### Create a streaming analytics job

In [None]:
import os, sys

print("Kernel Python:", sys.executable)

os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable

print("Set PYSPARK_PYTHON:", os.environ["PYSPARK_PYTHON"])
print("Set PYSPARK_DRIVER_PYTHON:", os.environ["PYSPARK_DRIVER_PYTHON"])

In [None]:
import redis
import mariadb

#For production systems, use a class instead
#https://spark.apache.org/docs/latest/api/python/reference/pyspark.ss/api/pyspark.sql.streaming.DataStreamWriter.foreach.html
def write_to_redis(row):
    stats_key="last-action-stats"
    redis_conn=redis.Redis(host="localhost", 
                     port=6379, decode_responses=True)
    redis_conn.zincrby(stats_key,
                        row["duration"],row["country"])
    redis_conn.quit()       

def write_to_mariadb(row):
    #Connect to website_stats database
    summary_conn = mariadb.connect(
                user="spark",
                password="spark",
                host="127.0.0.1",
                port=3306,
                database="website_stats",
                autocommit=True
            )
    summary_cursor = summary_conn.cursor()
    
    summary_sql=f"""
            INSERT INTO `website_stats`.`visit_stats` 
                (INTERVAL_TIMESTAMP, LAST_ACTION, DURATION)
            VALUES('{row["window"]["start"]}',
                    '{row["last_action"]}',
                    '{row["duration"]}')
            """
    summary_cursor.execute(summary_sql)
    



In [None]:
# =========================
# Streaming Website Analytics (Corrected)
# - Fix visit_date parsing (string -> timestamp)
# - Replace per-row foreach sinks with foreachBatch (stable + production-like)
# - Add checkpointLocation for ALL streaming queries
# - Keep your Windows + venv Python stability configs
# =========================

import os, sys
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql import SparkSession

print("*************Starting Streaming Analytics for Website visits*****************")

# ---- IMPORTANT: your producer sends visit_date as a STRING like "YYYY-MM-DD HH:MM:SS"
# So parse it explicitly in Spark. Declare visit_date as StringType here.
schema = StructType([
    StructField("country", StringType()),
    StructField("last_action", StringType()),
    StructField("visit_date", StringType()),   # was TimestampNTZType() -> change to StringType()
    StructField("duration", IntegerType())
])

# ---- Spark session (Windows + venv safe)
streaming_spark = (SparkSession.builder
    .appName("StreamingWebsiteAnalyticsJob")
    .master("local[2]")

    # Force IPv4 loopback (Windows)
    .config("spark.driver.host", "127.0.0.1")
    .config("spark.driver.bindAddress", "127.0.0.1")

    # Force same python as Jupyter kernel
    .config("spark.pyspark.python", sys.executable)
    .config("spark.pyspark.driver.python", sys.executable)

    # Windows stability for Python workers
    .config("spark.python.use.daemon", "false")
    .config("spark.python.worker.reuse", "false")

    # your existing configs
    .config("spark.sql.shuffle.partitions", 2)
    .config("spark.default.parallelism", 2)
    .config("spark.sql.streaming.forceDeleteTempCheckpointLocation", True)
    .config("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2")

    # jars
    .config("spark.jars",
            "jars/mysql-connector-j-8.4.0.jar,"
            "jars/commons-pool2-2.12.0.jar,"
            "jars/kafka-clients-3.6.0.jar,"
            "jars/spark-sql-kafka-0-10_2.12-3.5.1.jar,"
            "jars/spark-token-provider-kafka-0-10_2.12-3.5.1.jar,"
            "jars/spark-streaming-kafka-0-10_2.12-3.5.1.jar")
    .config("spark.driver.extraClassPath", "jars/*")
    .getOrCreate()
)

streaming_spark.sparkContext.setLogLevel("WARN")

print("Spark pyspark python:", streaming_spark.sparkContext.getConf().get("spark.pyspark.python"))
print("Spark driver python:", streaming_spark.sparkContext.getConf().get("spark.pyspark.driver.python"))

# -------------------------
# Read from Kafka
# -------------------------
print("Reading from Kafka...")
raw_visits_df = (streaming_spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "spark.streaming.website.visits")
    .option("startingOffsets", "latest")
    .load()
)

# Parse JSON + convert visit_date string -> timestamp
visits_df = (raw_visits_df
    .selectExpr("CAST(value AS STRING) as value")
    .select(F.from_json("value", schema).alias("visits"))
    .select("visits.*")
    .withColumn("visit_date", F.to_timestamp("visit_date", "yyyy-MM-dd HH:mm:ss"))
    .filter(F.col("visit_date").isNotNull())   # drop bad records if parse fails
)

# -------------------------
# 1) Write abandoned shopping carts to Kafka (keep as-is, but add value as bytes/string)
# -------------------------
shopping_cart_df = visits_df.filter(F.col("last_action") == "ShoppingCart")

q_carts = (shopping_cart_df
    .selectExpr('CAST(format_string("%s,%s,%s,%d", country, last_action, visit_date, duration) AS STRING) as value')
    .writeStream
    .format("kafka")
    .option("checkpointLocation", "tmp/cp-shoppingcart2")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("topic", "spark.streaming.carts.abandoned")
    .start()
)

# -------------------------
# 2) Redis sink (use foreachBatch, not per-row foreach)
# -------------------------
def write_redis_batch(batch_df, batch_id: int):
    import redis
    r = redis.Redis(host="localhost", port=6379, decode_responses=True)
    stats_key = "last-action-stats"

    # small batches: collect is OK for learning; in prod you'd do partition-wise writes
    rows = batch_df.collect()
    if rows:
        pipe = r.pipeline()
        for row in rows:
            pipe.zincrby(stats_key, int(row["duration"]), row["country"])
        pipe.execute()
    r.close()

q_redis = (visits_df
    .select("country", "duration")
    .writeStream
    .foreachBatch(write_redis_batch)
    .option("checkpointLocation", "tmp/cp-redis")
    .start()
)

# -------------------------
# 3) Windowed summary -> MariaDB (use foreachBatch)
# -------------------------
windowed_summary = (visits_df
    # Use event time from the record rather than current_timestamp()
    .withWatermark("visit_date", "10 seconds")
    .groupBy(
        F.window(F.col("visit_date"), "5 seconds"),
        F.col("last_action")
    )
    .agg(F.sum(F.col("duration")).alias("duration"))
)

def write_mariadb_batch(batch_df, batch_id: int):
    import mariadb
    conn = mariadb.connect(
        user="spark",
        password="spark",
        host="127.0.0.1",
        port=3306,
        database="website_stats",
        autocommit=True
    )
    cur = conn.cursor()

    rows = batch_df.collect()
    data = [(r["window"]["start"], r["last_action"], int(r["duration"])) for r in rows]
    if data:
        cur.executemany(
            "INSERT INTO website_stats.visit_stats (INTERVAL_TIMESTAMP, LAST_ACTION, DURATION) VALUES (?, ?, ?)",
            data
        )

    cur.close()
    conn.close()

q_db = (windowed_summary
    .writeStream
    .foreachBatch(write_mariadb_batch)
    .option("checkpointLocation", "tmp/cp-mariadb")
    .start()
)

# Await one query (others keep running)
q_db.awaitTermination()