# Przetwarzanie danych strumieniowych

1. Uruchom obraz dockera i sprawdź czy serwer Kafki posiada jakieś zdefiniowane topici:
    - uruchom terminal i w katalogu jupyterlab uruchom docker compose up
    - w dodatkowym oknie termianala wpisz polecenie:
    ```bash
        docker exec broker kafka-topics --list --bootstrap-server broker:9092
    ```

2. dodaj topic o nazwie `streaming`
```bash
docker exec broker kafka-topics --bootstrap-server broker:9092 --create --topic streaming
```

3. sprawdź listę tematów ponownie upewniając się, że posiadasz temat `streaming`

4. Uruchom nowy terminal na swoim komputerze i utwórz producenta generującego dane do nowego topicu
```bash
docker exec --interactive --tty broker kafka-console-producer --bootstrap-server broker:9092 --topic streaming
```

Aby sprawdzić czy wysyłanie wiadomości działa uruchom kolejne okno terminala i wpisz następującą komendę realizującą consumenta: 

```bash
docker exec --interactive --tty broker kafka-console-consumer --bootstrap-server broker:9092 --topic streaming --from-beginning
```

## Uruchomienie kodu wysyłającego strumień automatycznie



Uzupełnij skrypt tak by generował następujące dane: 

1. utwórz zmienną `message` która będzie słownikiem zawierającym informacje pojedynczego eventu (klucz: wartość): 
    - "time" : aktualny czas + timedelta(seconds=random.randint(-15, 0))
    - "id" : wybierane losowo z listy ["a", "b", "c", "d", "e"]
    - "value: losowa wartość z zakresu 0 do 100

In [1]:
%%file stream.py

import json
import random
import sys
from datetime import datetime, timedelta
from time import sleep

from kafka import KafkaProducer

if __name__ == "__main__":
    SERVER = "broker:9092"

    producer = KafkaProducer(
        bootstrap_servers=[SERVER],
        value_serializer=lambda x: json.dumps(x).encode("utf-8"),
        api_version=(2, 7, 0),
    )
    
    try:
        while True:
            ##### 
            #    YOUR CODE HERE


            #####
            producer.send("streaming", value=message)
            sleep(1)
    except KeyboardInterrupt:
        producer.close()


Writing stream.py


2.  w terminalu jupyterlab uruchom plik `stream.py`
```bash
python stream.py
```

sprawdz w oknie consumenta czy wysyłane wiadomości przychodzą do Kafki.

Za uruchomienie importu kafka odpowiedzialna jest biblioteka `kafka-python`
którą możesz zainstalować poleceniem `pip install kafka-python`

## KOD APACHE SPARK 1

Przygotuj kod skryptu który pobierze informacje z przesyłanego strumienia danych. 

In [6]:
%%file raw_app.py

## LOAD SPARK SESSION object

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, TimestampType, StringType, IntegerType
###
import pyspark.sql.functions as f

# from pyspark.sql.functions import from_json, col

SERVER = "broker:9092"

if __name__ == "__main__":
    ## create spark variable
    #YOUR CODE HERE
    
    spark = SparkSession.builder.getOrCreate()
    
    
    ## 
    spark.sparkContext.setLogLevel("ERROR")
    
    
    json_schema = StructType(
        [
            StructField("time", TimestampType()),
            StructField("id", StringType()),
            StructField("value", IntegerType()),
        ]
    )
    
    ## load stream data from topic streaming
    # topic subscription
    
    raw = (
        spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "broker:9092")
        .option("subscribe", "streaming")
        .load()
    )
    
    #parsed = raw.selectExpr("CAST(value AS STRING)")
    
    parsed = raw.select(
        "timestamp", f.from_json(raw.value.cast("string"), json_schema).alias("json")
    ).select(
        f.col("timestamp").alias("proc_time"),
        f.col("json").getField("time").alias("event_time"),
        f.col("json").getField("id").alias("id"),
        f.col("json").getField("value").alias("value"),
    )
    
    info = parsed.groupBy("id").count()
    
    # defining output as console with outputMode as append
    query =  (
        info.writeStream
        .outputMode("complete")
        .option("truncate", "false")
        .format("console")
        .start()
    )
    
    
    query.awaitTermination()
    query.stop()

Overwriting raw_app.py


uruchom pierwsze przetwarzanie strumienia: 
```bash
spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1 raw_app.py
```


Zmodyfikuj pragram `raw_app.py` dodając schemat:
```python
    json_schema = StructType(
        [
            StructField("time", TimestampType()),
            StructField("id", StringType()),
            StructField("value", IntegerType()),
        ]
    )
    
```
oraz sparsowany strumień 

```python
    parsed = raw.select(
        "timestamp", f.from_json(raw.value.cast("string"), json_schema).alias("json")
    ).select(
        f.col("timestamp").alias("proc_time"),
        f.col("json").getField("time").alias("event_time"),
        f.col("json").getField("id").alias("id"),
        f.col("json").getField("value").alias("value"),
    )

```

uruchom kod sprawdzając czy widzisz przychodzące eventy.

## zlicz ilość eventów ze względu na grupę ID 
