<a href="https://colab.research.google.com/github/robertoarturomc/ProgramacionConcurrente/blob/main/30_Structured_Streaming_Spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### Programación Concurrente
## 30. Structured Streaming con Spark.

## Structured Streaming con una API Pública (USGS – Terremotos)

### ¿Qué es Structured Streaming?

**Apache Spark Structured Streaming** es un motor de procesamiento de datos en tiempo real que funciona como si estuviéramos ejecutando consultas de Spark *continuamente*.

Su idea principal es escribir código de análisis de datos como si fuera batch, pero Spark lo ejecuta continuamente cada vez que llegan nuevos datos.

### Conceptos clave:

**Flujo de datos (stream)**  
Es una fuente que *produce datos gradualmente*

API, sensores, Kafka, archivos que llegan, logs, etc.

**Micro-batches**  
Spark no procesa cada evento uno por uno, sino pequeños “mini batchs” que agrupan los nuevos datos que llegaron desde la última ejecución.

**Fuentes de streaming (streaming sources)**  
Ejemplos: Kafka, sockets, carpetas donde llegan archivos, APIs, streams internos de Spark.

**Sinks (destinos)**  
Dónde escribimos los resultados: consola, archivos, Kafka, bases de datos, etc.

**Event time vs Processing time**
- *Processing time:* cuándo Spark procesa los datos.  
- *Event time:* cuándo *ocurrieron* realmente los eventos.  
Structured Streaming puede agrupar por ventanas usando *event time*.

**Watermark**  
Un mecanismo para manejar datos que llegan tarde.

### ¿Qué haremos en este notebook?

1. Llamaremos repetidamente a una **API pública real** (USGS Earthquakes).  
2. Cada llamada guardará un archivo NDJSON con varios sismos.  
3. Spark Structured Streaming vigilará una carpeta e irá leyendo cada archivo conforme aparece.  
4. Procesaremos los datos en *tiempo casi real*:  
- Contando número de sismos cada ventana de 10 minutos  
- Calculando magnitud media y máxima  
5. Veremos los resultados aparecer en la consola conforme llegan archivos nuevos.

Es un ejemplo **muy realista** de cómo se integran:
- APIs → ingestión (landing layer)  
- Procesamiento streaming → agregaciones con ventanas  
- Tolerancia a retrasos → watermark  
- Resultados incrementalmente actualizados  

In [1]:
import os
import requests
import json
import time

from datetime import datetime

from pyspark.sql import SparkSession
from pyspark.sql.types import (
    StructType, StructField, StringType, LongType, DoubleType
)
from pyspark.sql.functions import col, from_unixtime, to_timestamp
from pyspark.sql.functions import window, count, avg, max as spark_max


In [2]:
spark = (SparkSession.builder
         .appName("QuakeStructuredStream")
         .master("local[*]")
         .getOrCreate())
spark

1. Crear carpetas para simular el streaming
- `stream_data/` → Aquí Spark estará escuchando por nuevos archivos NDJSON.

Nuestro proceso de Python (más adelante) irá guardando nuevos archivos ahí.


In [3]:
base_dir = "/content/earthquakes"
stream_dir = os.path.join(base_dir, "stream_data")

os.makedirs(stream_dir, exist_ok=True)

print("Carpeta para streaming:", stream_dir)


Carpeta para streaming: /content/earthquakes/stream_data


2. Proceso de ingesta desde la API pública

Usaremos la API de USGS:

https://earthquake.usgs.gov/earthquakes/feed/v1.0/summary/all_hour.geojson

Esta API devuelve **todos los sismos de la última hora** (actualizados continuamente).

Nuestra función:
- Llama a la API varias veces.
- Crea un archivo .ndjson por cada llamada.
- Spark verá estos archivos como "nuevos eventos".


In [4]:
def fetch_and_save_earthquakes(output_dir, iterations=3, sleep_seconds=15):
    """
    Llama a la API de USGS varias veces y guarda archivos NDJSON en output_dir.
    """
    url = "https://earthquake.usgs.gov/earthquakes/feed/v1.0/summary/all_hour.geojson"

    for i in range(iterations):
        print(f"[{datetime.utcnow().isoformat()}] Llamando a la API ({i+1}/{iterations})...")
        resp = requests.get(url)
        data = resp.json()

        features = data.get("features", [])
        print(f"  → {len(features)} eventos recibidos")

        ts = int(time.time())
        out_path = os.path.join(output_dir, f"earthquakes_{ts}.ndjson")

        with open(out_path, "w") as f:
            for feat in features:
                props = feat.get("properties", {})
                geom = feat.get("geometry", {})

                record = {
                    "time": props.get("time"),      # epoch ms
                    "place": props.get("place"),
                    "mag": props.get("mag"),
                    "alert": props.get("alert"),
                    "tsunami": props.get("tsunami"),
                    "longitude": geom.get("coordinates", [None])[0],
                    "latitude":  geom.get("coordinates", [None])[1],
                    "depth":     geom.get("coordinates", [None])[2],
                }
                f.write(json.dumps(record) + "\n")

        print(f"  → Archivo escrito: {out_path}\n")
        if i < iterations - 1:
            time.sleep(sleep_seconds)


3. Definir el stream de lectura en Spark

Ahora que ya sabemos que aparecerán archivos .ndjson en `stream_data/`,
configuramos Spark para:

- Leer archivos JSON de esa carpeta.  
- Convertir el campo `time` (ms desde epoch) a un timestamp de Spark.  
- Preparar los datos para agregaciones de ventanas.


In [5]:
schema = StructType([
    StructField("time", LongType(), True),
    StructField("place", StringType(), True),
    StructField("mag", DoubleType(), True),
    StructField("alert", StringType(), True),
    StructField("tsunami", DoubleType(), True),
    StructField("longitude", DoubleType(), True),
    StructField("latitude", DoubleType(), True),
    StructField("depth", DoubleType(), True),
])

raw_stream_df = (
    spark.readStream
         .format("json")
         .schema(schema)
         .load(stream_dir)
)

stream_df = raw_stream_df.withColumn(
    "event_time",
    to_timestamp(from_unixtime(col("time") / 1000))
)

stream_df.printSchema()


root
 |-- time: long (nullable = true)
 |-- place: string (nullable = true)
 |-- mag: double (nullable = true)
 |-- alert: string (nullable = true)
 |-- tsunami: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- depth: double (nullable = true)
 |-- event_time: timestamp (nullable = true)



4. Agregaciones en tiempo real

Queremos responder preguntas como:

**“¿Cuántos terremotos hubo cada 10 minutos y cuál fue su magnitud promedio y máxima?”**

Para eso:
- Usamos `groupBy(window(event_time, "10 minutes"))`  
- Añadimos un `watermark` para tolerar datos tardíos  
- Calculamos `quake_count`, `avg_mag`, `max_mag`


In [8]:
agg_df = (
    stream_df
    .withWatermark("event_time", "30 minutes")
    .groupBy(window(col("event_time"), "10 minutes"))
    .agg(
        count("*").alias("quake_count"),
        avg("mag").alias("avg_mag"),
        spark_max("mag").alias("max_mag")
    )
)


agg_df.printSchema()



root
 |-- window: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- quake_count: long (nullable = false)
 |-- avg_mag: double (nullable = true)
 |-- max_mag: double (nullable = true)



5. Iniciar el streaming query

Esto iniciará un *job continuo* que:
- Observa la carpeta `stream_data/`
- Espera a que aparezcan archivos nuevos
- Procesa datos cuando llegan
- Actualiza las agregaciones

**Nota: Deja esta celda corriendo.**  
En otra celda correremos la función que llama a la API.


In [17]:
spark.conf.set("spark.sql.streaming.noDataMicroBatch.enabled", "true")

query = (
    agg_df.writeStream
    .outputMode("append")
    .format("parquet")
    .option("path", "/content/earthquakes/output")
    .option("checkpointLocation", "/content/earthquakes/checkpoint")
    .start()
)


In [18]:
fetch_and_save_earthquakes(stream_dir, iterations=3, sleep_seconds=20)


  print(f"[{datetime.utcnow().isoformat()}] Llamando a la API ({i+1}/{iterations})...")


[2025-11-26T00:58:22.615023] Llamando a la API (1/3)...
  → 7 eventos recibidos
  → Archivo escrito: /content/earthquakes/stream_data/earthquakes_1764118702.ndjson

[2025-11-26T00:58:42.796682] Llamando a la API (2/3)...
  → 7 eventos recibidos
  → Archivo escrito: /content/earthquakes/stream_data/earthquakes_1764118722.ndjson

[2025-11-26T00:59:02.934239] Llamando a la API (3/3)...
  → 7 eventos recibidos
  → Archivo escrito: /content/earthquakes/stream_data/earthquakes_1764118743.ndjson



In [19]:
query.stop()


In [20]:
df_results = spark.read.parquet("/content/earthquakes/output")
df_results.show()

AnalysisException: Unable to infer schema for Parquet at . It must be specified manually.