# TRATAMIENTO DE DATOS CON PYSPARK

In [1]:
!pip install pyspark



In [None]:
### Importar las librerías necesarias ###
### INICIA LA SPARK-SESSION ###
### Añade el spark context ###
### Añade el path para acceder a los datos y nómbralo como DATA_PATH###
### Lee la carpeta vuelos con pyspark y nómbralo ###

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql import Window

# Crear la sesión de Spark
spark = SparkSession.builder.appName("FinPlus-Analytics").getOrCreate()

DATA_PATH = "/home/jovyan/work/"

beh = spark.read.parquet(DATA_PATH + "BEHAVIOURAL.parquet", header=True, inferSchema=True)
clients = spark.read.csv(DATA_PATH + "CLIENTS.csv", header=True, inferSchema=True)



In [7]:
import os
os.listdir("/home/jovyan/work")


['BEHAVIOURAL.parquet', 'CLIENTS.csv', 'Data_Dictionary.xlsx']

## INDICADORES Y ANÁLISIS DE COMPORTAMIENTO


### ACTIVIDAD CLIENTE

In [None]:
beh.printSchema()
beh.show(5, truncate=False)


root
 |-- CONTRACT_ID: string (nullable = true)
 |-- CLIENT_ID: string (nullable = true)
 |-- DATE: string (nullable = true)
 |-- CREDICT_CARD_BALANCE: double (nullable = true)
 |-- CREDIT_CARD_LIMIT: double (nullable = true)
 |-- CREDIT_CARD_DRAWINGS_ATM: double (nullable = true)
 |-- CREDIT_CARD_DRAWINGS: double (nullable = true)
 |-- CREDIT_CARD_DRAWINGS_POS: double (nullable = true)
 |-- CREDIT_CARD_DRAWINGS_OTHER: double (nullable = true)
 |-- CREDIT_CARD_PAYMENT: double (nullable = true)
 |-- NUMBER_DRAWINGS_ATM: double (nullable = true)
 |-- NUMBER_DRAWINGS: long (nullable = true)
 |-- NUMBER_INSTALMENTS: double (nullable = true)
 |-- CURRENCY: string (nullable = true)

+------------------+------------+----------+--------------------+-----------------+------------------------+--------------------+------------------------+--------------------------+-------------------+-------------------+---------------+------------------+--------+
|CONTRACT_ID       |CLIENT_ID   |DATE      |CRED

In [16]:
clients.printSchema()
clients.show(10)


root
 |-- CLIENT_ID: string (nullable = true)
 |-- NON_COMPLIANT_CONTRACT: integer (nullable = true)
 |-- NAME_PRODUCT_TYPE: string (nullable = true)
 |-- GENDER: string (nullable = true)
 |-- TOTAL_INCOME: double (nullable = true)
 |-- AMOUNT_PRODUCT: double (nullable = true)
 |-- INSTALLMENT: double (nullable = true)
 |-- EDUCATION: string (nullable = true)
 |-- MARITAL_STATUS: string (nullable = true)
 |-- HOME_SITUATION: string (nullable = true)
 |-- REGION_SCORE: double (nullable = true)
 |-- AGE_IN_YEARS: double (nullable = true)
 |-- JOB_SENIORITY: double (nullable = true)
 |-- HOME_SENIORITY: double (nullable = true)
 |-- LAST_UPDATE: double (nullable = true)
 |-- OWN_INSURANCE_CAR: string (nullable = true)
 |-- CAR_AGE: double (nullable = true)
 |-- FAMILY_SIZE: double (nullable = true)
 |-- REACTIVE_SCORING: double (nullable = true)
 |-- PROACTIVE_SCORING: double (nullable = true)
 |-- BEHAVIORAL_SCORING: double (nullable = true)
 |-- DAYS_LAST_INFO_CHANGE: double (nullable =

In [17]:
from pyspark.sql import SparkSession, functions as F, Window
from pyspark.sql.types import DoubleType, IntegerType, DateType
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.classification import GBTClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator


# 3) Normalización / parsing de la fecha
# La columna se llama 'DATE' (string 'YYYY-MM-DD' según tu muestra). Convertimos a tipo Date.
beh = beh.withColumn("event_date", F.to_date(F.col("DATE"), "yyyy-MM-dd"))

# 4) Definir un importe de transacción representativo (monetary)
# Usaremos CREDIT_CARD_DRAWINGS como proxy transaccional (suma de otras columnas si conviene)
# Crear columna transaction_amount con la mejor aproximación disponible
beh = beh.withColumn("transaction_amount",
                     (F.coalesce(F.col("CREDIT_CARD_DRAWINGS"), F.lit(0.0))
                      + F.coalesce(F.col("CREDIT_CARD_DRAWINGS_POS"), F.lit(0.0))
                      + F.coalesce(F.col("CREDIT_CARD_DRAWINGS_ATM"), F.lit(0.0))
                      + F.coalesce(F.col("CREDIT_CARD_DRAWINGS_OTHER"), F.lit(0.0))
                     ).cast(DoubleType())
                    )

# 5) Fecha de referencia (max fecha del dataset) — útil para recency
max_date_row = beh.select(F.max("event_date").alias("max_dt")).collect()[0]
max_date = max_date_row["max_dt"]
print("Fecha máxima en BEHAVIOURAL (fecha de corte):", max_date)

# 6) Agregados por cliente (RFM básico + medidas adicionales)
# Recency: días desde la última transacción (respecto a max_date)
recency_df = beh.groupBy("CLIENT_ID") \
    .agg(F.max("event_date").alias("last_activity_date")) \
    .withColumn("recency_days", F.datediff(F.lit(max_date), F.col("last_activity_date")))

# Frequency: conteo de eventos (filas) y suma de NUMBER_DRAWINGS (si disponible)
freq_df = beh.groupBy("CLIENT_ID") \
    .agg(
         F.count("*").alias("n_events"),
         F.sum(F.coalesce(F.col("NUMBER_DRAWINGS"), F.lit(0))).alias("sum_number_drawings")
    )

# Monetary: sumas y medias
mon_df = beh.groupBy("CLIENT_ID") \
    .agg(
        F.sum("transaction_amount").alias("monetary_total"),
        F.avg("transaction_amount").alias("monetary_avg"),
        F.expr("percentile_approx(transaction_amount, 0.5)").alias("monetary_median"),
        F.max("transaction_amount").alias("monetary_max")
    )

# 7) Ventanas temporales: eventos últimos 30/90/365 días (usamos max_date)
beh_30 = beh.filter(F.col("event_date") >= F.date_sub(F.lit(max_date), 30))
beh_90 = beh.filter(F.col("event_date") >= F.date_sub(F.lit(max_date), 90))
beh_365 = beh.filter(F.col("event_date") >= F.date_sub(F.lit(max_date), 365))

tmp_30 = beh_30.groupBy("CLIENT_ID").agg(F.count("*").alias("events_30d"),
                                        F.sum("transaction_amount").alias("monetary_30d"))
tmp_90 = beh_90.groupBy("CLIENT_ID").agg(F.count("*").alias("events_90d"),
                                        F.sum("transaction_amount").alias("monetary_90d"))
tmp_365 = beh_365.groupBy("CLIENT_ID").agg(F.count("*").alias("events_365d"),
                                          F.sum("transaction_amount").alias("monetary_365d"))

# 8) Actividad por periodo (meses activos) y estabilidad (std dev diaria)
from pyspark.sql.functions import date_format

monthly = beh.withColumn("year_month", date_format("event_date", "yyyy-MM")) \
             .groupBy("CLIENT_ID", "year_month").agg(F.count("*").alias("events_month"))

months_active = monthly.groupBy("CLIENT_ID").agg(F.count("year_month").alias("n_months_active"),
                                                F.avg("events_month").alias("avg_events_per_month"),
                                                F.expr("stddev_pop(events_month)").alias("std_events_per_month"))

# 9) Unir todo en customer_activity_master
customer_activity = clients.select("CLIENT_ID", "NUMBER_OF_PRODUCTS", "AGE_IN_YEARS", "GENDER", "TOTAL_INCOME") \
    .join(recency_df, on="CLIENT_ID", how="left") \
    .join(freq_df, on="CLIENT_ID", how="left") \
    .join(mon_df, on="CLIENT_ID", how="left") \
    .join(tmp_30, on="CLIENT_ID", how="left") \
    .join(tmp_90, on="CLIENT_ID", how="left") \
    .join(tmp_365, on="CLIENT_ID", how="left") \
    .join(months_active, on="CLIENT_ID", how="left")

# Rellenar nulos con 0s o valores razonables
customer_activity = customer_activity.fillna({
    "n_events": 0,
    "sum_number_drawings": 0,
    "monetary_total": 0.0,
    "monetary_avg": 0.0,
    "monetary_median": 0.0,
    "monetary_max": 0.0,
    "events_30d": 0, "monetary_30d": 0.0,
    "events_90d": 0, "monetary_90d": 0.0,
    "events_365d": 0, "monetary_365d": 0.0,
    "n_months_active": 0, "avg_events_per_month": 0.0, "std_events_per_month": 0.0,
    "recency_days": 9999
})

# 10) Indicadores compuestos: Score de actividad
# Normalizamos tres señales: recency (inversa), frequency, monetary_total
# Para normalizar con Spark ML necesitamos VectorAssembler + StandardScaler (pero lo haremos por min-max simple)
# Calcular min/max para cada columna
mins_maxs = customer_activity.agg(
    F.min("recency_days").alias("min_recency"), F.max("recency_days").alias("max_recency"),
    F.min("n_events").alias("min_freq"), F.max("n_events").alias("max_freq"),
    F.min("monetary_total").alias("min_mon"), F.max("monetary_total").alias("max_mon")
).collect()[0]

min_rec, max_rec = mins_maxs["min_recency"], mins_maxs["max_recency"]
min_freq, max_freq = mins_maxs["min_freq"], mins_maxs["max_freq"]
min_mon, max_mon = mins_maxs["min_mon"], mins_maxs["max_mon"]

# UDFs not necessary: use column expressions with safe denom
customer_activity = customer_activity \
    .withColumn("recency_norm", (F.lit(max_rec) - F.col("recency_days")) / F.when(F.lit(max_rec) - F.lit(min_rec) == 0, F.lit(1)).otherwise(F.lit(max_rec) - F.lit(min_rec))) \
    .withColumn("freq_norm", (F.col("n_events") - F.lit(min_freq)) / F.when(F.lit(max_freq) - F.lit(min_freq) == 0, F.lit(1)).otherwise(F.lit(max_freq) - F.lit(min_freq))) \
    .withColumn("mon_norm", (F.col("monetary_total") - F.lit(min_mon)) / F.when(F.lit(max_mon) - F.lit(min_mon) == 0, F.lit(1)).otherwise(F.lit(max_mon) - F.lit(min_mon)))

# Activity Score: ponderación (puedes ajustar según negocio)
customer_activity = customer_activity.withColumn("activity_score",
                            0.4 * F.col("recency_norm") + 0.35 * F.col("freq_norm") + 0.25 * F.col("mon_norm"))

# 11) Riesgo de churn (heurístico + modelo baseline)
# Etiqueta heurística (proxy): churn si recency_days > 90
customer_activity = customer_activity.withColumn("is_churn_proxy", (F.col("recency_days") > 90).cast(IntegerType()))

# Entrenar un modelo simple (baseline): features = events_30d, events_90d, events_365d, monetary_365d, n_months_active
feature_cols = ["events_30d", "events_90d", "events_365d", "monetary_365d", "n_months_active", "NUMBER_OF_PRODUCTS"]
# Reemplazar nulos y convertir a double
for c in feature_cols:
    if c not in customer_activity.columns:
        customer_activity = customer_activity.withColumn(c, F.lit(0.0))
customer_activity = customer_activity.fillna({c:0.0 for c in feature_cols})

assembler = VectorAssembler(inputCols=feature_cols, outputCol="features_vec")
gbt = GBTClassifier(featuresCol="features_vec", labelCol="is_churn_proxy", maxIter=50)
pipe = Pipeline(stages=[assembler, gbt])

# split
train, test = customer_activity.select(feature_cols + ["is_churn_proxy", "CLIENT_ID", "activity_score"]).randomSplit([0.7, 0.3], seed=42)
model = pipe.fit(train)
preds = model.transform(test)

evaluator = BinaryClassificationEvaluator(labelCol="is_churn_proxy", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
auc = evaluator.evaluate(preds)
print("Baseline churn model AUC:", auc)

# Extraer feature importances (del GBT)
gbt_model = model.stages[-1]
try:
    importances = list(gbt_model.featureImportances)
    print("Feature importances (order):", list(zip(feature_cols, importances)))
except Exception as e:
    print("No se pudo extraer importances:", e)

# 12) Outputs: ejemplos y guardado
print("Top 10 clientes por activity_score:")
customer_activity.select("CLIENT_ID", "activity_score", "recency_days", "n_events", "monetary_total") \
    .orderBy(F.col("activity_score").desc()).show(10, truncate=False)

print("Clientes en riesgo (churn proxy) - top 10 por activity_score inverso (baja actividad pero alto valor):")
customer_activity.filter(F.col("is_churn_proxy") == 1) \
    .orderBy(F.col("activity_score").asc()).select("CLIENT_ID", "activity_score", "recency_days", "n_events", "monetary_total").show(10, truncate=False)

# Guardar customer_activity
customer_activity.write.mode("overwrite").parquet(OUTPUT_PATH + "customer_activity.parquet")
print("Saved customer_activity to:", OUTPUT_PATH + "customer_activity.parquet")

# 13) Indicadores agregados globales para dashboard (ejemplos)
agg_kpis = {
    "total_customers": customer_activity.select(F.countDistinct("CLIENT_ID")).collect()[0][0],
    "avg_activity_score": float(customer_activity.agg(F.avg("activity_score")).collect()[0][0]),
    "pct_churn_proxy": float(customer_activity.agg(F.avg(F.col("is_churn_proxy").cast("double"))).collect()[0][0])
}
print("KPIs summary:", agg_kpis)

# 14) Guardar un CSV resumido para consumo de BI
customer_activity.select("CLIENT_ID", "activity_score", "recency_days", "n_events", "monetary_total",
                         "events_30d", "events_90d", "events_365d", "n_months_active", "is_churn_proxy") \
    .coalesce(1).write.mode("overwrite").option("header", True).csv(OUTPUT_PATH + "customer_activity_summary_csv")

print("Resumen listo en:", OUTPUT_PATH + "customer_activity_summary_csv")


Fecha máxima en BEHAVIOURAL (fecha de corte): 2021-12-31
Baseline churn model AUC: 1.0
Feature importances (order): [('events_30d', 3.688843655499395e-16), ('events_90d', 0.9999999999999604), ('events_365d', 1.8570364373642948e-15), ('monetary_365d', 7.705967862708162e-16), ('n_months_active', 6.002748245689171e-16), ('NUMBER_OF_PRODUCTS', 3.6086727050368835e-14)]
Top 10 clientes por activity_score:
+------------+------------------+------------+--------+------------------+
|CLIENT_ID   |activity_score    |recency_days|n_events|monetary_total    |
+------------+------------------+------------+--------+------------------+
|ES182186401T|0.7574528900703528|0           |192     |11458.8           |
|ES182348429U|0.7192708333333333|0           |38      |384374.38         |
|ES182422431E|0.6724174712884138|31          |109     |115250.31999999999|
|ES182132454N|0.6686185388529928|0           |87      |169162.84000000003|
|ES182283225D|0.6444128964893115|0           |122     |33851.18000000001

NameError: name 'OUTPUT_PATH' is not defined

In [19]:
"""
FinPlus - Activity Metrics (PySpark)

Archivo: FinPlus_Activity_Metrics_pyspark.py
Objetivo: Pipeline PySpark para calcular métricas de actividad de cliente (recencia, frecuencia, intensidad, cohortes, RFM, segmentación y features derived) y generar un dataset "curated" listo para modelos y dashboard.

Instrucciones de ejecución (en VSCode dentro del contenedor Docker):
- Coloca BEHAVIOURAL.parquet y CLIENTS.csv en /home/jovyan/work/ (o ajusta DATA_PATH)
- Ejecutar con spark-submit o dentro de una sesión PySpark:
  spark-submit --master local[*] FinPlus_Activity_Metrics_pyspark.py

Notas:
- Diseñado para PySpark 3.x
- Explicaciones en comentarios y bloques de funciones.
"""

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql import Window
import datetime


# ---------------------------
# Etapa de limpieza y control de calidad
# - Normalizar nombres
# - Convertir tipos
# - Manejo nulos
# - Eliminar duplicados exactos
# ---------------------------

def qc_and_clean(beh_df):
    # Renombrar columnas para evitar errores y trim
    for c in beh_df.columns:
        beh_df = beh_df.withColumnRenamed(c, c.strip())

    # Cast DATE a DateType (suponiendo formato yyyy-MM-dd)
    beh_df = beh_df.withColumn("DATE", F.to_date(F.col("DATE"), "yyyy-MM-dd"))

    # Reemplazar strings de moneda y limpiar columnas numéricas si vinieran como string
    numeric_cols = [c for c, t in beh_df.dtypes if t in ("string",) and c not in ("CONTRACT_ID","CLIENT_ID","CURRENCY")]
    # Intentar cast seguro en cada columna potencialmente numérica
    for c in beh_df.columns:
        if c not in ("CONTRACT_ID","CLIENT_ID","DATE","CURRENCY"):
            beh_df = beh_df.withColumn(c, F.col(c).cast("double"))

    # Detección y conteo de nulos por columna
    null_counts = {r[0]: r[1] for r in beh_df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in beh_df.columns]).collect()[0].asDict().items()}
    print("Null counts (sample):", list(null_counts.items())[:10])

    # Eliminar filas sin CLIENT_ID o DATE (no podemos ubicarlas en el tiempo)
    beh_df = beh_df.filter(F.col("CLIENT_ID").isNotNull() & F.col("DATE").isNotNull())

    # Eliminar duplicados exactos
    beh_df = beh_df.dropDuplicates()

    return beh_df

beh = qc_and_clean(beh)

# ---------------------------
# Feature engineering: métricas básicas por fila
# - Crear columna AMOUNT: sumar distintos tipos de drawing (si aplica)
# - Flag de actividad (si hubo movimiento monetario)
# ---------------------------
beh = beh.withColumn("AMOUNT", (F.coalesce(F.col("CREDIT_CARD_DRAWINGS"), F.lit(0.0))))
beh = beh.withColumn("HAS_ACTIVITY", F.when(F.col("AMOUNT") > 0, 1).otherwise(0))

# ---------------------------
# Agregados temporales por cliente
# Definimos ventana por cliente ordenada por fecha
# ---------------------------
w_client_date = Window.partitionBy("CLIENT_ID").orderBy(F.col("DATE").asc())

# Última fecha disponible global (para cálculos de recencia)
max_date = beh.select(F.max("DATE").alias("maxd")).collect()[0]["maxd"]
print("Max date in data:", max_date)

# ---------------------------
# 1) Métricas RFM (Recency, Frequency, Monetary)
# - Recency: días desde última transacción hasta max_date
# - Frequency: número de transacciones en el periodo
# - Monetary: suma total de AMOUNT
# ---------------------------
rfm = beh.groupBy("CLIENT_ID").agg(
    F.max("DATE").alias("last_date"),
    F.count(F.when(F.col("HAS_ACTIVITY") == 1, True)).alias("freq_transactions"),
    F.sum("AMOUNT").alias("monetary_total"),
    F.avg("AMOUNT").alias("avg_ticket"),
    F.sum("CREDIT_CARD_PAYMENT").alias("total_payments"),
    F.avg("CREDICT_CARD_BALANCE").alias("avg_balance")
)

rfm = rfm.withColumn("recency_days", F.datediff(F.lit(max_date), F.col("last_date")))

# Normalizar nulls
rfm = rfm.fillna({"freq_transactions": 0, "monetary_total": 0.0, "avg_ticket": 0.0, "total_payments": 0.0, "avg_balance": 0.0})

# ---------------------------
# 2) Métricas periódicas (mensuales)
# - Agregar por cliente-mes: transacciones, monto, últimos 3/6/12 meses
# ---------------------------
beh_month = beh.withColumn("year_month", F.date_format(F.col("DATE"), "yyyy-MM"))

monthly_agg = beh_month.groupBy("CLIENT_ID", "year_month").agg(
    F.count(F.when(F.col("HAS_ACTIVITY") == 1, True)).alias("tx_count"),
    F.sum("AMOUNT").alias("tx_amount"),
    F.sum("CREDIT_CARD_PAYMENT").alias("payments_amount"),
    F.avg("CREDICT_CARD_BALANCE").alias("avg_balance_month")
)

# Ventanas temporales para rolling (usaremos timestamp + months_between para ordenamiento si hiciera falta)
# Convertimos year_month a primera fecha de mes
monthly_agg = monthly_agg.withColumn("month_date", F.to_date(F.concat_ws("-", F.col("year_month"), F.lit("01")), "yyyy-MM-dd"))

w_month_client = Window.partitionBy("CLIENT_ID").orderBy(F.col("month_date").cast('long')).rowsBetween(-11, 0)  # rolling 12 meses

monthly_agg = monthly_agg.withColumn("rolling_12m_tx_count", F.sum("tx_count").over(w_month_client))
monthly_agg = monthly_agg.withColumn("rolling_12m_tx_amount", F.sum("tx_amount").over(w_month_client))
monthly_agg = monthly_agg.withColumn("active_months_12m", F.sum(F.when(F.col("tx_count") > 0, 1).otherwise(0)).over(w_month_client))

# ---------------------------
# 3) Cohortes e inactividad
# - Cohort: primer mes de actividad del cliente
# - Inactivity flag: sin movimiento en N meses (ej. 3 meses)
# ---------------------------
first_tx = beh.groupBy("CLIENT_ID").agg(F.min(F.col("DATE")).alias("first_tx_date"))
first_tx = first_tx.withColumn("cohort_month", F.date_format(F.col("first_tx_date"), "yyyy-MM"))

# Merge rfm + first_tx
cust_profile = rfm.join(first_tx, on="CLIENT_ID", how="left")

# Inactivity en 3 meses: si recency_days > 90
cust_profile = cust_profile.withColumn("inactive_3m", F.when(F.col("recency_days") > 90, 1).otherwise(0))
cust_profile = cust_profile.withColumn("inactive_6m", F.when(F.col("recency_days") > 180, 1).otherwise(0))

# ---------------------------
# 4) Segmentación RFM simple (quintiles)
# - Score R (más bajo recency mejor => invertimos)
# - Score F y M (mayor mejor)
# ---------------------------
from pyspark.ml.feature import QuantileDiscretizer

def add_rfm_scores(df, spark):
    # Recency: discretize into 5 (0 = best)
    # Add a recency_score where smaller recency_days -> higher score (5 best)
    q = QuantileDiscretizer(numBuckets=5, inputCol="recency_days", outputCol="recency_bucket", handleInvalid='keep')
    df_q = q.fit(df).transform(df)
    # recency_bucket: 0 = lowest recency_days (closest to 0) => we want reverse scoring
    df_q = df_q.withColumn("recency_score", (F.lit(4) - F.col("recency_bucket")).cast("int") + 1)

    # frequency
    qf = QuantileDiscretizer(numBuckets=5, inputCol="freq_transactions", outputCol="freq_bucket", handleInvalid='keep')
    df_q = qf.fit(df_q).transform(df_q)
    df_q = df_q.withColumn("frequency_score", (F.col("freq_bucket")).cast("int") + 1)

    # monetary
    qm = QuantileDiscretizer(numBuckets=5, inputCol="monetary_total", outputCol="monetary_bucket", handleInvalid='keep')
    df_q = qm.fit(df_q).transform(df_q)
    df_q = df_q.withColumn("monetary_score", (F.col("monetary_bucket")).cast("int") + 1)

    df_q = df_q.withColumn("rfm_score", F.concat(F.col("recency_score").cast("string"), F.lit("-"), F.col("frequency_score").cast("string"), F.lit("-"), F.col("monetary_score").cast("string")))
    return df_q

cust_profile = add_rfm_scores(cust_profile, spark)

# ---------------------------
# 5) Señales de oportunidad (ejemplos de features útiles para cross-sell)
# - HighValueLowActivity: high monetary_total pero baja frecuencia/recency alta
# - AtRisk: alta recency_days, baja freq
# ---------------------------
cust_profile = cust_profile.withColumn("high_value_low_activity", F.when((F.col("monetary_total") > F.expr("percentile_approx(monetary_total, 0.75) over ()")) & (F.col("freq_transactions") < 2), 1).otherwise(0))

# Nota: percentile_approx in expression window not available; en Spark standalone, calculamos umbral global:
monetary_75 = cust_profile.approxQuantile("monetary_total", [0.75], 0.01)[0]
print("Monetary 75%:", monetary_75)
cust_profile = cust_profile.withColumn("high_value_low_activity", F.when((F.col("monetary_total") > F.lit(monetary_75)) & (F.col("freq_transactions") < 2), 1).otherwise(0))

cust_profile = cust_profile.withColumn("at_risk", F.when((F.col("recency_days") > 90) & (F.col("freq_transactions") < 3), 1).otherwise(0))

# ---------------------------
# 6) Agregar features adicionales desde BEHAVIOURAL
# - avg_drawings_pos_ratio, avg_atm_drawings_ratio, avg_number_drawings
# ---------------------------
beh_feats = beh.groupBy("CLIENT_ID").agg(
    F.avg("NUMBER_DRAWINGS").alias("avg_number_drawings"),
    F.sum("NUMBER_DRAWINGS_ATM").alias("sum_drawings_atm"),
    F.sum("NUMBER_DRAWINGS").alias("sum_drawings"),
    F.avg("CREDIT_CARD_LIMIT").alias("avg_cc_limit"),
    F.avg("CREDICT_CARD_BALANCE").alias("avg_balance_global"),
    F.sum("CREDIT_CARD_DRAWINGS_POS").alias("sum_drawings_pos"),
    F.sum("CREDIT_CARD_DRAWINGS_ATM").alias("sum_drawings_atm_amount")
)

beh_feats = beh_feats.withColumn("pos_drawings_ratio", F.when(F.col("sum_drawings") > 0, F.col("sum_drawings_pos")/F.col("sum_drawings")).otherwise(0.0))
beh_feats = beh_feats.withColumn("atm_drawings_ratio", F.when(F.col("sum_drawings") > 0, F.col("sum_drawings_atm")/F.col("sum_drawings")).otherwise(0.0))

# Merge todas las features al perfil
final_profile = cust_profile.join(beh_feats, on="CLIENT_ID", how="left")

# Rellenar nulos razonables
final_profile = final_profile.fillna({
    "avg_number_drawings": 0.0,
    "sum_drawings_atm": 0.0,
    "sum_drawings": 0.0,
    "avg_cc_limit": 0.0,
    "avg_balance_global": 0.0,
    "sum_drawings_pos": 0.0,
    "sum_drawings_atm_amount": 0.0,
    "pos_drawings_ratio": 0.0,
    "atm_drawings_ratio": 0.0
})

# ---------------------------
# 7) Guardado del dataset curated
# - Formato parquet particionado por year_month de última actividad
# ---------------------------
final_profile = final_profile.withColumn("last_activity_month", F.date_format(F.col("last_date"), "yyyy-MM"))

final_profile.repartition(200).write.mode("overwrite").partitionBy("last_activity_month").parquet(OUTPUT_PATH + "client_activity_profile.parquet")

# También guardar monthly_agg para análisis temporal
monthly_agg.repartition(200).write.mode("overwrite").parquet(OUTPUT_PATH + "monthly_client_activity.parquet")

print("Pipeline finished. Curated datasets saved to:", OUTPUT_PATH)

# ---------------------------
# 8) Quick checks (muestreo)
# ---------------------------
print("Sample final profile:")
final_profile.select("CLIENT_ID", "recency_days", "freq_transactions", "monetary_total", "rfm_score", "at_risk", "high_value_low_activity").show(10, truncate=False)

spark.stop()


Null counts (sample): [('CONTRACT_ID', 0), ('CLIENT_ID', 0), ('DATE', 0), ('CREDICT_CARD_BALANCE', 0), ('CREDIT_CARD_LIMIT', 0), ('CREDIT_CARD_DRAWINGS_ATM', 0), ('CREDIT_CARD_DRAWINGS', 0), ('CREDIT_CARD_DRAWINGS_POS', 0), ('CREDIT_CARD_DRAWINGS_OTHER', 0), ('CREDIT_CARD_PAYMENT', 0)]
Max date in data: 2021-12-31
Monetary 75%: 4320.0
Pipeline finished. Curated datasets saved to: /home/jovyan/work/curated/
Sample final profile:
+------------+------------+-----------------+------------------+---------+-------+-----------------------+
|CLIENT_ID   |recency_days|freq_transactions|monetary_total    |rfm_score|at_risk|high_value_low_activity|
+------------+------------+-----------------+------------------+---------+-------+-----------------------+
|ES182220324Q|0           |14               |8865.39           |4-5-5    |0      |0                      |
|ES182420166I|0           |9                |6075.0            |4-4-5    |0      |0                      |
|ES182303796D|0           |14    

### VALOR ECONÓMICO

### INTERACCIÓN Y FIDELIDAD

### RIESGO POTENCIAL

### OPORTUNIDADES COMERCIALES

### ANÁLISIS DE CAUSALIDAD / UPLIFT (para identificar qué ofertas realmente causan más retención)

### EMBEDDING DE COMPORTAMIENTO - SEQUENCE MODELING - (para recomendar productos RNNS/transformers si hay secuencias largas)

### ANOMALÍA TRANSACCIONAL (para detectar fraudes o glitches del sistema)