# Pipeline de procesamiento

### Importar librerias

In [1]:
import pandas as pd
import dask.dataframe as dd

### Cargar la data

Si bien el formato del archivo ```.parquet``` hace referencia a una forma de almacenar de manera optima grandes volumenes de datos de manera eficiente y comprimida. Sin embargo, en ciertos escenarios, como cuando se trabaja con un volumen de datos relativamente pequeño y en un solo computador, su uso puede generar un tiempo de carga y procesamiento innecesariamente prolongado. <br>

Para el desarrollo de esta prueba, se optó por Dask para cargar los archivos de manera distribuida ademas del procesamiento en un solo nodo y Pandas para la visualizacion de la data. Esta combinación permitió un enfoque más ágil y eficiente para el procesamiento de 20 millones de datos, aprovechando la capacidad de paralelismo de Dask y la versatilidad de Pandas para operaciones de análisis y visualizacion. 

In [2]:
route1 = 'sample_data_0006_part_00.parquet'
route2 = 'sample_data_0007_part_00.parquet'

df_0006 = dd.read_parquet(route1)
df_0007 = dd.read_parquet(route2)
df = dd.concat([df_0006, df_0007], ignore_index=True)
df = df.compute()

In [8]:
df.head()

Unnamed: 0,merchant_id,_id,subsidiary,transaction_date,account_number,user_id,transaction_amount,transaction_type
0,075d178871d8d48502bf1f54887e52fe,aa8dacff663072244d0a8ab6bbe36b93,824b2af470cbe6a65b15650e03b740fc,2021-09-12 18:32:03,648e257c9d74909a1f61c54b93a9e1b3,ba42d192a145583ba8e7bf04875f837f,178.33365037,CREDITO
1,075d178871d8d48502bf1f54887e52fe,a53bb81bd0bba2ae2535bda7ea5a550c,2d8d34be7509a6b1262336d036fdb324,2021-09-12 18:31:58,c0b62f9046c83ea5543ea46a497a4d6e,5cfff960ea6d732c1ba3e63d24f3be52,35.66673007,CREDITO
2,075d178871d8d48502bf1f54887e52fe,79f893ea65c06fe2933f3847c88c272f,5eeb18254850b21af0a6bb2697913cd3,2021-09-12 18:31:56,872d10143fc0ac7d5de467806f6bef81,c97e63a92c82c7217b333635d75928ed,142.66692029,CREDITO
3,075d178871d8d48502bf1f54887e52fe,ce577223699dbdc119df2ab8a35457fe,5221a599856c0a0588ed9ffd150edd3b,2021-09-12 18:31:10,2e35cfe7860a480a93e1c83e99843579,fc09bdd00f283222d65eaff4d00a6594,8.32223701,CREDITO
4,075d178871d8d48502bf1f54887e52fe,0ce7ab2950e7a788a608f881aec0f8f0,971e55ef12d80ec070ea4f6750c8b892,2021-09-12 18:31:07,995bc89e4c4e00334f1f90c4a55f4729,213527e8ba94fcaf2f9378969f9f6abc,32.10005706,CREDITO


### Evaluación de calidad de datos

Se evalúa la calidad de los datos en función de un framework basico de dimensiones de calidad de datos para determinar las instancias de operación durante el procesamiento y las transformación. Lo primero es hacer una revision de **la unicidad** y **la completitud**, comprobando si existen valores duplicados y valores nulos, respectivamente. 

In [9]:
total_duplicates = df.duplicated().sum()
print(f"El conjunto de datos tiene {total_duplicates} registros duplicados")

El conjunto de datos tiene 11 registros duplicados


In [10]:
have_nulls= df.isnull().any().any()
print(f"¿El dataset tiene valores nulos? {have_nulls}")

¿El dataset tiene valores nulos? False


En segunda instancia se revisa **la consistencia**. Como los datos deben tener uniformidad y coherencia se realiza una evaluación en términos de formato, unidades y valores, para esta etapa se revisa el diccionario de datos y se contrasta la información con el dataframe. 

In [11]:
df.info()

<class 'pandas.core.frame.DataFrame'>
Index: 21516918 entries, 0 to 831362
Data columns (total 8 columns):
 #   Column              Dtype         
---  ------              -----         
 0   merchant_id         string        
 1   _id                 string        
 2   subsidiary          string        
 3   transaction_date    datetime64[ns]
 4   account_number      string        
 5   user_id             string        
 6   transaction_amount  string        
 7   transaction_type    string        
dtypes: datetime64[ns](1), string(7)
memory usage: 4.4 GB


Como la información esta codificada con carcateres alfanúmericos es coherente el formato string en la mayoria de las columnas. Sin embargo, la columna ```transaction_amount``` que representa la moneda ficticia debe tener un formato numerico para realizar operaciones futuras. En este escenario se decide cambiar este dato de string a float

Finalmente se evalúa **la validez** de los datos, es decir, que los datos esten dentro de los limites establecidos en función de las definiciones expuestas en el diccionario de datos

In [9]:
max_transaction_date = df['transaction_date'].max()
min_transaction_date = df['transaction_date'].min()

print(f" El rango de transacciones esta entre {min_transaction_date} y {max_transaction_date}")

 El rango de transacciones esta entre 2021-01-01 00:00:40 y 2021-11-30 23:59:49


### Transformación de la data

In [4]:
def string_to_float(column_name):

    df[column_name] = df[column_name].astype(float).round(3)
    return df

In [None]:
# Eliminar registros duplicados
df = df.drop_duplicates()
# Conversion de tipo de datos
df = string_to_float('transaction_amount')

# Se crea una columna a partir de la extracción de la fecha desde transaction_date
df.loc[:, 'date'] = df['transaction_date'].dt.date
df['date'] = pd.to_datetime(df['date'])

# Ordenar los datos por fecha
df = df.sort_values('transaction_date')

In [12]:
df.head()

Unnamed: 0,merchant_id,_id,subsidiary,transaction_date,account_number,user_id,transaction_amount,transaction_type,date
16066,838a8fa992a4aa2fb5a0cf8b15b63755,4a101c7ee6f4f8f1fcdbd4bc044c59d8,f54e0b6b32831a6307361ed959903e76,2021-01-01 00:00:40,8fee49f3fedc5590551fcb57a2f58a3e,2fa852d1bd38bbbcf4afd629f4a7c51b,5.944,CREDITO,2021-01-01
341948,817d18cd3c31e40e9bff0566baae7758,47d6c9460c5c27d63d6e66d23e598695,6f19909d89f3178ea74b3cee1f20af13,2021-01-01 00:01:08,f75a7c37888408308e02fef9086046fe,516c209606ac9f7e870802c90bffbaf8,59.445,DEBITO,2021-01-01
1097759,817d18cd3c31e40e9bff0566baae7758,8c2df593436163e9cbff2fe3050f0ae1,c1b186a762110afc5d510517b04b6329,2021-01-01 00:01:13,a49b390b90de1454f966c604da9687d3,8c8b5512b0772dc891bef619ecd2acfb,47.556,DEBITO,2021-01-01
124971,838a8fa992a4aa2fb5a0cf8b15b63755,6ed538998f04c804c299dc9fbc4d6e90,dff70ce33784a932ce4a7efc81a43863,2021-01-01 00:01:17,666e5e749c3ae3dea220e023a51c88f4,d0a25a8f11f7228d912e6874a0544d41,5.944,CREDITO,2021-01-01
1401278,838a8fa992a4aa2fb5a0cf8b15b63755,8801042a2192cdb8d96074442e5d4e06,d4b31b123120a4eefd51ba95975f2ae4,2021-01-01 00:01:45,c039cca9581596ee863c3c812835bfff,6b6113f0dfead46eb7f0c21ecb78d567,5.944,DEBITO,2021-01-01


In [13]:
# Verificación del cambio en el tipo de datos
df.info()

<class 'pandas.core.frame.DataFrame'>
Index: 21516907 entries, 16066 to 906278
Data columns (total 9 columns):
 #   Column              Dtype         
---  ------              -----         
 0   merchant_id         string        
 1   _id                 string        
 2   subsidiary          string        
 3   transaction_date    datetime64[ns]
 4   account_number      string        
 5   user_id             string        
 6   transaction_amount  float64       
 7   transaction_type    string        
 8   date                datetime64[ns]
dtypes: datetime64[ns](2), float64(1), string(6)
memory usage: 4.5 GB


# Output Pipeline de procesamiento

Decido guardar el archivo .parquet con el process pipeline para usarlo en el futuro sin necesidad de correr todo el proceso dada las limitaciones de computo. 

In [14]:
# Guardar el DataFrame procesado en formato Parquet
df.to_parquet('processing_data_0001_dummie.parquet')

In [15]:
df.head()

Unnamed: 0,merchant_id,_id,subsidiary,transaction_date,account_number,user_id,transaction_amount,transaction_type,date
16066,838a8fa992a4aa2fb5a0cf8b15b63755,4a101c7ee6f4f8f1fcdbd4bc044c59d8,f54e0b6b32831a6307361ed959903e76,2021-01-01 00:00:40,8fee49f3fedc5590551fcb57a2f58a3e,2fa852d1bd38bbbcf4afd629f4a7c51b,5.944,CREDITO,2021-01-01
341948,817d18cd3c31e40e9bff0566baae7758,47d6c9460c5c27d63d6e66d23e598695,6f19909d89f3178ea74b3cee1f20af13,2021-01-01 00:01:08,f75a7c37888408308e02fef9086046fe,516c209606ac9f7e870802c90bffbaf8,59.445,DEBITO,2021-01-01
1097759,817d18cd3c31e40e9bff0566baae7758,8c2df593436163e9cbff2fe3050f0ae1,c1b186a762110afc5d510517b04b6329,2021-01-01 00:01:13,a49b390b90de1454f966c604da9687d3,8c8b5512b0772dc891bef619ecd2acfb,47.556,DEBITO,2021-01-01
124971,838a8fa992a4aa2fb5a0cf8b15b63755,6ed538998f04c804c299dc9fbc4d6e90,dff70ce33784a932ce4a7efc81a43863,2021-01-01 00:01:17,666e5e749c3ae3dea220e023a51c88f4,d0a25a8f11f7228d912e6874a0544d41,5.944,CREDITO,2021-01-01
1401278,838a8fa992a4aa2fb5a0cf8b15b63755,8801042a2192cdb8d96074442e5d4e06,d4b31b123120a4eefd51ba95975f2ae4,2021-01-01 00:01:45,c039cca9581596ee863c3c812835bfff,6b6113f0dfead46eb7f0c21ecb78d567,5.944,DEBITO,2021-01-01


He creado un set de datos filtrado por los meses enero y febrero para testear los procesos. Esto me servira a mi de soporte para pruebas unitarias con las transformaciones o analisis futuros. 

In [6]:
df_may = df[df['date'].dt.month.isin([5])]

# Guardar el DataFrame procesado en formato Parquet
df_may.to_parquet('processing_data_0003_may.parquet')

In [9]:
df_may.head()

Unnamed: 0,merchant_id,_id,subsidiary,transaction_date,account_number,user_id,transaction_amount,transaction_type,date
1308460,817d18cd3c31e40e9bff0566baae7758,cccd4e8518a582b39ce1cffe44edc8af,96276bee7fbbf1ca462f3d47764d377a,2021-05-01 00:01:21,8bec2d575c1e33c74f92dd0a09501702,1dbae7a18837ce218aae142cc83c23a2,59.445,DEBITO,2021-05-01
1106433,817d18cd3c31e40e9bff0566baae7758,b24404b20feb753d1f5bd8f2011c3d27,4ba2bf9dfd5681f4f0da17ca275aa023,2021-05-01 00:01:29,3d9a19bd785dda7ba7dfdb8221a0e562,5e9f571ca5c463c40116da034975b60b,59.445,DEBITO,2021-05-01
1637920,817d18cd3c31e40e9bff0566baae7758,bd52552db927fca11ab0e6ef56313545,96c5fb314a29a3823d859c5e42715504,2021-05-01 00:01:38,ee9333ad67115b5c0e4f470005d70690,1cec9761413161c9fe880e3da3e2df8f,59.445,DEBITO,2021-05-01
1427732,817d18cd3c31e40e9bff0566baae7758,fd6ce552d6bb1c08962972f76c0542ba,e5bbc3d7754ee7365c6a891bdc430e55,2021-05-01 00:02:29,d31c77f805ea71310e87dc7f50ec23cc,d4c9a85282784a6d345bf4ee7b16bd8e,59.445,DEBITO,2021-05-01
1162276,817d18cd3c31e40e9bff0566baae7758,a386ff46edf4fb56d9f0bdd72f34008a,e276b9fe27f12e216543bf51b0094dc2,2021-05-01 00:06:18,263fc0d031917c95c98cb61be2ba3f54,d51a637a41280f462d06fa15b361ad09,23.778,DEBITO,2021-05-01
