## 1. Project Intro - Patterns Decisions and Implementation Overview

For this project a **python library  structure was built** to centralize all the data transformation processes, ensuring consistency, reusability, and maintainability across various data handling tasks.

library tree:

```bash
etl
├── factory.py
└── __init__.py
```

This project is full conteinarized so for using it inside these notebook just need to import it - as we are going to do on a next step.

This project adopts a [**medallion architecture**](https://www.databricks.com/glossary/medallion-architecture), processing data through *Bronze*, *Silver*, and *Gold* layers, which emulates a data lake architecture locally. This setup allows for scalable and modular data processing.

For this project all the data is saved on .csv or .json files but as the idea is to simulate a Data Lake architecture, we could connect this to a real one and save the files over the data lake (as parquet, for example).

We opted for Pandas as the primary data manipulation library for this project due to its efficiency with medium-sized datasets and its rich feature set for data analysis and transformation. Given the current scale of our data, which does not necessitate distributed computing, Pandas offers a good straightforward and developer-friendly environment to achieve our objectives efficiently. 

Moreover, our ETL system is designed with modularization and employs robust design patterns, which are conducive to scalability. Should the need arise to handle larger datasets or to leverage distributed computing, our architecture allows for a relatively smooth transition to distributed frameworks like Spark, Dask, Polars, or Ray, ensuring future-proofing and adaptability of our data processing capabilities.

More in detail about each layer on this project:

**Landing** 
> This folder acts as the initial ingestion point where raw data is dumped directly from the source systems. No processing or transformation is done at this stage. It’s purely for data intake.

**Bronze** 
> Data moves from the Landing folder to the Bronze folder after initial logging and perhaps some minor processing like adding metadata (load timestamps, source identifiers, etc.). The Bronze layer should provide capabilities to handle Change Data Capture (CDC), ensure data lineage and auditability, and support reprocessing if necessary without needing to re-read from the source systems.

**Silver**
> The silver folder represents an intermediate stage of data processing, where data is further cleansed, merged, and conformed into more consistent and usable forms.The silver layer supports more complex data operations that are necessary for deriving meaningful business insights.

**Gold** 
> Gold data is the endpoint of the data transformation process. There we have the ouput of the tasks who were designated to this project (tasks 1 to 5). In a real scenario, it represents the highest value in terms of data usability and relevance to business users. 

## 2. Pipeline runs through the layers (Bronze, Silver and Gold)

In [1]:
from etl.factory import DataProcessorFactory

In [2]:
def pipeline(layer, origin_layer_dir, destiny_layer_dir):
    """
    Executes the data processing pipeline for a specified layer using a data processor factory.

    This function initializes the data processor for a given layer, sets the directories for data input and output, and triggers the data processing.

    Args:
        layer (str): The layer of the data processing pipeline to execute. Valid options are 'bronze', 'silver', or 'gold'.
        origin_layer_dir (str): The directory where the input data is located. This is the starting point for data processing.
        destiny_layer_dir (str): The directory where the processed data should be saved. This acts as the output location for the data.

    Returns:
        None: This function does not return anything but will trigger the processing of data as per the specified layer.
    """
    processor = DataProcessorFactory(origin_layer_dir, destiny_layer_dir)
    processor = processor.get_processor(layer)
    processor.process_data()


### 2.1 Landing -> Bronze

In [3]:
layer = 'bronze'
origin_layer_dir = '../data/landing'
destiny_layer_dir = '../data/bronze'

# Run pipeline
pipeline(layer, origin_layer_dir, destiny_layer_dir)

[32m2024-04-29 23:56:42.559[0m | [1mINFO    [0m | [36metl.factory[0m:[36mprocess_data[0m:[36m63[0m - [1mStarting processing for Bronze layer[0m
[32m2024-04-29 23:56:42.560[0m | [1mINFO    [0m | [36metl.factory[0m:[36mread_data[0m:[36m26[0m - [1mReading all files in the landing directory[0m
[32m2024-04-29 23:56:45.433[0m | [1mINFO    [0m | [36metl.factory[0m:[36msave_data[0m:[36m56[0m - [1mData saved to Bronze layer at ../data/bronze/prints.csv[0m
[32m2024-04-29 23:56:45.434[0m | [1mINFO    [0m | [36metl.factory[0m:[36mprocess_json[0m:[36m39[0m - [1mProcessed and saved 508617 records from prints.json to Bronze layer[0m
[32m2024-04-29 23:56:47.175[0m | [1mINFO    [0m | [36metl.factory[0m:[36msave_data[0m:[36m56[0m - [1mData saved to Bronze layer at ../data/bronze/pays.csv[0m
[32m2024-04-29 23:56:47.175[0m | [1mINFO    [0m | [36metl.factory[0m:[36mprocess_csv[0m:[36m46[0m - [1mProcessed and saved 756483 records from pa

### 2.2 Bronze -> Silver

In [4]:
layer = 'silver'
origin_layer_dir = '../data/bronze'
destiny_layer_dir = '../data/silver'

# Run pipeline
pipeline(layer, origin_layer_dir, destiny_layer_dir)

[32m2024-04-29 23:56:47.403[0m | [1mINFO    [0m | [36metl.factory[0m:[36mprocess_data[0m:[36m93[0m - [1mProcessing data for Silver layer[0m
[32m2024-04-29 23:56:47.404[0m | [1mINFO    [0m | [36metl.factory[0m:[36mread_data[0m:[36m74[0m - [1mReading all files in the bronze directory[0m
Extracting value_prop from event_data: 100%|██████████| 508617/508617 [00:03<00:00, 131406.68it/s]
Extracting value_prop from event_data: 100%|██████████| 50859/50859 [00:00<00:00, 129476.50it/s]
[32m2024-04-29 23:56:52.650[0m | [1mINFO    [0m | [36metl.factory[0m:[36mprocess_data[0m:[36m130[0m - [1mProcessed and saved data to Gold layer at ../data/gold/task0_prints_last_3_weeks.json[0m
[32m2024-04-29 23:56:54.255[0m | [1mINFO    [0m | [36metl.factory[0m:[36msave_data[0m:[36m179[0m - [1mProcessed and saved to Silver layer at ../data/silver/prints_taps_and_pays_daily.csv[0m
[32m2024-04-29 23:56:54.256[0m | [1mINFO    [0m | [36metl.factory[0m:[36mproces

### 2.3 Silver -> Gold

In [5]:
# Silver
layer = 'gold'
origin_layer_dir = '../data/silver'
destiny_layer_dir = '../data/gold'

# Run pipeline
pipeline(layer, origin_layer_dir, destiny_layer_dir)

[32m2024-04-29 23:56:54.288[0m | [1mINFO    [0m | [36metl.factory[0m:[36mprocess_data[0m:[36m195[0m - [1mProcessing data for Gold layer[0m
[32m2024-04-29 23:56:54.289[0m | [1mINFO    [0m | [36metl.factory[0m:[36mread_data[0m:[36m188[0m - [1mReading all files in the Silver directory[0m
[32m2024-04-29 23:56:54.477[0m | [1mINFO    [0m | [36metl.factory[0m:[36mprocess_data[0m:[36m200[0m - [1mData read successfully. Ready for further processing.[0m
[32m2024-04-29 23:56:54.813[0m | [1mINFO    [0m | [36metl.factory[0m:[36msave_task1_json[0m:[36m287[0m - [1mProcessed and saved data to Gold layer at ../data/gold/task1_prints_with_clicked_parameter.json[0m
[32m2024-04-29 23:56:56.201[0m | [1mINFO    [0m | [36metl.factory[0m:[36msave_data[0m:[36m276[0m - [1mProcessed and saved data to Gold layer at ../data/gold/task_2_views_on_each_value_prop_last_3_weeks.csv[0m
[32m2024-04-29 23:56:57.618[0m | [1mINFO    [0m | [36metl.factory[0m:

## 3. Expected Results and Show Results

### 3.1 Tasks Definition
- **task_0**: *Prints from the Last Week*:
- *For each print*:
    - **task_1**: A field indicating if the value props were clicked or not.
    - **task_2**: The number of views each value proposition received in the last 3 weeks prior to the print.
    - **task_3**: The number of times a user clicked on each of the value props in the last 3 weeks prior to the print.
    - **task_4**: The number of payments made by the user for each value proposition in the last 3 weeks prior to the print.
    - **task_5**: The total amount of payments made by the user for each value proposition in the last 3 weeks prior to the print.

### 3.2 Show Results

In [6]:
# Lets first import some utils to print the results:
from utils import read_task0_data, read_task1_data, read_task2_data, read_task3_data, read_tasks4_and_5_data

json_task0 = read_task0_data()
json_task1 = read_task1_data()
df_task2 = read_task2_data()
df_task3 = read_task3_data()
df_task4_and_task5 = read_tasks4_and_5_data()

##### **important** 
> All the code was implemented mainly on the [factory.py](../etl/factory.py), so the idea is to show the final result and a brief explanation about the decisions made.


#### task_0
Above is a one line of the [**task0_prints_last_3_weeks.json**](../data/gold/task0_prints_last_3_weeks.json) file:

In [7]:
# 1 observation of the prints from the last 3 weeks before the last week:
# here we have a json file with all the prints from the last_3 weeks.

# The first one on the [**task0_prints_last_3_weeks.json**](../data/gold/task0_prints_last_3_weeks.json but you could check the file output for more examples:
print(f"The first record on the ../data/gold/task0_prints_last_3_weeks.json:\n\n{json_task0[:78]}")


# As we can see, we just have data on the last week of the 11-2020 month:
print(f"Some other data inside the ../data/gold/task0_prints_last_3_weeks.json:\n\n{json_task0[78:541]}")


The first record on the ../data/gold/task0_prints_last_3_weeks.json:

{"day":"2020-11-30","event_data":{"value_prop":"send_money"},"user_id":59706}

Some other data inside the ../data/gold/task0_prints_last_3_weeks.json:

{"day":"2020-11-30","event_data":{"value_prop":"link_cobro"},"user_id":32191}
{"day":"2020-11-30","event_data":{"value_prop":"transport"},"user_id":32191}
{"day":"2020-11-30","event_data":{"value_prop":"send_money"},"user_id":32191}
{"day":"2020-11-30","event_data":{"value_prop":"prepaid"},"user_id":53960}
{"day":"2020-11-30","event_data":{"value_prop":"link_cobro"},"user_id":53960}
{"day":"2020-11-30","event_data":{"value_prop":"send_money"},"user_id":53960}


to create the prints from the last 3 weeks before the last week we take the steps:
1. Extract value_prop from event_data
2. define 'week' column indicating the week of the month (based on [series.dt.isocalendar().week](https://pandas.pydata.org/docs/reference/api/pandas.Series.dt.isocalendar.html) method)
```python
               # Extract value_prop from event_data
            df_w_prints = self.extract_event_data(data_frames['prints'])
            df_w_taps = self.extract_event_data(data_frames['taps'])
            df_w_pays = data_frames['pays'].copy()

            # Add 'week' column indicating the week of the month
            df_w_prints['week'] = df_w_prints['day'].dt.isocalendar().week
```
after that, we got that, the last week was the number 49 - so, we could esally extract the last week data just filtering it from the data transformation that we was doing at the Silver layer:
```python
            # task 0:
            # Step 1: Filter data for week 49
            df_week_49 = df_w_prints[df_w_prints['week'] == 49]
            
            # Step 2: Transform the DataFrame to match the JSON structure
            json_data = df_week_49.apply(self._transform_row_to_prints_format, axis=1).to_json(orient='records', lines=True)

```

the self.transform_row_to_prints_format is a function to recreate the prints.json struct:
```python
    def _transform_row_to_prints_format(self, row):
        return {
            "day": row['day'].strftime('%Y-%m-%d'),  # Format day as string if it's a datetime object
            "event_data": {
                "value_prop": row['value_prop']
            },
            "user_id": row['user_id']
        }
```

3. Save the data on the gold layer - I choosed to save on the gold layear all the tasks outputs, even this is not actually a "ready to use" data.
   ```python
            with open(output_path, 'w') as file:
                file.write(json_data)

            logger.info(f"Processed and saved data to Gold layer at {output_path}")
   ```

#### task_1
Above is a one line of the [**task1_prints_with_clicked_parameter.json**](../data/gold/task1_prints_with_clicked_parameter.json) file:

In [8]:
# 1 observation of the prints with clicked paramter
json_task1[:77]

'{"user_id":96544,"day_prints":"2020-11-09","value_prop":"point","clicked":0}\n'

In [9]:
# 2 observations...
json_task1[:150]

'{"user_id":96544,"day_prints":"2020-11-09","value_prop":"point","clicked":0}\n{"user_id":96544,"day_prints":"2020-11-09","value_prop":"cellphone_rechar'

the idea was to build a similar struct as the [prints.json](../data/landing/prints.json) file:

```json
{"day":"2020-11-01","event_data":{"position":0,"value_prop":"cellphone_recharge"},"user_id":98702}
```

the output, example:

```json
'{"user_id":96544,"day_prints":"2020-11-09","value_prop":"point","clicked":0}\n'
```

to create the clicked label that could be 0: not clicked or 1: clicked I did the followings decisions:
1. Extract value_prop from event_data
2. dd 'week' column indicating the week of the month (based on series.dt.isocalendar().week function)
3. Define what is a click:
    i. day+value_prop on the taps.json and prints.json if is it equal on both files, thas is a click!

    ```python
                # Extract value_prop from event_data
            df_w_prints = self.extract_event_data(data_frames['prints'])
            df_w_taps = self.extract_event_data(data_frames['taps'])
            df_w_pays = data_frames['pays'].copy()

            # Add 'week' column indicating the week of the month
            df_w_prints['week'] = df_w_prints['day'].dt.isocalendar().week

            # Creating a 'day+value_prop' identifier for merging
            df_w_prints['day_value_prop'] = df_w_prints['day'].astype(str) + df_w_prints['value_prop']
            df_w_taps['day_value_prop'] = df_w_taps['day'].astype(str) + df_w_taps['value_prop']

            # Merge to find if prints were clicked
            merged_df = pd.merge(df_w_prints, df_w_taps, 
                                 on=['user_id', 'day'], how='left', suffixes=('_prints', '_taps'))
            merged_df['clicked'] = merged_df['day_value_prop_prints'] == merged_df['day_value_prop_taps']
            merged_df['clicked'] = merged_df['clicked'].astype(int)  # Convert boolean to 1 or 0
    ```

#### task_2
Above is a summary about the [**task_2_views_on_each_value_prop_last_3_weeks**](../data/gold/task_2_views_on_each_value_prop_last_3_weeks.csv) csv file output:

In [10]:
# we filtered last-3 weeks (the last 3 weeks before the last week of the month)
df_task2.head()

Unnamed: 0,user_id,week_prints,value_prop_prints
0,1,last-3,"{'link_cobro': 2, 'prepaid': 1, 'credits_consu..."
1,2,last-3,"{'cellphone_recharge': 1, 'point': 1, 'send_mo..."
2,3,last-3,"{'transport': 1, 'point': 1, 'cellphone_rechar..."
3,4,last-3,"{'cellphone_recharge': 1, 'credits_consumer': ..."
4,5,last-3,"{'transport': 1, 'point': 1}"


In [11]:
# As we could see above, there is 1 observation for each user_id, as expected.
print(df_task2.user_id.value_counts(dropna=False))

100000    1
1         1
2         1
3         1
99949     1
         ..
8         1
9         1
11        1
12        1
13        1
Name: user_id, Length: 76804, dtype: int64


In [12]:
#So for the last-3 weeks, we have this distribuction of value_prop_prints that appeard for the users:

for ix,row in df_task2.head().iterrows():
    print(f"for the user_id:{row.user_id}\n{row.value_prop_prints}\n")

for the user_id:1
{'link_cobro': 2, 'prepaid': 1, 'credits_consumer': 1, 'transport': 2, 'point': 1, 'send_money': 1, 'cellphone_recharge': 1}

for the user_id:2
{'cellphone_recharge': 1, 'point': 1, 'send_money': 1}

for the user_id:3
{'transport': 1, 'point': 1, 'cellphone_recharge': 1, 'link_cobro': 1, 'prepaid': 1, 'send_money': 1}

for the user_id:4
{'cellphone_recharge': 1, 'credits_consumer': 1, 'prepaid': 1, 'link_cobro': 1}

for the user_id:5
{'transport': 1, 'point': 1}



For this task we also used all the SilverDataProcessor pipeline and on the merged data we applyied some agg with groupby:

plus: I used the [collections.Counter](https://docs.python.org/3/library/collections.html#collections.Counter) function that apply a optimized implementation of counts algoritimics

```python
            # ○ task2: Each of the value props views number in the last 3 weeks prior to the print mentioned before.
            f_counter = lambda x: dict(Counter(list(x)))
            df_agg_value_prop_views = data.groupby(['user_id', 'week_prints']).agg({"value_prop_prints": f_counter}).reset_index()
```

#### task_3
Above is a summary about the [**task_3_clickes_on_each_value_prop_last_3_weeks**](../data/gold/task_3_clickes_on_each_value_prop_last_3_weeks.csv) csv file output:

In [13]:
# we filtered last-3 weeks (the last 3 weeks before the last week of the month)
df_task3.head()

Unnamed: 0,user_id,clicked
0,1,"{'point': 0, 'cellphone_recharge': 0, 'credits..."
1,2,"{'point': 0, 'cellphone_recharge': 0}"
2,3,"{'point': 0, 'cellphone_recharge': 1, 'credits..."
3,4,"{'point': 1, 'cellphone_recharge': 0}"
4,5,"{'point': 0, 'cellphone_recharge': 0}"


In [14]:
# As we could see above, there is 1 observation for each user_id, as expected.
print(df_task3.user_id.value_counts(dropna=False))

100000    1
1         1
2         1
3         1
99949     1
         ..
8         1
9         1
11        1
12        1
13        1
Name: user_id, Length: 76804, dtype: int64


In [15]:
#So for the last-3 weeks, we have this distribuction of
# clicked over each print

for ix,row in df_task3.head().iterrows():
    print(f"for the user_id:{row.user_id}\n{row.clicked}\n")

for the user_id:1
{'point': 0, 'cellphone_recharge': 0, 'credits_consumer': 0, 'link_cobro': 0, 'transport': 0}

for the user_id:2
{'point': 0, 'cellphone_recharge': 0}

for the user_id:3
{'point': 0, 'cellphone_recharge': 1, 'credits_consumer': 0}

for the user_id:4
{'point': 1, 'cellphone_recharge': 0}

for the user_id:5
{'point': 0, 'cellphone_recharge': 0}



For task 3 we did a very simmilalr approach:

- used transformed data from Silver layer;
- apply agg and groupby function:

  ```python
            # ○ task3: Number of times a user clicked on each of the value props in the last 3 weeks prior to the print mentioned before
            msk = data.week_prints == 'last'
            df_agg_clicks_for_each_value_prop = self._agg_clicks_data_for_each_user(data[~msk])

            # The auxiliar function that we used on this case:
                def _agg_clicks_data_for_each_user(self, df):

                    # Aggregate the data
                    result_df = df.groupby('user_id').agg({
                        'clicked': lambda x: dict(zip(df['value_prop_prints'], x))
                        }).reset_index()

                    return result_df

  ```

#### task_4 and task_5:
Above is a summary about the [**tasks_4_and_5_user_payments_summary_last_3_weeks**](../data/gold/tasks_4_and_5_user_payments_summary_last_3_weeks.csv) csv file output:

In [16]:
# we filtered last-3 weeks (the last 3 weeks before the last week of the month)
df_task4_and_task5.head()

Unnamed: 0,user_id,count_of_pays_for_each_value_props,sum_of_pays_for_each_value_props
0,12,"{'credits_consumer': 1, 'point': 1}","{'credits_consumer': 17.23, 'point': 53.15}"
1,15,{'credits_consumer': 2},{'credits_consumer': 90.50999999999999}
2,30,"{'credits_consumer': 1, 'point': 1}","{'credits_consumer': 171.96, 'point': 7.39}"
3,33,{'credits_consumer': 1},{'credits_consumer': 4.66}
4,44,{'credits_consumer': 1},{'credits_consumer': 101.38}


In [17]:
# As we could see above, there is 1 observation for each user_id, as expected.
print(df_task4_and_task5.user_id.value_counts(dropna=False))

99993    1
12       1
15       1
30       1
33       1
        ..
83       1
84       1
91       1
100      1
101      1
Name: user_id, Length: 21413, dtype: int64


In [18]:
#So for the last-3 weeks, we have this distribuction of value_prop_prints that appeard for the users:

for ix,row in df_task4_and_task5.head().iterrows():
    print(f"for the user_id:{row.user_id}\n{row.count_of_pays_for_each_value_props}\n{row.sum_of_pays_for_each_value_props}\n\n")

for the user_id:12
{'credits_consumer': 1, 'point': 1}
{'credits_consumer': 17.23, 'point': 53.15}


for the user_id:15
{'credits_consumer': 2}
{'credits_consumer': 90.50999999999999}


for the user_id:30
{'credits_consumer': 1, 'point': 1}
{'credits_consumer': 171.96, 'point': 7.39}


for the user_id:33
{'credits_consumer': 1}
{'credits_consumer': 4.66}


for the user_id:44
{'credits_consumer': 1}
{'credits_consumer': 101.38}




That task was more intersting because we needed to do some more groupby's before we apply the agg with the custom function:

1. we needed to creat count_pays and sum_pays for each week on the last-3 weeks grouped by 3 keys:
- `user_id`, `week_prints`, `value_prop_pays` -> because we want to calculate the sum for each week for each user for each value_prop on the value_prop_pays (value_prop from pays.csv data after transformation and merging)

3. after that, we created a last
```python
            df_tmp_count_and_sum_pays = data.groupby(['user_id', 'week_prints', 'value_prop_pays']).agg({"total": set})
            df_tmp_count_and_sum_pays["count_pays"] = df_tmp_count_and_sum_pays.total.apply(lambda x: len(x))
            df_tmp_count_and_sum_pays["sum_pays"] = df_tmp_count_and_sum_pays.total.apply(lambda x: sum(x))
            df_tmp_count_and_sum_pays = df_tmp_count_and_sum_pays.reset_index()

            # Just w-3 data
            msk_last_week = df_tmp_count_and_sum_pays.week_prints == "last"
            df_tmp_count_and_sum_pays = df_tmp_count_and_sum_pays[~msk_last_week]

            # agg last 3 weeks data
            df_agg_count_and_sum_pays = self._agg_pays_data_for_each_user(df_tmp_count_and_sum_pays)

            df_agg_count_and_sum_pays.rename(columns={"count_pays":"count_of_pays_for_each_value_props","sum_pays":"sum_of_pays_for_each_value_props"}, inplace=True)
```