In [None]:
# Se instala la libreria astrapy para conexion con la BD de Cassandra
!pip install --upgrade astrapy

Collecting astrapy
  Downloading astrapy-2.0.1-py3-none-any.whl.metadata (23 kB)
Collecting deprecation<2.2.0,>=2.1.0 (from astrapy)
  Downloading deprecation-2.1.0-py2.py3-none-any.whl.metadata (4.6 kB)
Collecting pymongo>=3 (from astrapy)
  Downloading pymongo-4.13.2-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (22 kB)
Collecting uuid6>=2024.1.12 (from astrapy)
  Downloading uuid6-2025.0.0-py3-none-any.whl.metadata (10 kB)
Collecting dnspython<3.0.0,>=1.16.0 (from pymongo>=3->astrapy)
  Downloading dnspython-2.7.0-py3-none-any.whl.metadata (5.8 kB)
Downloading astrapy-2.0.1-py3-none-any.whl (300 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m300.5/300.5 kB[0m [31m7.7 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading deprecation-2.1.0-py2.py3-none-any.whl (11 kB)
Downloading pymongo-4.13.2-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.4 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.4/1.4 MB[0m [31m29.6 M

In [None]:
# Se importan las librerias necesarias
from astrapy import DataAPIClient
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from datetime import datetime
from pyspark.sql.types import IntegerType, TimestampType
from pyspark.sql.functions import to_timestamp
from pyspark.sql.functions import col, sum, when
from pyspark.sql import functions as F

In [None]:
# Se carga el token generado por nuestra BD Cassandra
from google.colab import userdata
token = userdata.get('token')

In [None]:
# Se realiza la conexion con la BD
client = DataAPIClient(token)
db = client.get_database_by_api_endpoint(
  "https://4d0012cb-974a-4358-8adc-9674bb16b79d-us-east-2.apps.astra.datastax.com"
)

print(f"Connected to Astra DB: {db.list_collection_names()}")

Connected to Astra DB: []


In [None]:
# Se define una funcion para la carga en la BD
def insertar(df, coleccion, keyspace):
    for col in df.columns:
        if df[col].dtype == 'datetime64[ns]':
            df[col] = df[col].dt.isoformat()

    collection = db.create_collection(coleccion, keyspace=keyspace)
    collection.insert_many(df.to_dict(orient="records"))
    print(f"Colección {coleccion} fue correctamente poblada...!")

In [None]:
# Se leen los archivos .csv
users = pd.read_csv('/dim_users.csv')
onboarding = pd.read_csv('/fact_users_onboarding.csv')
transactions = pd.read_csv('/fact_users_transactions.csv')

In [None]:
users.head()

Unnamed: 0.1,Unnamed: 0,user_id,name,email,address,birth_dt,phone,type,rubro
0,0,MLB410638994850,Carlos Eduardo Moura,cardosoisabel@example.com,"Feira Sales, 51\nCarmo\n49826-972 da Mata do C...",1958-03-07,84 8887 0394,8.0,1.0
1,1,MLB44585068070,João Vitor Cunha,ana-luizarodrigues@example.com,"Distrito da Luz, 4\nFrei Leopoldo\n71718588 Co...",1978-06-09,51 9358 7614,8.0,9.0
2,2,MLB4815806440,Fernando Costa,nogueirathiago@example.org,"Passarela da Rosa, 95\nGlória\n46105-403 Melo ...",2001-03-13,0300 560 1075,9.0,2.0
3,3,MLB3944955860,Maria Clara Azevedo,souzafrancisco@example.org,"Setor Yago Almeida, 7\nParaíso\n88387-297 Silv...",1915-03-05,+55 (011) 5809 4712,,
4,4,MLB747760490,Srta. Fernanda Jesus,nicolas88@example.org,"Lago de da Rosa, 85\nBelvedere\n41665068 da Ro...",1994-09-27,+55 51 6154 7175,,


In [None]:
onboarding.head()

Unnamed: 0.2,Unnamed: 0.1,Unnamed: 0,first_login_dt,week_year,user_id,habito,habito_dt,activacion,activacion_dt,setup,setup_dt,return,return_dt
0,0,9847.0,2022-01-03,1,MLB7745503990,0.0,,0,,0,,0,
1,1,3346.0,2022-01-04,1,MLB10508061470,0.0,,1,2022-01-06,1,2022-01-04,1,2022-01-06
2,2,11261.0,2022-01-23 00:00:00,3,MLB10618286450,0.0,,0,,0,,0,
3,3,6273.0,2022-01-03,1,MLB743362200,0.0,,1,2022-01-03,0,,0,
4,4,860.0,2022-01-16,2,MLB2467761450,0.0,,1,2022-01-17,1,2022-01-17,1,2022-01-17


In [None]:
transactions.head()

Unnamed: 0.1,Unnamed: 0,user_id,transaction_dt,type,segment
0,0,MLB410542856680,2022-01-20 23:05:07.884739087,9,2
1,1,MLB10610169410,2022-02-06 07:55:58.674812703,1,1
2,2,MLB7813965430,2022-01-25 17:08:53.753615635,6,1
3,3,MLB410646227340,2022-01-28 00:00:00.000000000,8,2
4,4,MLB10664964390,2022-02-02 19:04:18.359600977,7,1


### Carga datos en capa Landing

In [None]:
# Se carga la tabla users en capa Landing

users['type'] = users['type'].replace({np.nan: None})
users['rubro'] = users['rubro'].replace({np.nan: None})

users = users.drop(columns=['Unnamed: 0'])

insertar(users, 'users', 'Landing')

Colección users fue correctamente poblada...!


In [None]:
# Se carga la tabla onboarding en capa Landing

onboarding = onboarding.drop(columns=['Unnamed: 0.1'])

onboarding = onboarding.replace({np.nan: None})

date_cols = ['first_login_dt', 'habito_dt', 'activacion_dt', 'setup_dt', 'return_dt']
for col in date_cols:
    onboarding[col] = onboarding[col].astype(str).replace('NaT', 'None')

insertar(onboarding, 'onboarding', 'Landing')

Colección onboarding fue correctamente poblada...!


In [None]:
# Se carga la tabla transactions en capa Landing

transactions = transactions.drop(columns=['Unnamed: 0'])

for col in transactions.columns:
    if transactions[col].dtype == 'int':
        transactions[col] = transactions[col].replace({np.nan: None})

transactions['transaction_dt'] = transactions['transaction_dt'].astype(str)

insertar(transactions, 'transactions', 'Landing')

Colección transactions fue correctamente poblada...!


### Descarga desde Astra capa Landing

In [None]:
# Se inicializa la sesion de Spark, herramienta que utlizaremos para transformación y manipulación de los datos
spark = SparkSession.builder \
        .appName("Proyecto Mineria de datos II") \
        .config("spark.sql.catalogImplementation", "hive") \
        .enableHiveSupport() \
        .getOrCreate()

In [None]:
# Se obtiene la colección users desde la BD
users = db.get_collection('users', keyspace='Landing')
users

Collection(name="users", keyspace="Landing", database.api_endpoint="https://4d0012cb-974a-4358-8adc-9674bb16b79d-us-east-2.apps.astra.datastax.com", api_options=FullAPIOptions(token=StaticTokenProvider(AstraCS:ZuQC...), ...))

In [None]:
# Se obtiene la colección onboarding desde la BD
onboarding = db.get_collection('onboarding', keyspace='Landing')
onboarding

Collection(name="onboarding", keyspace="Landing", database.api_endpoint="https://4d0012cb-974a-4358-8adc-9674bb16b79d-us-east-2.apps.astra.datastax.com", api_options=FullAPIOptions(token=StaticTokenProvider(AstraCS:ZuQC...), ...))

In [None]:
# Se obtiene la colección transactions desde la BD
transactions = db.get_collection('transactions', keyspace='Landing')
transactions

Collection(name="transactions", keyspace="Landing", database.api_endpoint="https://4d0012cb-974a-4358-8adc-9674bb16b79d-us-east-2.apps.astra.datastax.com", api_options=FullAPIOptions(token=StaticTokenProvider(AstraCS:ZuQC...), ...))

In [None]:
# Se define el esquema de users
schema_users = StructType([
    StructField("user_id", StringType()),
    StructField("name", StringType()),
    StructField("email", StringType()),
    StructField("address", StringType()),
    StructField("birth_dt", DateType()),
    StructField("phone", StringType()),
    StructField("type", FloatType()),
    StructField("rubro", StringType())
])

In [None]:
# Se define el esquema de onboarding
schema_onboarding = StructType([
    StructField("Unnamed: 0", StringType()),
    StructField("first_login_dt", StringType()),
    StructField("week_year", IntegerType()),
    StructField("user_id", StringType()),
    StructField("habito", FloatType()),
    StructField("habito_dt", StringType()),
    StructField("activacion", IntegerType()),
    StructField("activacion_dt", StringType()),
    StructField("setup", IntegerType()),
    StructField("setup_dt", StringType()),
    StructField("return", IntegerType()),
    StructField("return_dt", StringType())
])

In [None]:
# Se define el esquema de transactions
schema_transactions = StructType([
    StructField("user_id", StringType()),
    StructField("transaction_dt", StringType()),
    StructField("type", IntegerType()),
    StructField("segment", IntegerType())
])

In [None]:
# Se recorre los documentos de users y con cada registro se crea una lista de tuplas
reg_users = [
    (
        str(doc.get("user_id", "")),
        str(doc.get("name", "")),
        str(doc.get("email", "")),
        str(doc.get("address", "")),
        datetime.strptime(doc.get("birth_dt", ""), "%Y-%m-%d").date() if doc.get("birth_dt") else None,
        str(doc.get("phone", "")),
        float(doc.get("type", None)) if doc.get("type") is not None else None,
        str(doc.get("rubro", ""))
    )
    for doc in users.find()
]

In [None]:
# Se recorre los documentos de onboarding y con cada registro se crea una lista de tuplas
reg_onboarding = [
    (
        str(doc.get("Unnamed: 0", "")),
        str(doc.get("first_login_dt", "")),
        int(doc.get("week_year", 0)),
        str(doc.get("user_id", "")),
        float(doc.get("habito", None)) if doc.get("habito") is not None else None,
        str(doc.get("habito_dt", "")),
        int(doc.get("activacion", 0)),
        str(doc.get("activacion_dt", "")),
        int(doc.get("setup", 0)),
        str(doc.get("setup_dt", "")),
        int(doc.get("return", 0)),
        str(doc.get("return_dt", ""))
    )
    for doc in onboarding.find()
]

In [None]:
# Se recorre los documentos de transactions y con cada registro se crea una lista de tuplas
reg_transactions = [
    (
        str(doc.get("user_id", "")),
        str(doc.get("transaction_dt", "")),
        int(doc.get("type", 0)),
        int(doc.get("segment", 0)),
        )
    for doc in transactions.find()
]

In [None]:
# Se crea el df de spark para users
users_df = spark.createDataFrame(reg_users, schema_users)

In [None]:
# Se crea el df de spark para onboarding
onboarding_df = spark.createDataFrame(reg_onboarding, schema_onboarding)

In [None]:
# Se crea el df de spark para transactions
transactions_df = spark.createDataFrame(reg_transactions, schema_transactions)

### Limpieza y transformación de datos

In [None]:
# Se eliminan columnas innecesarias de users
users_df = users_df.drop('name', 'email', 'address', 'birth_dt', 'phone', 'type')

In [None]:
# Se eliminan columnas innecesarias de onboarding
onboarding_df = onboarding_df.drop('Unnamed: 0', 'return_dt')

In [None]:
users_df.show()

+---------------+-----+
|        user_id|rubro|
+---------------+-----+
|  MLB7383247150| None|
|  MLB6042969380| None|
| MLB10648640630| None|
|  MLB4854341820| None|
| MLB10611434700| None|
|  MLB1330275630| None|
|MLB4843136640-1| None|
|  MLB7912215780| None|
| MLB10546093560| None|
|MLB8218727920-1| None|
| MLB10546394350| None|
|  MLB1270429620| None|
|  MLB2843378020| None|
|  MLB7709809260| None|
| MLB10501603390| None|
|  MLB3376605710| None|
| MLB10542704690| None|
| MLB42261029540|    5|
| MLB10594263420| None|
| MLB10505782870| None|
+---------------+-----+
only showing top 20 rows



In [None]:
# Se remplazan None por 0 en la columna rubro y se la transforma a int
users_df = users_df.replace({'None': '0'}, subset=['rubro'])
users_df = users_df.withColumn('rubro', users_df['rubro'].cast(IntegerType()))

# Se eliminan duplicados si los hubiera
users_df = users_df.dropDuplicates(['user_id', 'rubro'])

users_df.show()


+----------------+-----+
|         user_id|rubro|
+----------------+-----+
|    MLB307947990|    0|
|  MLB43711440690|    7|
| MLB1465538830-1|    0|
| MLB3144375440-1|    0|
|   MLB4467229230|    0|
|  MLB10546351270|    0|
|  MLB10004759110|    0|
|MLB10642449350-1|    0|
|   MLB2743543900|    0|
|   MLB1576067360|    0|
|    MLB311099390|    0|
|   MLB4125428140|    0|
|   MLB2081112380|    0|
|   MLB6767805510|    0|
|   MLB3802172970|    0|
|  MLB10289924710|    0|
|    MLB832900320|    0|
|   MLB4215323460|    0|
|   MLB1931306250|    0|
| MLB410642554250|    1|
+----------------+-----+
only showing top 20 rows



In [None]:
# Se checkea que no haya ids nulos
users_df.filter("user_id is NULL").show()

+-------+-----+
|user_id|rubro|
+-------+-----+
+-------+-----+



In [None]:
onboarding_df.show()

+-------------------+---------+----------------+------+----------+----------+-------------------+-----+-------------------+------+
|     first_login_dt|week_year|         user_id|habito| habito_dt|activacion|      activacion_dt|setup|           setup_dt|return|
+-------------------+---------+----------------+------+----------+----------+-------------------+-----+-------------------+------+
|         2022-01-28|        4|  MLB41927705020|   0.0|      None|         0|               None|    0|               None|     0|
|         2022-01-12|        2|  MLB45561165510|   0.0|      None|         0|               None|    0|               None|     0|
|         2022-01-04|        1|  MLB10510053700|   1.0|2022-01-24|         1|         2022-01-04|    1|         2022-01-04|     1|
|         2022-01-09|        1|    MLB759679770|   0.0|      None|         0|               None|    0|               None|     0|
|         2022-01-13|        2| MLB410562268630|   0.0|      None|         0|      

In [None]:
# Se rellena la columna habito con 0.0 y se transforman las columnas con fechas a tipo fecha (con esto se busca homogeneizar los formatos)
onboarding_df = onboarding_df.fillna({"habito": 0.0})
onboarding_df = onboarding_df.withColumn('first_login_dt', onboarding_df['first_login_dt'].cast(DateType()))
onboarding_df = onboarding_df.withColumn('habito_dt', onboarding_df['habito_dt'].cast(DateType()))
onboarding_df = onboarding_df.withColumn('activacion_dt', onboarding_df['activacion_dt'].cast(DateType()))
onboarding_df = onboarding_df.withColumn('setup_dt', onboarding_df['setup_dt'].cast(DateType()))
onboarding_df = onboarding_df.withColumn('habito', onboarding_df['habito'].cast(IntegerType()))

# Las columnas de fecha se las vuelve a transformar a str que son mas facilmente manejables por la BD
onboarding_df = onboarding_df.withColumn('first_login_dt', onboarding_df['first_login_dt'].cast(StringType()))
onboarding_df = onboarding_df.withColumn('habito_dt', onboarding_df['habito_dt'].cast(StringType()))
onboarding_df = onboarding_df.withColumn('activacion_dt', onboarding_df['activacion_dt'].cast(StringType()))
onboarding_df = onboarding_df.withColumn('setup_dt', onboarding_df['setup_dt'].cast(StringType()))

# Se eliminan los duplicados si los hubiera
onboarding_df = onboarding_df.dropDuplicates()

onboarding_df.show()

+--------------+---------+---------------+------+----------+----------+-------------+-----+----------+------+
|first_login_dt|week_year|        user_id|habito| habito_dt|activacion|activacion_dt|setup|  setup_dt|return|
+--------------+---------+---------------+------+----------+----------+-------------+-----+----------+------+
|    2022-01-24|        4|  MLB6441174910|     1|2022-02-14|         1|   2022-01-19|    1|2022-01-28|     1|
|    2022-01-06|        1| MLB10523352060|     1|2022-01-19|         1|   2022-01-06|    1|2022-01-06|     1|
|    2022-01-14|        2| MLB10570013550|     1|2022-01-15|         1|   2022-01-14|    1|2022-01-17|     1|
|    2022-01-15|        2|  MLB3999980600|     1|2022-01-17|         1|   2022-01-16|    1|2022-01-15|     1|
|    2022-01-27|        4|MLB410635680280|     1|2022-02-10|         1|   2022-01-28|    1|2022-01-28|     1|
|    2022-01-17|        3|  MLB5418067050|     1|2022-01-30|         1|   2022-01-17|    1|2022-01-17|     1|
|    2022-

In [None]:
# Se realiza un left join con la tabla transactions
onboarding_with_transactions = onboarding_df.join(transactions_df, "user_id", "left")

# Se actualiza la columna activacion en funcion de si se encuentra o no una transaccion para ese cliente
onboarding_df = onboarding_with_transactions.withColumn(
    "activacion",
    F.when(F.col("transaction_dt").isNotNull(), 1).otherwise(0)
)

# Se actualiza la columna activacion_dt en funcion de si se encuentra o no una fecha de transaccion para ese cliente y luego se eliminan las columnas agregadas
onboarding_df = onboarding_with_transactions.withColumn(
    "activacion_dt",
    F.when(F.col("transaction_dt").isNull(), None).otherwise(F.col("transaction_dt"))
).drop("transaction_dt", "type", "segment")

# La columna activacion_dt se tranforma a fecha para homogeneizar formato, y luego a str para su carga
onboarding_df = onboarding_df.withColumn('activacion_dt', onboarding_df['activacion_dt'].cast(DateType()))
onboarding_df = onboarding_df.withColumn('activacion_dt', onboarding_df['activacion_dt'].cast(StringType()))

# Se eliminan duplicados si los hubiera
onboarding_df = onboarding_df.dropDuplicates()

onboarding_df.show()

+---------------+--------------+---------+------+----------+----------+-------------+-----+----------+------+
|        user_id|first_login_dt|week_year|habito| habito_dt|activacion|activacion_dt|setup|  setup_dt|return|
+---------------+--------------+---------+------+----------+----------+-------------+-----+----------+------+
|  MLB4854341820|    2022-01-03|        1|     1|2022-01-18|         1|   2022-01-15|    1|2022-01-03|     1|
|  MLB6493787800|    2022-01-29|        4|     1|2022-02-20|         1|   2022-02-16|    1|2022-02-01|     1|
| MLB10564105150|    2022-01-14|        2|     1|2022-02-01|         1|   2022-01-30|    1|2022-01-14|     1|
|  MLB2274734870|    2022-01-05|        1|     1|2022-01-28|         1|   2022-01-23|    1|2022-01-05|     1|
|MLB410390720050|    2022-01-16|        2|     1|2022-01-16|         1|   2022-01-16|    1|2022-01-21|     1|
| MLB10263203860|    2022-01-07|        1|     1|2022-01-23|         1|   2022-01-19|    1|2022-01-07|     1|
|MLB410523

In [None]:
# Analizando el negocio, concluimos lo siguiente:
#       si hay habito, debe haber activacion
#       si hay activacion, debe haber setup
#       si hay habito, debe haber return
# Actualizamos las columnas segun ese criterio
onboarding_df = onboarding_df.withColumn("activacion", F.when(F.col("habito") == 1, 1).otherwise(F.col("activacion")))
onboarding_df = onboarding_df.withColumn("setup", F.when(F.col("activacion") == 1, 1).otherwise(F.col("setup")))
onboarding_df = onboarding_df.withColumn("return", F.when(F.col("habito") == 1, 1).otherwise(F.col("return")))

# Se rellenan las columnas activacion, setup y return con 0, ya que en el paso anterior modificamos a 1 cuando correspondía segun el criterio aplicado
onboarding_df = onboarding_df.fillna({"activacion": 0})
onboarding_df = onboarding_df.fillna({"setup": 0})
onboarding_df = onboarding_df.fillna({"return": 0})

onboarding_df.show()

+---------------+--------------+---------+------+----------+----------+-------------+-----+----------+------+
|        user_id|first_login_dt|week_year|habito| habito_dt|activacion|activacion_dt|setup|  setup_dt|return|
+---------------+--------------+---------+------+----------+----------+-------------+-----+----------+------+
|  MLB4854341820|    2022-01-03|        1|     1|2022-01-18|         1|   2022-01-15|    1|2022-01-03|     1|
|  MLB6493787800|    2022-01-29|        4|     1|2022-02-20|         1|   2022-02-16|    1|2022-02-01|     1|
| MLB10564105150|    2022-01-14|        2|     1|2022-02-01|         1|   2022-01-30|    1|2022-01-14|     1|
|  MLB2274734870|    2022-01-05|        1|     1|2022-01-28|         1|   2022-01-23|    1|2022-01-05|     1|
|MLB410390720050|    2022-01-16|        2|     1|2022-01-16|         1|   2022-01-16|    1|2022-01-21|     1|
| MLB10263203860|    2022-01-07|        1|     1|2022-01-23|         1|   2022-01-19|    1|2022-01-07|     1|
|MLB410523

In [None]:
# Se checkean ids nulos
onboarding_df.filter("user_id is NULL").show()

+-------+--------------+---------+------+---------+----------+-------------+-----+--------+------+
|user_id|first_login_dt|week_year|habito|habito_dt|activacion|activacion_dt|setup|setup_dt|return|
+-------+--------------+---------+------+---------+----------+-------------+-----+--------+------+
+-------+--------------+---------+------+---------+----------+-------------+-----+--------+------+



In [None]:
transactions_df.show()

+---------------+--------------------+----+-------+
|        user_id|      transaction_dt|type|segment|
+---------------+--------------------+----+-------+
|  MLB7579492460|2022-02-01 06:00:...|   6|      1|
|  MLB2509208880|2022-02-02 19:04:...|   6|      1|
| MLB10058048790|2022-01-28 03:00:...|   1|      1|
| MLB10533683550|2022-01-13 04:17:...|   4|      1|
|  MLB4004971550|2022-01-19 04:17:...|   3|      1|
| MLB10495770230|2022-01-19 06:39:...|   5|      1|
|  MLB3746295080|2022-02-04 09:00:...|   7|      1|
|MLB410225567310|2022-01-30 18:58:...|   8|      2|
| MLB10590429150|2022-01-23 13:30:...|   3|      1|
|MLB410631445822|2022-02-10 18:03:...|   9|      2|
|  MLB1524022700|2022-01-30 00:00:...|   4|      1|
| MLB10523141070|2022-01-09 14:08:...|   3|      1|
| MLB10542683560|2022-01-10 00:00:...|   2|      1|
|  MLB3120760160|2022-02-04 22:04:...|   6|      1|
| MLB10638958250|2022-02-17 10:56:...|   4|      1|
|  MLB7846926710|2022-01-25 03:38:...|   4|      1|
| MLB1054252

In [None]:
# Se eliminan duplicados si los hubiera
transactions_df = transactions_df.dropDuplicates()

# Se transforma la columna transaction_dt a TimeStamp para homogeneizar, y luego a str para su carga
transactions_df = transactions_df.withColumn('transaction_dt', F.col('transaction_dt').cast(TimestampType()))
transactions_df = transactions_df.withColumn('transaction_dt', transactions_df['transaction_dt'].cast(StringType()))

transactions_df.show()

+----------------+--------------------+----+-------+
|         user_id|      transaction_dt|type|segment|
+----------------+--------------------+----+-------+
|   MLB5366135920|2022-01-23 23:21:...|   5|      1|
|   MLB4735384670|2022-01-29 06:00:...|   6|      1|
|  MLB10648848950|2022-02-14 02:21:...|   1|      1|
|  MLB41739682480|2022-02-02 21:42:...|   8|      2|
|   MLB2587363500|2022-01-31 03:00:...|   6|      1|
|   MLB2159161160| 2022-01-27 00:00:00|   1|      1|
|  MLB10640012380|2022-02-03 08:34:...|   7|      1|
|  MLB10637519180|2022-01-27 19:04:...|   1|      1|
|  MLB10626498740|2022-01-27 14:08:...|   5|      1|
|  MLB10352333000|2022-02-01 03:00:...|   5|      1|
|  MLB10640525210|2022-01-31 04:17:...|   5|      1|
|MLB10634012110-1|2022-02-01 08:34:...|   1|      1|
|  MLB10656547340|2022-02-09 23:21:...|   2|      1|
|   MLB2026375510|2022-01-27 02:21:...|   4|      1|
|   MLB2698910180|2022-02-03 17:47:...|   4|      1|
|   MLB6441174910|2022-02-08 15:51:...|   6|  

In [None]:
# Se checkean ids nulos
transactions_df.filter("user_id is NULL").show()

+-------+--------------+----+-------+
|user_id|transaction_dt|type|segment|
+-------+--------------+----+-------+
+-------+--------------+----+-------+



In [None]:
# Se checkean transaction_dt nulas, encontrándose algunas
transactions_df.filter("transaction_dt is NULL").show()

+---------------+--------------+----+-------+
|        user_id|transaction_dt|type|segment|
+---------------+--------------+----+-------+
|MLB410554685650|          NULL|   8|      2|
| MLB10554685650|          NULL|   7|      1|
| MLB10554685650|          NULL|   4|      1|
| MLB10554685650|          NULL|   6|      1|
| MLB10661666960|          NULL|   7|      1|
|MLB410554685650|          NULL|   9|      2|
| MLB10660135740|          NULL|   2|      1|
| MLB10661666960|          NULL|   1|      1|
| MLB10660135740|          NULL|   5|      1|
| MLB10661666960|          NULL|   4|      1|
| MLB10554685650|          NULL|   5|      1|
| MLB10554685650|          NULL|   2|      1|
| MLB10660135740|          NULL|   3|      1|
| MLB10661666960|          NULL|   2|      1|
+---------------+--------------+----+-------+



In [None]:
# Transacciones que no se sabe cuando fueron realizadas no nos aportan a nuestro analisis, por lo que se decide eliminarlas
transactions_df = transactions_df.na.drop(subset=['transaction_dt'])

In [None]:
# Se recheckean transaction_dt nulas
transactions_df.filter("transaction_dt is NULL").show()

+-------+--------------+----+-------+
|user_id|transaction_dt|type|segment|
+-------+--------------+----+-------+
+-------+--------------+----+-------+



###  Subida a Astra capa Universal

In [None]:
# Se carga la tabla users en capa Universal
insertar(users_df.toPandas(), 'users', 'Universal')

Colección users fue correctamente poblada...!


In [None]:
# Se carga la tabla onboarding en capa Universal
insertar(onboarding_df.toPandas(), 'onboarding', 'Universal')

Colección onboarding fue correctamente poblada...!


In [None]:
# Se carga la tabla transactions en capa Universal
insertar(transactions_df.toPandas(), 'transactions', 'Universal')

Colección transactions fue correctamente poblada...!


### Descarga desde capa Universal

In [None]:
# Se obtiene la colección users desde la BD
users_univ = db.get_collection('users', keyspace='Universal')
users_univ

Collection(name="users", keyspace="Universal", database.api_endpoint="https://4d0012cb-974a-4358-8adc-9674bb16b79d-us-east-2.apps.astra.datastax.com", api_options=FullAPIOptions(token=StaticTokenProvider(AstraCS:ZuQC...), ...))

In [None]:
# Se obtiene la colección onboarding desde la BD
onboardin_univ = db.get_collection('onboarding', keyspace='Universal')
onboardin_univ

Collection(name="onboarding", keyspace="Universal", database.api_endpoint="https://4d0012cb-974a-4358-8adc-9674bb16b79d-us-east-2.apps.astra.datastax.com", api_options=FullAPIOptions(token=StaticTokenProvider(AstraCS:ZuQC...), ...))

In [None]:
# Se obtiene la colección transactions desde la BD
transactions_univ = db.get_collection('transactions', keyspace='Universal')
transactions_univ

Collection(name="transactions", keyspace="Universal", database.api_endpoint="https://4d0012cb-974a-4358-8adc-9674bb16b79d-us-east-2.apps.astra.datastax.com", api_options=FullAPIOptions(token=StaticTokenProvider(AstraCS:ZuQC...), ...))

In [None]:
# Se define el esquema de users
schema_users_univ = StructType([
    StructField("user_id", StringType()),
    StructField("rubro", IntegerType())
])

In [None]:
# Se define el esquema de onboarding
schema_onboarding_univ = StructType([
    StructField("user_id", StringType()),
    StructField("first_login_dt", StringType()),
    StructField("week_year", IntegerType()),
    StructField("habito", IntegerType()),
    StructField("habito_dt", StringType()),
    StructField("activacion", IntegerType()),
    StructField("activacion_dt", StringType()),
    StructField("setup", IntegerType()),
    StructField("setup_dt", StringType()),
    StructField("return", IntegerType()),
])

In [None]:
# Se define el esquema de transactions
schema_transactions_univ = StructType([
    StructField("user_id", StringType()),
    StructField("transaction_dt", StringType()),
    StructField("type", IntegerType()),
    StructField("segment", IntegerType())
])

In [None]:
# Se recorre los documentos de users y con cada registro se crea una lista de tuplas
reg_users_univ = [
    (
        str(doc.get("user_id", "")),
        int(doc.get("rubro", 0))
    )
    for doc in users_univ.find()
]

In [None]:
# Se recorre los documentos de onboarding y con cada registro se crea una lista de tuplas
reg_onboarding_univ = [
    (
        str(doc.get("user_id", "")),
        str(doc.get("first_login_dt", "")),
        int(doc.get("week_year", 0)),
        int(doc.get("habito", 0)),
        str(doc.get("habito_dt", "")),
        int(doc.get("activacion", 0)),
        str(doc.get("activacion_dt", "")),
        int(doc.get("setup", 0)),
        str(doc.get("setup_dt", "")),
        int(doc.get("return", 0)),
    )
    for doc in onboardin_univ.find()
]

In [None]:
# Se recorre los documentos de transactions y con cada registro se crea una lista de tuplas
reg_transactions_univ = [
    (
        str(doc.get("user_id", "")),
        str(doc.get("transaction_dt", "")),
        int(doc.get("type", 0)),
        int(doc.get("segment", 0)),
        )
    for doc in transactions_univ.find()
]

In [None]:
# Se crea el df de spark para users
users_df_univ = spark.createDataFrame(reg_users_univ, schema_users_univ)

In [None]:
# Se crea el df de spark para onboarding
onboarding_df_univ = spark.createDataFrame(reg_onboarding_univ, schema_onboarding_univ)

In [None]:
# Se crea el df de spark para transactions
transactions_df_univ = spark.createDataFrame(reg_transactions_univ, schema_transactions_univ)

### Creacion de esquema y tablas en SQL capa Smart

In [None]:
# Creacion de esquema SMART
spark.sql(
    """
    CREATE SCHEMA IF NOT EXISTS SMART
    """
)

DataFrame[]

In [None]:
# Se crea tabla users
spark.sql(
    """
    CREATE TABLE IF NOT EXISTS SMART.users (
        user_id STRING,
        rubro INT
    )

    """
)

DataFrame[]

In [None]:
# Se insertan los registros en la tabla users
users_df_univ.createOrReplaceTempView("users_df_view")

spark.sql(
    """
    INSERT INTO SMART.users
    SELECT *
    FROM users_df_view
    """
)

DataFrame[]

In [None]:
# Se crea tabla onboarding
spark.sql(
    """
    CREATE TABLE IF NOT EXISTS SMART.onboarding (
        user_id STRING,
        first_login_dt DATE,
        week_year INT,
        habito FLOAT,
        habito_dt DATE,
        activacion INT,
        activacion_dt DATE,
        setup INT,
        setup_dt DATE,
        return INT
    )

    """
)

DataFrame[]

In [None]:
# Se insertan los registros en la tabla onboarding
onboarding_df_univ.createOrReplaceTempView("onboarding_df_view")

spark.sql(
    """
    INSERT INTO SMART.onboarding (user_id, first_login_dt, week_year, habito, habito_dt, activacion, activacion_dt, setup, setup_dt, return)
    SELECT
        user_id,
        CAST(first_login_dt AS DATE),
        week_year,
        habito,
        CAST(habito_dt AS DATE),
        activacion,
        CAST(activacion_dt AS DATE),
        setup,
        CAST(setup_dt AS DATE),
        return
    FROM onboarding_df_view
    """
)

DataFrame[]

In [None]:
# Se crea tabla transactions
spark.sql(
    """
    CREATE TABLE IF NOT EXISTS SMART.transactions(
        user_id STRING,
        transaction_dt TIMESTAMP,
        type INT,
        segment INT

    )

    """
)

DataFrame[]

In [None]:
# Se insertan los registros en la tabla transactions
transactions_df_univ.createOrReplaceTempView("transactions_df_view")

spark.sql(
    """
    INSERT INTO SMART.transactions
    SELECT
        user_id,
        CAST(transaction_dt AS TIMESTAMP),
        type,
        segment
    FROM transactions_df_view
    """
)

DataFrame[]

Teniendo las tablas creadas y listas para consumir en Spark, utilizamos Querys de SQL para ir respondiendo las distintas preguntas de negocio que se formularon.

In [None]:
spark.sql(
    """
    SELECT 1- (SUM(return) / COUNT(*)) AS drop_rate
    FROM SMART.onboarding
    """
).show()

+-------------------+
|          drop_rate|
+-------------------+
|0.29046153846153844|
+-------------------+



In [None]:
spark.sql(
    """
    SELECT SUM(activacion) / COUNT(*) AS activation_rate
    FROM SMART.onboarding
    """
).show()

+-------------------+
|    activation_rate|
+-------------------+
|0.11807692307692308|
+-------------------+



In [None]:
spark.sql(
    """
    SELECT SUM(setup) / COUNT(*) AS setup_rate
    FROM SMART.onboarding



    """
).show()

+------------------+
|        setup_rate|
+------------------+
|0.4460769230769231|
+------------------+



In [None]:
spark.sql(
    """
    WITH FirstTransaction AS (
        SELECT
            user_id,
            segment,
            MIN(transaction_dt) AS first_transaction_dt
        FROM SMART.transactions
        GROUP BY user_id, segment
    )
    SELECT
        t.user_id,
        t.transaction_dt
    FROM SMART.transactions t
    JOIN FirstTransaction ft ON t.user_id = ft.user_id AND t.segment = ft.segment
    WHERE t.transaction_dt BETWEEN ft.first_transaction_dt AND ft.first_transaction_dt + INTERVAL 30 DAY

    """
).show()

+--------------+--------------------+
|       user_id|      transaction_dt|
+--------------+--------------------+
| MLB3120760160|2022-02-04 22:04:...|
| MLB3626692100| 2022-01-27 00:00:00|
| MLB4568373170|2022-02-27 17:47:...|
| MLB5742173940|2022-01-27 13:30:...|
|MLB10614454400|2022-02-18 16:30:...|
|MLB10617979960|2022-02-01 08:34:...|
| MLB6671538600|2022-02-18 07:17:...|
|MLB10628393100| 2022-01-25 00:00:00|
|MLB10644446360|2022-02-16 02:21:...|
|MLB10641386330|2022-01-28 19:04:...|
|MLB10575273520|2022-02-04 23:21:...|
|MLB10655448410|2022-02-06 22:43:...|
|MLB10495482340|2022-01-13 07:55:...|
| MLB8141537210|2022-01-19 03:00:...|
|MLB10512354980|2022-01-08 19:04:...|
|MLB10613812040| 2022-01-22 00:00:00|
|MLB10585813380|2022-01-30 23:21:...|
| MLB3350248620|2022-01-09 18:25:...|
| MLB7813965430|2022-01-25 17:08:...|
|MLB10640415220| 2022-02-02 00:00:00|
+--------------+--------------------+
only showing top 20 rows



In [None]:
spark.sql(
    """
    SELECT COUNT(user_id) AS Habito_individuals



    FROM (
      WITH FirstTransaction AS (
        SELECT
            user_id,
            segment,
            MIN(transaction_dt) AS first_transaction_dt

        FROM SMART.transactions
        WHERE segment = 1
        GROUP BY user_id, segment
    )
    SELECT
        t.user_id,
        t.transaction_dt
    FROM SMART.transactions t
    JOIN FirstTransaction ft ON t.user_id = ft.user_id AND t.segment = ft.segment
    WHERE t.transaction_dt BETWEEN ft.first_transaction_dt AND ft.first_transaction_dt + INTERVAL 30 DAY
        ) AS subquery


    """
).show()

+------------------+
|Habito_individuals|
+------------------+
|              4758|
+------------------+



In [None]:
spark.sql(
    """
    SELECT COUNT(user_id) AS Habito_sellers



    FROM (
      WITH FirstTransaction AS (
        SELECT
            user_id,
            segment,
            MIN(transaction_dt) AS first_transaction_dt

        FROM SMART.transactions
        WHERE segment = 2
        GROUP BY user_id, segment
    )
    SELECT
        t.user_id

    FROM SMART.transactions t
    JOIN FirstTransaction ft ON t.user_id = ft.user_id AND t.segment = ft.segment
    WHERE t.transaction_dt BETWEEN ft.first_transaction_dt AND ft.first_transaction_dt + INTERVAL 30 DAY
        ) AS subquery


    """
).show()

+--------------+
|Habito_sellers|
+--------------+
|           505|
+--------------+

