**Introducción**  

Contruir un pipeline de datos utilizando Python que considere los siguientes requisitos:

**Data Sources**

- Prints (prints.json) - historial de 1 mes de value props que fueron mostradas a cada usuario, en formato json lines
- Taps (taps.json) - historial de 1 mes de value props que fueron clickeadas por un usuario, en formato json lines
- Payments (pays.csv) - historial de 1 mes de pagos realizados por los usuarios, en formato csv

**Resultado esperado**

Un dataset de salida con la siguiente información:

- prints de la última semana
- por cada print:
  - un campo que indique si se hizo click o no
  - cantidad de veces que el usuario vio cada value prop en las 3 semanas previas a ese print.
 - cantidad de veces que el usuario clickeo cada value prop en las 3 semanas previas a ese print.
  - cantidad de pagos que el usuario realizó para cada value prop en las 3 semanas previas a ese print.
  - importes acumulados que el usuario gasto para cada value prop en las 3 semanas previas a ese print.

**Importación de librerias a utilizar**

In [11]:
import pandas as pd
import matplotlib as mpl
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
import matplotlib.ticker as ticker
import csv
import io
import os
import re

import warnings
warnings.filterwarnings("ignore")

sns.set(color_codes=True)
pd.set_option('display.max_columns', None)

**Carga del conjunto de datos**  

Se hace uso de la libreria "pandas" para cargar los respectivos conjuntos de datos

In [49]:
df_prints = pd.read_json('data/prints.json', lines=True)
df_prints.head(10)

Unnamed: 0,day,event_data,user_id
0,2020-11-01,"{'position': 0, 'value_prop': 'cellphone_recha...",98702
1,2020-11-01,"{'position': 1, 'value_prop': 'prepaid'}",98702
2,2020-11-01,"{'position': 0, 'value_prop': 'prepaid'}",63252
3,2020-11-01,"{'position': 0, 'value_prop': 'cellphone_recha...",24728
4,2020-11-01,"{'position': 1, 'value_prop': 'link_cobro'}",24728
5,2020-11-01,"{'position': 2, 'value_prop': 'credits_consumer'}",24728
6,2020-11-01,"{'position': 3, 'value_prop': 'point'}",24728
7,2020-11-01,"{'position': 0, 'value_prop': 'point'}",25517
8,2020-11-01,"{'position': 1, 'value_prop': 'credits_consumer'}",25517
9,2020-11-01,"{'position': 2, 'value_prop': 'transport'}",25517


In [47]:
df_taps = pd.read_json('data/taps.json', lines=True)
df_taps.head(10)

Unnamed: 0,day,event_data,user_id
0,2020-11-01,"{'position': 0, 'value_prop': 'cellphone_recha...",98702
1,2020-11-01,"{'position': 2, 'value_prop': 'point'}",3708
2,2020-11-01,"{'position': 3, 'value_prop': 'send_money'}",3708
3,2020-11-01,"{'position': 0, 'value_prop': 'transport'}",93963
4,2020-11-01,"{'position': 1, 'value_prop': 'cellphone_recha...",93963
5,2020-11-01,"{'position': 0, 'value_prop': 'link_cobro'}",94945
6,2020-11-01,"{'position': 1, 'value_prop': 'cellphone_recha...",94945
7,2020-11-01,"{'position': 2, 'value_prop': 'prepaid'}",89026
8,2020-11-01,"{'position': 0, 'value_prop': 'link_cobro'}",7616
9,2020-11-01,"{'position': 0, 'value_prop': 'link_cobro'}",63471


In [48]:
df_pays = pd.read_csv('data/pays.csv')
df_pays.head(10)

Unnamed: 0,pay_date,total,user_id,value_prop
0,2020-11-01,7.04,35994,link_cobro
1,2020-11-01,37.36,79066,cellphone_recharge
2,2020-11-01,15.84,19321,cellphone_recharge
3,2020-11-01,26.26,19321,send_money
4,2020-11-01,35.35,38438,send_money
5,2020-11-01,20.95,85939,transport
6,2020-11-01,74.48,14372,prepaid
7,2020-11-01,31.52,14372,link_cobro
8,2020-11-01,83.76,65274,transport
9,2020-11-01,93.54,65274,prepaid


**Analisis exploratorio de las fuentes**  

El objetivo de esta fase es realizar un analisis descriptivo de las fuentes de datos a procesar con el fin de identificar su estructura, los tipos de datos utilizados y la distribución de los mismos.

In [44]:
df_prints.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 508617 entries, 0 to 508616
Data columns (total 3 columns):
 #   Column      Non-Null Count   Dtype 
---  ------      --------------   ----- 
 0   day         508617 non-null  object
 1   event_data  508617 non-null  object
 2   user_id     508617 non-null  int64 
dtypes: int64(1), object(2)
memory usage: 11.6+ MB


In [41]:
df_taps.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 50859 entries, 0 to 50858
Data columns (total 3 columns):
 #   Column      Non-Null Count  Dtype 
---  ------      --------------  ----- 
 0   day         50859 non-null  object
 1   event_data  50859 non-null  object
 2   user_id     50859 non-null  int64 
dtypes: int64(1), object(2)
memory usage: 1.2+ MB


In [42]:
df_pays.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 756483 entries, 0 to 756482
Data columns (total 4 columns):
 #   Column      Non-Null Count   Dtype  
---  ------      --------------   -----  
 0   pay_date    756483 non-null  object 
 1   total       756483 non-null  float64
 2   user_id     756483 non-null  int64  
 3   value_prop  756483 non-null  object 
dtypes: float64(1), int64(1), object(2)
memory usage: 23.1+ MB


- El set de datos "prints" tiene un total de 508.617 entradas y 3 columnas day, event_data y user_id, no presenta valores nulos.
- El set de datos "taps" tiene un total de 50.859 entradas y 3 columnas day, event_data y user_id, no presenta valores nulos.
- El set de datos "pays" tiene un total de 756.483 entradas y 4 columnas pay_date, total, user_id y value_prop, no presenta valores nulos.

De los 3 conjuntos de datos se infiere que los campos "user_id" y "value_prop" serán propicios para realizar las operaciones de agrupación a través del tiempo para el dataset de salida requerido.

**Preparación y limpieza de los datos**

Como parte de la fase de preparación de los datos se aplicaran las siguientes tranformaciones:

+ Convertir las columnas que representan fechas al tipo "datetime" conservando el formato 'YYYY-MM-DD' y agregar una nueva columna 'day_week' para representar la semana correspondiente a la fecha en el respectivo año calendario
+ Aplicar una operación tipo "explode/flatten" para el campo event_data, que significa transformar una estructura jerárquica potencialmente anidada (json) en un formato tabular en la que cada par clave-valor se convierte en columnas y filas.
+ Ordernar los conjuntos de datos por la fecha del evento y el user_id en orden descendente

In [85]:
def object_to_datetime(df: pd.DataFrame, column_name: str) -> pd.DataFrame:
    df_converted = df.copy()
    try:
            converted = pd.to_datetime(df_converted[column_name], errors='raise')
            df_converted[column_name] = converted
            df_converted['day_week'] = converted.dt.isocalendar().week
    except (ValueError, TypeError):
           print("Error converting object to datetime type")
    return df_converted

In [58]:
def flatten_json_column(df: pd.DataFrame, column_name: str, record_prefix: str = '') -> pd.DataFrame:
    """
    Expande una columna que contiene objetos JSON y adiciona los campos resultantes al df orginal.

    Parameters:
    - df: The original dataframe.
    - column_name: The name of the column containing JSON/dict objects.
    - record_prefix: Prefix to add to flattened columns (optional).

    Returns:
    - A new dataframe with the JSON column flattened and merged.
    """
    # Ensure the column contains dict-like structures
    json_series = df[column_name].apply(lambda x: x if isinstance(x, dict) else {})

    # Normalize (flatten) the JSON column
    flattened = pd.json_normalize(json_series, sep='_')
    if record_prefix:
        flattened = flattened.add_prefix(record_prefix)

    # Combine with original DataFrame
    df_result = pd.concat([df.drop(columns=[column_name]), flattened], axis=1)
    return df_result

In [132]:
def clean_data(df1: pd.DataFrame, df2: pd.DataFrame, df3: pd.DataFrame):
  _df_prints = flatten_json_column(object_to_datetime(df1,'day'), 'event_data').sort_values(["day","user_id"], ascending=False)
  _df_taps   = flatten_json_column(object_to_datetime(df2,'day'), 'event_data').sort_values(["day","user_id"], ascending=False)
  _df_pays   = object_to_datetime(df3,'pay_date').sort_values(["pay_date","user_id"], ascending=False)

  return _df_prints, _df_taps, _df_pays

In [133]:
df_prints_final, df_taps_final, df_pays_final = clean_data(df_prints,df_taps,df_pays)

In [134]:
print("Prints :")
print(df_prints_final.head(10))
print("\nTaps :")
print(df_taps_final.head(10))
print("\nPayments :")
print(df_pays_final.head(10))

Prints :
              day  user_id  day_week  position          value_prop
502448 2020-11-30    99990        49         0           transport
502449 2020-11-30    99990        49         1    credits_consumer
502450 2020-11-30    99990        49         2               point
499856 2020-11-30    99989        49         0    credits_consumer
499857 2020-11-30    99989        49         1               point
503208 2020-11-30    99969        49         0          link_cobro
503436 2020-11-30    99947        49         0          send_money
501252 2020-11-30    99939        49         0  cellphone_recharge
501253 2020-11-30    99939        49         1               point
501254 2020-11-30    99939        49         2             prepaid

Taps :
             day  user_id  day_week  position          value_prop
49383 2020-11-30    99929        49         0          send_money
50749 2020-11-30    99912        49         0    credits_consumer
50750 2020-11-30    99912        49         1  c

Examinando valores minimos y maximos para las columnas tipo fecha

In [145]:
df_prints_final[['day','day_week']].agg(['min', 'max'])

Unnamed: 0,day,day_week
min,2020-11-01,44
max,2020-11-30,49


In [146]:
df_taps_final[['day','day_week']].agg(['min', 'max']).agg(['min', 'max'])

Unnamed: 0,day,day_week
min,2020-11-01,44
max,2020-11-30,49


In [147]:
df_pays_final[['pay_date','day_week']].agg(['min', 'max'])

Unnamed: 0,pay_date,day_week
min,2020-11-01,44
max,2020-11-30,49


El análisis confirma que el intervalo de tiempo para el análisis corresponde al mes de Noviembre de 2020, para las semanas 44 a la 49 de ese año calendario

**Transformación de los datos para el dataset de salida**

Se define una nueva función *transform_data*, la cual recibe como parametros los 3 conjuntos de datos obtenidos del paso anterior y deberá retornar un solo dataset con lo requerimientos descritos en el apartado de Introducción, así:

- Seleccionar los print correspondientes a la semana 49, por cada print, crear los siguientes atributos:
  - 'qty_clicked_current': indica si se hizo click o no en el print
  - 'qty_viewed_3weeks: cantidad de veces que el usuario vio cada value prop en las 3 semanas previas a ese print.
  - 'qty_clicked_3weeks': cantidad de veces que el usuario clickeo cada value prop en las 3 semanas previas a ese print.
  - 'qty_paid_3weeks': cantidad de pagos que el usuario realizó para cada value prop en las 3 semanas previas a ese print.
  - 'amt_paid_3weeks': importes acumulados que el usuario gasto para cada value prop en las 3 semanas previas a ese print.