# Lab 4 — Zarzadzanie zrodlami danych strumieniowych i segmentacja klientow

## 🔄 Wprowadzenie

W tym laboratorium zapoznasz sie z roznymi metodami zasilania danych strumieniowych w Apache Spark oraz zastosowaniem prostych transformacji, filtrowania i segmentacji klientow w czasie rzeczywistym.

## 💡 Pomocnicza funkcja do wyswietlania naszych danych strumieniowych

Funkcja ta pozwala wyświetlać batche strumieni w notatniku. Dodatkowo ograniczona została do realizacji 5 następujących po sobie elementów. 

In [None]:
batch_counter = {"count": 0}

def process_batch(df, batch_id):
    batch_counter["count"] += 1
    print(f"Batch ID: {batch_id}")
    df.show(truncate=False)
    if batch_counter["count"] % 5 == 0:
        spark.stop()

## 🔹 rate jako źródło kontrolowanego strumienia

### ✅ Zadanie 1

1. Przygotuj strumien danych z `format('rate')`, ustaw `rowsPerSecond` na 5.
2. Utworz kolumne `user_id`: `expr("concat('u', cast(rand()*100 as int))")`
3. Dodaj kolumne `event_type`: `expr("case when rand() > 0.7 then 'purchase' else 'view' end")`

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr

spark = SparkSession.builder.appName("StreamingDemo").getOrCreate()
spark.sparkContext.setLogLevel("WARN")

rate_df = (spark.readStream
           .format("rate")
           .option("rowsPerSecond", 5)
           .load())

events = rate_df.withColumn("user_id", expr("concat('u', cast(rand()*100 as int))")) \
                 .withColumn("event_type", expr("case when rand() > 0.7 then 'purchase' else 'view' end"))

query = (events.writeStream
         .format("console")
         .foreachBatch(process_batch)
         .start())

## 🔹 Źródło plikowe (JSON)

### ✅ Schemat danych:

```json
{
  "user_id": "u123",
  "event_type": "purchase", // albo "view", "cart", "click"
  "timestamp": "2025-05-09T15:24:00Z",
  "product_id": "p456",
  "category": "electronics",
  "price": 299.99
}
```

### ✅ Generator danych:

In [None]:
%%file generator.py
# generator.py
import json, os, random, time
from datetime import datetime, timedelta

output_dir = "data/stream"
os.makedirs(output_dir, exist_ok=True)

event_types = ["view", "cart", "purchase"]
categories = ["electronics", "books", "fashion", "home", "sports"]

def generate_event():
    return {
        "user_id": f"u{random.randint(1, 50)}",
        "event_type": random.choices(event_types, weights=[0.6, 0.25, 0.15])[0],
        "timestamp": (datetime.utcnow() - timedelta(seconds=random.randint(0, 300))).isoformat(),
        "product_id": f"p{random.randint(100, 120)}",
        "category": random.choice(categories),
        "price": round(random.uniform(10, 1000), 2)
    }

# Simulate file-based streaming
while True:
    batch = [generate_event() for _ in range(50)]
    filename = f"{output_dir}/events_{int(time.time())}.json"
    with open(filename, "w") as f:
        for e in batch:
            f.write(json.dumps(e) + "\n")
    print(f"Wrote: {filename}")
    time.sleep(5)

### Schemat danych 

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr

spark = SparkSession.builder.appName("StreamingDemo").getOrCreate()
spark.sparkContext.setLogLevel("WARN")

from pyspark.sql.types import *

schema = StructType([
    StructField("user_id", StringType()),
    StructField("event_type", StringType()),
    StructField("timestamp", TimestampType()),
    StructField("product_id", StringType()),
    StructField("category", StringType()),
    StructField("price", DoubleType())
])

In [None]:
stream = (spark.readStream
          .schema(schema)
          .json("data/stream"))

# sprawdz opcje 
# .option("maxFilesPerTrigger", 2) 
# .option("header", true) for csv file 

query = (stream.writeStream
         .format("console")
         .foreachBatch(process_batch)
         .start())



### Okna czasowe z wykorzystaniem Watermark

1. Tumbling window
2. Sliding window

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr

spark = SparkSession.builder.appName("StreamingDemo").getOrCreate()
spark.sparkContext.setLogLevel("WARN")

from pyspark.sql.types import *

schema = StructType([
    StructField("user_id", StringType()),
    StructField("event_type", StringType()),
    StructField("timestamp", TimestampType()),
    StructField("product_id", StringType()),
    StructField("category", StringType()),
    StructField("price", DoubleType())
])

stream = (spark.readStream
          .schema(schema)
          .json("data/stream"))

windowed = (stream.withWatermark("timestamp", "1 minute") 
    .groupBy(window("timestamp", "5 minutes", "1 minute"), "event_type")
    .count())

query = (windowed.writeStream
         .outputMode("append")
         .foreachBatch(process_batch)
         .format("console")
         .start())

## przykład - segmentacja klientów 

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession.builder.appName("StreamingDemo").getOrCreate()
spark.sparkContext.setLogLevel("WARN")


schema = StructType([
    StructField("user_id", StringType()),
    StructField("event_type", StringType()),
    StructField("timestamp", TimestampType()),
    StructField("product_id", StringType()),
    StructField("category", StringType()),
    StructField("price", DoubleType())
])

stream = (spark.readStream
          .schema(schema)
          .json("data/stream"))


user_events = (stream
    .withWatermark("timestamp", "1 minute")
    .groupBy(window("timestamp", "10 seconds"), "user_id")
    .agg(collect_set("event_type").alias("events")))

segmented = user_events.withColumn("segment", expr("""
    CASE
        WHEN array_contains(events, 'purchase') THEN 'Buyer'
        WHEN array_contains(events, 'cart') THEN 'Cart abandoner'
        WHEN array_contains(events, 'view') THEN 'Lurker'
        ELSE 'Other'
    END
"""))

query = (segmented.writeStream
         .outputMode("append")
         .format("console")
         .foreachBatch(process_batch)
         .start())

### przesylanie strumienia przez socket 

```bash
nc -lk 9998


ps | grep 9998
kill -9 <p_id>
```

In [None]:
%%file start_stream.py

from socket import *
import time

rdd = list()
with open("MobyDick.txt", 'r') as ad:
    for line in ad:
        rdd.append(line)

HOST = 'localhost'
PORT = 9998
ADDR = (HOST, PORT)
tcpSock = socket(AF_INET, SOCK_STREAM)
tcpSock.bind(ADDR)
tcpSock.listen(5)


while True:
    c, addr = tcpSock.accept()
    print('got connection')
    for line in rdd:
        try:
            c.send(line.encode())
            time.sleep(1)
        except:
            break
    c.close()
    print('disconnected')

In [None]:
batch_counter = {"count": 0}

def process_batch(df, batch_id):
    batch_counter["count"] += 1
    print(f"Batch ID: {batch_id}")
    df.show(truncate=False)
    if batch_counter["count"] % 5 == 0:
        spark.stop()

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split

if __name__ == "__main__":
    spark = SparkSession.builder.appName("Stream_DF").getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")
    
    lines = (spark
         .readStream
         .format("socket")
         .option("host", "localhost")
         .option("port", 9998)
         .load())

    words = lines.select(explode(split(lines.value, " ")).alias("words"))
    word_counts = words.groupBy("words").count()

    streamingQuery = (word_counts
         .writeStream
         .format("console")
         .outputMode("complete") # .outputMode("update")
#         .trigger(processingTime="5 second")
         .start())

    streamingQuery.awaitTermination()

# Przetwarzanie danych strumieniowych z Apache Kafka i Spark

1. Sprawdź czy serwer Kafki posiada jakieś zdefiniowane topici:
    - w dodatkowym oknie termianala wpisz polecenie:
    ```bash
    cd ~ 
    kafka/bin/kafka-topics.sh --list --bootstrap-server broker:9092
    ```
2. dodaj topic o nazwie `streamXX` Gdzie za `X` wstaw nr grupy a za `xx` nr swojego serwera
```bash
cd ~ 
kafka/bin/kafka-topics.sh --bootstrap-server broker:9092 --create --topic streamXX
```

3. sprawdź listę tematów ponownie upewniając się, że posiadasz temat `streamXX`

4. Uruchom nowy terminal na swoim komputerze i utwórz producenta generującego dane do nowego topicu
```bash
cd ~ 
kafka/bin/kafka-console-producer.sh --bootstrap-server broker:9092 --topic stream
```

Aby sprawdzić czy wysyłanie wiadomości działa uruchom kolejne okno terminala i wpisz następującą komendę realizującą konsumenta: 

```bash
cd ~ 
kafka/bin/kafka-console-consumer.sh  --bootstrap-server broker:9092 --topic streamXX 
```
Zweryfikuj, że przesyłanie danych działa. 

Zamknij okno producenta. Okno konsumenta zostaw otwarte - przyda się do weryfikacji automatu generującego dane. 

## Uruchomienie kodu wysyłającego strumień

Uzupełnij skrypt tak by generował następujące dane: 

1. utwórz zmienną `message` która będzie słownikiem zawierającym informacje pojedynczego eventu (klucz: wartość): 
    - "time" : aktualny czas w postaci stringu datetime.now()
    - "id" : wybierane losowo z listy ["a", "b", "c", "d", "e"]
    - "value: losowa wartość z zakresu 0 do 100
  


In [None]:
%%file stream.py

import json
import random
import sys
from datetime import datetime
from time import sleep
from kafka import KafkaProducer


KAFKA_SERVER = "broker:9092"
TOPIC = 'stream'
LAG = 2

def create_producer(server):
    return KafkaProducer(
        bootstrap_servers=[server],
        value_serializer=lambda x: json.dumps(x).encode("utf-8"),
        api_version=(3, 7, 0),
    )

if __name__ == "__main__":
    
    producer = create_producer(KAFKA_SERVER)
    try:
        while True:

            message = {
                "time" : str(datetime.now() )  ,
                "id" : random.choice(["a", "b", "c", "d", "e"])     ,
                "temperatura" : random.randint(-100,100)  ,
                "cisnienie" :  random.randint(0,50)   ,
            }
  
            producer.send(TOPIC, value=message)
            sleep(LAG)
    except KeyboardInterrupt:
        producer.close()

2.  w terminalu jupyterlab uruchom plik `stream.py`
```bash
python stream.py
```

sprawdz w oknie consumenta czy wysyłane wiadomości przychodzą do Kafki.

Za uruchomienie importu kafka odpowiedzialna jest biblioteka `kafka-python`
którą możesz zainstalować poleceniem `pip install kafka-python`

## APACHE SPARK 

Przygotuj kod skryptu który pobierze informacje z przesyłanego strumienia danych. 

In [None]:
%%file app.py

# spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1 app.py

from pyspark.sql import SparkSession 
from pyspark.sql.functions import *
from pyspark.sql.types import *


SERVER = "broker:9092"
TOPIC = 'stream'

schema = StructType(
        [
            StructField("time", TimestampType()),
            StructField("id", StringType()),
            StructField("temperatura", IntegerType()),
            StructField("cisnienie", IntegerType()),
        ]
    )

SCHEMA = """time Timestamp, id String, temperatura Int, cisnienie Int """ # DDL string


if __name__ == "__main__":
    
    spark = SparkSession.builder.getOrCreate()
    spark.sparkContext.setLogLevel("WARN")
     
    raw = (
        spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", SERVER)
        .option("subscribe", TOPIC)
        .load()
    # .select(col("value").cast("string"))
    )
    # query =  (
    #     raw.writeStream
    #     .outputMode("append")
    #     .format("console")
    #     .option("truncate", False)
    #     .start()
    # )
    parsed = (raw.select("timestamp", from_json(decode(col("value"), "utf-8"), SCHEMA).alias("moje_dane"))
                .select("timestamp", "moje_dane.*")
             )
    # query =  (
    #     parsed.writeStream
    #     .outputMode("append")
    #     .format("console")
    #     .option("truncate", False)
    #     .start()
    # )
    # gr = parsed.agg(avg("temperatura"), avg("cisnienie"))
    # query =  (gr.writeStream
    #     .outputMode("update")
    #     .format("console")
    #     .option("truncate", False)
    #     .start()
    # )
    gr = (parsed.withWatermark("timestamp", "5 seconds")
    .groupBy(window("timestamp", "10 seconds", "7 seconds"))
    .agg(avg("temperatura"), avg("cisnienie"))
         )
    query =  (gr.writeStream
        .outputMode("complete")
        .format("console")
        .option("truncate", False)
        .start()
    )
    query.awaitTermination()