# Apache Spark Streaming: Procesamiento en Tiempo Real

## Objetivos de Aprendizaje
- Dominar Spark Structured Streaming
- Implementar procesamiento de ventanas y agregaciones
- Integrar con Kafka y otras fuentes de streaming
- Manejar estado y checkpointing
- Optimizar rendimiento en streaming

## Requisitos
- PySpark 3.x
- Python 3.8+
- Kafka (opcional)
- Delta Lake (opcional)

In [None]:
# Instalación de dependencias
import sys
!{sys.executable} -m pip install pyspark pandas numpy -q

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, window, count, sum as spark_sum, avg, max as spark_max,
    current_timestamp, to_json, from_json, struct, expr
)
from pyspark.sql.types import (
    StructType, StructField, StringType, IntegerType, 
    DoubleType, TimestampType
)
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import time

print("Librerías importadas correctamente")

### 🌊 **Spark Structured Streaming: Arquitectura y Micro-batches**

**Evolución del Streaming en Spark:**

```
Gen 1: Spark Streaming (DStreams) - 2013
├── RDDs en micro-batches
├── API de bajo nivel
└── ❌ Complejo, no unificado con batch

Gen 2: Structured Streaming - 2016
├── DataFrames/Datasets unificados
├── Event-time processing
├── Exactly-once semantics
└── ✅ API declarativa, tolerancia a fallos
```

**Micro-batch vs Continuous Processing:**

| Aspecto | Micro-batch (default) | Continuous (experimental) |
|---------|----------------------|---------------------------|
| **Latencia** | ~100ms - 1s | ~1ms |
| **Throughput** | ✅ Alto (10K+ events/s) | ⚠️ Medio |
| **Garantías** | ✅ Exactly-once | ⚠️ At-least-once |
| **Estado** | ✅ Full support | ⚠️ Limitado |
| **Uso** | Analytics, aggregations | Ultra-low latency alerts |

**Arquitectura de Micro-batch:**

```python
# Streaming Query = Infinite Table
# Cada batch procesa un "snapshot" incremental

┌──────────────────────────────────────────┐
│  Input Source (Kafka, Kinesis, Files)   │
└──────────────┬───────────────────────────┘
               │ Batch 0: Records [1-100]
               ▼
┌──────────────────────────────────────────┐
│  Incremental Query Engine                │
│  ┌────────────────────────────────────┐  │
│  │ Batch 0: Transform + Aggregate     │  │
│  └────────────────────────────────────┘  │
└──────────────┬───────────────────────────┘
               │ Write Batch 0 Results
               ▼
┌──────────────────────────────────────────┐
│  Output Sink (Delta, Kafka, Console)    │
└──────────────────────────────────────────┘
               │
               │ Trigger interval (e.g., 10s)
               ▼
         (Repeat Batch 1, 2, 3...)
```

**Triggers (Frecuencia de Ejecución):**

```python
# 1. ProcessingTime: Ejecutar cada N segundos
query = df.writeStream \
    .trigger(processingTime='10 seconds') \
    .start()

# 2. Once: Ejecutar una sola vez (útil para testing/backfill)
query = df.writeStream \
    .trigger(once=True) \
    .start()

# 3. Continuous: Latencia ultra-baja (~1ms)
query = df.writeStream \
    .trigger(continuous='1 second') \
    .start()

# 4. AvailableNow: Procesar todos los datos disponibles (Spark 3.3+)
query = df.writeStream \
    .trigger(availableNow=True) \
    .start()
```

**Output Modes:**

```python
# 1. Append: Solo nuevos registros (default)
# ✅ Uso: Raw logs, eventos sin agregaciones
# ❌ Limitación: No updates/deletes
df.writeStream \
    .outputMode("append") \
    .format("parquet") \
    .start("/data/events")

# 2. Complete: Toda la tabla resultado (re-escribir completo)
# ✅ Uso: Agregaciones pequeñas, dashboards
# ❌ Limitación: No escala, solo con aggregations
df.groupBy("category").count() \
    .writeStream \
    .outputMode("complete") \
    .format("memory") \
    .queryName("category_counts") \
    .start()

# 3. Update: Solo registros modificados
# ✅ Uso: Agregaciones con watermark, upserts
# ⚡ Best practice: Balanceo append vs complete
df.groupBy(window("timestamp", "1 hour"), "user_id") \
    .agg(count("*").alias("events")) \
    .writeStream \
    .outputMode("update") \
    .format("delta") \
    .start("/data/user_hourly_stats")
```

**Event-time vs Processing-time:**

```python
from pyspark.sql.functions import current_timestamp

# ❌ Processing-time: Cuando Spark procesa el evento
df_processing_time = df.withColumn("processed_at", current_timestamp())
# Problema: Late data processed in wrong time window

# ✅ Event-time: Timestamp del evento original
df_event_time = df.select("event_timestamp", "user_id", "action")
# Benefit: Correcta agregación temporal incluso con retrasos

# Ejemplo: Click en app a las 10:00 AM
# - Dispositivo offline hasta 10:30 AM
# - Processing-time: 10:30 AM ❌ (wrong window)
# - Event-time: 10:00 AM ✅ (correct window)
```

**Backpressure y Rate Limiting:**

```python
# Limitar ingesta para evitar overwhelm
spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "events") \
    .option("maxOffsetsPerTrigger", 10000) \  # Max 10K records/batch
    .option("minPartitions", 4) \              # Paralelismo mínimo
    .load()

# Backpressure automático (Spark 3.2+)
spark.conf.set("spark.streaming.backpressure.enabled", "true")
spark.conf.set("spark.streaming.backpressure.initialRate", 1000)
```

**Ejemplo Real: E-commerce Click Stream**

```python
# Fuente: Kafka con eventos de clicks
clicks = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "user-clicks") \
    .option("startingOffsets", "latest") \
    .load()

# Parse JSON payload
from pyspark.sql.functions import from_json, col

clicks_parsed = clicks.select(
    from_json(col("value").cast("string"), click_schema).alias("data")
).select("data.*")

# Agregación por ventanas de 5 minutos
clicks_per_5min = clicks_parsed \
    .groupBy(
        window("event_timestamp", "5 minutes"),
        "product_id"
    ) \
    .agg(
        count("*").alias("clicks"),
        countDistinct("user_id").alias("unique_users")
    )

# Escribir a Delta Lake con ACID guarantees
query = clicks_per_5min.writeStream \
    .format("delta") \
    .outputMode("update") \
    .option("checkpointLocation", "/checkpoints/clicks-5min") \
    .trigger(processingTime='30 seconds') \
    .start("/delta/clicks_aggregated")

# Monitoreo en tiempo real
print(f"Query ID: {query.id}")
print(f"Status: {query.status}")
print(f"Last Progress: {query.lastProgress}")
```

**Métricas de Performance:**

```python
# Acceder a métricas del stream
progress = query.lastProgress

key_metrics = {
    "batchId": progress["batchId"],
    "inputRowsPerSecond": progress["inputRowsPerSecond"],
    "processedRowsPerSecond": progress["processedRowsPerSecond"],
    "batchDuration": progress["batchDuration"],  # ms
    "numInputRows": progress["numInputRows"],
    "stateOperators": progress["stateOperators"]  # Estado acumulado
}

# Alertas si procesamiento es más lento que ingesta
if progress["inputRowsPerSecond"] > progress["processedRowsPerSecond"]:
    print("⚠️ WARNING: Falling behind! Increase parallelism")
```

**Comparación con Flink:**

| Característica | Spark Streaming | Apache Flink |
|----------------|-----------------|--------------|
| **Modelo** | Micro-batch | True streaming (record-at-a-time) |
| **Latencia** | 100ms - 1s | 10ms - 100ms |
| **Throughput** | ✅ Muy alto | ✅ Alto |
| **Estado** | RocksDB, memory | RocksDB native |
| **SQL Support** | ✅ Excellent | ✅ Good |
| **Ecosystem** | ✅ Spark ML, Delta | ⚠️ Limitado |
| **Learning Curve** | ⚡ Fácil (si conoces Spark) | ⚠️ Steeper |
| **Best For** | Analytics, ML, unified batch+stream | Ultra-low latency, complex CEP |

---
**Autor:** Luis J. Raigoso V. (LJRV)

### ⏰ **Watermarking: Manejo de Late Data y Event-time Windows**

**Problema: Late Arriving Data**

```
Timeline:
10:00 AM: User clicks product (event_timestamp = 10:00)
10:15 AM: Network issues, event buffered
10:30 AM: Event arrives at Spark (processing_time = 10:30)

Sin watermark:
- Evento procesado en ventana 10:30-10:35 ❌ (wrong)
- Estado crece infinitamente (memory leak)

Con watermark:
- Evento procesado en ventana 10:00-10:05 ✅ (correct)
- Estado antiguo limpiado automáticamente
```

**Watermark = "Cuánto retraso tolerar"**

```python
from pyspark.sql.functions import window, col

# Watermark de 10 minutos: Eventos con >10 min retraso se descartan
df_with_watermark = df \
    .withWatermark("event_timestamp", "10 minutes") \
    .groupBy(
        window("event_timestamp", "5 minutes"),
        "user_id"
    ) \
    .count()

# Cómo funciona:
# 1. Spark trackea max(event_timestamp) visto hasta ahora
# 2. Watermark = max(event_timestamp) - threshold (10 min)
# 3. Eventos con timestamp < watermark se descartan
# 4. Estado de ventanas < watermark se elimina

# Ejemplo numérico:
# Batch 1: max_event_time = 10:15, watermark = 10:05
#   → Mantiene ventanas [10:00-10:05, 10:05-10:10, 10:10-10:15]
# Batch 2: max_event_time = 10:25, watermark = 10:15
#   → Elimina ventana [10:00-10:05], mantiene [10:05-10:25]
```

**Tipos de Ventanas (Windows):**

```python
from pyspark.sql.functions import window, session_window

# 1. TUMBLING WINDOW (no solapamiento)
# Uso: Métricas cada N minutos sin duplicar
tumbling = df.groupBy(
    window("timestamp", "10 minutes")  # [00:00-00:10), [00:10-00:20)
).count()

# 2. SLIDING WINDOW (solapamiento)
# Uso: Promedios móviles, tendencias
sliding = df.groupBy(
    window("timestamp", "10 minutes", "5 minutes")
    # [00:00-00:10), [00:05-00:15), [00:10-00:20)
).count()

# 3. SESSION WINDOW (gap-based)
# Uso: Sesiones de usuario, actividad continua
session = df.groupBy(
    "user_id",
    session_window("timestamp", "30 minutes")  # Gap de inactividad
).count()
# Si user activo 00:00, 00:05, 00:40 → 2 sesiones:
#   Session 1: [00:00-00:35] (last activity 00:05 + 30min gap)
#   Session 2: [00:40-...]
```

**Configuración de Watermark Óptima:**

```python
# ❌ Watermark muy corto (1 minuto)
df.withWatermark("timestamp", "1 minute")
# Problema: Late data descartado agresivamente
# Uso: Solo si latencia de red <1 min garantizada

# ✅ Watermark balanceado (10-30 minutos)
df.withWatermark("timestamp", "10 minutes")
# Beneficio: 99% eventos capturados, estado razonable

# ⚠️ Watermark muy largo (2 horas)
df.withWatermark("timestamp", "2 hours")
# Problema: Estado crece demasiado (OutOfMemory)
# Uso: Solo si retrasos son realmente >1 hora

# 🎯 Regla de oro:
# Watermark = P99 latency de tus datos + buffer
# Ejemplo: Si 99% eventos llegan en <5 min → watermark 10 min
```

**Manejo de Late Data con Output Modes:**

```python
# Output Mode + Watermark interacción:

# 1. APPEND + Watermark
# ✅ Escribe ventanas finalizadas (después de watermark)
# ⚡ Best practice: Inmutable, no updates
df.withWatermark("timestamp", "10 minutes") \
    .groupBy(window("timestamp", "5 minutes")) \
    .count() \
    .writeStream \
    .outputMode("append") \
    .start()
# Ventana [10:00-10:05] escrita cuando watermark > 10:05

# 2. UPDATE + Watermark
# ✅ Escribe ventanas activas + finalizadas
# ⚡ Uso: Dashboards que necesitan updates
df.withWatermark("timestamp", "10 minutes") \
    .groupBy(window("timestamp", "5 minutes")) \
    .count() \
    .writeStream \
    .outputMode("update") \
    .start()
# Ventana [10:00-10:05] puede actualizarse hasta watermark

# 3. COMPLETE (no necesita watermark)
# ❌ Re-escribe toda la tabla cada batch
# Uso: Agregaciones pequeñas (<1M registros)
```

**Caso Real: Métricas de IoT Devices**

```python
# Devices envían telemetría cada 1 minuto
# Red celular puede tener latencia variable (1s - 5 min)

iot_metrics = spark.readStream \
    .format("kafka") \
    .option("subscribe", "iot-telemetry") \
    .load() \
    .select(
        from_json(col("value").cast("string"), iot_schema).alias("data")
    ).select("data.*")

# Agregación por device + ventana de 5 min
device_stats = iot_metrics \
    .withWatermark("event_timestamp", "10 minutes") \  # Tolerar 10 min retraso
    .groupBy(
        window("event_timestamp", "5 minutes"),
        "device_id"
    ) \
    .agg(
        avg("temperature").alias("avg_temp"),
        max("temperature").alias("max_temp"),
        count("*").alias("num_readings")
    ) \
    .filter(col("max_temp") > 80)  # Alertas de sobrecalentamiento

# Escribir alertas a Kafka para acción inmediata
alerts_query = device_stats.writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("topic", "device-alerts") \
    .option("checkpointLocation", "/checkpoints/iot-alerts") \
    .outputMode("update") \
    .start()
```

**Debugging Late Data:**

```python
# Agregar columnas de diagnóstico
from pyspark.sql.functions import current_timestamp, unix_timestamp

df_debug = df \
    .withColumn("processing_time", current_timestamp()) \
    .withColumn("latency_seconds", 
        unix_timestamp("processing_time") - unix_timestamp("event_timestamp")
    )

# Analizar distribución de latencias
latency_stats = df_debug.groupBy(
    window("processing_time", "1 minute")
).agg(
    avg("latency_seconds").alias("avg_latency"),
    expr("percentile_approx(latency_seconds, 0.95)").alias("p95_latency"),
    expr("percentile_approx(latency_seconds, 0.99)").alias("p99_latency"),
    max("latency_seconds").alias("max_latency")
)

# Si p99_latency = 300s (5 min) → watermark debe ser ≥10 min
```

**Watermark con Joins:**

```python
# Join entre streams requiere watermark en AMBOS
clicks = clicks_stream.withWatermark("click_time", "5 minutes")
impressions = impressions_stream.withWatermark("impression_time", "10 minutes")

# Inner join con time constraint
joined = clicks.join(
    impressions,
    expr("""
        click_user_id = impression_user_id AND
        click_time >= impression_time AND
        click_time <= impression_time + interval 1 hour
    """)
)

# Watermark resultante = min(5 min, 10 min) = 5 min
# Estado mantenido: 1 hora (time constraint) + 5 min (watermark)
```

**Monitoreo de Watermark:**

```python
# Ver watermark actual en query progress
progress = query.lastProgress
print(f"Current watermark: {progress['watermark']}")

# Ejemplo output:
# {
#   "eventTime": {
#     "avg": "2025-10-30T14:25:00.000Z",
#     "max": "2025-10-30T14:30:00.000Z",
#     "min": "2025-10-30T14:20:00.000Z",
#     "watermark": "2025-10-30T14:20:00.000Z"  # max - 10 min
#   }
# }

# Alertas si watermark se retrasa mucho
if progress['watermark'] < (current_time - timedelta(hours=1)):
    send_alert("Watermark lagging! Check data source")
```

**Best Practices:**

1. ✅ **Siempre usar watermark** con agregaciones event-time
2. ✅ **Medir P99 latency** de tus datos antes de configurar
3. ✅ **Append mode** con watermark para inmutabilidad
4. ✅ **Session windows** para análisis de comportamiento
5. ⚠️ **Sliding windows** consumen más estado (overlap)
6. ❌ **No watermark infinito** (sin cleanup de estado)
7. ⚡ **Buffer 2-3x P99** para evitar drops

---
**Autor:** Luis J. Raigoso V. (LJRV)

### 💾 **State Management y Checkpointing: Fault Tolerance**

**¿Qué es el Estado (State)?**

```
Stateless Processing (sin estado):
Input: {"user": "A", "action": "click"}
Output: {"user": "A", "action": "click", "processed": true}
✅ Cada evento independiente

Stateful Processing (con estado):
Input Batch 1: {"user": "A", "action": "click"}
State: {A: 1 click}
Input Batch 2: {"user": "A", "action": "purchase"}
State: {A: 1 click, 1 purchase}
Output: {A: total_events=2, conversion_rate=1.0}
✅ Memoria entre batches
```

**Operaciones Stateful en Spark:**

```python
# 1. Aggregations (groupBy)
# Estado: Valores acumulados por key
user_stats = df.groupBy("user_id") \
    .agg(
        count("*").alias("total_events"),
        sum("revenue").alias("total_revenue")
    )
# Estado almacenado: {user_123: {count: 45, revenue: 1200.50}}

# 2. Windowed Aggregations
# Estado: Valores por ventana + key
window_stats = df \
    .withWatermark("timestamp", "10 minutes") \
    .groupBy(
        window("timestamp", "5 minutes"),
        "product_id"
    ) \
    .count()
# Estado: {(window_10:00-10:05, product_42): 150}

# 3. Stream-Stream Joins
# Estado: Buffered events de ambos streams
clicks = clicks_stream.withWatermark("click_time", "5 minutes")
views = views_stream.withWatermark("view_time", "5 minutes")
joined = clicks.join(views, "session_id")
# Estado: Eventos no matcheados dentro de watermark

# 4. Deduplication
# Estado: Claves únicas vistas
deduplicated = df \
    .withWatermark("timestamp", "1 hour") \
    .dropDuplicates(["event_id"])
# Estado: {event_456, event_789, ...}

# 5. mapGroupsWithState / flatMapGroupsWithState
# Estado: Customizado por usuario
# Ejemplo: Sesiones complejas, máquinas de estado
```

**State Store Backends:**

```python
# 1. Memory (default para testing)
spark.conf.set(
    "spark.sql.streaming.stateStore.providerClass",
    "org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider"
)
# ⚠️ Limitado por executor memory
# Uso: Testing local, datasets pequeños

# 2. RocksDB (producción)
spark.conf.set(
    "spark.sql.streaming.stateStore.providerClass",
    "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider"
)
spark.conf.set("spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true")
# ✅ Disk-backed, escala a TB de estado
# Uso: Producción, estado grande

# 3. Custom (empresarial)
# Ejemplo: Redis, Cassandra para estado compartido
```

**Checkpointing: Write-Ahead Log (WAL)**

```
Checkpoint Directory Structure:
/checkpoints/my-query/
├── commits/
│   ├── 0                    # Batch 0 metadata
│   ├── 1
│   └── 2
├── offsets/
│   ├── 0                    # Kafka offsets batch 0
│   ├── 1
│   └── 2
├── sources/
│   └── 0/
│       └── 0               # Source info
└── state/
    ├── 0/
    │   ├── 0/
    │   │   ├── 1.delta     # State snapshots
    │   │   └── 1.snapshot
    │   └── 1/
    └── 1/
```

**Configuración de Checkpointing:**

```python
# Checkpoint obligatorio para stateful queries
query = df.writeStream \
    .format("delta") \
    .outputMode("update") \
    .option("checkpointLocation", "/checkpoints/user-stats") \  # REQUERIDO
    .start("/delta/user_stats")

# Sin checkpoint → Error:
# "checkpointLocation must be specified either through option("checkpointLocation", ...)
# or SparkSession.conf.set("spark.sql.streaming.checkpointLocation", ...)"

# Configuración global (no recomendado)
spark.conf.set("spark.sql.streaming.checkpointLocation", "/checkpoints/default")
```

**Recovery Process:**

```python
# Escenario: Executor falla durante Batch 5

# 1. Query detecta falla
# 2. Lee último checkpoint exitoso (Batch 4)
#    - Offsets: Kafka partition 0 offset 1000, partition 1 offset 850
#    - State: Usuario A → 45 eventos, Usuario B → 23 eventos
# 3. Replay desde Batch 5
#    - Lee desde offset 1000, 850 en Kafka
#    - Restaura estado de Batch 4
#    - Procesa Batch 5 con estado correcto
# 4. Continúa normalmente

# Garantía: Exactly-once processing
# Kafka offsets + Estado + Output escritos atómicamente
```

**Ejemplo: Session Analytics con Estado Custom**

```python
from pyspark.sql.streaming import GroupState, GroupStateTimeout
from typing import Iterator, Tuple

# Modelo de sesión
@dataclass
class SessionData:
    user_id: str
    start_time: datetime
    last_activity: datetime
    events: List[str]
    total_revenue: float

def update_session_state(
    key: Tuple[str],
    events: Iterator[pd.DataFrame],
    state: GroupState
) -> Iterator[pd.DataFrame]:
    """
    Custom state management para sesiones
    """
    user_id = key[0]
    
    # Leer estado anterior
    if state.exists:
        session = state.get()
    else:
        session = SessionData(
            user_id=user_id,
            start_time=None,
            last_activity=None,
            events=[],
            total_revenue=0.0
        )
    
    # Procesar eventos del batch
    for event_batch in events:
        for _, event in event_batch.iterrows():
            if session.start_time is None:
                session.start_time = event['timestamp']
            
            session.last_activity = event['timestamp']
            session.events.append(event['event_type'])
            session.total_revenue += event.get('revenue', 0.0)
    
    # Timeout si inactividad >30 min
    if (datetime.now() - session.last_activity).seconds > 1800:
        state.remove()  # Limpiar estado
        # Emitir sesión finalizada
        return iter([pd.DataFrame([{
            'user_id': user_id,
            'session_duration': (session.last_activity - session.start_time).seconds,
            'num_events': len(session.events),
            'total_revenue': session.total_revenue
        }])])
    else:
        state.update(session)  # Guardar estado actualizado
        state.setTimeoutDuration("30 minutes")
        return iter([])  # No output aún

# Aplicar state management
sessions = df.groupBy("user_id") \
    .applyInPandasWithState(
        update_session_state,
        outputStructType=session_output_schema,
        stateStructType=session_state_schema,
        outputMode="update",
        timeoutConf=GroupStateTimeout.ProcessingTimeTimeout
    )
```

**State Size Monitoring:**

```python
# Métricas de estado en lastProgress
progress = query.lastProgress

state_info = progress["stateOperators"][0]  # First stateful operator
print(f"Num state rows: {state_info['numRowsTotal']}")
print(f"State memory (MB): {state_info['memoryUsedBytes'] / 1024 / 1024}")
print(f"Custom metrics: {state_info['customMetrics']}")

# Ejemplo output:
{
  "numRowsTotal": 1234567,
  "numRowsUpdated": 5678,
  "memoryUsedBytes": 524288000,  # ~500 MB
  "customMetrics": {
    "loadedMapCacheHitCount": 1000,
    "loadedMapCacheMissCount": 50,
    "stateOnCurrentVersionSizeBytes": 450000000
  }
}

# Alertas si estado crece sin control
if state_info['numRowsTotal'] > 10_000_000:
    print("⚠️ WARNING: State size >10M rows, consider:")
    print("  1. Reduce watermark threshold")
    print("  2. Increase parallelism (more partitions)")
    print("  3. Use state TTL (time-to-live)")
```

**State Cleanup Strategies:**

```python
# 1. Watermark-based (automático)
df.withWatermark("timestamp", "1 hour") \
    .groupBy(window("timestamp", "10 minutes")) \
    .count()
# Estado limpiado cuando ventana < watermark

# 2. TTL (Time-to-Live) con RocksDB
spark.conf.set("spark.sql.streaming.statefulOperator.stateInfo.ttl", "2 hours")
# Estado no accedido por >2h eliminado

# 3. Manual cleanup en mapGroupsWithState
def cleanup_old_state(key, events, state):
    if state.hasTimedOut:
        state.remove()  # Cleanup explícito
        return iter([])
    # ... process events

# 4. State re-partitioning para balancear
df.repartition(200, "user_id")  # Distribuir estado uniformemente
```

**Checkpoint Management:**

```python
# ❌ NEVER cambiar checkpoint location en producción
# Cambiar checkpoint = perder estado + reprocessar desde inicio

# ✅ Migration process:
# 1. Stop query gracefully
query.stop()

# 2. Backup checkpoint
# hdfs dfs -cp /checkpoints/old /checkpoints/backup

# 3. Start new query con nuevo checkpoint
query_new = df.writeStream \
    .option("checkpointLocation", "/checkpoints/new") \
    .start()

# 4. Validar outputs son correctos

# 5. Cleanup old checkpoint (después de días/semanas)
# hdfs dfs -rm -r /checkpoints/old
```

**Caso Real: Fraud Detection con Estado**

```python
# Detectar múltiples transacciones sospechosas del mismo usuario

fraud_detection = transactions \
    .withWatermark("transaction_time", "15 minutes") \
    .groupBy("user_id") \
    .applyInPandasWithState(
        detect_fraud_pattern,
        outputStructType=fraud_alert_schema,
        stateStructType=user_fraud_state_schema,
        outputMode="update",
        timeoutConf=GroupStateTimeout.ProcessingTimeTimeout
    )

def detect_fraud_pattern(key, events, state):
    user_id = key[0]
    
    # Restaurar estado (historial de transacciones)
    if state.exists:
        history = state.get()
    else:
        history = {'transactions': [], 'risk_score': 0}
    
    # Analizar nuevas transacciones
    for event_batch in events:
        for _, txn in event_batch.iterrows():
            history['transactions'].append(txn)
            
            # Patrones sospechosos:
            # 1. >5 transacciones en 10 minutos
            recent_txns = [t for t in history['transactions'] 
                          if (txn['timestamp'] - t['timestamp']).seconds < 600]
            
            # 2. Monto total >$10,000 en 10 minutos
            recent_total = sum(t['amount'] for t in recent_txns)
            
            # 3. Múltiples países en corto tiempo
            recent_countries = set(t['country'] for t in recent_txns)
            
            if len(recent_txns) > 5 or recent_total > 10000 or len(recent_countries) > 2:
                # Emitir alerta
                return iter([pd.DataFrame([{
                    'user_id': user_id,
                    'alert_type': 'FRAUD_SUSPECTED',
                    'risk_score': calculate_risk(recent_txns),
                    'timestamp': txn['timestamp']
                }])])
    
    # Actualizar estado con TTL 24h
    state.update(history)
    state.setTimeoutDuration("24 hours")
    return iter([])

# Escribir alertas a Kafka para acción inmediata
fraud_alerts = fraud_detection.writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("topic", "fraud-alerts") \
    .option("checkpointLocation", "/checkpoints/fraud-detection") \
    .start()
```

**Best Practices:**

1. ✅ **Siempre configurar checkpoint** para queries stateful
2. ✅ **RocksDB en producción** (memory solo para dev)
3. ✅ **Monitorear state size** con métricas
4. ✅ **Watermark** para cleanup automático
5. ✅ **Backup checkpoints** antes de upgrades
6. ⚠️ **State TTL** para evitar crecimiento infinito
7. ❌ **Nunca cambiar checkpoint location** sin migration plan
8. ⚡ **Repartition** por key para balancear estado

---
**Autor:** Luis J. Raigoso V. (LJRV)

### 🏗️ **Integración con Delta Lake y Optimización de Performance**

**Spark Streaming + Delta Lake = Streaming Lakehouse**

```
Ventajas de Delta como Sink:
✅ ACID Transactions: Garantía exactly-once sin duplicados
✅ Schema Evolution: Agregar columnas sin interrumpir stream
✅ Time Travel: Rollback si procesamiento incorrecto
✅ Upserts/Merges: CDC (Change Data Capture) en streaming
✅ Performance: Z-ordering, compaction automática
```

**Escribir Stream a Delta:**

```python
# Configuración básica
query = df.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/checkpoints/events-delta") \
    .option("path", "/delta/events") \
    .start()

# Configuración avanzada
query = df.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/checkpoints/events-delta") \
    .option("mergeSchema", "true") \              # Auto schema evolution
    .option("optimizeWrite", "true") \            # Bin-packing
    .option("autoCompact", "true") \              # Auto compactación
    .partitionBy("date", "hour") \                # Particionamiento
    .trigger(processingTime='30 seconds') \
    .start("/delta/events")

# Trigger AvailableNow (Spark 3.3+): Procesar backlog completo
query = df.writeStream \
    .format("delta") \
    .trigger(availableNow=True) \                 # Procesar todo y terminar
    .option("checkpointLocation", "/checkpoints/backfill") \
    .start("/delta/events")
```

**Streaming Upserts (Merge):**

```python
from delta.tables import DeltaTable

# Función para merge en cada micro-batch
def upsert_to_delta(batch_df, batch_id):
    """
    Merge batch into Delta table (UPSERT)
    """
    delta_table = DeltaTable.forPath(spark, "/delta/user_profiles")
    
    delta_table.alias("target").merge(
        batch_df.alias("source"),
        "target.user_id = source.user_id"
    ).whenMatchedUpdateAll() \    # Update si existe
     .whenNotMatchedInsertAll() \ # Insert si no existe
     .execute()
    
    print(f"Batch {batch_id} merged successfully")

# Aplicar a cada batch
query = user_updates.writeStream \
    .foreachBatch(upsert_to_delta) \
    .option("checkpointLocation", "/checkpoints/user-upserts") \
    .start()

# Ejemplo: Actualización de perfiles de usuario
# Batch 1: user_123 {name: "John", purchases: 5}
# Batch 2: user_123 {purchases: 6}  → MERGE actualiza purchases
# Batch 3: user_456 {name: "Jane"} → INSERT nuevo usuario
```

**Change Data Capture (CDC) Streaming:**

```python
# Leer CDC desde Kafka (Debezium format)
cdc_stream = spark.readStream \
    .format("kafka") \
    .option("subscribe", "mysql.prod.users") \
    .load() \
    .select(
        from_json(col("value").cast("string"), cdc_schema).alias("cdc")
    ).select("cdc.*")

# Aplicar cambios a Delta
def apply_cdc_changes(batch_df, batch_id):
    """
    Procesar CDC events: INSERT, UPDATE, DELETE
    """
    delta_table = DeltaTable.forPath(spark, "/delta/users")
    
    # Separar por operación
    inserts = batch_df.filter(col("op") == "c")  # Create
    updates = batch_df.filter(col("op") == "u")  # Update
    deletes = batch_df.filter(col("op") == "d")  # Delete
    
    # Aplicar deletes
    if deletes.count() > 0:
        delta_table.alias("t").merge(
            deletes.alias("s"),
            "t.user_id = s.user_id"
        ).whenMatchedDelete().execute()
    
    # Aplicar upserts (inserts + updates)
    upserts = inserts.union(updates)
    if upserts.count() > 0:
        delta_table.alias("t").merge(
            upserts.alias("s"),
            "t.user_id = s.user_id"
        ).whenMatchedUpdateAll() \
         .whenNotMatchedInsertAll() \
         .execute()

query = cdc_stream.writeStream \
    .foreachBatch(apply_cdc_changes) \
    .option("checkpointLocation", "/checkpoints/cdc") \
    .start()
```

**Leer Stream desde Delta (Change Data Feed):**

```python
# Habilitar CDF en tabla Delta
spark.sql("""
    ALTER TABLE user_profiles
    SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
""")

# Leer cambios como stream
changes_stream = spark.readStream \
    .format("delta") \
    .option("readChangeFeed", "true") \
    .option("startingVersion", 10) \  # Desde versión específica
    .table("user_profiles")

# Columnas adicionales en CDF:
# _change_type: insert, update_preimage, update_postimage, delete
# _commit_version: Delta version
# _commit_timestamp: When change happened

# Procesar solo updates
updates_only = changes_stream.filter(col("_change_type") == "update_postimage")

# Materializar a otra tabla
query = updates_only.writeStream \
    .format("delta") \
    .option("checkpointLocation", "/checkpoints/cdf-consumer") \
    .start("/delta/user_updates_log")
```

**Performance Optimization: Kafka Source**

```python
# Configuración óptima para Kafka
kafka_stream = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka1:9092,kafka2:9092,kafka3:9092") \
    .option("subscribe", "events") \
    .option("startingOffsets", "latest") \        # latest, earliest, {"topic":{"0":23,"1":-1}}
    .option("maxOffsetsPerTrigger", 50000) \      # Limitar ingesta por batch
    .option("minPartitions", 10) \                # Paralelismo mínimo
    .option("kafka.max.poll.records", 500) \      # Records por poll
    .option("kafka.session.timeout.ms", 30000) \
    .option("kafka.request.timeout.ms", 40000) \
    .option("failOnDataLoss", "false") \          # Tolerar data loss en dev
    .load()

# Consumer group por query
# - Checkpoint location determina consumer group
# - Cambiar checkpoint = nuevo consumer group = reprocessar todo
```

**Partitioning Strategies:**

```python
# ❌ Anti-pattern: Sobre-particionamiento
df.writeStream \
    .partitionBy("year", "month", "day", "hour", "user_id") \  # Millones de particiones!
    .start()

# ✅ Particionamiento balanceado
df.writeStream \
    .partitionBy("date") \  # ~30 particiones/mes
    .start()

# ⚡ Dynamic partition overwrite (batch mode)
df.write \
    .format("delta") \
    .mode("overwrite") \
    .option("partitionOverwriteMode", "dynamic") \
    .partitionBy("date") \
    .save("/delta/events")

# Z-ordering para columnas de filtrado frecuente
spark.sql("""
    OPTIMIZE events
    ZORDER BY (user_id, product_id)
""")
```

**Auto Compaction:**

```python
# Problema: Small files generados por streaming
# Cada micro-batch escribe archivos pequeños (10-100 MB)
# Resultado: Millones de archivos después de días/semanas

# Solución 1: Auto-compaction (Delta 1.2+)
spark.conf.set("spark.databricks.delta.properties.defaults.autoOptimize.optimizeWrite", "true")
spark.conf.set("spark.databricks.delta.properties.defaults.autoOptimize.autoCompact", "true")

# Solución 2: Scheduled compaction job
from delta.tables import DeltaTable

def compact_table(table_path):
    """
    Compactar archivos pequeños
    """
    delta_table = DeltaTable.forPath(spark, table_path)
    
    # OPTIMIZE: Compactar archivos en particiones
    delta_table.optimize() \
        .where("date >= current_date() - interval 7 days") \  # Solo últimos 7 días
        .executeCompaction()
    
    # VACUUM: Limpiar archivos antiguos (después de retention period)
    delta_table.vacuum(retentionHours=168)  # 7 días

# Ejecutar como Airflow DAG diario
optimize_dag = DAG('delta_optimize', schedule_interval='@daily')

# Solución 3: Bin-packing en escritura
df.writeStream \
    .format("delta") \
    .option("optimizeWrite", "true") \  # Bin-pack antes de escribir
    .start()
```

**Monitoring y Alertas:**

```python
# Métricas clave para monitoreo
def extract_stream_metrics(query):
    """
    Extraer métricas para Prometheus/Datadog
    """
    progress = query.lastProgress
    
    if progress is None:
        return {}
    
    metrics = {
        # Throughput
        "input_rows_per_second": progress.get("inputRowsPerSecond", 0),
        "processed_rows_per_second": progress.get("processedRowsPerSecond", 0),
        
        # Latency
        "batch_duration_ms": progress.get("durationMs", {}).get("triggerExecution", 0),
        "batch_id": progress.get("batchId", 0),
        
        # State
        "num_state_rows": 0,
        "state_memory_mb": 0,
        
        # Lag
        "input_rows": progress.get("numInputRows", 0),
    }
    
    # State metrics si hay operadores stateful
    if "stateOperators" in progress and len(progress["stateOperators"]) > 0:
        state_op = progress["stateOperators"][0]
        metrics["num_state_rows"] = state_op.get("numRowsTotal", 0)
        metrics["state_memory_mb"] = state_op.get("memoryUsedBytes", 0) / 1024 / 1024
    
    return metrics

# Alertas
def check_alerts(metrics):
    """
    Generar alertas si métricas anormales
    """
    alerts = []
    
    # Alerta 1: Falling behind
    if metrics["input_rows_per_second"] > metrics["processed_rows_per_second"] * 1.2:
        alerts.append({
            "severity": "WARNING",
            "message": f"Processing falling behind: {metrics['input_rows_per_second']:.0f} in/s vs {metrics['processed_rows_per_second']:.0f} out/s"
        })
    
    # Alerta 2: High latency
    if metrics["batch_duration_ms"] > 60000:  # >1 minuto
        alerts.append({
            "severity": "ERROR",
            "message": f"High batch latency: {metrics['batch_duration_ms']/1000:.1f}s"
        })
    
    # Alerta 3: State growing unbounded
    if metrics["num_state_rows"] > 50_000_000:  # >50M registros
        alerts.append({
            "severity": "WARNING",
            "message": f"Large state size: {metrics['num_state_rows']:,} rows, {metrics['state_memory_mb']:.1f} MB"
        })
    
    return alerts

# Integración con monitoring
import time
while query.isActive:
    time.sleep(60)  # Check cada minuto
    metrics = extract_stream_metrics(query)
    alerts = check_alerts(metrics)
    
    # Enviar a Prometheus
    push_to_prometheus(metrics)
    
    # Enviar alertas a Slack/PagerDuty
    if alerts:
        for alert in alerts:
            send_alert(alert)
```

**Caso Real: Real-time Analytics Dashboard**

```python
# Pipeline completo: Kafka → Spark Streaming → Delta → BI Tool

# 1. Leer eventos de Kafka
events = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "user-events") \
    .option("maxOffsetsPerTrigger", 10000) \
    .load() \
    .select(from_json(col("value").cast("string"), event_schema).alias("e")) \
    .select("e.*")

# 2. Transformaciones
events_enriched = events \
    .withWatermark("event_timestamp", "10 minutes") \
    .join(
        users_dim.alias("u"),
        events.user_id == col("u.user_id"),
        "left"
    ) \
    .select(
        col("event_id"),
        col("event_timestamp"),
        col("e.user_id"),
        col("u.user_segment"),  # Enriquecimiento
        col("event_type"),
        col("revenue")
    )

# 3. Agregaciones por ventanas
dashboard_metrics = events_enriched \
    .groupBy(
        window("event_timestamp", "5 minutes"),
        "user_segment"
    ) \
    .agg(
        count("*").alias("total_events"),
        countDistinct("user_id").alias("unique_users"),
        sum("revenue").alias("total_revenue"),
        avg("revenue").alias("avg_revenue")
    )

# 4. Escribir a Delta con optimizaciones
query = dashboard_metrics.writeStream \
    .format("delta") \
    .outputMode("update") \
    .option("checkpointLocation", "/checkpoints/dashboard") \
    .option("optimizeWrite", "true") \
    .option("autoCompact", "true") \
    .trigger(processingTime='30 seconds') \
    .start("/delta/dashboard_metrics")

# 5. BI Tool lee desde Delta (Tableau, Power BI, Looker)
# SELECT * FROM delta.`/delta/dashboard_metrics`
# WHERE window.start >= current_timestamp() - interval 1 hour

# 6. Z-order para performance
spark.sql("""
    OPTIMIZE delta.`/delta/dashboard_metrics`
    ZORDER BY (window, user_segment)
""")

# 7. Vacuum archivos antiguos (semanal)
spark.sql("""
    VACUUM delta.`/delta/dashboard_metrics`
    RETAIN 168 HOURS
""")
```

**Best Practices:**

1. ✅ **Delta como sink primario** en producción (ACID + performance)
2. ✅ **foreachBatch para lógica custom** (upserts, alerts, etc.)
3. ✅ **Auto-compaction** o scheduled OPTIMIZE
4. ✅ **Z-ordering** en columnas de filtrado frecuente
5. ✅ **Particionamiento por fecha** (balance entre granularidad y cantidad)
6. ✅ **maxOffsetsPerTrigger** para controlar ingesta
7. ✅ **Monitor métricas** continuamente (Prometheus, Datadog)
8. ⚠️ **Change Data Feed** para downstream consumers
9. ⚠️ **failOnDataLoss=false** solo en dev (strict en prod)
10. ❌ **No sobre-particionar** (evitar millones de particiones)

**Performance Benchmarks:**

```
Scenario: 1M events/s, 10 KB/event, 100 partitions

Kafka → Spark Streaming → Parquet:
- Latency: ~30s (trigger interval + processing)
- Small files: 1000+ archivos/hora
- OPTIMIZE needed: Daily
- Cost: $$

Kafka → Spark Streaming → Delta (optimized):
- Latency: ~30s
- Small files: Auto-compacted
- OPTIMIZE needed: Weekly
- Cost: $$ (slightly higher for optimization)
- Benefit: ACID, time travel, upserts

Kafka → Flink → Delta:
- Latency: ~5s
- More complex setup
- Cost: $$$
- Benefit: Lower latency, event-time watermarks native
```

---
**Autor:** Luis J. Raigoso V. (LJRV)

## 1. Inicializar Spark Session

In [None]:
# Crear Spark Session con configuración para Streaming
spark = SparkSession.builder \
    .appName("SparkStreamingAdvanced") \
    .config("spark.sql.shuffle.partitions", "4") \
    .config("spark.sql.streaming.schemaInference", "true") \
    .config("spark.streaming.stopGracefullyOnShutdown", "true") \
    .getOrCreate()

# Configurar nivel de logging
spark.sparkContext.setLogLevel("WARN")

print(f"Spark Version: {spark.version}")
print(f"Spark UI: {spark.sparkContext.uiWebUrl}")

## 2. Definir Esquemas para Streaming

In [None]:
# Esquema para eventos de e-commerce
ecommerce_schema = StructType([
    StructField("event_id", StringType(), False),
    StructField("timestamp", TimestampType(), False),
    StructField("user_id", StringType(), False),
    StructField("event_type", StringType(), False),
    StructField("product_id", StringType(), False),
    StructField("product_name", StringType(), True),
    StructField("category", StringType(), True),
    StructField("price", DoubleType(), True),
    StructField("quantity", IntegerType(), True)
])

# Esquema para logs de aplicación
log_schema = StructType([
    StructField("timestamp", TimestampType(), False),
    StructField("level", StringType(), False),
    StructField("service", StringType(), False),
    StructField("message", StringType(), True),
    StructField("error_code", IntegerType(), True),
    StructField("user_id", StringType(), True)
])

print("Esquemas definidos")
print("\nEsquema E-commerce:")
print(ecommerce_schema)

## 3. Simulación de Fuente de Streaming

In [None]:
# Crear datos de ejemplo para simular streaming
def generate_sample_data(n_records=1000):
    """
    Generar datos de muestra para streaming
    """
    np.random.seed(42)
    
    base_time = datetime.now()
    
    data = []
    for i in range(n_records):
        event = {
            'event_id': f'evt_{i:06d}',
            'timestamp': base_time + timedelta(seconds=i),
            'user_id': f'user_{np.random.randint(1, 101)}',
            'event_type': np.random.choice(['view', 'add_to_cart', 'purchase', 'remove'], p=[0.5, 0.25, 0.15, 0.1]),
            'product_id': f'prod_{np.random.randint(1, 51)}',
            'product_name': np.random.choice(['Laptop', 'Mouse', 'Keyboard', 'Monitor', 'Headphones']),
            'category': np.random.choice(['Electronics', 'Accessories', 'Computers']),
            'price': round(np.random.uniform(10, 2000), 2),
            'quantity': np.random.randint(1, 5)
        }
        data.append(event)
    
    return data


# Generar datos
sample_data = generate_sample_data(1000)
df_sample = spark.createDataFrame(sample_data, schema=ecommerce_schema)

print(f"Generados {df_sample.count()} registros de muestra")
df_sample.show(5, truncate=False)

## 4. Streaming con Rate Source (Simulación)

In [None]:
# Crear stream simulado con rate source
rate_stream = spark.readStream \
    .format("rate") \
    .option("rowsPerSecond", 10) \
    .option("numPartitions", 2) \
    .load()

# Enriquecer con datos simulados
from pyspark.sql.functions import rand, when, lit

enriched_stream = rate_stream \
    .withColumn("user_id", (rand() * 100).cast("int")) \
    .withColumn("event_type", 
        when(rand() < 0.5, "view")
        .when(rand() < 0.75, "add_to_cart")
        .when(rand() < 0.9, "purchase")
        .otherwise("remove")
    ) \
    .withColumn("product_id", (rand() * 50).cast("int")) \
    .withColumn("price", (rand() * 1990 + 10).cast("double")) \
    .withColumn("quantity", (rand() * 4 + 1).cast("int"))

print("Stream enriquecido creado")
print("Schema:")
enriched_stream.printSchema()

## 5. Agregaciones por Ventanas de Tiempo

In [None]:
# Agregaciones por ventana de tiempo
windowed_counts = enriched_stream \
    .withWatermark("timestamp", "10 seconds") \
    .groupBy(
        window(col("timestamp"), "30 seconds", "10 seconds"),
        col("event_type")
    ) \
    .agg(
        count("*").alias("event_count"),
        spark_sum(expr("price * quantity")).alias("total_revenue"),
        avg("price").alias("avg_price")
    ) \
    .orderBy("window")

print("Query de agregación por ventanas configurado")

## 6. Procesamiento Stateful

In [None]:
# Mantener estado acumulado por usuario
user_aggregations = enriched_stream \
    .groupBy("user_id") \
    .agg(
        count("*").alias("total_events"),
        spark_sum(when(col("event_type") == "purchase", 1).otherwise(0)).alias("purchases"),
        spark_sum(when(col("event_type") == "purchase", 
                      expr("price * quantity")).otherwise(0)).alias("total_spent"),
        spark_max("timestamp").alias("last_activity")
    )

print("Query de agregación por usuario configurado")

## 7. Detección de Patrones en Tiempo Real

In [None]:
# Detectar usuarios con comportamiento anómalo
anomaly_detection = enriched_stream \
    .withWatermark("timestamp", "5 minutes") \
    .groupBy(
        window(col("timestamp"), "1 minute"),
        col("user_id")
    ) \
    .agg(
        count("*").alias("events_per_minute"),
        spark_sum(when(col("event_type") == "purchase", 
                      expr("price * quantity")).otherwise(0)).alias("spend_per_minute")
    ) \
    .filter(
        (col("events_per_minute") > 50) |  # Más de 50 eventos por minuto
        (col("spend_per_minute") > 10000)   # Más de $10,000 por minuto
    )

print("Query de detección de anomalías configurado")

## 8. Ejemplo de Query con Output Completo

In [None]:
# Mostrar resultados en consola (modo batch para demo)
# NOTA: En producción usarías .writeStream() en lugar de show()

print("\n=== ANÁLISIS BATCH DE DATOS DE MUESTRA ===")
print("\n1. Eventos por tipo:")
df_sample.groupBy("event_type").count().orderBy("count", ascending=False).show()

print("\n2. Revenue por categoría:")
df_sample.filter(col("event_type") == "purchase") \
    .groupBy("category") \
    .agg(
        count("*").alias("num_purchases"),
        spark_sum(expr("price * quantity")).alias("total_revenue"),
        avg("price").alias("avg_price")
    ) \
    .orderBy("total_revenue", ascending=False) \
    .show()

print("\n3. Top 10 usuarios más activos:")
df_sample.groupBy("user_id") \
    .agg(
        count("*").alias("total_events"),
        spark_sum(when(col("event_type") == "purchase", 1).otherwise(0)).alias("purchases")
    ) \
    .orderBy("total_events", ascending=False) \
    .limit(10) \
    .show()

print("\n4. Tasa de conversión por producto:")
conversion_rate = df_sample.groupBy("product_name") \
    .agg(
        count("*").alias("total_events"),
        spark_sum(when(col("event_type") == "view", 1).otherwise(0)).alias("views"),
        spark_sum(when(col("event_type") == "purchase", 1).otherwise(0)).alias("purchases")
    )

conversion_rate.withColumn(
    "conversion_rate",
    (col("purchases") / col("views") * 100)
).orderBy("conversion_rate", ascending=False).show()

## 9. Escribir Stream a Diferentes Sinks

In [None]:
# Ejemplo de configuración de escritura (comentado para evitar ejecución)

# 1. Escribir a consola (desarrollo/debug)
console_query_config = {
    'outputMode': 'complete',  # complete, append, update
    'format': 'console',
    'trigger': {'processingTime': '10 seconds'},
    'options': {
        'truncate': False,
        'numRows': 20
    }
}

# 2. Escribir a Parquet (data lake)
parquet_query_config = {
    'outputMode': 'append',
    'format': 'parquet',
    'path': '/path/to/output',
    'checkpointLocation': '/path/to/checkpoint',
    'trigger': {'processingTime': '1 minute'},
    'options': {
        'compression': 'snappy',
        'partitionBy': 'date'
    }
}

# 3. Escribir a Kafka
kafka_query_config = {
    'outputMode': 'append',
    'format': 'kafka',
    'options': {
        'kafka.bootstrap.servers': 'localhost:9092',
        'topic': 'processed-events',
        'checkpointLocation': '/path/to/checkpoint'
    }
}

# 4. Escribir a Delta Lake
delta_query_config = {
    'outputMode': 'append',
    'format': 'delta',
    'path': '/path/to/delta',
    'checkpointLocation': '/path/to/checkpoint',
    'options': {
        'mergeSchema': True,
        'optimizeWrite': True
    }
}

print("Configuraciones de sink definidas (ver código para detalles)")

## 10. Métricas y Monitoreo

In [None]:
# Función para monitorear estado del stream
def monitor_stream_metrics(query):
    """
    Extraer métricas del streaming query
    """
    status = query.status
    
    metrics = {
        'isDataAvailable': status['isDataAvailable'],
        'isTriggerActive': status['isTriggerActive'],
        'message': status['message']
    }
    
    if 'inputRowsPerSecond' in status:
        metrics['inputRowsPerSecond'] = status['inputRowsPerSecond']
    
    if 'processedRowsPerSecond' in status:
        metrics['processedRowsPerSecond'] = status['processedRowsPerSecond']
    
    return metrics


# Ejemplo de métricas a monitorear
print("\n=== MÉTRICAS CLAVE PARA MONITOREO ===")
print("""
1. Input Rate: Eventos por segundo recibidos
2. Processing Rate: Eventos por segundo procesados
3. Batch Duration: Tiempo de procesamiento por batch
4. Trigger Interval: Intervalo entre ejecuciones
5. Watermark: Retraso máximo aceptado
6. Estado del Query: Activo, inactivo, error
7. Checkpoint Location: Para recuperación de fallos
""")

## 11. Optimización de Rendimiento

In [None]:
# Configuraciones de optimización
optimization_configs = {
    # Particionamiento
    'spark.sql.shuffle.partitions': '200',  # Número de particiones para shuffles
    'spark.default.parallelism': '200',     # Paralelismo por defecto
    
    # Memoria
    'spark.executor.memory': '4g',
    'spark.driver.memory': '2g',
    'spark.memory.fraction': '0.8',
    
    # Streaming específico
    'spark.sql.streaming.minBatchesToRetain': '100',
    'spark.sql.streaming.stateStore.providerClass': 'org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider',
    
    # Optimización de escritura
    'spark.sql.adaptive.enabled': 'true',
    'spark.sql.adaptive.coalescePartitions.enabled': 'true',
}

print("\n=== MEJORES PRÁCTICAS DE OPTIMIZACIÓN ===")
print("""
1. Usar watermarks para limpiar estado antiguo
2. Particionar datos por columnas clave
3. Configurar apropiadamente spark.sql.shuffle.partitions
4. Usar triggers basados en tiempo para controlar frecuencia
5. Implementar checkpointing para recuperación
6. Monitorear métricas constantemente
7. Usar Delta Lake para ACID transactions
8. Implementar compactación de archivos pequeños
9. Optimizar esquemas y evitar tipos genéricos
10. Considerar micro-batching vs continuous processing
""")

## Resumen y Arquitectura Enterprise

### Arquitectura Típica de Streaming:
```
Fuentes de Datos       Ingesta           Procesamiento        Almacenamiento        Consumo
────────────────       ───────           ──────────────       ──────────────        ───────
Kafka/Kinesis    →    Spark       →     Transformaciones  →   Delta Lake      →    BI Tools
IoT Devices      →    Streaming   →     Agregaciones      →   Data Lake       →    ML Models
APIs             →                →     Joins             →   Warehouse       →    Dashboards
Logs             →                →     Windows           →   Cache (Redis)   →    Alertas
```

### Patrones Avanzados:

#### 1. Lambda Architecture
- **Batch Layer**: Procesamiento histórico completo
- **Speed Layer**: Procesamiento en tiempo real
- **Serving Layer**: Combina ambas vistas

#### 2. Kappa Architecture
- Solo capa de streaming
- Todo procesamiento en tiempo real
- Reprocesamiento desde el inicio del stream

#### 3. Delta Architecture
- Basada en Delta Lake
- ACID transactions
- Time travel
- Schema evolution

### Casos de Uso Enterprise:

1. **Detección de Fraude en Tiempo Real**
   - Análisis de patrones sospechosos
   - Machine Learning en streaming
   - Alertas automáticas

2. **Recomendaciones Personalizadas**
   - Seguimiento de comportamiento en tiempo real
   - Actualización de perfiles de usuario
   - A/B testing dinámico

3. **Monitoreo de Infraestructura**
   - Logs y métricas en tiempo real
   - Detección de anomalías
   - Auto-scaling basado en carga

4. **IoT y Telemetría**
   - Procesamiento de sensores
   - Mantenimiento predictivo
   - Optimización de operaciones

### Consideraciones de Producción:

- **Alta Disponibilidad**: Cluster mode, múltiples workers
- **Fault Tolerance**: Checkpointing, Write-Ahead Logs
- **Escalabilidad**: Auto-scaling, dynamic allocation
- **Seguridad**: Kerberos, SSL/TLS, encryption at rest
- **Monitoreo**: Prometheus, Grafana, CloudWatch
- **Testing**: Unit tests, integration tests, chaos engineering

### Recursos Adicionales:
- [Spark Structured Streaming Guide](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html)
- [Delta Lake Documentation](https://docs.delta.io/latest/index.html)
- [Databricks Streaming Best Practices](https://docs.databricks.com/structured-streaming/index.html)

In [None]:
# Limpiar recursos
spark.stop()
print("Spark Session cerrada")