In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, window, sum as _sum
from pyspark.sql.types import StructType, StringType, DoubleType, TimestampType

# 1. Start Spark session
spark = SparkSession.builder \
    .appName("KafkaToMySQL") \
    .config("spark.jars", "/opt/bitnami/spark/jars/mysql-connector-j-8.0.33.jar") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

# 2. Define schema of incoming Kafka messages
schema = StructType() \
    .add("user_id", StringType()) \
    .add("event_type", StringType()) \
    .add("points", DoubleType()) \
    .add("timestamp", TimestampType())

# 3. Read from Kafka
kafka_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "user-events") \
    .option("startingOffsets", "latest") \
    .load()

# 4. Parse and select fields
parsed_df = kafka_df.selectExpr("CAST(value AS STRING)") \
    .select(from_json(col("value"), schema).alias("data")) \
    .select("data.*")

# 5. Calculate current user balances
user_balance_df = parsed_df.groupBy("user_id") \
    .agg(_sum("points").alias("current_balance"))

# 6. Create time-series checkpoint balance
time_series_df = parsed_df \
    .withWatermark("timestamp", "10 minutes") \
    .groupBy(window("timestamp", "5 minutes"), "user_id") \
    .agg(_sum("points").alias("window_points")) \
    .select(
        col("window.start").alias("window_start"),
        col("window.end").alias("window_end"),
        "user_id",
        "window_points"
    )

# 7. Batch write functions
def write_to_mysql(df, epoch_id, table_name):
    df.write \
        .format("jdbc") \
        .option("url", "jdbc:mysql://mysql:3306/datapipeline") \
        .option("dbtable", table_name) \
        .option("user", "sparkuser") \
        .option("password", "sparkpass") \
        .option("driver", "com.mysql.cj.jdbc.Driver") \
        .mode("append") \
        .save()

# 8. Write user balance to MySQL
user_balance_query = user_balance_df.writeStream \
    .foreachBatch(lambda df, epoch_id: write_to_mysql(df, epoch_id, "user_balance")) \
    .outputMode("complete") \
    .start()

# 9. Write time series data to MySQL
time_series_query = time_series_df.writeStream \
    .foreachBatch(lambda df, epoch_id: write_to_mysql(df, epoch_id, "balance_time_series")) \
    .outputMode("append") \
    .start()

# 10. Wait for completion
user_balance_query.awaitTermination()
time_series_query.awaitTermination()


25/05/09 08:14:08 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-84009992-f20a-410f-abc6-ce4176e9b495. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/05/09 08:14:08 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
25/05/09 08:14:08 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-a69e4242-0ec4-4551-a2e4-8157e4476495. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/05/09 08:14:08 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not support

StreamingQueryException: [STREAM_FAILED] Query [id = cdae89c7-1cc6-4ce2-a9d8-48dcbc43e576, runId = 5975ebf2-7d14-4e53-90ef-cdee57905523] terminated with exception: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition.