In [0]:
from pyspark.sql.functions import col, when
from pyspark.sql.functions import sum
from pyspark.sql.functions import mean, min, max
from pyspark.sql.types import ArrayType, DoubleType


In [0]:
base_atributos = spark.table("databricks_clase.prueba_schema.base_atributos")
base_cliente = spark.table("databricks_clase.prueba_schema.base_cliente")
base_trx = spark.table("databricks_clase.prueba_schema.base_trx")

In [0]:
join_1 = base_atributos.join(
    base_cliente,
    on=["periodo", "id_cliente"],  
    how="left"  
)

tabla_consolidada = join_1.join(
    base_trx,
    on=["periodo", "id_cliente"],  
    how="left"  
)


In [0]:
# Reemplazar valores nulos en columnas categóricas
tabla_consolidada = tabla_consolidada.fillna({
    "tipo_producto": "Desconocido",
    "departamento": "Desconocido",
    "canal": "Desconocido"
})

# Reemplazar valores nulos en columnas numéricas con la mediana
numeric_cols = ["monto_1m", "monto_2m", "monto_3m", "frecuencia_1m", "frecuencia_2m", "frecuencia_3m"]
for col_name in numeric_cols:
    median_value = tabla_consolidada.approxQuantile(col_name, [0.5], 0.01)[0]
    tabla_consolidada = tabla_consolidada.fillna({col_name: median_value})

from pyspark.sql.functions import expr

# Crear nuevas características
tabla_consolidada = tabla_consolidada.withColumn(
    "monto_total",
    expr("monto_1m + monto_2m + monto_3m + monto_4m + monto_5m + monto_6m")
)

tabla_consolidada = tabla_consolidada.withColumn(
    "tendencia_monto",
    expr("(monto_1m - monto_6m) / monto_6m")
)

In [0]:
tabla_consolidada = tabla_consolidada.drop('__index_level_0__')

In [0]:
tabla_consolidada = tabla_consolidada.filter(col("flg_churn").isNotNull())

In [0]:
# Crear una lista de columnas
columnas = tabla_consolidada.columns

# Contar valores nulos en cada columna
nulos = tabla_consolidada.agg(
    *[sum(col(c).isNull().cast("int")).alias(c) for c in columnas]
)

# Mostrar el resultado
nulos.show(vertical=True)

-RECORD 0--------------------
 periodo            | 0      
 id_cliente         | 0      
 tiempo_permanencia | 0      
 flg_vip            | 0      
 incidencias_a      | 357723 
 incidencias_b      | 307397 
 tipo_producto      | 0      
 periodo_creacion   | 388469 
 departamento       | 0      
 segmento_pago      | 388469 
 canal              | 0      
 segmento_cliente   | 388469 
 crossell           | 388469 
 tasa               | 504    
 monto_1m           | 0      
 monto_2m           | 0      
 monto_3m           | 0      
 monto_4m           | 0      
 monto_5m           | 0      
 monto_6m           | 0      
 cantidad_1m        | 0      
 cantidad_2m        | 0      
 cantidad_3m        | 0      
 cantidad_6m        | 0      
 frecuencia_1m      | 0      
 frecuencia_2m      | 0      
 frecuencia_3m      | 0      
 ultima_compra_1m   | 0      
 ultima_compra_2m   | 55791  
 ultima_compra_3m   | 80756  
 flg_churn          | 0      
 monto_total        | 0      
 tendencia

In [0]:

# Reemplazar con 0
columnas_cero = ["incidencias_a", "incidencias_b", "crossell", "ultima_compra_2m", "ultima_compra_3m"]
tabla_consolidada = tabla_consolidada.fillna({col: 0 for col in columnas_cero})

# Reemplazar con valor predeterminado
periodo_minimo = tabla_consolidada.select(min("periodo")).collect()[0][0]
tabla_consolidada = tabla_consolidada.fillna({"periodo_creacion": periodo_minimo})

# Reemplazar con -1
columnas_menos_uno = ["segmento_pago", "segmento_cliente"]
tabla_consolidada = tabla_consolidada.fillna({col: -1 for col in columnas_menos_uno})

# Imputar con la media
tasa_media = tabla_consolidada.select(mean("tasa")).collect()[0][0]
tabla_consolidada = tabla_consolidada.fillna({"tasa": tasa_media})

In [0]:
nulos_final = tabla_consolidada.agg(
    *[sum(col(c).isNull().cast("int")).alias(c) for c in columnas]
)

nulos_final.show(vertical=True)

-RECORD 0-----------------
 periodo            | 0   
 id_cliente         | 0   
 tiempo_permanencia | 0   
 flg_vip            | 0   
 incidencias_a      | 0   
 incidencias_b      | 0   
 tipo_producto      | 0   
 periodo_creacion   | 0   
 departamento       | 0   
 segmento_pago      | 0   
 canal              | 0   
 segmento_cliente   | 0   
 crossell           | 0   
 tasa               | 0   
 monto_1m           | 0   
 monto_2m           | 0   
 monto_3m           | 0   
 monto_4m           | 0   
 monto_5m           | 0   
 monto_6m           | 0   
 cantidad_1m        | 0   
 cantidad_2m        | 0   
 cantidad_3m        | 0   
 cantidad_6m        | 0   
 frecuencia_1m      | 0   
 frecuencia_2m      | 0   
 frecuencia_3m      | 0   
 ultima_compra_1m   | 0   
 ultima_compra_2m   | 0   
 ultima_compra_3m   | 0   
 flg_churn          | 0   
 monto_total        | 0   
 tendencia_monto    | 0   



In [0]:
%sql
DELETE FROM databricks_clase.prueba_schema.base_consolidada;

org.apache.spark.sql.catalyst.ExtendedAnalysisException: [TABLE_OR_VIEW_NOT_FOUND] The table or view `databricks_clase`.`prueba_schema`.`base_consolidada` cannot be found. Verify the spelling and correctness of the schema and catalog.
If you did not qualify the name with a schema, verify the current_schema() output, or qualify the name with the correct schema and catalog.
To tolerate the error on drop use DROP VIEW IF EXISTS or DROP TABLE IF EXISTS. SQLSTATE: 42P01; line 1 pos 12;
'DeleteFromTable true
+- 'UnresolvedRelation [databricks_clase, prueba_schema, base_consolidada], [__required_write_privileges__=DELETE], false

	at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.tableNotFound(package.scala:94)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$2(CheckAnalysis.scala:322)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$2$adapted(CheckAnalysis.scala:286)
	at org.apache.spark.sql.catalyst.trees.TreeNo

In [0]:
spark.sql("DROP TABLE IF EXISTS databricks_clase.prueba_schema.base_consolidada PURGE")

DataFrame[]

In [0]:
tabla_consolidada.printSchema()

root
 |-- periodo: long (nullable = true)
 |-- id_cliente: long (nullable = true)
 |-- tiempo_permanencia: long (nullable = true)
 |-- flg_vip: double (nullable = true)
 |-- incidencias_a: double (nullable = false)
 |-- incidencias_b: double (nullable = false)
 |-- tipo_producto: string (nullable = false)
 |-- periodo_creacion: long (nullable = false)
 |-- departamento: string (nullable = false)
 |-- segmento_pago: long (nullable = false)
 |-- canal: string (nullable = false)
 |-- segmento_cliente: long (nullable = false)
 |-- crossell: double (nullable = false)
 |-- tasa: double (nullable = false)
 |-- monto_1m: double (nullable = false)
 |-- monto_2m: double (nullable = false)
 |-- monto_3m: double (nullable = false)
 |-- monto_4m: double (nullable = true)
 |-- monto_5m: double (nullable = true)
 |-- monto_6m: double (nullable = true)
 |-- cantidad_1m: double (nullable = true)
 |-- cantidad_2m: double (nullable = true)
 |-- cantidad_3m: double (nullable = true)
 |-- cantidad_6m: doub

In [0]:
%sql
CREATE TABLE databricks_clase.prueba_schema.base_consolidada (
    periodo BIGINT,
    id_cliente BIGINT,
    tiempo_permanencia BIGINT,
    flg_vip DOUBLE,
    incidencias_a DOUBLE NOT NULL,
    incidencias_b DOUBLE NOT NULL,
    tipo_producto STRING NOT NULL,
    periodo_creacion BIGINT NOT NULL,
    departamento STRING NOT NULL,
    segmento_pago BIGINT NOT NULL,
    canal STRING NOT NULL,
    segmento_cliente BIGINT NOT NULL,
    crossell DOUBLE NOT NULL,
    tasa DOUBLE NOT NULL,
    monto_1m DOUBLE NOT NULL,
    monto_2m DOUBLE NOT NULL,
    monto_3m DOUBLE NOT NULL,
    monto_4m DOUBLE,
    monto_5m DOUBLE,
    monto_6m DOUBLE,
    cantidad_1m DOUBLE,
    cantidad_2m DOUBLE,
    cantidad_3m DOUBLE,
    cantidad_6m DOUBLE,
    frecuencia_1m DOUBLE NOT NULL,
    frecuencia_2m DOUBLE NOT NULL,
    frecuencia_3m DOUBLE NOT NULL,
    ultima_compra_1m DOUBLE,
    ultima_compra_2m DOUBLE NOT NULL,
    ultima_compra_3m DOUBLE NOT NULL,
    flg_churn DOUBLE,
    monto_total DOUBLE,
    tendencia_monto DOUBLE
)
USING DELTA;


In [0]:
tabla_consolidada.toPandas().head(2)

Unnamed: 0,periodo,id_cliente,tiempo_permanencia,flg_vip,incidencias_a,incidencias_b,tipo_producto,periodo_creacion,departamento,segmento_pago,canal,segmento_cliente,crossell,tasa,monto_1m,monto_2m,monto_3m,monto_4m,monto_5m,monto_6m,cantidad_1m,cantidad_2m,cantidad_3m,cantidad_6m,frecuencia_1m,frecuencia_2m,frecuencia_3m,ultima_compra_1m,ultima_compra_2m,ultima_compra_3m,flg_churn,monto_total,tendencia_monto
0,202401,131,39,0.0,0.0,0.0,Desconocido,202401,Desconocido,-1,Desconocido,-1,0.0,0.0344,6.708084,-16.118096,6.579251,-16.118096,5.991465,-16.118096,1.386294,-16.118096,0.693147,-16.118096,4.0,0.0,2.0,29.0,0.0,24.0,0.0,-29.075487,-1.416183
1,202401,164,16,0.0,0.0,3.0,Desconocido,202401,Desconocido,-1,Desconocido,-1,0.0,0.0344,10.190057,9.484025,9.330521,9.67574,9.978711,9.18881,3.091042,2.484907,2.564949,2.484907,16.0,10.0,12.0,31.0,30.0,30.0,0.0,57.847863,0.108964


In [0]:
# Guardar la tabla en Unity Catalog
tabla_consolidada.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("databricks_clase.prueba_schema.base_consolidada")


In [0]:
df_consolidado_lec = spark.read.table("databricks_clase.prueba_schema.base_consolidada")

In [0]:
df_consolidado_lec.show()

+-------+----------+------------------+-------+-------------+-------------+-------------+----------------+------------+-------------+-----------+----------------+--------+------+------------------+------------------+------------------+--------------------+------------------+------------------+--------------------+--------------------+--------------------+--------------------+-------------+-------------+-------------+----------------+----------------+----------------+---------+-------------------+--------------------+
|periodo|id_cliente|tiempo_permanencia|flg_vip|incidencias_a|incidencias_b|tipo_producto|periodo_creacion|departamento|segmento_pago|      canal|segmento_cliente|crossell|  tasa|          monto_1m|          monto_2m|          monto_3m|            monto_4m|          monto_5m|          monto_6m|         cantidad_1m|         cantidad_2m|         cantidad_3m|         cantidad_6m|frecuencia_1m|frecuencia_2m|frecuencia_3m|ultima_compra_1m|ultima_compra_2m|ultima_compra_3m|flg_ch

In [0]:
periodo_max =df_consolidado_lec.agg(max("periodo")).collect()[0][0]

In [0]:
tabla_consolidada_mensual = df_consolidado_lec.filter(df_consolidado_lec.periodo == periodo_max)

In [0]:
spark.sql("DROP TABLE IF EXISTS databricks_clase.prueba_schema.base_consolidada_mensual PURGE")

DataFrame[]

In [0]:
%sql
CREATE TABLE databricks_clase.prueba_schema.base_consolidada_mensual (
    periodo BIGINT,
    id_cliente BIGINT,
    tiempo_permanencia BIGINT,
    flg_vip DOUBLE,
    incidencias_a DOUBLE NOT NULL,
    incidencias_b DOUBLE NOT NULL,
    tipo_producto STRING NOT NULL,
    periodo_creacion BIGINT NOT NULL,
    departamento STRING NOT NULL,
    segmento_pago BIGINT NOT NULL,
    canal STRING NOT NULL,
    segmento_cliente BIGINT NOT NULL,
    crossell DOUBLE NOT NULL,
    tasa DOUBLE NOT NULL,
    monto_1m DOUBLE NOT NULL,
    monto_2m DOUBLE NOT NULL,
    monto_3m DOUBLE NOT NULL,
    monto_4m DOUBLE,
    monto_5m DOUBLE,
    monto_6m DOUBLE,
    cantidad_1m DOUBLE,
    cantidad_2m DOUBLE,
    cantidad_3m DOUBLE,
    cantidad_6m DOUBLE,
    frecuencia_1m DOUBLE NOT NULL,
    frecuencia_2m DOUBLE NOT NULL,
    frecuencia_3m DOUBLE NOT NULL,
    ultima_compra_1m DOUBLE,
    ultima_compra_2m DOUBLE NOT NULL,
    ultima_compra_3m DOUBLE NOT NULL,
    flg_churn DOUBLE,
    monto_total DOUBLE,
    tendencia_monto DOUBLE
)
USING DELTA;


In [0]:
tabla_consolidada_mensual.write.format("delta").mode("overwrite").saveAsTable("databricks_clase.prueba_schema.base_consolidada_mensual")