In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, sum, to_timestamp, window
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
from datetime import datetime
import psycopg2
import os

In [2]:
# Initialize SparkSession
spark = SparkSession.builder \
    .appName("PurchaseEventStreaming") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2,org.apache.kafka:kafka-clients:2.8.0") \
    .config("spark.streaming.kafka.consumer.cache.enabled", "false") \
    .config("spark.sql.streaming.schemaInference", "true") \
    .getOrCreate()

:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/jovyan/.ivy2/cache
The jars for the packages stored in: /home/jovyan/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
org.apache.kafka#kafka-clients added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-ef693dfe-945e-4c3c-9dbf-d3fd95dce410;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.1.2 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.1.2 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.commons#commons-pool2;2.6.2 in central
	found org.apache.kafka#kafka-clients;2.8.0 in central
	found com.github.luben#zstd-jni;1.4.9-1 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.8.1 in central
	found org.slf4j#slf4j-api;1.7.30 in central
:: resolution report :: resolve 162ms :: artifacts dl 6ms
	:: modules in use:
	com.github.luben#zstd-jni;1.4.9-1 from central in [default]


In [5]:
# Define schema for purchase events
schema = StructType([
    StructField("timestamp", StringType()),
    StructField("product_id", IntegerType()),
    StructField("quantity", IntegerType()),
    StructField("price", DoubleType())
])

In [6]:
kafka_broker = os.environ.get('KAFKA_BROKER')
kafka_topic = "purchase_event"

# PostgreSQL connection details
pg_host = os.environ.get('POSTGRES_HOST')
pg_db = os.environ.get('POSTGRES_DB')
pg_user = os.environ.get('POSTGRES_USER')
pg_password = os.environ.get('POSTGRES_PASSWORD')

In [7]:
def save_to_postgres(df, epoch_id):
    # Create a connection to PostgreSQL
    conn = psycopg2.connect(host=pg_host, database=pg_db, user=pg_user, password=pg_password)
    cur = conn.cursor()

    # Create table if not exists
    cur.execute("""
    CREATE TABLE IF NOT EXISTS running_total (
        timestamp TIMESTAMP PRIMARY KEY,
        running_total DOUBLE PRECISION
    )
    """)

    # Insert or update data
    print(f"\n--- Running Total Updated at {datetime.now()} ---")
    print("Timestamp | Running Total")
    print("-----------+---------------")
    for row in df.collect():
        window_end = row.window_end
        running_total = row.running_total
        
        if window_end is not None and running_total is not None:
            cur.execute("""
            INSERT INTO running_total (timestamp, running_total)
            VALUES (%s, %s)
            ON CONFLICT (timestamp) DO UPDATE
            SET running_total = EXCLUDED.running_total
            """, (window_end, running_total))
            
            print(f"{window_end} | {running_total:.2f}")
        else:
            print(f"Skipping row due to None values: window_end={window_end}, running_total={running_total}")

    # Commit and close
    conn.commit()
    cur.close()
    conn.close()
    
    print("\n")

In [8]:
# Read streaming data from Kafka
df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_broker) \
    .option("subscribe", kafka_topic) \
    .option("startingOffsets", "earliest") \
    .option("kafka.security.protocol", "PLAINTEXT") \
    .load()

# Parse JSON from value and process
parsed_df = df.select(
    from_json(col("value").cast("string"), schema).alias("data")
).select("data.*")

NameError: name 'kafka_broker' is not defined

In [9]:
# Convert timestamp to proper format and calculate running total
sales_df = parsed_df \
    .withColumn("timestamp", to_timestamp("timestamp")) \
    .withColumn("sales", col("quantity") * col("price")) \
    .groupBy(window("timestamp", "1 day")) \
    .agg(sum("sales").alias("running_total")) \
    .select(
        col("window.end").alias("window_end"),
        col("running_total")
    )

NameError: name 'parsed_df' is not defined

In [10]:
# Start the streaming query
query = sales_df.writeStream \
    .outputMode("complete") \
    .foreachBatch(save_to_postgres) \
    .trigger(processingTime='10 seconds') \
    .start()

query.awaitTermination()

NameError: name 'sales_df' is not defined