In [1]:
from pyspark.sql import SparkSession


In [2]:
spark = SparkSession.builder \
    .appName("MercadoPago") \
    .getOrCreate()

In [3]:
#historial de 1 mes de pagos realizados por los usuarios
pays_path = '/home/jovyan/work/spark_env/pays.csv'

#historial de 1 mes de value props que fueron mostradas a cada usuario
prints_path = '/home/jovyan/work/spark_env/prints.json'

#historial de 1 mes de value props que fueron clickeadas por un usuario
taps_path = '/home/jovyan/work/spark_env/taps.json'

In [4]:
from pyspark.sql.functions import to_date, when, count, window, desc, asc, sum
from pyspark.sql.types import IntegerType, StringType

In [5]:
paysDF = spark.read.csv(path=pays_path, header=True)

In [6]:
# printsDF
# historial de 1 mes de value props que fueron mostradas a cada usuario

In [7]:
printsDF = spark.read.json(prints_path)

In [8]:
printsDF = printsDF.withColumn('day_prints', to_date(printsDF['day'], 'yyyy-MM-dd'))
printsDF = printsDF.withColumn('position_prints', printsDF['event_data.position'].cast(IntegerType())) \
                           .withColumn('value_prop_prints', printsDF['event_data.value_prop'].cast(StringType()))
printsDF = printsDF.drop('event_data')
printsDF = printsDF.drop('day')
printsDF = printsDF.withColumn('user_id', printsDF['user_id'].cast(IntegerType()))

In [9]:
printsDF.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- day_prints: date (nullable = true)
 |-- position_prints: integer (nullable = true)
 |-- value_prop_prints: string (nullable = true)



In [10]:
printsDF.show()

+-------+----------+---------------+------------------+
|user_id|day_prints|position_prints| value_prop_prints|
+-------+----------+---------------+------------------+
|  98702|2020-11-01|              0|cellphone_recharge|
|  98702|2020-11-01|              1|           prepaid|
|  63252|2020-11-01|              0|           prepaid|
|  24728|2020-11-01|              0|cellphone_recharge|
|  24728|2020-11-01|              1|        link_cobro|
|  24728|2020-11-01|              2|  credits_consumer|
|  24728|2020-11-01|              3|             point|
|  25517|2020-11-01|              0|             point|
|  25517|2020-11-01|              1|  credits_consumer|
|  25517|2020-11-01|              2|         transport|
|  57587|2020-11-01|              0|             point|
|  13609|2020-11-01|              0|         transport|
|   3708|2020-11-01|              0|cellphone_recharge|
|   3708|2020-11-01|              1|           prepaid|
|   3708|2020-11-01|              2|            

In [11]:
#tapsDF
#historial de 1 mes de value props que fueron clickeadas por un usuario

In [12]:
tapsDF = spark.read.json(taps_path)

In [13]:
tapsDF = tapsDF.withColumn('day_taps', to_date(tapsDF['day'], 'yyyy-MM-dd'))
tapsDF = tapsDF.withColumn('position_taps', tapsDF['event_data.position'].cast(IntegerType())) \
                           .withColumn('value_prop_taps', tapsDF['event_data.value_prop'].cast(StringType()))
tapsDF = tapsDF.drop('event_data')
tapsDF = tapsDF.drop('day')
tapsDF = tapsDF.withColumn('user_id', tapsDF['user_id'].cast(IntegerType()))

In [14]:
tapsDF.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- day_taps: date (nullable = true)
 |-- position_taps: integer (nullable = true)
 |-- value_prop_taps: string (nullable = true)



In [15]:
# paysDF

# historial de 1 mes de pagos realizados por los usuarios

In [16]:
paysDF = spark.read.csv(path=pays_path, header=True)

In [17]:
paysDF = paysDF.withColumn('pay_date', to_date(paysDF['pay_date'], 'yyyy-MM-dd'))
paysDF = paysDF.withColumn('total', paysDF['total'].cast(IntegerType()))
paysDF = paysDF.withColumn('user_id', paysDF['user_id'].cast(IntegerType()))
paysDF = paysDF.withColumn('value_prop', paysDF['value_prop'].cast(StringType()))

In [18]:
paysDF.printSchema()

root
 |-- pay_date: date (nullable = true)
 |-- total: integer (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- value_prop: string (nullable = true)



In [19]:
paysDF.show()

+----------+-----+-------+------------------+
|  pay_date|total|user_id|        value_prop|
+----------+-----+-------+------------------+
|2020-11-01|    7|  35994|        link_cobro|
|2020-11-01|   37|  79066|cellphone_recharge|
|2020-11-01|   15|  19321|cellphone_recharge|
|2020-11-01|   26|  19321|        send_money|
|2020-11-01|   35|  38438|        send_money|
|2020-11-01|   20|  85939|         transport|
|2020-11-01|   74|  14372|           prepaid|
|2020-11-01|   31|  14372|        link_cobro|
|2020-11-01|   83|  65274|         transport|
|2020-11-01|   93|  65274|           prepaid|
|2020-11-01|   37|  97428|        link_cobro|
|2020-11-01|   26|  82163|        link_cobro|
|2020-11-01|   92|   9816|        send_money|
|2020-11-01|  122|   9816|           prepaid|
|2020-11-01|   83|  28929|           prepaid|
|2020-11-01|  136|  97275|        link_cobro|
|2020-11-01|   17|  85001|cellphone_recharge|
|2020-11-01|   41|  85001|        link_cobro|
|2020-11-01|   31|     42|        

In [20]:
# Desarrollo

In [21]:
# Campo que indique si se hizo click o no

In [22]:
prints_click_DF = printsDF.withColumn('click', when(printsDF['position_prints'] != 0, True).otherwise(False))
prints_click_DF.show()

+-------+----------+---------------+------------------+-----+
|user_id|day_prints|position_prints| value_prop_prints|click|
+-------+----------+---------------+------------------+-----+
|  98702|2020-11-01|              0|cellphone_recharge|false|
|  98702|2020-11-01|              1|           prepaid| true|
|  63252|2020-11-01|              0|           prepaid|false|
|  24728|2020-11-01|              0|cellphone_recharge|false|
|  24728|2020-11-01|              1|        link_cobro| true|
|  24728|2020-11-01|              2|  credits_consumer| true|
|  24728|2020-11-01|              3|             point| true|
|  25517|2020-11-01|              0|             point|false|
|  25517|2020-11-01|              1|  credits_consumer| true|
|  25517|2020-11-01|              2|         transport| true|
|  57587|2020-11-01|              0|             point|false|
|  13609|2020-11-01|              0|         transport|false|
|   3708|2020-11-01|              0|cellphone_recharge|false|
|   3708

In [23]:
# Cantidad de veces que el usuario vio cada value prop en las 3 semanas previas a ese print.

In [24]:
prints_view_quantity_DF = printsDF.groupBy('user_id', 'value_prop_prints', window('day_prints', '3 weeks').alias('three_weeks')).agg(count('*').alias('view_quantity'))
prints_view_quantity_DF.show()

+-------+------------------+--------------------+-------------+
|user_id| value_prop_prints|         three_weeks|view_quantity|
+-------+------------------+--------------------+-------------+
|  42041|  credits_consumer|{2020-10-29 00:00...|            2|
|  56144|             point|{2020-10-29 00:00...|            1|
|  98034|             point|{2020-10-29 00:00...|            1|
|  75123|        send_money|{2020-10-29 00:00...|            2|
|  59128|           prepaid|{2020-10-29 00:00...|            2|
|  89301|        send_money|{2020-10-29 00:00...|            1|
|  37961|         transport|{2020-10-29 00:00...|            1|
|  94602|             point|{2020-10-29 00:00...|            2|
|  69738|        send_money|{2020-10-29 00:00...|            2|
|  11260|        link_cobro|{2020-10-29 00:00...|            1|
|  90349|  credits_consumer|{2020-10-29 00:00...|            1|
|  38868|cellphone_recharge|{2020-10-29 00:00...|            1|
|  86185|        link_cobro|{2020-10-29 

In [25]:
#Contar la cantidad de veces que el usuario clickeó cada value prop en las 3 semanas previas

In [26]:
taps_clicks_quantity_DF = tapsDF.groupBy('user_id', 'value_prop_taps', window('day_taps', '3 weeks').alias('three_weeks')).agg(count('*').alias('clicks_quantity'))
taps_clicks_quantity_DF.show()

+-------+------------------+--------------------+---------------+
|user_id|   value_prop_taps|         three_weeks|clicks_quantity|
+-------+------------------+--------------------+---------------+
|  60205|        send_money|{2020-10-29 00:00...|              1|
|  43783|         transport|{2020-10-29 00:00...|              1|
|  38580|             point|{2020-10-29 00:00...|              1|
|  21441|        send_money|{2020-10-29 00:00...|              1|
|  14647|cellphone_recharge|{2020-10-29 00:00...|              1|
|  83278|         transport|{2020-10-29 00:00...|              1|
|  76897|         transport|{2020-10-29 00:00...|              1|
|  15385|cellphone_recharge|{2020-10-29 00:00...|              1|
|  99988|             point|{2020-10-29 00:00...|              1|
|  70644|         transport|{2020-10-29 00:00...|              1|
|   6157|        send_money|{2020-10-29 00:00...|              1|
|  12904|        send_money|{2020-10-29 00:00...|              1|
|  94177| 

In [27]:
# Contar la cantidad de pagos que el usuario realizó para cada value prop en las 3 semanas previas

In [28]:
pays_count_DF = paysDF.groupBy('user_id', 'value_prop', window('pay_date', '3 weeks').alias('three_weeks')).agg(count('*').alias('pays_count'))
pays_count_DF.show()

+-------+------------------+--------------------+----------+
|user_id|        value_prop|         three_weeks|pays_count|
+-------+------------------+--------------------+----------+
|   5622|             point|{2020-10-29 00:00...|         1|
|  91821|           prepaid|{2020-10-29 00:00...|         1|
|  70766|        link_cobro|{2020-10-29 00:00...|         1|
|  57886|  credits_consumer|{2020-10-29 00:00...|         2|
|  51134|             point|{2020-10-29 00:00...|         2|
|   1902|        send_money|{2020-10-29 00:00...|         1|
|  36042|  credits_consumer|{2020-10-29 00:00...|         1|
|  66483|  credits_consumer|{2020-10-29 00:00...|         1|
|  39625|             point|{2020-10-29 00:00...|         1|
|  56407|        send_money|{2020-10-29 00:00...|         2|
|  11599|cellphone_recharge|{2020-10-29 00:00...|         3|
|  30829|           prepaid|{2020-10-29 00:00...|         1|
|  49762|         transport|{2020-10-29 00:00...|         2|
|  86770|           prep

In [29]:
# Calcular importes acumulados que el usuario gastó para cada value prop en las 3 semanas previas

In [30]:
pays_total_DF = paysDF.groupBy('user_id', 'value_prop', window('pay_date', '3 weeks').alias('three_Weeks')).agg(sum('total').alias('pays_total'))
pays_total_DF.show()

+-------+------------------+--------------------+----------+
|user_id|        value_prop|         three_Weeks|pays_total|
+-------+------------------+--------------------+----------+
|   5622|             point|{2020-10-29 00:00...|       178|
|  91821|           prepaid|{2020-10-29 00:00...|        12|
|  70766|        link_cobro|{2020-10-29 00:00...|         5|
|  57886|  credits_consumer|{2020-10-29 00:00...|        45|
|  51134|             point|{2020-10-29 00:00...|        64|
|   1902|        send_money|{2020-10-29 00:00...|       185|
|  36042|  credits_consumer|{2020-10-29 00:00...|       184|
|  66483|  credits_consumer|{2020-10-29 00:00...|        10|
|  39625|             point|{2020-10-29 00:00...|        72|
|  56407|        send_money|{2020-10-29 00:00...|       127|
|  11599|cellphone_recharge|{2020-10-29 00:00...|       153|
|  30829|           prepaid|{2020-10-29 00:00...|        41|
|  49762|         transport|{2020-10-29 00:00...|       281|
|  86770|           prep

In [31]:
#DB conection

In [32]:
!pip install sqlalchemy databases
!pip install asyncpg
!pip install psycopg2-binary



Collecting databases
  Downloading databases-0.9.0-py3-none-any.whl.metadata (5.4 kB)
Downloading databases-0.9.0-py3-none-any.whl (25 kB)
Installing collected packages: databases
Successfully installed databases-0.9.0
Collecting asyncpg
  Downloading asyncpg-0.29.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (4.4 kB)
Collecting async-timeout>=4.0.3 (from asyncpg)
  Downloading async_timeout-4.0.3-py3-none-any.whl.metadata (4.2 kB)
Downloading asyncpg-0.29.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (2.8 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.8/2.8 MB[0m [31m6.4 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m0m
[?25hDownloading async_timeout-4.0.3-py3-none-any.whl (5.7 kB)
Installing collected packages: async-timeout, asyncpg
Successfully installed async-timeout-4.0.3 asyncpg-0.29.0
Collecting psycopg2-binary
  Downloading psycopg2_binary-2.9.9-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (4

In [33]:
import sqlalchemy
import databases


DATABASE_URL = f"postgresql://myuser:mypassword@postgres-db:5432/mercadoPago"

database = databases.Database(DATABASE_URL)

metadata = sqlalchemy.MetaData()

engine = sqlalchemy.create_engine(DATABASE_URL)

metadata.create_all(engine)

In [34]:
async def check_connection():
    try:
        await database.connect()
        print("La conexión con la base de datos se estableció correctamente. {database.status_code}")
    except Exception as e:
        print(f"Error al conectar a la base de datos: {e}")
    finally:
        await database.disconnect()

# Llama a la función para verificar la conexión
await check_connection()

La conexión con la base de datos se estableció correctamente. {database.status_code}


In [35]:
import pandas as pd

In [36]:
printsDF_pandas = printsDF.toPandas()
tapsDF_pandas = tapsDF.toPandas()
paysDF_pandas = paysDF.toPandas()

In [37]:
printsDF_pandas.head()

Unnamed: 0,user_id,day_prints,position_prints,value_prop_prints
0,98702,2020-11-01,0,cellphone_recharge
1,98702,2020-11-01,1,prepaid
2,63252,2020-11-01,0,prepaid
3,24728,2020-11-01,0,cellphone_recharge
4,24728,2020-11-01,1,link_cobro


In [38]:
printsDF_insert = printsDF_pandas.to_dict(orient='records')
tapsDF_insert = tapsDF_pandas.to_dict(orient='records')
paysDF_insert = paysDF_pandas.to_dict(orient='records')

In [39]:
print([x for x in paysDF_insert[:5]])

[{'pay_date': datetime.date(2020, 11, 1), 'total': 7, 'user_id': 35994, 'value_prop': 'link_cobro'}, {'pay_date': datetime.date(2020, 11, 1), 'total': 37, 'user_id': 79066, 'value_prop': 'cellphone_recharge'}, {'pay_date': datetime.date(2020, 11, 1), 'total': 15, 'user_id': 19321, 'value_prop': 'cellphone_recharge'}, {'pay_date': datetime.date(2020, 11, 1), 'total': 26, 'user_id': 19321, 'value_prop': 'send_money'}, {'pay_date': datetime.date(2020, 11, 1), 'total': 35, 'user_id': 38438, 'value_prop': 'send_money'}]


In [None]:
async def insert_data():
    try:
        await database.connect()
        for record in printsDF_insert[:20]:
            await database.execute("INSERT INTO prints (user_id, day_prints, position_prints, value_prop_prints) VALUES (:user_id, :day_prints, :position_prints, :value_prop_prints)", values=record)
        
        for record in tapsDF_insert[:20]:
            await database.execute("INSERT INTO taps (user_id, day_taps, position_taps, value_prop_taps) VALUES (:user_id, :day_taps, :position_taps, :value_prop_taps)", values=record)
        
        for record in paysDF_insert[:20]:
            await database.execute("INSERT INTO pays (pay_date, total, user_id, value_prop) VALUES (:pay_date, :total, :user_id, :value_prop)", values=record)
    except Exception as e:
        print(f"Error al insertar datos: {e}")
    finally:
        await database.disconnect()

await insert_data()

In [None]:
async def select_data(table_name):
    try:
        await database.connect()
        
        query = f"SELECT * FROM {table_name};"
        result = await database.fetch_all(query)
        
        print(f"Contenido de la tabla '{table_name}':")
        for row in result:
            row_dict = dict(row)
            for column, value in row_dict.items():
                print(f"{column}: {value}")
            print('----------------')
    except Exception as e:
        print(f"Error al seleccionar datos de la tabla '{table_name}': {e}")
    finally:
        await database.disconnect()

await select_data('prints')
