# 03 - Procesamiento con PySpark

Este notebook demuestra cómo usar PySpark para procesar datos de bases de datos a gran escala.

## Objetivos:
- Conectar a bases de datos usando Spark
- Procesar datos de manera distribuida
- Realizar transformaciones eficientes
- Guardar resultados en diferentes formatos
- Optimizar consultas para mejor rendimiento

In [None]:
# Importar librerías necesarias
import sys
sys.path.append('../src')

from etl.spark_loader import SparkDataLoader, create_spark_loader
from pyspark.sql import functions as F
from pyspark.sql.types import *
import yaml

# Crear instancia de SparkDataLoader
spark_loader = create_spark_loader("TSCDIA-Demo")
spark = spark_loader.spark

print("✅ Spark inicializado correctamente")
print(f"Versión de Spark: {spark.version}")
print(f"Contexto: {spark.sparkContext.appName}")

## 1. Cargar Datos desde Base de Datos

In [None]:
# Configurar parámetros de conexión (ajustar según tu BD)
db_params = {
    "url": "jdbc:postgresql://localhost:5432/mi_database",
    "user": "mi_usuario",
    "password": "mi_password"
}

# Opción 1: Cargar tabla completa
# df = spark_loader.read_from_postgres("mi_tabla", **db_params)

# Opción 2: Cargar con query personalizada
# query = """
# SELECT 
#     id,
#     fecha,
#     categoria,
#     valor,
#     region
# FROM mi_tabla 
# WHERE fecha >= '2024-01-01'
# """
# df = spark_loader.read_from_postgres(query, **db_params)

# Para este ejemplo, crear datos de muestra
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, DateType
import datetime

# Esquema de ejemplo
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("fecha", DateType(), True),
    StructField("categoria", StringType(), True),
    StructField("valor", DoubleType(), True),
    StructField("region", StringType(), True)
])

# Datos de ejemplo
data = [
    (1, datetime.date(2024, 1, 1), "A", 100.0, "Norte"),
    (2, datetime.date(2024, 1, 2), "B", 150.0, "Sur"),
    (3, datetime.date(2024, 1, 3), "A", 200.0, "Este"),
    (4, datetime.date(2024, 1, 4), "C", 175.0, "Oeste"),
    (5, datetime.date(2024, 1, 5), "B", 120.0, "Norte")
]

df = spark.createDataFrame(data, schema)

print(f"Datos cargados: {df.count()} filas")
df.show(5)

## 2. Exploración Inicial con Spark

In [None]:
# Información básica del DataFrame
print("=== INFORMACIÓN DEL DATAFRAME ===")
print(f"Número de filas: {df.count()}")
print(f"Número de columnas: {len(df.columns)}")
print(f"Columnas: {df.columns}")

# Schema
print("\n=== ESQUEMA ===")
df.printSchema()

# Estadísticas descriptivas
print("\n=== ESTADÍSTICAS DESCRIPTIVAS ===")
df.describe().show()

In [None]:
# Análisis de calidad de datos
quality_report = spark_loader.basic_data_quality_check(df)

print("=== REPORTE DE CALIDAD ===")
print(f"Total filas: {quality_report['total_rows']}")
print(f"Total columnas: {quality_report['total_columns']}")

print("\nAnálisis de valores nulos:")
for col, stats in quality_report['null_analysis'].items():
    print(f"  {col}: {stats['null_count']} nulos ({stats['null_percentage']:.2f}%)")

## 3. Transformaciones con Spark

In [None]:
# Transformaciones básicas
from pyspark.sql import functions as F

# Agregar nuevas columnas
df_transformed = df.withColumn("valor_log", F.log(F.col("valor"))) \
                   .withColumn("año", F.year(F.col("fecha"))) \
                   .withColumn("mes", F.month(F.col("fecha"))) \
                   .withColumn("valor_categoria", 
                              F.when(F.col("valor") > 150, "Alto")
                               .when(F.col("valor") > 100, "Medio")
                               .otherwise("Bajo"))

print("DataFrame transformado:")
df_transformed.show()

# Verificar schema actualizado
print("\nSchema actualizado:")
df_transformed.printSchema()

In [None]:
# Agregaciones y agrupaciones
print("=== AGREGACIONES ===")

# Por categoría
agg_categoria = df_transformed.groupBy("categoria") \
    .agg(
        F.count("*").alias("total_registros"),
        F.sum("valor").alias("valor_total"),
        F.avg("valor").alias("valor_promedio"),
        F.min("valor").alias("valor_minimo"),
        F.max("valor").alias("valor_maximo")
    ).orderBy(F.desc("valor_total"))

print("Agregación por categoría:")
agg_categoria.show()

# Por región
agg_region = df_transformed.groupBy("region") \
    .agg(
        F.count("*").alias("registros"),
        F.avg("valor").alias("promedio")
    )

print("Agregación por región:")
agg_region.show()

## 4. Filtros y Consultas Complejas

In [None]:
# Filtros múltiples
df_filtered = df_transformed.filter(
    (F.col("valor") > 120) & 
    (F.col("categoria").isin(["A", "B"])) &
    (F.col("region") != "Oeste")
)

print("Datos filtrados:")
df_filtered.show()

# Usar SQL sobre DataFrame
df_transformed.createOrReplaceTempView("datos")

sql_result = spark.sql("""
    SELECT 
        categoria,
        region,
        COUNT(*) as total,
        AVG(valor) as promedio,
        PERCENTILE_APPROX(valor, 0.5) as mediana
    FROM datos
    WHERE valor > 100
    GROUP BY categoria, region
    ORDER BY promedio DESC
""")

print("\nResultado de consulta SQL:")
sql_result.show()

## 5. Joins y Combinaciones

In [None]:
# Crear DataFrame adicional para join
schema_info = StructType([
    StructField("categoria", StringType(), True),
    StructField("descripcion", StringType(), True),
    StructField("prioridad", IntegerType(), True)
])

data_info = [
    ("A", "Categoría Premium", 1),
    ("B", "Categoría Estándar", 2),
    ("C", "Categoría Básica", 3)
]

df_info = spark.createDataFrame(data_info, schema_info)

print("DataFrame de información:")
df_info.show()

# Realizar join
df_joined = df_transformed.join(
    df_info, 
    on="categoria", 
    how="left"
)

print("DataFrame con join:")
df_joined.select("id", "categoria", "descripcion", "valor", "prioridad").show()

## 6. Optimización y Performance

In [None]:
# Cache para DataFrames que se usan múltiples veces
df_transformed.cache()

# Verificar particiones
print(f"Número de particiones: {df_transformed.rdd.getNumPartitions()}")

# Reparticionar si es necesario
df_repartitioned = df_transformed.repartition(2, "categoria")
print(f"Particiones después de repartition: {df_repartitioned.rdd.getNumPartitions()}")

# Explain plan para ver optimizaciones
print("\nPlan de ejecución:")
df_transformed.groupBy("categoria").count().explain(True)

## 7. Guardar Resultados

In [None]:
# Guardar como Parquet (formato recomendado)
output_path = "../data/processed/spark_output"

# Guardar particionado por categoría
spark_loader.save_to_parquet(
    df_transformed, 
    output_path + "/parquet", 
    partition_by="categoria"
)

# Guardar como CSV (para compatibilidad)
df_transformed.coalesce(1) \
    .write \
    .mode("overwrite") \
    .option("header", "true") \
    .csv(output_path + "/csv")

# Guardar agregaciones
agg_categoria.coalesce(1) \
    .write \
    .mode("overwrite") \
    .option("header", "true") \
    .csv(output_path + "/agregaciones")

print("✅ Datos guardados exitosamente")

In [None]:
# Opcional: Escribir de vuelta a base de datos
# spark_loader.write_to_database(
#     df_transformed,
#     "tabla_procesada",
#     mode="overwrite",
#     **db_params
# )

## 8. Monitoreo y Métricas

In [None]:
# Obtener métricas del job
print("=== MÉTRICAS DE SPARK ===")
print(f"Aplicación: {spark.sparkContext.appName}")
print(f"ID de aplicación: {spark.sparkContext.applicationId}")
print(f"Modo de deploy: {spark.sparkContext.deployMode}")
print(f"Master: {spark.sparkContext.master}")

# Estadísticas de memoria
status = spark.sparkContext.statusTracker()
print(f"\nEjecutores activos: {len(status.getExecutorInfos())}")

# Acceder a Spark UI
print(f"\nSpark UI disponible en: {spark.sparkContext.uiWebUrl}")

## 9. Limpieza y Cierre

In [None]:
# Limpiar cache
df_transformed.unpersist()

# Detener Spark (opcional - se puede mantener para otros notebooks)
# spark_loader.stop_spark()

print("✅ Procesamiento completado")

## 10. Mejores Prácticas y Tips

### Performance:
- Usar `cache()` para DataFrames reutilizados
- Particionar datos por columnas que se usan frecuentemente en filtros
- Usar formatos eficientes como Parquet
- Evitar `collect()` en DataFrames grandes

### Bases de Datos:
- Usar particionado al leer tablas grandes
- Filtrar en la query SQL cuando sea posible
- Considerar índices en las tablas fuente
- Usar connection pooling para múltiples conexiones

### Desarrollo:
- Testear con muestras pequeñas primero
- Usar `explain()` para entender planes de ejecución
- Monitorear Spark UI durante desarrollo
- Configurar logs apropiadamente

### Producción:
- Configurar recursos apropiados (memoria, cores)
- Implementar manejo de errores robusto
- Usar checkpointing para jobs largos
- Monitorear métricas de performance