# 09 - Structured Streaming

Przetwarzanie danych w czasie rzeczywistym z Spark Structured Streaming.

**Tematy:**
- Koncepcja Structured Streaming - "nieskończona tabela"
- Źródła danych: rate, socket, file, Kafka
- Output modes: append, complete, update
- Okna czasowe (tumbling, sliding)
- Watermarki - obsługa spóźnionych danych
- Streaming joins
- Symulacja real-time ratingu filmów
- foreachBatch - zapis wyników do PostgreSQL

## 1. Setup

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession.builder \
    .appName("09_Structured_Streaming") \
    .master("spark://spark-master:7077") \
    .config("spark.jars.packages", "org.postgresql:postgresql:42.7.1") \
    .config("spark.driver.memory", "6g") \
    .config("spark.executor.memory", "7g") \
    .config("spark.driver.host", "recommender-jupyter") \
    .config("spark.driver.bindAddress", "0.0.0.0") \
    .config("spark.sql.shuffle.partitions", "4") \
    .getOrCreate()

jdbc_url = "jdbc:postgresql://postgres:5432/recommender"
jdbc_properties = {
    "user": "recommender",
    "password": "recommender",
    "driver": "org.postgresql.Driver"
}

## 2. Koncepcja Structured Streaming

Structured Streaming traktuje strumień danych jako **nieskończoną tabelę**, do której ciągle dochodzą nowe wiersze.

```
Batch:     [dane] → przetworzenie → [wynik]
Streaming: [dane...dane...dane...] → ciągłe przetwarzanie → [wynik aktualizowany]
```

**Kluczowa zaleta:** ten sam kod DataFrame/SQL działa w batch i streaming!

### Output modes:
- **append** - tylko nowe wiersze (domyślny, nie działa z agregacjami bez watermarki)
- **complete** - cała tabela wyników (dla agregacji)
- **update** - tylko zmienione wiersze

## 3. Źródło `rate` - generator danych testowych

Najprostrze źródło do nauki - generuje wiersze z autoinkrementowanym `value` i `timestamp`.

In [None]:
# Źródło rate - generuje N wierszy/sekundę
rate_stream = spark.readStream \
    .format("rate") \
    .option("rowsPerSecond", 10) \
    .load()

# To jest streaming DataFrame - wygląda jak zwykły ale jest "leniwy"
rate_stream.printSchema()
print(f"Is streaming: {rate_stream.isStreaming}")

In [None]:
# Przetwórz strumień - dodaj kolumny
processed = rate_stream \
    .withColumn("value_squared", col("value") * col("value")) \
    .withColumn("is_even", col("value") % 2 == 0)

# Uruchom query z outputem do konsoli (pamięci)
query = processed.writeStream \
    .outputMode("append") \
    .format("memory") \
    .queryName("rate_test") \
    .start()

# Poczekaj chwilę na dane
import time
time.sleep(5)

# Odczytaj wyniki z pamięci
spark.sql("SELECT * FROM rate_test ORDER BY timestamp DESC LIMIT 10").show(truncate=False)

query.stop()

## 4. Symulacja strumienia ocen filmów

Zapiszemy oceny jako pliki JSON, a Spark będzie je odczytywał jako strumień (file source).

In [None]:
import json
import os
import random
from datetime import datetime, timedelta

# Katalog na symulowane dane streamingowe
STREAM_DIR = "/tmp/rating_stream"
CHECKPOINT_DIR = "/tmp/rating_checkpoint"

os.makedirs(STREAM_DIR, exist_ok=True)
os.makedirs(CHECKPOINT_DIR, exist_ok=True)

# Wyczyść stare dane
for f in os.listdir(STREAM_DIR):
    os.remove(os.path.join(STREAM_DIR, f))

def generate_rating_batch(batch_id, n=50):
    """Generuj plik JSON z losowymi ocenami."""
    ratings = []
    base_time = datetime.now()
    for i in range(n):
        ratings.append({
            "user_id": random.randint(1, 1000),
            "movie_id": random.choice([1, 2, 50, 110, 260, 296, 318, 356, 480, 527, 589, 593, 2571, 4993, 58559]),
            "rating": random.choice([0.5, 1.0, 1.5, 2.0, 2.5, 3.0, 3.5, 4.0, 4.5, 5.0]),
            "timestamp": (base_time + timedelta(seconds=random.randint(-30, 0))).isoformat()
        })
    
    path = os.path.join(STREAM_DIR, f"batch_{batch_id}.json")
    with open(path, 'w') as f:
        for r in ratings:
            f.write(json.dumps(r) + '\n')
    return path

# Wygeneruj pierwszy batch
generate_rating_batch(0)
print("Pierwszy batch wygenerowany")

In [None]:
# Schemat strumienia - MUSI być zdefiniowany z góry (streaming nie może go wywnioskować)
rating_schema = StructType([
    StructField("user_id", IntegerType()),
    StructField("movie_id", IntegerType()),
    StructField("rating", DoubleType()),
    StructField("timestamp", StringType())
])

# Odczytaj strumień z katalogu
rating_stream = spark.readStream \
    .format("json") \
    .schema(rating_schema) \
    .option("maxFilesPerTrigger", 1) \
    .load(STREAM_DIR)

# Dodaj event time jako timestamp
rating_stream = rating_stream \
    .withColumn("event_time", to_timestamp(col("timestamp")))

print(f"Is streaming: {rating_stream.isStreaming}")
rating_stream.printSchema()

## 5. Agregacje w strumieniu - output mode `complete`

In [None]:
# Średnia ocena per film - aktualizowana w czasie rzeczywistym
movie_avg_stream = rating_stream \
    .groupBy("movie_id") \
    .agg(
        count("*").alias("num_ratings"),
        round(avg("rating"), 2).alias("avg_rating"),
        max("event_time").alias("last_rating_time")
    )

# Uruchom z complete mode (cała tabela wyników)
query_avg = movie_avg_stream.writeStream \
    .outputMode("complete") \
    .format("memory") \
    .queryName("movie_averages") \
    .option("checkpointLocation", f"{CHECKPOINT_DIR}/avg") \
    .start()

time.sleep(3)
spark.sql("SELECT * FROM movie_averages ORDER BY num_ratings DESC").show()

In [None]:
# Dodaj więcej danych - obserwuj jak wyniki się aktualizują
for i in range(1, 4):
    generate_rating_batch(i, n=100)
    time.sleep(3)
    print(f"\n=== Po batch {i} ===")
    spark.sql("SELECT * FROM movie_averages ORDER BY num_ratings DESC LIMIT 5").show()

In [None]:
# Status query
print(query_avg.status)
print(f"\nLast progress:")
if query_avg.lastProgress:
    progress = query_avg.lastProgress
    print(f"  Input rows: {progress.get('numInputRows', 'N/A')}")
    print(f"  Processing time: {progress.get('batchDuration', 'N/A')}ms")

query_avg.stop()

## 6. Okna czasowe (Window Operations)

### Tumbling window
Okno stałe, nieprzekrywające się: `[00:00-00:05), [00:05-00:10), ...`

### Sliding window
Okno przesuwane: slide co 1 min, okno 5 min: `[00:00-00:05), [00:01-00:06), ...`

In [None]:
# Wyczyść checkpointy
import shutil
for d in os.listdir(CHECKPOINT_DIR):
    shutil.rmtree(os.path.join(CHECKPOINT_DIR, d), ignore_errors=True)

# Odczytaj strumień od nowa
rating_stream2 = spark.readStream \
    .format("json") \
    .schema(rating_schema) \
    .option("maxFilesPerTrigger", 1) \
    .load(STREAM_DIR) \
    .withColumn("event_time", to_timestamp(col("timestamp")))

# Tumbling window - 10 sekundowe okna
windowed_ratings = rating_stream2 \
    .groupBy(
        window(col("event_time"), "10 seconds"),  # tumbling window
        col("movie_id")
    ) \
    .agg(
        count("*").alias("num_ratings"),
        round(avg("rating"), 2).alias("avg_rating")
    )

query_window = windowed_ratings.writeStream \
    .outputMode("complete") \
    .format("memory") \
    .queryName("windowed_ratings") \
    .option("checkpointLocation", f"{CHECKPOINT_DIR}/window") \
    .start()

time.sleep(5)
spark.sql("""
    SELECT window.start, window.end, movie_id, num_ratings, avg_rating
    FROM windowed_ratings
    ORDER BY window.start DESC, num_ratings DESC
    LIMIT 20
""").show(truncate=False)

query_window.stop()

In [None]:
# Wyczyść checkpointy
for d in os.listdir(CHECKPOINT_DIR):
    shutil.rmtree(os.path.join(CHECKPOINT_DIR, d), ignore_errors=True)

# Odczytaj strumień od nowa
rating_stream3 = spark.readStream \
    .format("json") \
    .schema(rating_schema) \
    .option("maxFilesPerTrigger", 1) \
    .load(STREAM_DIR) \
    .withColumn("event_time", to_timestamp(col("timestamp")))

# Sliding window - okno 30s, slide co 10s
sliding_ratings = rating_stream3 \
    .groupBy(
        window(col("event_time"), "30 seconds", "10 seconds"),  # sliding
    ) \
    .agg(
        count("*").alias("total_ratings"),
        round(avg("rating"), 2).alias("avg_rating"),
        countDistinct("user_id").alias("unique_users")
    )

query_sliding = sliding_ratings.writeStream \
    .outputMode("complete") \
    .format("memory") \
    .queryName("sliding_ratings") \
    .option("checkpointLocation", f"{CHECKPOINT_DIR}/sliding") \
    .start()

# Dodaj nowe dane
generate_rating_batch(10, n=200)
time.sleep(5)

spark.sql("""
    SELECT window.start, window.end, total_ratings, avg_rating, unique_users
    FROM sliding_ratings
    ORDER BY window.start DESC
    LIMIT 10
""").show(truncate=False)

query_sliding.stop()

## 7. Watermarki - obsługa spóźnionych danych

W rzeczywistym systemie dane mogą docierać z opóźnieniem. Watermark mówi Sparkowi:
"akceptuj dane spóźnione maksymalnie o X czasu".

Po przekroczeniu watermarki, Spark:
- Nie aktualizuje starych okien
- Może zwolnić pamięć stanu (state)
- Umożliwia `append` mode z agregacjami

In [None]:
# Wyczyść checkpointy
for d in os.listdir(CHECKPOINT_DIR):
    shutil.rmtree(os.path.join(CHECKPOINT_DIR, d), ignore_errors=True)

# Odczytaj strumień od nowa
rating_stream4 = spark.readStream \
    .format("json") \
    .schema(rating_schema) \
    .option("maxFilesPerTrigger", 1) \
    .load(STREAM_DIR) \
    .withColumn("event_time", to_timestamp(col("timestamp")))

# Z watermarkiem - akceptuj do 30 sekund spóźnienia
watermarked = rating_stream4 \
    .withWatermark("event_time", "30 seconds") \
    .groupBy(
        window(col("event_time"), "10 seconds"),
        col("movie_id")
    ) \
    .agg(
        count("*").alias("cnt"),
        round(avg("rating"), 2).alias("avg_rating")
    )

# Z watermarkiem można użyć APPEND mode!
# Okna są emitowane dopiero po zamknięciu (watermark minął)
query_wm = watermarked.writeStream \
    .outputMode("append") \
    .format("memory") \
    .queryName("watermarked_ratings") \
    .option("checkpointLocation", f"{CHECKPOINT_DIR}/watermark") \
    .start()

# Generuj dane
for i in range(20, 25):
    generate_rating_batch(i, n=50)
    time.sleep(2)

time.sleep(10)
spark.sql("SELECT * FROM watermarked_ratings ORDER BY window.start DESC").show(truncate=False)

query_wm.stop()

### Zadanie 1
Stwórz streaming query z watermarkiem, który:
1. Liczy liczbę ocen per użytkownik w oknach 1-minutowych
2. Akceptuje spóźnienia do 1 minuty
3. Używa append mode

Generuj batche i obserwuj wyniki.

In [None]:
# Twoje rozwiązanie:


## 8. Stream-Static Join

Łączenie strumienia z statycznym DataFrame (np. strumień ocen + tabela filmów).

In [None]:
# Wyczyść checkpointy
for d in os.listdir(CHECKPOINT_DIR):
    shutil.rmtree(os.path.join(CHECKPOINT_DIR, d), ignore_errors=True)

# Załaduj statyczną tabelę filmów
movies_static = spark.read.jdbc(jdbc_url, "movielens.movies", properties=jdbc_properties)

# Odczytaj strumień od nowa
rating_stream5 = spark.readStream \
    .format("json") \
    .schema(rating_schema) \
    .option("maxFilesPerTrigger", 1) \
    .load(STREAM_DIR) \
    .withColumn("event_time", to_timestamp(col("timestamp")))

# Stream-static join - wzbogacamy strumień o tytuły filmów
enriched_stream = rating_stream5.join(movies_static, "movie_id")

# Agregacja per gatunek w czasie rzeczywistym
genre_stats = enriched_stream \
    .groupBy("genres") \
    .agg(
        count("*").alias("num_ratings"),
        round(avg("rating"), 2).alias("avg_rating")
    )

query_genre = genre_stats.writeStream \
    .outputMode("complete") \
    .format("memory") \
    .queryName("genre_stats") \
    .option("checkpointLocation", f"{CHECKPOINT_DIR}/genre") \
    .start()

generate_rating_batch(30, n=200)
time.sleep(5)

spark.sql("""
    SELECT genres, num_ratings, avg_rating
    FROM genre_stats
    ORDER BY num_ratings DESC
""").show(truncate=False)

query_genre.stop()

## 9. foreachBatch - zapis wyników do zewnętrznego systemu

`foreachBatch` daje pełną kontrolę nad tym co się dzieje z każdym micro-batchem.

In [None]:
# Wyczyść checkpointy
for d in os.listdir(CHECKPOINT_DIR):
    shutil.rmtree(os.path.join(CHECKPOINT_DIR, d), ignore_errors=True)

# Odczytaj strumień od nowa
rating_stream6 = spark.readStream \
    .format("json") \
    .schema(rating_schema) \
    .option("maxFilesPerTrigger", 1) \
    .load(STREAM_DIR) \
    .withColumn("event_time", to_timestamp(col("timestamp")))

def write_to_postgres(batch_df, batch_id):
    """Zapisz micro-batch do PostgreSQL."""
    if batch_df.count() == 0:
        return
    
    # Agregacja per film w batchu
    movie_stats = batch_df.groupBy("movie_id").agg(
        count("*").alias("batch_ratings"),
        round(avg("rating"), 2).alias("batch_avg_rating"),
        max("event_time").alias("last_update")
    ).withColumn("batch_id", lit(batch_id))
    
    # Zapisz do PostgreSQL
    movie_stats.write \
        .mode("append") \
        .jdbc(jdbc_url, "movielens.streaming_stats", properties=jdbc_properties)
    
    print(f"Batch {batch_id}: zapisano {movie_stats.count()} wierszy do PostgreSQL")

# Uwaga: tabela streaming_stats musi istnieć w PostgreSQL
# Możesz ją stworzyć ręcznie lub użyć trybu "overwrite" na pierwszym batchu

# Alternatywnie - zapisz do Parquet (nie wymaga tabeli w DB)
def write_to_parquet(batch_df, batch_id):
    """Zapisz micro-batch do Parquet."""
    if batch_df.count() == 0:
        return
    batch_df.write \
        .mode("append") \
        .parquet("/tmp/streaming_output")
    print(f"Batch {batch_id}: zapisano {batch_df.count()} wierszy")

query_foreach = rating_stream6.writeStream \
    .foreachBatch(write_to_parquet) \
    .option("checkpointLocation", f"{CHECKPOINT_DIR}/foreach") \
    .start()

generate_rating_batch(40, n=100)
time.sleep(5)
generate_rating_batch(41, n=100)
time.sleep(5)

query_foreach.stop()

# Sprawdź wyniki
spark.read.parquet("/tmp/streaming_output").show(5)

## 10. Trending Movies - wykrywanie trendów

Znajdź filmy z nagłym wzrostem popularności.

In [None]:
# Wyczyść checkpointy
for d in os.listdir(CHECKPOINT_DIR):
    shutil.rmtree(os.path.join(CHECKPOINT_DIR, d), ignore_errors=True)

# Odczytaj strumień od nowa
rating_stream7 = spark.readStream \
    .format("json") \
    .schema(rating_schema) \
    .option("maxFilesPerTrigger", 1) \
    .load(STREAM_DIR) \
    .withColumn("event_time", to_timestamp(col("timestamp")))

# Policz oceny w 15-sekundowych oknach z watermarkiem
trending = rating_stream7 \
    .withWatermark("event_time", "30 seconds") \
    .groupBy(
        window(col("event_time"), "15 seconds"),
        col("movie_id")
    ) \
    .agg(
        count("*").alias("rating_count"),
        round(avg("rating"), 2).alias("avg_rating")
    )

query_trending = trending.writeStream \
    .outputMode("complete") \
    .format("memory") \
    .queryName("trending_movies") \
    .option("checkpointLocation", f"{CHECKPOINT_DIR}/trending") \
    .start()

# Generuj sporo danych z jednym "trendującym" filmem
def generate_trending_batch(batch_id, trending_movie_id=318, n=100):
    """Generuj batch z jednym filmem dominującym (trending)."""
    ratings = []
    base_time = datetime.now()
    for i in range(n):
        # 70% ocen dla trending movie
        mid = trending_movie_id if random.random() < 0.7 else random.choice([1, 50, 260, 2571])
        ratings.append({
            "user_id": random.randint(1, 1000),
            "movie_id": mid,
            "rating": random.choice([3.5, 4.0, 4.5, 5.0]) if mid == trending_movie_id else random.choice([1.0, 2.0, 3.0, 4.0, 5.0]),
            "timestamp": (base_time + timedelta(seconds=random.randint(-10, 0))).isoformat()
        })
    path = os.path.join(STREAM_DIR, f"trending_{batch_id}.json")
    with open(path, 'w') as f:
        for r in ratings:
            f.write(json.dumps(r) + '\n')

for i in range(50, 55):
    generate_trending_batch(i)
    time.sleep(2)

time.sleep(5)

# Który film trenduje?
movies_static = spark.read.jdbc(jdbc_url, "movielens.movies", properties=jdbc_properties)
spark.sql("""
    SELECT movie_id, SUM(rating_count) as total, ROUND(AVG(avg_rating), 2) as avg
    FROM trending_movies
    GROUP BY movie_id
    ORDER BY total DESC
""").join(movies_static, "movie_id").select("title", "total", "avg").show(truncate=False)

query_trending.stop()

## Zadanie końcowe

Zbuduj **"Real-time Movie Dashboard"** streaming pipeline:

1. Źródło: file stream z symulowanymi ocenami
2. Wzbogacenie: stream-static join z movies (tytuły, gatunki)
3. Metryki w oknie 30s z watermarkiem 1 min:
   - Top 5 filmów po liczbie ocen (trending)
   - Średnia ocena per gatunek
   - Liczba unikalnych użytkowników
4. Zapis: foreachBatch do plików Parquet
5. Generuj batche i obserwuj jak metryki się zmieniają

In [None]:
# Twoje rozwiązanie:


In [None]:
# Cleanup
import shutil
shutil.rmtree(STREAM_DIR, ignore_errors=True)
shutil.rmtree(CHECKPOINT_DIR, ignore_errors=True)
shutil.rmtree("/tmp/streaming_output", ignore_errors=True)

spark.stop()