In [None]:
#Pomocnicza funkcja do wyswietlania naszych danych strumieniowych
batch_counter = {"count": 0}
 
def process_batch(df, batch_id):
    batch_counter["count"] += 1
    print(f"Batch ID: {batch_id}")
    df.show(truncate=False)
    if batch_counter["count"] % 7 == 0:
        spark.stop()

In [None]:
# Zadanie 1
# Przygotuj strumien danych z format('rate'), ustaw rowsPerSecond na 5.
# Utworz kolumne user_id: expr("concat('u', cast(rand()*100 as int))")
# Dodaj kolumne event_type: expr("case when rand() > 0.7 then 'purchase' else 'view' end")
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr
 
spark = SparkSession.builder.appName("StreamingDemo").getOrCreate()
spark.sparkContext.setLogLevel("WARN")
 
rate_df = (spark.readStream
      .format("rate")
      .option("rowsPerSecond", 5)
      .load())
 
events = (rate_df.withColumn("user_id", expr("concat('u', cast(rand()*100 as int))") )
          .withColumn("event_type", expr("case when rand() > 0.7 then 'purchase' else 'view' end") )
          .select("timestamp", "user_id", "event_type")
         )
query = (events.writeStream
         .format("console")
         .foreachBatch(process_batch)
         .start())

In [None]:
#Skorzystaj z danych z poprzedniego zadania.
# Wyfiltruj tylko purchase.
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr
 
spark = SparkSession.builder.appName("StreamingDemo").getOrCreate()
spark.sparkContext.setLogLevel("WARN")
 
rate_df = (spark.readStream
      .format("rate")
      .option("rowsPerSecond", 5)
      .load())
 
events = (rate_df.withColumn("user_id", expr("concat('u', cast(rand()*100 as int))") )
          .withColumn("event_type", expr("case when rand() > 0.7 then 'purchase' else 'view' end") )
          .select("timestamp", "user_id", "event_type")
         )

purchases= events.filter("event_type = 'purchase'")

query = (
    purchases.writeStream
    .format("console")
    .foreachBatch(process_batch)
    .start()
)

In [1]:
%%file generator.py
# generator.py
import json, os, random, time
from datetime import datetime, timedelta

output_dir = "data/stream"
os.makedirs(output_dir, exist_ok=True)

event_types = ["view", "cart", "purchase"]
categories = ["electronics", "books", "fashion", "home", "sports"]

def generate_event():
    return {
        "user_id": f"u{random.randint(1, 50)}",
        "event_type": random.choices(event_types, weights=[0.6, 0.25, 0.15])[0],
        "timestamp": (datetime.utcnow() - timedelta(seconds=random.randint(0, 300))).isoformat(),
        "product_id": f"p{random.randint(100, 120)}",
        "category": random.choice(categories),
        "price": round(random.uniform(10, 1000), 2)
    }

# Simulate file-based streaming
while True:
    batch = [generate_event() for _ in range(50)]
    filename = f"{output_dir}/events_{int(time.time())}.json"
    with open(filename, "w") as f:
        for e in batch:
            f.write(json.dumps(e) + "\n")
    print(f"Wrote: {filename}")
    time.sleep(5)

Writing generator.py


In [None]:
# Utwórz zmienną schema, która zrealizuje schamat danych naszej ramki. Wykorzystaj StringType(), TimestampType(),DoubleType()

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession.builder.appName("RealTimeEcommerce").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

schema = StructType([
    StructField("user_id", StringType()),
    StructField("event_type", StringType()),
    StructField("timestamp", TimestampType()),
    StructField("product_id", StringType()),
    StructField("category", StringType()),
    StructField("price", DoubleType())
])

In [None]:
#odczyt daych z katalogu
stream = (spark.readStream
          .schema(schema)
          .json("data/stream"))

query = (stream.writeStream
         .format("console")
         .foreachBatch(process_batch)
         .start())

In [None]:
#Przygotuj zmienną agg1 zliczającą zdarzenia należące do danej grupy event_type.
agg1 = (stream.groupBy("event_type").count())

# pamietaj, że agregacje wymagają opcji complete
query = (agg1
         .writeStream
         .outputMode("complete")
         .format("console")
         .foreachBatch(process_batch)
         .start()
        )

In [None]:
##Pogrupuj typy zdarzen w thumbling window, w oknie co 5 minut
##dodaj watermark z ustawieniem na 1 minutę.
windowed = (stream
            .withWatermark("timestamp", "1 minute") 
            .groupBy(window("timestamp", "5 minutes"),"event_type")
            .count())

query = (
    windowed.writeStream
    .outputMode("append")
    .foreachBatch(process_batch)
    .format("console")
    .start()
)

In [None]:
#Zmień thumbling window na sliding window z szerokością okna 5 minut i startem nowego okna co 1 minutę.
windowed = (stream.withWatermark("timestamp", "1 minutes")
            .groupBy(window("timestamp", "5 minutes", "1 minutes"), "event_type").count()
           )
 
query = (
    windowed.writeStream
    .format("console")
    .outputMode("complete")
    .option("truncate", False) 
    .foreachBatch(process_batch)
    .start()
)

In [None]:
########segmentacja
#####jeśli był purchase → "Buyer"
##### jeśli był cart, ale nie purchase → "Cart abandoner"
##### jeśli tylko view → "Lurker"
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

batch_counter = {"count": 0}

def process_batch(df, batch_id):
    batch_counter["count"] += 1
    print(f"Batch ID: {batch_id}")
    df.show(truncate=False)
    if batch_counter["count"] % 7 == 0:
        spark.stop()

spark = SparkSession.builder.appName("RealTimeEcommerce").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

schema = StructType([
    StructField("user_id", StringType()),
    StructField("event_type", StringType()),
    StructField("timestamp", TimestampType()),
    StructField("product_id", StringType()),
    StructField("category", StringType()),
    StructField("price", DoubleType())
])

stream = (spark.readStream
          .schema(schema)
          .json("data/stream")
         )

windowed = (stream.withWatermark("timestamp", "1 minutes")
            .groupBy(window("timestamp", "5 minutes"),"user_id")
            .agg(collect_set("event_type").alias("event_type"))
            .withColumn("segmentacja",expr("""CASE
                                            WHEN array_contains(event_type, 'purchase') THEN 'Buyer'
                                            WHEN array_contains(event_type, 'cart') THEN 'Cart abandoner'
                                            ELSE 'Lurker'
                                            END"""))
            .select("window", "user_id", "segmentacja")
)

query = (
    windowed.writeStream
    .format("console")
    .outputMode("update")  
    .option("truncate", False)
    .foreachBatch(process_batch)
    .start()
)
