## Importar librerias

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_timestamp, to_date, avg, count, sum, when, hour, date_sub, current_date

Creamos la SparkSession:

In [2]:
spark = SparkSession.builder \
    .appName("ETL Interactivo Churn Entel") \
    .getOrCreate()

print("SparkSession creada y conectada al clúster.")

25/09/30 04:19:06 WARN Utils: Your hostname, Debian12 resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
25/09/30 04:19:06 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/09/30 04:19:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


SparkSession creada y conectada al clúster.


### Fase: Extract

In [3]:
base_path = "hdfs://localhost:9000/user/bigdata_upao/churn_project/raw"

path_cdr = f"{base_path}/json/llamadas/"
path_facturacion = f"{base_path}/json/facturacion/"
path_social = f"{base_path}/parquet/social_media/"
path_clientes = f"{base_path}/clientes_maestro.csv"

print("Leyendo datos desde la Zona Cruda de HDFS...")

df_cdr_raw = spark.read.option("multiLine", "true").json(path_cdr)
df_facturacion_raw = spark.read.option("multiLine", "true").json(path_facturacion)

df_social_raw = spark.read.parquet(path_social)
df_clientes = spark.read.csv(path_clientes, header=True, inferSchema=True)

print("¡Lectura desde HDFS completada con éxito!")

Leyendo datos desde la Zona Cruda de HDFS...


                                                                                

¡Lectura desde HDFS completada con éxito!


### Fase: Transform

In [4]:
# Limpieza
print("Iniciando limpieza de datos (nulos y tipos)...")
df_cdr = df_cdr_raw.na.drop(subset=["id_cliente", "timestamp"]) \
                     .withColumn("fecha_hora", to_timestamp("timestamp", "yyyy-MM-dd HH:mm:ss"))

df_facturacion = df_facturacion_raw.na.drop(subset=["id_cliente", "fecha_emision"]) \
                                   .withColumn("fecha_factura", to_date("fecha_emision", "yyyy-MM-dd")) \
                                   .na.fill({"reclamo_presentado": False})

df_social = df_social_raw.na.drop(subset=["id_cliente", "timestamp"]) \
                         .withColumn("fecha_hora", to_timestamp("timestamp", "yyyy-MM-dd HH:mm:ss"))


# Filtrado de Datos Irrelevantes
print("Filtrando registros antiguos...")
df_cdr = df_cdr.filter(col("fecha_hora") >= date_sub(current_date(), 365*2)) # Solo últimos 2 años

print("Creando nuevas características y agregando datos por cliente...")

# Añadir hora del día a los CDRs
df_cdr = df_cdr.withColumn("hora_del_dia", hour(col("fecha_hora")))

# Agregación para crear features por cliente
cdr_features = df_cdr.groupBy("id_cliente").agg(
    count("*").alias("total_llamadas_registros"),
    avg("calidad_red").alias("promedio_calidad_red"),
    sum("datos_mb_consumidos").alias("total_mb_consumidos"),
    sum(when(col("duracion_segundos") == 0, 1).otherwise(0)).alias("total_llamadas_fallidas"),
    avg(when((col("hora_del_dia") >= 20) | (col("hora_del_dia") <= 6), 1).otherwise(0)).alias("pct_llamadas_nocturnas")
)

facturacion_features = df_facturacion.groupBy("id_cliente").agg(
    sum(when(col("estado_pago") == "VENCIDO", 1).otherwise(0)).alias("total_facturas_vencidas"),
    sum(when(col("reclamo_presentado") == True, 1).otherwise(0)).alias("total_reclamos_facturacion")
)

social_features = df_social.groupBy("id_cliente").agg(
    count("*").alias("total_comentarios_sociales")
)

# Unión
print("Unificando todas las fuentes de datos...")
df_final = df_clientes \
    .join(cdr_features, "id_cliente", "left") \
    .join(facturacion_features, "id_cliente", "left") \
    .join(social_features, "id_cliente", "left") \
    .na.fill(0) # Rellenar nulos post-join con 0

print("Tabla final de características creada:")
df_final.printSchema()
df_final.show(10)

Iniciando limpieza de datos (nulos y tipos)...
Filtrando registros antiguos...
Creando nuevas características y agregando datos por cliente...
Unificando todas las fuentes de datos...
Tabla final de características creada:
root
 |-- id_cliente: string (nullable = true)
 |-- nombre_completo: string (nullable = true)
 |-- fecha_alta: date (nullable = true)
 |-- id_plan: string (nullable = true)
 |-- region: string (nullable = true)
 |-- churn_futuro: boolean (nullable = true)
 |-- total_llamadas_registros: long (nullable = true)
 |-- promedio_calidad_red: double (nullable = false)
 |-- total_mb_consumidos: double (nullable = false)
 |-- total_llamadas_fallidas: long (nullable = true)
 |-- pct_llamadas_nocturnas: double (nullable = false)
 |-- total_facturas_vencidas: long (nullable = true)
 |-- total_reclamos_facturacion: long (nullable = true)
 |-- total_comentarios_sociales: long (nullable = true)



25/09/30 04:30:32 WARN NettyRpcEnv: Ignored failure: java.util.concurrent.TimeoutException: Cannot receive any reply from 10.0.2.15:34863 in 10000 milliseconds
25/09/30 04:31:11 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [10000 milliseconds]. This timeout is controlled by spark.executor.heartbeatInterval
	at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:47)
	at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:62)
	at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:58)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:101)
	at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scal

KeyboardInterrupt: 

25/09/30 05:10:04 WARN NettyRpcEnv: Ignored message: true
25/09/30 05:10:05 WARN NettyRpcEnv: Ignored message: true


### Fase: Load

In [None]:
path_salida = "/user/bigdata_upao/churn_project/processed/customer_features"

print(f"Guardando la tabla final en formato Parquet en: {path_salida}")
df_final.write.mode("overwrite").parquet(path_salida)

print("¡Proceso ETL completado con éxito!")

Guardando la tabla final en formato Parquet en: /user/bigdata_upao/churn_project/processed/customer_features


25/09/30 05:11:51 WARN NettyRpcEnv: Ignored message: true           (0 + 1) / 1]
25/09/30 05:12:12 WARN NettyRpcEnv: Ignored message: true
25/09/30 05:11:51 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [10000 milliseconds]. This timeout is controlled by spark.executor.heartbeatInterval
	at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:47)
	at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:62)
	at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:58)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:101)
	at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1219)
	at org.apach

Detenemos la sesion:

In [None]:
spark.stop()