SPARK NOTEBOOK SETUP

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import from_json, col, expr, to_timestamp, udf
from pyspark.sql.utils import AnalysisException

In [None]:
KAFKA_BOOTSTRAP_SERVERS = "localhost:9092"  # Or "kafka:9092" if using Docker
KAFKA_TOPIC = "transactions"

POSTGRES_JDBC_URL = "jdbc:postgresql://postgres:5432/retail"
POSTGRES_USER = "admin"
POSTGRES_PASSWORD = "admin"
POSTGRES_DRIVER = "org.postgresql.Driver"

FACT_TABLE = "fact_transactions"




In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("KafkaToPostgresStreaming") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1,org.postgresql:postgresql:42.6.0") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")


In [None]:
dim_products_df = spark.read \
    .format("jdbc") \
    .option("url", POSTGRES_JDBC_URL) \
    .option("dbtable", "dim_products") \
    .option("user", POSTGRES_USER) \
    .option("password", POSTGRES_PASSWORD) \
    .option("driver", POSTGRES_DRIVER) \
    .load()

dim_location_df = spark.read \
    .format("jdbc") \
    .option("url", POSTGRES_JDBC_URL) \
    .option("dbtable", "dim_location") \
    .option("user", POSTGRES_USER) \
    .option("password", POSTGRES_PASSWORD) \
    .option("driver", POSTGRES_DRIVER) \
    .load()


In [None]:
kafka_schema = StructType([
    StructField("transaction_id", StringType(), True),
    StructField("product_id", IntegerType(), True),
    StructField("store_id", IntegerType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("price", DoubleType(), True),
    StructField("payment_method", StringType(), True),
    StructField("timestamp", StringType(), True),
])


In [None]:
kafka_bootstrap_servers = "kafka:9092"

raw_df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
  .option("subscribe", "transactions") \
  .option("startingOffsets", "earliest") \
  .load()



In [None]:
value_df = raw_df.selectExpr("CAST(value AS STRING) as json_str")

json_df = value_df.select(from_json(col("json_str"), kafka_schema).alias("data")).select("data.*")

In [None]:
from pyspark.sql.functions import when

# Define validity condition
valid_condition = (
    col("transaction_id").isNotNull() &
    col("quantity").isNotNull() &
    col("payment_method").isNotNull() &
    (col("price") >= 0)
)

# Clean data
clean_df = json_df \
    .filter(valid_condition) \
    .withColumn("transaction_timestamp", to_timestamp("timestamp")) \
    .drop("timestamp")

# Bad records
bad_df = json_df \
    .filter(~valid_condition)

In [None]:
joined_df = clean_df \
    .withColumnRenamed("price", "total_price") \
    .join(dim_products_df.withColumnRenamed("price", "unit_price"), on="product_id", how="inner") \
    .join(dim_location_df, on="store_id", how="inner") \
    .select(
        col("transaction_id"),
        col("product_id"),
        col("name").alias("product_name"),
        col("category").alias("product_category"),
        col("store_id"),
        col("city"),
        col("state"),
        col("country"),
        col("quantity"),
        col("unit_price"),
        col("total_price"),
        col("payment_method"),
        col("transaction_timestamp")
    )

In [None]:
bad_df.writeStream \
    .format("json") \
    .option("path", "/tmp/bad_data/transactions/") \
    .option("checkpointLocation", "/tmp/checkpoints/bad_data") \
    .outputMode("append") \
    .start()


In [None]:
def write_bad_to_postgres(batch_df, epoch_id):
    batch_df.write \
        .format("jdbc") \
        .option("url", POSTGRES_JDBC_URL) \
        .option("dbtable", "bad_transactions") \
        .option("user", POSTGRES_USER) \
        .option("password", POSTGRES_PASSWORD) \
        .option("driver", POSTGRES_DRIVER) \
        .mode("append") \
        .save()



In [None]:
def write_to_postgres(batch_df, epoch_id):
    try:
        batch_df.write \
            .format("jdbc") \
            .option("url", POSTGRES_JDBC_URL) \
            .option("dbtable", FACT_TABLE) \
            .option("user", POSTGRES_USER) \
            .option("password", POSTGRES_PASSWORD) \
            .option("driver", POSTGRES_DRIVER) \
            .mode("append") \
            .save()
    except Exception as e:
        print(f"[ERROR] Failed to write batch: {e}")

In [None]:
# Start clean data stream
clean_query = joined_df.writeStream \
    .foreachBatch(write_to_postgres) \
    .outputMode("append") \
    .option("checkpointLocation", "/tmp/checkpoints/transactions_fact") \
    .start()

# Start bad data stream
bad_query = bad_df.writeStream \
    .foreachBatch(write_bad_to_postgres) \
    .outputMode("append") \
    .option("checkpointLocation", "/tmp/checkpoints/bad_transactions") \
    .start()

# Wait for both to terminate
clean_query.awaitTermination()
bad_query.awaitTermination()
