# Clase 1: Introducción a Big Data y PySpark

**Rol del docente:** Profesor de Big Data.

**Objetivo general:** Al finalizar la sesión, el estudiante comprende los fundamentos de Big Data, la arquitectura básica de Apache Spark y es capaz de ejecutar un análisis inicial de un *dataset* grande con PySpark aplicando buenas prácticas de ingeniería, calidad de datos y estándares.

---


## **1) Fundamentos de Big Data**

### **1.1 ¿Qué es Big Data?**

Big Data **no se trata únicamente de tener “muchos datos”**, sino de la **capacidad de procesarlos, analizarlos y generar valor a partir de ellos** usando tecnologías avanzadas y equipos humanos especializados. Veamos sus características fundamentales y por qué es un concepto mucho más profundo que simplemente “datos grandes”.

---

### **Las 5V (y extensiones 7V)**

1. **Volumen**

   * Se refiere a la **cantidad masiva de datos** que generan las empresas, dispositivos IoT, redes sociales, sensores, transacciones, etc.
   * Ejemplo: Facebook procesa más de *4 petabytes* de datos diarios.

**Caso real:** Lectura del dataset de “Yellow Taxi Trip Records” de Nueva York (disponible públicamente en Amazon S3), que contiene millones de viajes de taxi por mes.

#### Instalar libreria

```
pip install pyspark
```

#### Solución

In [None]:
# PySpark: Framework de procesamiento distribuido para grandes volúmenes de datos
# (Este script descarga y analiza un parquet público de NYC Taxi de forma local)

# Importar SparkSession: punto de entrada para crear DataFrames y ejecutar operaciones Spark
from pyspark.sql import SparkSession

# Importar requests para descargar archivos vía HTTP(S)
import requests

# Importar warnings para suprimir advertencias no relevantes en la salida
import warnings

# Suprimir advertencias de la librería warnings para mantener la salida limpia
warnings.filterwarnings('ignore')

# -------------------------
# 1. Crear la sesión de Spark
# -------------------------
# Iniciamos el "builder" para configurar la SparkSession
# .appName() define el nombre que verá en la UI de Spark
# .config(...) añade configuraciones específicas (aquí ejemplos de ajustes)
# .getOrCreate() crea la sesión si no existe o devuelve la existente
spark = SparkSession.builder \
    .appName("BigData_Volumen") \
    .config("spark.sql.debug.maxToStringFields", "100") \
    .config("spark.sql.adaptive.enabled", "false") \
    .getOrCreate()

# Ajustar el nivel de logs del SparkContext para evitar ver mucho detalle (INFO/DEBUG)
spark.sparkContext.setLogLevel("ERROR")

# 2. Definir URL y ruta local para descargar dataset
# URL pública hacia el archivo Parquet (CloudFront público que sirve datos de NYC taxi)
url = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-01.parquet"
# Nombre del archivo local donde lo guardaremos
local_path = "yellow_tripdata_2024-01.parquet"

# 2b. Descargar dataset (manejo de errores básico)
try:
    # Mensaje informativo para el usuario/alumno
    print("Descargando dataset...")
    # Realiza la petición HTTP con un timeout para no colgar indefinidamente
    response = requests.get(url, timeout=30)
    # Abrir el archivo local en modo binario y escribir el contenido descargado
    with open(local_path, "wb") as f:
        f.write(response.content)
    # Mensaje de éxito
    print("Descarga completada")
except Exception as e:
    # Si ocurre cualquier error de red o IO, lo mostramos (podrías manejarlo más finamente)
    print(f"Error en descarga: {e}")

# 3. Leer el archivo Parquet con Spark
# read.parquet() es la forma nativa y más eficiente de leer Parquet en Spark
df = spark.read.parquet(local_path)

# 4. Inspección básica del dataset
print("INFORMACIÓN DEL DATASET NYC TAXI")

# Contar el número total de registros (count() es una acción que dispara ejecución)
total_records = df.count()
# Imprimir el número total con formato de miles
print(f"\nTOTAL DE REGISTROS: {total_records:,}")

# Mostrar sólo las primeras 3 filas con algunas columnas relevantes, sin truncamiento
print("\nPRIMERAS 3 FILAS:")
df.select("VendorID", "tpep_pickup_datetime", "fare_amount", "tip_amount", "total_amount") \
  .show(3, truncate=False)

# 4b. Mostrar esquema de forma ordenada (campo → tipo)
print("\nESQUEMA DEL DATASET:")
# Creamos un diccionario para ordenar/mostrar el esquema más legible
schema_dict = {}
for field in df.schema.fields:
    # field.name es el nombre de la columna; field.dataType es su tipo (p. ej., TimestampType, DoubleType)
    schema_dict[field.name] = str(field.dataType)

# Imprimir el esquema enumerado y con alineación para que sea fácil de leer en clase
for i, (field_name, field_type) in enumerate(schema_dict.items(), 1):
    print(f"  {i:2d}. {field_name:<25} → {field_type}")

# 5. Análisis estadístico básico (tarifas y propinas)
print("\nANÁLISIS ESTADÍSTICO BÁSICO")

# Columnas numéricas de interés para el ejemplo
numeric_cols = ['fare_amount', 'tip_amount', 'total_amount', 'trip_distance']

# Iteramos sobre las columnas numéricas y mostramos estadísticas resumidas
for col in numeric_cols:
    # Comprobar que la columna exista en el DataFrame (por compatibilidad entre versiones de dataset)
    if col in df.columns:
        # summary() produce métricas: count, mean, stddev, min, max (entre otras si se pide)
        # collect() trae las filas resultantes al driver como lista de Rows
        stats = df.select(col).summary("count", "mean", "stddev", "min", "max").collect()
        # Si stats no está vacío procedemos a extraer los valores
        if stats:
            # Cada entrada de stats es una fila; accedemos por índice y luego por nombre de columna
            count = stats[0][col]    # 'count' viene como string (por eso mostramos tal cual)
            mean = stats[1][col]     # 'mean' normalmente es string convertible a float
            std = stats[2][col]      # 'stddev' como string
            min_val = stats[3][col]  # 'min'
            max_val = stats[4][col]  # 'max'
            
            # Imprimir las estadísticas con formato legible (conversión a float para formateo)
            print(f"\n  {col.upper()}:")
            print(f"    • Total: {count}")
            # Guardamos un manejo seguro al convertir a float (siempre que no sea 'null' o cadena vacía)
            try:
                print(f"    • Promedio: ${float(mean):.2f}")
            except Exception:
                print(f"    • Promedio: {mean} (no convertible a float)")
            try:
                print(f"    • Desviación: ${float(std):.2f}")
            except Exception:
                print(f"    • Desviación: {std} (no convertible a float)")
            # Min / Max también formateados si son convertibles
            try:
                print(f"    • Rango: ${float(min_val):.2f} - ${float(max_val):.2f}")
            except Exception:
                print(f"    • Rango: {min_val} - {max_val}")

# 6. Cerrar la sesión de Spark (liberar recursos)
spark.stop()


2. **Velocidad**

   * Describe la **rapidez con la que los datos se generan y deben procesarse**.
   * Ejemplo: Transacciones de tarjetas de crédito que deben validarse en milisegundos para evitar fraude.


**Caso empresarial:** detección de fraude en tiempo real.

3. **Variedad**

   * Son los **diferentes formatos y fuentes**: datos estructurados (bases de datos), semiestructurados (JSON, XML) y no estructurados (imágenes, videos, texto libre).
   * Ejemplo: En salud, los datos provienen de historiales clínicos, estudios de imagen, sensores portátiles, etc.

**Caso empresarial:** combinar ventas (estructuradas) con clics web (semiestructurados) para análisis omnicanal.

In [None]:
# Importar SparkSession: punto de entrada para crear DataFrames y usar Spark SQL
from pyspark.sql import SparkSession

# Importar requests: librería para realizar peticiones HTTP (descargar archivos desde internet)
import requests

# Importar os: utilidades del sistema de archivos (rutas, creación de carpetas, comprobaciones)
import os

# Crear sesión de Spark
# builder() arma la configuración; appName() nombra la aplicación (aparece en la UI de Spark);
# getOrCreate() crea la sesión si no existe o devuelve la existente.
spark = SparkSession.builder.appName("Streaming_Clase").getOrCreate()

print("=== DESCARGANDO DATASETS DESDE INTERNET ===")

# Definición de la URL del dataset 1 (aeropuertos). Es un CSV público alojado en GitHub.
airports_url = "https://raw.githubusercontent.com/datasets/airport-codes/master/data/airport-codes.csv"

# Ruta local donde se pretende guardar el CSV de aeropuertos
airports_file = "airports_from_internet.csv"

# Mostrar en consola de dónde se va a descargar el archivo (útil para depuración y explicación)
print(f"Descargando aeropuertos desde: {airports_url}")

# En producción conviene crear la carpeta con os.makedirs(output_dir, exist_ok=True) antes de escribir.
try:
    # Realizar la petición HTTP GET hacia la URL del CSV (puede tardar según la red)
    response = requests.get(airports_url, timeout=30)
    # Abrir (o crear) el archivo local en modo texto con codificación UTF-8 y escribir el contenido
    with open(airports_file, 'w', encoding='utf-8') as f:
        f.write(response.text)
    # Indicar que la descarga fue exitosa
    print("Dataset de aeropuertos descargado exitosamente")
except Exception as e:
    # Si ocurre cualquier error (red, permiso, path inválido), lo imprimimos para diagnóstico
    print(f"Error descargando aeropuertos: {e}")

# -------------------------
# Dataset 2: Países (CSV real)
# -------------------------

# URL pública del dataset de códigos de país (CSV en GitHub)
countries_url = "https://raw.githubusercontent.com/datasets/country-codes/master/data/country-codes.csv"

# Ruta local donde se guardará el CSV de países
countries_file = "countries_from_internet.csv"

# Mensaje en consola indicando la URL de descarga del segundo dataset
print(f"Descargando países desde: {countries_url}")

try:
    # Petición HTTP GET para descargar el CSV de países
    response = requests.get(countries_url, timeout=30)
    # Escribir el contenido descargado en el archivo local con codificación UTF-8
    with open(countries_file, 'w', encoding='utf-8') as f:
        f.write(response.text)
    # Mensaje de éxito
    print("Dataset de países descargado exitosamente")
except Exception as e:
    # Mensaje de error para depuración si la descarga falla
    print(f"Error descargando países: {e}")

# Línea en blanco para separar secciones en la salida por consola
print("\n=== LEYENDO DATASETS DESCARGADOS ===")

# Leer el CSV de aeropuertos con Spark:
# - .read crea un lector de DataFrame
# - .option("header", True) indica que la primera fila contiene nombres de columnas
# - .csv(path) lee el CSV desde la ruta local proporcionada
df_airports = spark.read.option("header", True).csv(airports_file)

# Leer el CSV de países con Spark (mismas opciones)
df_countries = spark.read.option("header", True).csv(countries_file)

# Mensaje indicando que vamos a mostrar el esquema (estructura de columnas) del dataset de aeropuertos
print("=== SCHEMA DE AEROPUERTOS (DESCARGADO) ===")

# printSchema() imprime en consola los nombres de columnas y sus tipos inferidos por Spark
df_airports.printSchema()

# Mensaje indicando que ahora mostraremos el esquema del dataset de países
print("\n=== SCHEMA DE PAÍSES (DESCARGADO) ===")

# Imprimir el esquema de países
df_countries.printSchema()

# Mensaje que indica que vamos a mostrar una muestra de los datos de aeropuertos
print("\n=== MUESTRA DE AEROPUERTOS ===")

# Seleccionar columnas relevantes del DataFrame de aeropuertos y mostrar 5 filas sin truncar
# - select(...) elige columnas
# - show(5, truncate=False) muestra 5 registros y no trunca texto largo
df_airports.select("ident", "name", "iso_country", "type").show(5, truncate=False)

# Mensaje que indica que vamos a mostrar una muestra del dataset de países
print("\n=== MUESTRA DE PAÍSES ===")

# Seleccionar columnas clave del DataFrame de países y mostrar 5 filas sin truncar
# Observa que el nombre de la columna "ISO3166-1-Alpha-2" incluye guiones; Spark lo acepta tal cual
df_countries.select("ISO3166-1-Alpha-2", "official_name_en", "Continent").show(5, truncate=False)

# Finalmente, cerrar la sesión de Spark para liberar recursos del cluster/local
spark.stop()


4. **Veracidad**

   * Garantizar que los datos sean **fiables y precisos**, evitando errores o duplicidades.
   * Ejemplo: Si un sensor de temperatura falla y envía lecturas incorrectas, podría afectar un sistema industrial automatizado.

**Caso empresarial:** asegurarse de que no entren datos corruptos al pipeline financiero.

In [None]:
# Importar SparkSession: punto de entrada para trabajar con DataFrames en PySpark
from pyspark.sql import SparkSession

# Importar funciones de conveniencia de pyspark.sql.functions
# - col: referencia columnas en expresiones
# - when, count, length, regexp_replace, upper, trim: funciones útiles para limpieza y validación
from pyspark.sql.functions import col, when, count, length, regexp_replace, upper, trim

# Importar tipos (no usado explícitamente en el script, pero disponible si necesitas esquemas)
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType

# Importar requests para descargar archivos desde una URL
import requests

# -------------------------
# Crear sesión de Spark
# -------------------------
# .builder: inicia la configuración de la SparkSession
# .appName(): nombre de la aplicación (aparece en UI)
# .getOrCreate(): crea la sesión o devuelve la existente
spark = SparkSession.builder.appName("Veracidad_Datos").getOrCreate()

# Mensaje informativo para la consola / presentación
print("=== SISTEMA DE VERACIDAD DE DATOS ===")
print("Garantizando datos fiables, precisos y sin errores")

# -------------------------
# Descargar dataset desde internet
# -------------------------
print("\n=== DESCARGANDO DATASET DESDE INTERNET ===")

# URL pública del dataset de aeropuertos (CSV alojado en GitHub)
url = "https://raw.githubusercontent.com/datasets/airport-codes/master/data/airport-codes.csv"

# Nombre de archivo local donde guardaremos el CSV descargado
archivo = "aeropuertos.csv"

# Mensaje indicando la URL que se va a descargar (útil para depuración)
print(f"Descargando dataset desde: {url}")

# Intento de descarga con manejo básico de excepciones
try:
    # Realiza la petición HTTP GET (sin timeout en este ejemplo; en producción añade timeout)
    response = requests.get(url, timeout=30)
    # Escribe el contenido de la respuesta en un archivo local (modo texto con encoding UTF-8)
    with open(archivo, 'w', encoding='utf-8') as f:
        f.write(response.text)
    # Mensaje de éxito
    print("Dataset descargado exitosamente")
except Exception as e:
    # Si ocurre cualquier problema (red, permisos, path), lo mostramos en consola
    print(f"Error descargando: {e}")

# -------------------------
# 1. DETECCIÓN DE DATOS CORRUPTOS
# -------------------------
print("\n=== 1. DETECCIÓN DE DATOS CORRUPTOS ===")

# Leer el CSV con Spark indicando que la primera fila es cabecera
df = spark.read.option("header", True).csv(archivo)

# Mostrar estado inicial: número de registros (count() fuerza ejecución distribuida)
print("ESTADO INICIAL DEL DATASET:")
print(f"Total de registros: {df.count()}")
print(f"Total de columnas: {len(df.columns)}")

# Detectar valores faltantes por columna (isNull detecta solo NULL, no strings vacíos)
print("\nVALORES FALTANTES POR COLUMNA:")
for col_name in df.columns:
    # Cada filter.count() ejecuta una acción; en datasets grandes puede ser costoso
    missing_count = df.filter(col(col_name).isNull()).count()
    if missing_count > 0:
        print(f"  {col_name}: {missing_count} valores faltantes")

# Detectar duplicados comparando todas las columnas (groupBy(df.columns).count())
# Nota: agrupar por todas las columnas puede ser muy costoso si el dataset es grande
duplicates = df.groupBy(df.columns).count().filter(col("count") > 1)
duplicate_count = duplicates.count()
print(f"\nDUPLICADOS ENCONTRADOS: {duplicate_count}")

# Si hay duplicados, mostramos ejemplos
if duplicate_count > 0:
    print("Ejemplos de duplicados:")
    duplicates.show(5, truncate=False)

# -------------------------
# 2. LIMPIEZA Y VALIDACIÓN DE DATOS
# -------------------------
print("\n=== 2. LIMPIEZA Y VALIDACIÓN DE DATOS ===")

# Eliminar filas que no tienen 'ident' o 'name' (clave mínima para identificar aeropuerto)
# .na.drop() elimina filas con NULLs en las columnas indicadas
df_clean = df.na.drop(subset=["ident", "name"])

# Validar formato de 'iso_country': filtramos solo códigos de longitud 2 (ej: 'US', 'CO')
df_clean = df_clean.filter(length(col("iso_country")) == 2)

# Limpiar el nombre: quitar espacios exteriores y convertir a mayúsculas => columna 'name_clean'
df_clean = df_clean.withColumn("name_clean", trim(upper(col("name"))))

# Validar coordenadas: mantener solo filas cuya columna 'coordinates' contiene una coma (lon,lat)
# Esto evita valores sin formato esperado
df_clean = df_clean.filter(col("coordinates").contains(","))

# Mostrar número de registros tras la limpieza
# (recalcular count -> acción costosa; en produccion considere cache() antes)
print(f"Registros después de limpieza: {df_clean.count()}")

# -------------------------
# 3. CONTROL DE CALIDAD (reglas)
# -------------------------
print("\n=== 3. CONTROL DE CALIDAD ===")

# Preparar lista para almacenar resultados de cada regla de validación
validation_results = []

# Regla 1: Todos los registros limpios deben tener 'ident' no nulo
ident_count = df_clean.filter(col("ident").isNotNull()).count()
total_count = df_clean.count()
validation_results.append(("Ident único", ident_count == total_count, f"{ident_count}/{total_count}"))

# Regla 2: Códigos de país válidos (longitud == 2)
valid_countries = df_clean.filter(length(col("iso_country")) == 2).count()
validation_results.append(("Códigos país válidos", valid_countries == total_count, f"{valid_countries}/{total_count}"))

# Regla 3: 'type' debe estar dentro de tipos esperados
valid_types = ["large_airport", "medium_airport", "small_airport", "heliport", "seaplane_base"]
valid_type_count = df_clean.filter(col("type").isin(valid_types)).count()
validation_results.append(("Tipos válidos", valid_type_count == total_count, f"{valid_type_count}/{total_count}"))

# Imprimir los resultados de las validaciones con estado claro
print("📋 RESULTADOS DE VALIDACIÓN:")
for rule, passed, details in validation_results:
    status = "PASÓ" if passed else "FALLÓ"
    print(f"  {rule}: {status} ({details})")

# -------------------------
# 4. DETECCIÓN DE ANOMALÍAS
# -------------------------
print("\n=== 4. DETECCIÓN DE ANOMALÍAS ===")

# Anomalía 1: contar aeropuertos sin coordenadas (NULL o cadena vacía)
sin_coordenadas = df_clean.filter(col("coordinates").isNull() | (col("coordinates") == "")).count()
print(f"Aeropuertos sin coordenadas: {sin_coordenadas}")

# Anomalía 2: nombres muy cortos (menos de 3 caracteres) — posible dato corrupto o abreviatura
nombres_cortos = df_clean.filter(length(col("name")) < 3).count()
print(f"Nombres muy cortos (<3 chars): {nombres_cortos}")

# Anomalía 3: tipos desconocidos (no están en la lista 'valid_types')
tipos_raros = df_clean.filter(~col("type").isin(valid_types)).count()
print(f"Tipos de aeropuerto desconocidos: {tipos_raros}")

# -------------------------
# 5. REPORTE FINAL DE CALIDAD
# -------------------------
print("\n=== 5. REPORTE FINAL DE CALIDAD ===")

# Calcular puntaje simple de calidad: porcentaje de reglas que pasaron
quality_score = sum(1 for _, passed, _ in validation_results if passed) / len(validation_results) * 100
print(f"PUNTAJE DE CALIDAD: {quality_score:.1f}%")

# Interpretación textual del puntaje
if quality_score >= 90:
    print("EXCELENTE: Los datos son altamente confiables")
elif quality_score >= 70:
    print("BUENO: Los datos son confiables con algunas excepciones")
else:
    print("ADVERTENCIA: Los datos requieren revisión manual")

# Resumen con números absolutos
print(f"\nRESUMEN FINAL:")
print(f"  - Registros originales: {df.count()}")
print(f"  - Registros después de limpieza: {df_clean.count()}")
print(f"  - Registros perdidos: {df.count() - df_clean.count()}")
print(f"  - Porcentaje de retención: {(df_clean.count() / df.count() * 100):.1f}%")

# Mostrar una muestra de 5 registros limpios y validados para inspección
print("\n=== MUESTRA DE DATOS VALIDADOS ===")
df_clean.select("ident", "name_clean", "iso_country", "type", "coordinates").show(5, truncate=False)

# Cerrar la sesión de Spark para liberar recursos
spark.stop()


5. **Valor**

   * No basta con almacenar datos: deben **traducirse en conocimiento útil** o en ventajas competitivas.
   * Ejemplo: Amazon usa datos de compras para recomendar productos relevantes y aumentar ventas.

**Caso empresarial:** usar los datos para priorizar productos más rentables.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, desc, when
import requests

# === 1. INICIO DE SESIÓN SPARK ===
spark = SparkSession.builder.appName("Valor_Datos").getOrCreate()

print("=== EXTRACCIÓN DE VALOR DE DATOS ===")
print("Transformando datos en conocimiento útil")

# === 2. DESCARGA DEL DATASET REAL ===
url = "https://raw.githubusercontent.com/datasets/airport-codes/master/data/airport-codes.csv"
archivo = "aeropuertos.csv"

print(f"Descargando desde: {url}")
response = requests.get(url)
with open(archivo, 'w', encoding='utf-8') as f:
    f.write(response.text)
print("Dataset descargado")

# === 3. LECTURA DEL DATASET CON SPARK ===
df = spark.read.option("header", True).csv(archivo)
print(f"Total aeropuertos: {df.count()}")   # Contar filas totales

# === 4. ANÁLISIS ESTRATÉGICO ===
print("\n=== 1. TOP 5 PAÍSES CON MÁS AEROPUERTOS ===")
top_countries = (
    df.groupBy("iso_country")                # Agrupar por país
      .agg(count("*").alias("total"))        # Contar aeropuertos por país
      .orderBy(desc("total"))                # Ordenar de mayor a menor
      .limit(5)                              # Quedarse con top 5
)
top_countries.show(truncate=False)           # Mostrar resultado sin truncar

# === 5. SEGMENTACIÓN DE MERCADO ===
print("\n=== 2. DISTRIBUCIÓN POR TIPO ===")
market_segments = (
    df.groupBy("type")                       # Agrupar por tipo de aeropuerto
      .agg(count("*").alias("cantidad"))     # Contar por tipo
      .orderBy(desc("cantidad"))             # Ordenar descendente
)
market_segments.show(truncate=False)

# === 6. OPORTUNIDADES DE INVERSIÓN ===
print("\n=== 3. PAÍSES CON SOLO AEROPUERTOS PEQUEÑOS ===")
opportunities = (
    df.groupBy("iso_country")                                         # Agrupar por país
      .agg(
          count("*").alias("total"),                                  # Total de aeropuertos
          count(when(col("type") == "small_airport", True)).alias("pequeños"),  # Contar solo pequeños
          count(when(col("type") == "large_airport", True)).alias("grandes")   # Contar grandes
      )
      .filter((col("grandes") == 0) & (col("total") > 10))            # Países sin aeropuertos grandes pero con >10 aeropuertos
      .orderBy(desc("total"))                                         # Ordenar descendente
      .limit(5)                                                       # Limitar a 5
)
opportunities.show(truncate=False)

# === 7. REPORTE FINAL ===
print("\n=== REPORTE FINAL ===")
total_airports = df.count()
large_airports = df.filter(col("type") == "large_airport").count()
small_airports = df.filter(col("type") == "small_airport").count()

print("RESUMEN:")
print(f"  - Aeropuertos grandes: {large_airports} ({large_airports/total_airports*100:.1f}%)")
print(f"  - Aeropuertos pequeños: {small_airports} ({small_airports/total_airports*100:.1f}%)")

# Cálculo simple de valor estratégico ponderado
strategic_value = (large_airports * 0.4 + small_airports * 0.1) / total_airports * 100
print(f"VALOR ESTRATÉGICO: {strategic_value:.1f}%")

# Evaluación de resultados
if strategic_value >= 20:
    print("BUENO: Oportunidades claras identificadas")
else:
    print("MODERADO: Requiere análisis adicional")

print("\nAPLICACIONES:")
print("  - Identificar mercados objetivo")
print("  - Oportunidades de inversión")
print("  - Estrategias de expansión")

# === 8. CIERRE DE SESIÓN ===
spark.stop()


---

### **Extensiones frecuentes: 7V**

* **Variabilidad:** los datos no siempre tienen un comportamiento constante, pueden cambiar su estructura o significado con el tiempo.
* **Visibilidad:** la capacidad de entender qué datos son importantes y cuáles no.

---

### **Big Data ≠ Datos grandes**

**Big Data implica tres elementos clave:**

1. **Procesos:** cómo se capturan, almacenan, procesan y analizan los datos.
2. **Tecnología:** infraestructura necesaria (Hadoop, Spark, NoSQL, cloud computing).
3. **Personas:** especialistas capaces de transformar datos en decisiones (científicos de datos, ingenieros de datos, analistas).

---



### 1.2 Casos de uso (ejemplos reales)

* **Retail**: recomendaciones personalizadas y *forecasting* de demanda.
* **Salud**: analítica de cohortes y detección temprana de eventos adversos.
* **Servicios financieros**: *fraud detection* en tiempo casi real.
* **Transporte urbano**: análisis de flujos, tiempos de viaje, congestión y optimización de rutas.
* **Industria/IoT**: mantenimiento predictivo (sensores, *streams* de eventos).

---

### 1.3 Patrones de procesamiento

* **Batch** (lotes): grandes volúmenes, latencia alta; *data lakes*, ETL/ELT.
* **Streaming** (tiempo real o casi real): baja latencia; Kafka + Spark Structured Streaming.
* **Híbridos**: Lambda/Kappa architectures.

---


## 2) **Ecosistema y Arquitectura de Spark**

### 2.1 **Piezas del ecosistema (visión rápida)**

* **Almacenamiento**:

  * **HDFS, S3, Azure Blob, Google Cloud Storage, lagos de datos**: repositorios donde residen datos estructurados y no estructurados.
  * **Formatos Parquet/ORC (columnar, comprimido)**: (del inglés Apache Parquet) son un formato de archivo de almacenamiento de datos de código abierto, optimizado para el almacenamiento eficiente y el análisis de grandes volúmenes de datos en el ecosistema de big data.


    **Ejemplo:** una aseguradora almacena historiales de pólizas y reclamaciones en Parquet sobre S3 para reducir costos de almacenamiento y acelerar consultas que analizan el riesgo de clientes.


* **Capa de tablas**:

  * **Delta Lake, Apache Iceberg, Apache Hudi**: permiten transacciones ACID (Atomicidad, Consistencia, Aislamiento y Durabilidad), control de versiones (*time travel*) y evolución de esquemas.


    **Ejemplo:** un banco usa Delta Lake para auditar cambios en transacciones históricas: si hay una corrección contable, pueden “viajar en el tiempo” y recuperar el estado de la tabla antes del ajuste.


* **Cómputo**:

  * **Apache Spark (Batch/SQL/ML/Streaming)**: procesamiento masivo en lotes o en tiempo real.
  * **Flink (event-driven)**: orientado a flujos continuos y baja latencia.
  * **Trino/Presto (consulta interactiva)**: consultas ad-hoc rápidas sobre diferentes fuentes.


    **Ejemplo:** un retailer global usa Spark Streaming para detectar picos de demanda en tiempo real y ajustar inventario; mientras su equipo de BI usa Trino para consultas rápidas sin tener que mover datos.


* **Orquestación**:

  * **Airflow, Dagster**: coordinan y programan pipelines complejos.

  
    **Ejemplo:** una empresa de logística usa Airflow para automatizar procesos diarios: extraer datos de sensores de camiones, procesarlos con Spark y generar dashboards de eficiencia de rutas antes de las 7 am.

---


### 2.2 **Arquitectura de Spark**

* **Driver y Executors**:

  * El **Driver** coordina, planifica y crea el DAG de ejecución ((Gráfico Acíclico Dirigido) es un gráfico que representa un flujo de trabajo o pipeline de datos de manera visual); los **Executors** procesan los datos.

    **Ejemplo:** en una empresa de e-commerce, el driver envía tareas de limpieza y agregación de datos de usuarios hacia múltiples executors que corren en un clúster Kubernetes para calcular métricas de conversión.


* **Lazy evaluation**:

  * Las transformaciones no ejecutan de inmediato; Spark construye un DAG y solo realiza el cómputo cuando ocurre una **acción**.

    **Ejemplo:** una fintech aplica transformaciones (`filter`, `join`) sobre millones de transacciones fraudulentas potenciales, pero el cálculo real solo ocurre cuando llaman `count()` para saber cuántos casos sospechosos hay.


* **DataFrame API (Catalyst + Tungsten)**:

  * Preferida sobre RDD (conjunto de datos distribuidos resilientes (RDD): Colección inmutable y tolerante a fallos de elementos que se pueden distribuir en varios nodos de clúster para procesarlos en paralelo) porque aplica optimizaciones automáticas de consultas y memoria.

    **Ejemplo:** una telco usa Spark SQL para resumir registros de llamadas usando DataFrames en lugar de RDD, reduciendo el tiempo de procesamiento de horas a minutos.


* **Transformaciones vs Acciones**:

  * **Transformaciones**: `select`, `filter`, `withColumn` (crean un nuevo plan).
  * **Acciones**: `count`, `collect`, `write` (disparan la ejecución real).
  
    **Ejemplo:** una aerolínea transforma datos de reservas para calcular tarifas promedio (`withColumn` y `groupBy`), pero no procesa nada hasta que escribe el resultado (`write`) en una tabla analítica.


* **Dependencias *narrow* vs *wide***:

  * **Narrow**: cada partición de salida depende de una sola partición de entrada (rápido, sin *shuffle*).
  * **Wide**: requieren reorganización (*shuffle*) de datos entre nodos (más costoso).
    **Ejemplo:** una plataforma de streaming aplica un `map` (narrow) para limpiar registros de usuarios, pero al hacer un `groupBy` por región (wide) Spark debe redistribuir datos por toda la red, aumentando el tiempo de ejecución.

---




## **Paso a paso para configurar S3 y Spark**

### **Paso 1. Crear y configurar tu cuenta de AWS**

1. Ve a [https://aws.amazon.com](https://aws.amazon.com) y crea una cuenta (si no tienes).

2. Entra a la **Consola de AWS**.

3. Ve al servicio **S3** → crea un bucket:

   * Nombre: `aseguradora` (debe ser único en todo AWS).
   * Región: elige una cercana (ej. `us-east-1`).
   * Desactiva “Bloquear todo el acceso público” solo si necesitas probar sin restricciones (en producción, mantenlo seguro).
   * Crea el bucket.

4. **Sube un archivo Parquet**:

   * Dentro del bucket, crea la carpeta `polizas/2025/`.
   * Sube un archivo `*.parquet` dentro.

---

### **Paso 2. Configurar credenciales de AWS en tu PC**

1. Instala la CLI de AWS (si no la tienes):

   ```bash
   pip install awscli
   ```
2. Configura tus credenciales:

   ```bash
   aws configure
   ```

   * AWS Access Key ID → (copiar desde IAM)
   * AWS Secret Access Key → (copiar desde IAM)
   * Default region → `us-east-1` (o la región de tu bucket)
   * Default output format → `json`

> Esto guardará credenciales en `~/.aws/credentials`.

---

### **Paso 3. Instalar PySpark y dependencias para S3**

Necesitas PySpark con el conector de Hadoop para S3:

```bash
pip install pyspark
pip install boto3
```

> Spark usará Hadoop para acceder a S3. Con PySpark >= 3.0 ya incluye soporte para `s3a://` y `s3://`, pero a veces hay que forzarlo.

---

### **Paso 4. Configurar Spark para usar S3**

En tu script o notebook, debes pasar credenciales a Spark:

```python
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("ReadFromS3") \
    .config("spark.hadoop.fs.s3a.access.key", "TU_ACCESS_KEY") \
    .config("spark.hadoop.fs.s3a.secret.key", "TU_SECRET_KEY") \
    .config("spark.hadoop.fs.s3a.endpoint", "s3.amazonaws.com") \
    .getOrCreate()

# Lectura de datos
df_policies = spark.read.parquet("s3a://aseguradora/polizas/2025/")
df_policies.printSchema()
```

> **Importante:**
>
> * Usa `s3a://` en lugar de `s3://` para evitar errores de compatibilidad.
> * Nunca dejes las llaves reales en código público: en producción, Spark las toma automáticamente de `~/.aws/credentials`.

---

## **3. Verificación paso a paso**

1. Verifica que puedes listar el bucket:

   ```bash
   aws s3 ls s3://aseguradora/polizas/2025/
   ```

   Si ves el archivo → tu CLI funciona bien.

2. Ejecuta el script PySpark. Si todo está bien, Spark debe imprimir el esquema de tu Parquet.

---

## **4. Errores comunes y soluciones**

* **`No FileSystem for scheme: s3`** → te falta el conector Hadoop S3 (usa PySpark >= 3.0 o agrega `hadoop-aws` jar).
* **`Access Denied`** → revisa credenciales o permisos IAM (necesitas `s3:GetObject`).
* **`File not found`** → revisa la ruta exacta del archivo/carpeta.

---


EJERCICIO COMPLETO: Ecosistema y Arquitectura de Spark (PySpark)
================================================================

## Ecosistema y Arquitectura de Spark
-------------------------------------
2.1 Piezas del ecosistema (visión rápida)
- Almacenamiento: HDFS, S3, Azure Blob, Google Cloud Storage, lagos de datos. Usamos Parquet/ORC (columnar, comprimido).
- Capa de tablas: Delta Lake, Apache Iceberg, Apache Hudi (ACID, time travel, evolución de esquemas). Aquí usamos Delta si está disponible.
- Cómputo: Apache Spark (Batch/SQL/ML/Streaming), Flink (event-driven), Trino/Presto (consulta interactiva).
- Orquestación: Airflow, Dagster (coordinan pipelines). Incluimos un DAG de Airflow de ejemplo al final (comentado).

2.2 Arquitectura de Spark
- Driver y Executors: el Driver coordina y crea el DAG; los Executors procesan los datos en el clúster.
- Lazy evaluation: transformaciones construyen el plan; las acciones disparan la ejecución.
- DataFrame API (Catalyst + Tungsten): optimización automática; preferible a RDD.
- Transformaciones vs Acciones: select/filter/withColumn (plan), count/collect/write (ejecución).
- Dependencias narrow vs wide: narrow (sin shuffle), wide (con shuffle).

PASO A PASO S3
- Incluye guía para configurar AWS CLI, credenciales, y cómo leer s3a://...

MODO DE USO
-----------
1) LOCAL (recomendado para probar ya):
   - Requisitos: Python 3.9+, Java 8/11, Apache Spark 3.x instalado y en PATH.
   - (Opcional) Delta Lake: `pip install delta-spark`
   - Ejecuta:
       spark-submit ejercicio_spark_completo.py

2) S3 (opcional):
   - Configura AWS CLI: `pip install awscli && aws configure`
   - Crea bucket y sube Parquet como se detalla en el README/teoría.
   - En este script, establece USE_S3 = True y ajusta credenciales/paths.


In [None]:
import os
import sys
import time
from datetime import date
from typing import Tuple

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, DateType

# ======================================================
# Parámetros del ejercicio
# ======================================================

# Cambiar a True si deseas leer desde S3 (requiere credenciales y datos existentes)
USE_S3 = False

# Rutas LOCAL
BASE_DIR = os.path.abspath(os.path.dirname(__file__))
LOCAL_PARQUET_DIR = os.path.join(BASE_DIR, "data", "parquet", "polizas", "2025")
LOCAL_DELTA_DIR   = os.path.join(BASE_DIR, "data", "delta", "polizas")

# Rutas S3 (si USE_S3=True)
S3_INPUT = "s3a://aseguradora/polizas/2025/"
S3_DELTA = "s3a://aseguradora/delta/polizas/"  # si deseas escribir Delta en S3

# Credenciales S3 (solo si no usas ~/.aws/credentials). No pongas llaves reales en código público.
AWS_ACCESS_KEY = os.environ.get("AWS_ACCESS_KEY_ID", "TU_ACCESS_KEY")
AWS_SECRET_KEY = os.environ.get("AWS_SECRET_ACCESS_KEY", "TU_SECRET_KEY")
AWS_ENDPOINT   = os.environ.get("AWS_ENDPOINT", "s3.amazonaws.com")


def build_spark() -> Tuple[SparkSession, bool]:
    """
    Crea la SparkSession. Si delta-spark está instalado, habilita Delta Lake (capa de tablas ACID + time travel).
    Devuelve (spark, delta_enabled).
    """
    delta_enabled = False

    # Builder base
    builder = (
        SparkSession.builder
        .appName("EjercicioSpark_Completo")
    )

    if USE_S3:
        # ----------------------------
        # ECOSISTEMA → Almacenamiento en S3 (HDFS/Blob/GCS alternativos)
        # ----------------------------
        # Preferir s3a:// para compatibilidad.
        builder = (
            builder
            .config("spark.hadoop.fs.s3a.access.key", AWS_ACCESS_KEY)
            .config("spark.hadoop.fs.s3a.secret.key", AWS_SECRET_KEY)
            .config("spark.hadoop.fs.s3a.endpoint", AWS_ENDPOINT)
        )

    # Intentar habilitar Delta Lake si está disponible
    try:
        # delta-spark permite configurar fácilmente la sesión con los JARs de Delta
        from delta import configure_spark_with_delta_pip
        builder = (
            builder
            .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
            .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
        )
        spark = configure_spark_with_delta_pip(builder).getOrCreate()
        delta_enabled = True
    except Exception:
        # Si no hay delta-spark, seguimos con Spark "vanilla"
        spark = builder.getOrCreate()

    # Ajustes útiles para el ejercicio
    spark.conf.set("spark.sql.shuffle.partitions", "8")
    spark.conf.set("spark.sql.adaptive.enabled", "true")

    return spark, delta_enabled


def ensure_local_dirs():
    os.makedirs(LOCAL_PARQUET_DIR, exist_ok=True)
    os.makedirs(LOCAL_DELTA_DIR, exist_ok=True)


def create_sample_data(spark: SparkSession):
    """
    Crea un DataFrame de pólizas sintético para LOCAL mode.
    (ECOSISTEMA - Almacenamiento → Parquet; FORMATO columnar): generaremos y guardaremos en Parquet.
    """
    schema = StructType([
        StructField("poliza_id",   StringType(), False),
        StructField("cliente_id",  StringType(), False),
        StructField("region",      StringType(), False),
        StructField("monto",       DoubleType(), False),
        StructField("prima",       DoubleType(), False),
        StructField("estado",      StringType(), False),
        StructField("fecha_alta",  DateType(),   False),
    ])

    rows = [
        ("P-0001", "C-100", "Norte",   950.0,  95.0,  "activa",  date(2025, 1, 10)),
        ("P-0002", "C-101", "Norte",  1200.0, 120.0,  "activa",  date(2025, 2,  5)),
        ("P-0003", "C-102", "Sur",     800.0,  80.0,  "vencida", date(2025, 2, 18)),
        ("P-0004", "C-103", "Centro", 2200.0, 220.0,  "activa",  date(2025, 3,  7)),
        ("P-0005", "C-104", "Sur",    1800.0, 180.0,  "activa",  date(2025, 3, 15)),
        ("P-0006", "C-105", "Norte",  3100.0, 310.0,  "activa",  date(2025, 3, 20)),
        ("P-0007", "C-106", "Centro",  500.0,  50.0,  "vencida", date(2025, 3, 25)),
        ("P-0008", "C-107", "Sur",    1300.0, 130.0,  "activa",  date(2025, 4,  2)),
        ("P-0009", "C-108", "Norte",  2600.0, 260.0,  "activa",  date(2025, 4, 12)),
        ("P-0010", "C-109", "Centro",  950.0,  95.0,  "activa",  date(2025, 4, 22)),
    ]

    df = spark.createDataFrame(rows, schema=schema)

    # Guardar en Parquet (FORMATO columnar comprimido)
    df.write.mode("overwrite").parquet(LOCAL_PARQUET_DIR)

    return df


def read_input(spark: SparkSession):
    """
    Lee datos desde Parquet LOCAL o desde S3 (según USE_S3).
    """
    if USE_S3:
        print(f"🔹 Leyendo desde S3 (Parquet): {S3_INPUT}")
        return spark.read.parquet(S3_INPUT)
    else:
        print(f"🔹 Leyendo desde LOCAL (Parquet): {LOCAL_PARQUET_DIR}")
        return spark.read.parquet(LOCAL_PARQUET_DIR)


def run_core_exercise(spark: SparkSession, df):
    """
    Ejecuta el ejercicio central cubriendo 2.2 Arquitectura:
    - Lazy evaluation (transformaciones no ejecutan hasta acción)
    - DataFrame API (Catalyst+Tungsten)
    - Transformaciones vs Acciones
    - Narrow vs Wide (sin y con shuffle)
    - SQL, caché, particionado
    """
    print("\n=== 2.2 ARQUITECTURA DE SPARK ===")
    print("Driver (este programa) construye el DAG; Executors procesan los datos al ejecutar acciones.")

    print("\n-- Esquema de entrada --")
    df.printSchema()

    # Lazy evaluation: transformaciones (no ejecuta)
    print("\n-- Transformaciones (lazy) --")
    df_trans = (df
                .filter(F.col("monto") > 1000)                 # narrow
                .withColumnRenamed("cliente_id", "id_cliente") # narrow
                .withColumn("ratio_prima", F.col("prima")/F.col("monto")))

    # Catalyst/Tungsten: el optimizador y motor físico actuarán al ejecutar acciones.
    print("\n-- Plan lógico/físico (explain) --")
    df_trans.explain(mode="formatted")

    # Acción: count() dispara ejecución
    print("\n-- Acción: count() --")
    n = df_trans.count()
    print(f"Registros con monto > 1000: {n}")

    # Narrow vs Wide: groupBy → shuffle (wide)
    print("\n-- Wide dependency: groupBy (shuffle) --")
    df_grp = df_trans.groupBy("region").agg(F.count("*").alias("polizas"),
                                            F.avg("monto").alias("monto_prom"),
                                            F.avg("prima").alias("prima_prom"))

    print("\n-- Acción: collect() --")
    for row in df_grp.collect():
        print(row)

    # SQL vs DataFrame API
    print("\n-- Spark SQL --")
    df_trans.createOrReplaceTempView("polizas_filtradas")
    sql_res = spark.sql("""
        SELECT region,
               COUNT(*)  AS polizas,
               ROUND(AVG(monto), 2) AS monto_prom,
               ROUND(AVG(prima), 2) AS prima_prom
        FROM polizas_filtradas
        GROUP BY region
        ORDER BY polizas DESC
    """)
    sql_res.show(truncate=False)

    # Caché (útil cuando reusaremos el mismo resultado varias veces)
    print("\n-- Cache / Persist --")
    df_cached = df_trans.cache()
    print("Cached count:", df_cached.count())

    # Particionado: coalesce (narrow) vs repartition (wide - shuffle)
    print("\n-- Particionado --")
    print("Particiones actuales:", df_cached.rdd.getNumPartitions())
    df_repart = df_cached.repartition(4, F.col("region"))  # wide (shuffle por 'region')
    print("Particiones tras repartition(4, 'region'):", df_repart.rdd.getNumPartitions())

    # Escritura particionada en Parquet (ECOSISTEMA - Almacenamiento)
    out_part_dir = os.path.join(os.path.dirname(LOCAL_PARQUET_DIR), "out_particionado")
    df_repart.write.mode("overwrite").partitionBy("region").parquet(out_part_dir)
    print(f"Escritura particionada en Parquet → {out_part_dir}")

    return df_trans


def try_delta_table(spark: SparkSession, df_trans, delta_enabled: bool):
    """
    Capa de tablas: Delta Lake (si está disponible).
    - ACID, Time Travel (versionAsOf), evolución de esquema.
    - Demostración: escribimos Delta, hacemos una actualización (UPDATE), consultamos estado anterior (time travel).
    """
    if not delta_enabled:
        print("\n(Delta Lake NO disponible; instala con `pip install delta-spark` para habilitar esta sección).")
        return

    print("\n=== Capa de Tablas: Delta Lake (ACID + Time Travel) ===")
    target_path = LOCAL_DELTA_DIR if not USE_S3 else S3_DELTA

    # Guardar como Delta
    (df_trans
        .write
        .format("delta")
        .mode("overwrite")
        .save(target_path))
    print(f"Delta escrito en: {target_path}")

    from delta.tables import DeltaTable
    dtab = DeltaTable.forPath(spark, target_path)

    # UPDATE (ACID): subir la prima un 10% a pólizas de región 'Norte'
    print("-- UPDATE en Delta (ACID) --")
    dtab.update(
        condition=F.expr("region = 'Norte'"),
        set={"prima": F.expr("prima * 1.10")}
    )

    # LEER estado actual
    print("-- Leer Delta (estado actual) --")
    spark.read.format("delta").load(target_path).groupBy("region").agg(
        F.round(F.avg("prima"), 2).alias("prima_prom")
    ).show()

    # TIME TRAVEL: leer versión anterior (antes del UPDATE)
    print("-- Time Travel (versionAsOf=0) --")
    spark.read.format("delta").option("versionAsOf", 0).load(target_path).groupBy("region").agg(
        F.round(F.avg("prima"), 2).alias("prima_prom_v0")
    ).show()


def demo_streaming(spark: SparkSession):
    """
    Cómputo en tiempo real (Streaming): usamos la fuente 'rate' para simular eventos.
    - Esto ilustra Spark Streaming (en el ecosistema, retailer detecta picos en tiempo real).
    - Usamos trigger(once=True) para un solo micro-batch y no quedar en ejecución indefinida.
    """
    print("\n=== Streaming: Structured Streaming con fuente 'rate' ===")
    rate_df = (spark.readStream
                    .format("rate")
                    .option("rowsPerSecond", 10)
                    .load())

    # Transformaciones en streaming (lazy)
    stream_trans = (rate_df
                    .withColumn("region",
                                F.when((F.col("value") % 3) == 0, F.lit("Norte"))
                                 .when((F.col("value") % 3) == 1, F.lit("Sur"))
                                 .otherwise(F.lit("Centro")))
                    .groupBy("region")
                    .count())  # wide

    # Escribimos a memoria para poder consultar luego con SQL
    query = (stream_trans.writeStream
             .format("memory")
             .queryName("demanda_regiones")
             .outputMode("complete")
             .trigger(once=True)   # ejecuta un micro-batch y termina
             .start())

    query.awaitTermination()
    print("-- Resultados en memoria (SQL sobre streaming) --")
    spark.sql("SELECT * FROM demanda_regiones ORDER BY count DESC").show()


def airflow_example():
    """
    ORQUESTACIÓN (Airflow): DAG de ejemplo (comentado) que ejecuta este script con SparkSubmitOperator.

    # Guarda como: dags/pipeline_polizas.py
    ----------------------------------------------------
    from airflow import DAG
    from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
    from datetime import datetime

    with DAG(
        dag_id="pipeline_polizas",
        start_date=datetime(2025, 8, 1),
        schedule_interval="0 6 * * *",  # todos los días 06:00
        catchup=False,
        default_args={"owner": "data-eng"}
    ) as dag:

        tarea_spark = SparkSubmitOperator(
            task_id="procesar_polizas",
            application="/path/a/ejercicio_spark_completo.py",
            conn_id="spark_default",
            verbose=True
        )

        tarea_spark
    ----------------------------------------------------
    """
    pass


def s3_howto_notes():
    """
    PASO A PASO S3 (resumen práctico dentro del script):
    1) Crea cuenta en AWS y configura CLI:
        pip install awscli
        aws configure
    2) Crea bucket 'aseguradora' y sube Parquet a s3://aseguradora/polizas/2025/
    3) En este script, pon USE_S3=True y ajusta claves o usa ~/.aws/credentials
    4) Usa rutas s3a://... Ej.: spark.read.parquet("s3a://aseguradora/polizas/2025/")
    5) Errores comunes:
       - No FileSystem for scheme: s3 → falta hadoop-aws/versión.
       - Access Denied → permisos IAM (s3:GetObject).
       - File not found → ruta/bucket incorrectos.
    """
    pass


def main():
    if not USE_S3:
        ensure_local_dirs()

    spark, delta_enabled = build_spark()

    try:
        # En LOCAL creamos datos de ejemplo → guardamos en Parquet
        if not USE_S3:
            create_sample_data(spark)

        # Leemos la "capa de almacenamiento": Parquet local o S3
        df_in = read_input(spark)

        # Núcleo del ejercicio (Arquitectura: lazy, acciones, narrow/wide, SQL, caché, particionado)
        df_trans = run_core_exercise(spark, df_in)

        # Capa de tablas: Delta Lake (ACID + Time Travel), si está disponible
        try_delta_table(spark, df_trans, delta_enabled)

        # Cómputo en tiempo real: mini demo de Streaming (rate source)
        demo_streaming(spark)

        print("\n✅ EJERCICIO COMPLETO FINALIZADO CORRECTAMENTE.")
        print("   - Ecosistema (almacenamiento, tabla/Delta, cómputo, orquestación).")
        print("   - Arquitectura (Driver/Executors, lazy, Catalyst/Tungsten, transformaciones vs acciones, narrow/wide).")
        print("   - SQL, caché, particionado, Streaming, Delta (si disponible).")

    finally:
        spark.stop()


if __name__ == "__main__":
    main()
