## Assignment: Real-Time Music Trends Dashboard with Spark & ClickHouse

**Goal:**
Build a real-time dashboard using Spark Structured Streaming and ClickHouse to analyze user listening behavior and identify live music trends. The final result will be a Grafana dashboard that displays insights derived from the continuous stream of song play events.


1. **Identify the Most Trending Song (Live Counter):**

   * Implement a streaming aggregation in Spark that calculates the number of plays per song title.
   * Group by song title, order by play count, and write the result continuously to ClickHouse.
   * Display this data in Grafana as a leaderboard or bar chart that updates in real time.

2. **Determine the Most Popular Genre:**

   * Use a genre mapping. 
   * Count play events per genre in real-time.
   * Write and visualize the trending genre distribution in Grafana.

3. **Live User Location Heatmap:**

   * Extract the `location` field and geocode it (e.g., there is only Bakersfield, CA and Boston-Cambridge-Newton, MA-NH).
   * Count active users by location within a moving time window (e.g. 5–30 min, your choice).
   * Output results to ClickHouse and visualize them on a geographic heatmap in Grafana.


4. **User Repetition Rate**
   – Measure how often users repeat the same song within a session or time window.
   – Analyze repeat play behavior and show top repeated songs per user.


**Technical Requirements:**

* Use Spark Structured Streaming and `writeStream.foreachBatch` to write into ClickHouse
* Use appropriate windowing and watermarking where necessary
* Store data in ClickHouse using JDBC and a well-structured schema

**Grafana Setup:**

* All metrics must be clearly labeled and refreshed automatically
* Dashboards will be reviewed for correctness, creativity, and clarity of visual presentation

**Deliverables:**

* Spark Streaming Job Code (.py or .ipynb)
* ClickHouse Table Schemas
* Link to Grafana Dashboard
* Short write-up (max. 1 page) describing your approach and insights


In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType
import os

In [2]:
url = "jdbc:postgresql://fhtw-big-data.postgres.database.azure.com/music_store"
postgres_options = {
    "url": url,
    "user": "student",
    "password": "reRZ2pjg1WxqlwjU",
    "driver": "org.postgresql.Driver"
}

In [3]:
spark = SparkSession.builder \
    .appName("KafkaSparkStreamingExample") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.0,com.clickhouse:clickhouse-jdbc:0.4.6,org.postgresql:postgresql:42.7.3") \
    .getOrCreate()

kafka_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "172.29.16.101:9092") \
    .option("subscribe", "music") \
    .load()

:: loading settings :: url = jar:file:/opt/spark-3.5.1/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/bda-g8/.ivy2/cache
The jars for the packages stored in: /home/bda-g8/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
com.clickhouse#clickhouse-jdbc added as a dependency
org.postgresql#postgresql added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-1c62418e-f296-4fa2-a37d-3a9d06528cf3;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.4.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.4.0 in central
	found org.apache.kafka#kafka-clients;3.3.2 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.9.1 in central
	found org.slf4j#slf4j-api;2.0.6 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.co

In [4]:
# Define the schema for the JSON data from Kafka
json_schema = StructType([
    StructField("ts", LongType(), True),
    StructField("auth", StringType(), True),
    StructField("page", StringType(), True),
    StructField("song", StringType(), True),
    StructField("level", StringType(), True),
    StructField("artist", StringType(), True),
    StructField("gender", StringType(), True),
    StructField("method", StringType(), True),
    StructField("status", IntegerType(), True),
    StructField("userId", StringType(), True),
    StructField("lastName", StringType(), True),
    StructField("location", StringType(), True),
    StructField("track_id", IntegerType(), True),
    StructField("firstName", StringType(), True),
    StructField("sessionId", IntegerType(), True),
    StructField("userAgent", StringType(), True),
    StructField("registration", LongType(), True),
    StructField("itemInSession", IntegerType(), True)
])


from pyspark.sql.functions import window, sum as _sum

parsed_df = kafka_df.selectExpr("CAST(value AS STRING)") \
    .select(from_json(col("value"), json_schema).alias("data")) \
    .select("data.*") 

In [5]:
play_count_df = parsed_df \
    .withColumn(
        "timestamp", expr("current_timestamp() + INTERVAL 2 HOURS")
    ) \
    .groupBy(
        window(col("timestamp"), "1 minute").alias("time_window"),
        col("song"),
        col("artist")
    ).agg(
        count("*").alias("play_count")
    )

Trending_Song_df = play_count_df \
    .withColumn("time_start", col("time_window.start")) \
    .withColumn("time_end", col("time_window.end")) \
    .select("time_start", "time_end", "song", "artist", "play_count")


In [6]:
def create_clickhouse_writer(dbtable):
    def write_to_clickhouse(df, epoch_id):
        if df.isEmpty():
            return
        df.write \
            .format("jdbc") \
            .option("url", "jdbc:clickhouse://t29sbjd0bd.europe-west4.gcp.clickhouse.cloud:8443?user=default&password=A9r9UG7e~oynn&ssl=true&jdbcCompliant=false") \
            .option("driver", "com.clickhouse.jdbc.ClickHouseDriver") \
            .option("ssl", "true") \
            .option("dbtable", dbtable) \
            .mode("append") \
            .save()
    return write_to_clickhouse

In [10]:
query1 = Trending_Song_df.writeStream \
    .outputMode("update") \
    .foreachBatch(create_clickhouse_writer("bda_g8_trending_songs")) \
    .start()

25/07/01 22:30:33 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-06406148-648a-4b9b-92cf-851e4fb8a7e4. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/07/01 22:30:33 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
25/07/01 22:30:33 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
25/07/01 22:30:37 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/07/01 22:30:37 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/07/01 22:30:37 WARN JdbcUtils: Requested isolation level 1, but transactions ar

In [11]:
query1.stop()

25/07/01 22:30:52 WARN TaskSetManager: Lost task 35.0 in stage 39.0 (TID 1157) (172.29.16.104 executor driver): TaskKilled (Stage cancelled: Job 19 cancelled part of cancelled job group d0fdf26e-def4-4871-a74a-957384b81c4f)
25/07/01 22:30:52 WARN TaskSetManager: Lost task 32.0 in stage 39.0 (TID 1154) (172.29.16.104 executor driver): TaskKilled (Stage cancelled: Job 19 cancelled part of cancelled job group d0fdf26e-def4-4871-a74a-957384b81c4f)
25/07/01 22:30:52 WARN TaskSetManager: Lost task 33.0 in stage 39.0 (TID 1155) (172.29.16.104 executor driver): TaskKilled (Stage cancelled: Job 19 cancelled part of cancelled job group d0fdf26e-def4-4871-a74a-957384b81c4f)
25/07/01 22:30:52 WARN TaskSetManager: Lost task 34.0 in stage 39.0 (TID 1156) (172.29.16.104 executor driver): TaskKilled (Stage cancelled: Job 19 cancelled part of cancelled job group d0fdf26e-def4-4871-a74a-957384b81c4f)


In [7]:
df_genres = spark.read.jdbc(url=url, 
                            table="""(
                            SELECT tracks.id AS track_id, g.name AS genre
                            FROM tracks 
                            JOIN public.genres g ON g.id = tracks.genre_id) 
                            AS genre_data
                            """, 
                            properties=postgres_options)

In [8]:
df_joined = parsed_df.join(df_genres, parsed_df.track_id == df_genres.track_id, "inner").drop(df_genres.track_id)

popular_genres_df = df_joined.withColumn(
            "timestamp", expr("current_timestamp() + INTERVAL 2 HOURS")
        ).groupBy(
        window(col("timestamp"), "1 minute").alias("time_window"),
        col("genre"),
    ).agg(
        count("*").alias("play_count")
    ) \
    .withColumn("time_start", col("time_window.start")) \
    .withColumn("time_end", col("time_window.end")) \
    .select("time_start", "time_end", "genre", "play_count")

In [16]:
query2 = popular_genres_df.writeStream \
    .outputMode("update") \
    .foreachBatch(create_clickhouse_writer("bda_g8_popular_genres")) \
    .start()

25/07/01 22:33:39 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-30161edc-74ce-4ef8-bcfc-f52d8702e31f. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/07/01 22:33:39 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
25/07/01 22:33:39 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
25/07/01 22:33:45 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/07/01 22:33:45 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/07/01 22:33:45 WARN JdbcUtils: Requested isolation level 1, but transactions ar

In [17]:
query2.stop()

25/07/01 22:34:16 WARN TaskSetManager: Lost task 0.0 in stage 264.0 (TID 7026) (172.29.16.104 executor driver): TaskKilled (Stage cancelled: Job 76 cancelled part of cancelled job group 26916e52-0665-4d8d-a3f3-026fa3427e9d)
25/07/01 22:34:17 WARN TaskSetManager: Lost task 0.0 in stage 265.0 (TID 7027) (172.29.16.104 executor driver): TaskKilled (Stage cancelled: Job 76 cancelled part of cancelled job group 26916e52-0665-4d8d-a3f3-026fa3427e9d)


In [9]:
locations = spark.createDataFrame([
    ("Bakersfield, CA",                     35.3733,  -119.0187),
    ("Boston-Cambridge-Newton, MA-NH",      42.3601,   -71.0589),
    ("Houston, TX",                         29.7604,  -95.3698),
    ("Los Angeles, CA",                     34.0522, -118.2437),
    ("Tokyo, Japan",                        35.6762, 139.6503),
    ("Paris, France",                       48.8566,   2.3522),
    ("Philadelphia, PA",                    39.9526, -75.1652)   
], 
                                    ["location", "lat", "lon"])

listeners_locations = parsed_df.withColumn(
            "timestamp", expr("current_timestamp() + INTERVAL 2 HOURS")
        ).groupBy(
    window(col("timestamp"), "5 minutes").alias("time_window"),
    col("location")
    )
active_listeners_locations = listeners_locations \
    .agg(approx_count_distinct("userId").alias("active_users"))\
    .withColumn("window_start", col("time_window.start")) \
    .withColumn("window_end", col("time_window.end")) \
    .select("window_start", "window_end", "location", "active_users") \
    .join(
        locations,
        on=["location"],
        how="left"
    )


In [34]:
query3 = active_listeners_locations.writeStream \
    .outputMode("update") \
    .foreachBatch(create_clickhouse_writer("bda_g8_active_users_location")) \
    .start()

25/07/01 22:55:42 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-907ee8ce-c70c-4a06-9612-2415067b1999. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/07/01 22:55:42 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
25/07/01 22:55:42 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
25/07/01 22:55:42 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
25/07/01 22:55:51 WARN JdbcUtils: Requested isolation level 1, but transactions are u

In [12]:
query3.stop()

25/07/01 23:46:31 WARN Shell: Interrupted while joining on: Thread[Thread-1848,5,main]
java.lang.InterruptedException
	at java.base/java.lang.Object.wait(Native Method)
	at java.base/java.lang.Thread.join(Thread.java:1313)
	at java.base/java.lang.Thread.join(Thread.java:1381)
	at org.apache.hadoop.util.Shell.joinThread(Shell.java:1042)
	at org.apache.hadoop.util.Shell.runCommand(Shell.java:1002)
	at org.apache.hadoop.util.Shell.run(Shell.java:900)
	at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:1212)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:1306)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:1288)
	at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:978)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkOneDirWithMode(RawLocalFileSystem.java:660)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:700)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSys

In [10]:
repetition_df = parsed_df.withColumn(
                "timestamp", expr("current_timestamp() + INTERVAL 2 HOURS")
    ).groupBy(
        window(col("timestamp"), "10 minute").alias("time_window"),
        col("song"),
        col("userID")
    ).agg(
        count("*").alias("play_count")
    ).withColumn("time_start", col("time_window.start")) \
    .withColumn("time_end", col("time_window.end")) \
    .withColumnRenamed("userID", "user_id") \
    .select("time_start", "time_end", "song", "user_id", "play_count")

In [39]:
query4 = repetition_df.writeStream \
    .outputMode("update") \
    .foreachBatch(create_clickhouse_writer("bda_g8_repetitions")) \
    .start()

25/07/01 23:07:03 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-8e4080e2-e7e7-4050-ac84-314f6f510a6e. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/07/01 23:07:03 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
25/07/01 23:07:03 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
25/07/01 23:07:07 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/07/01 23:07:07 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/07/01 23:07:07 WARN JdbcUtils: Requested isolation level 1, but transactions ar

In [40]:
query4.stop()

25/07/01 23:29:54 WARN TaskSetManager: Lost task 87.0 in stage 2329.0 (TID 67699) (172.29.16.104 executor driver): TaskKilled (Stage cancelled: Job 657 cancelled part of cancelled job group 87c6489b-3dc6-442b-a246-c0ceb4b010b5)
25/07/01 23:29:54 WARN TaskSetManager: Lost task 86.0 in stage 2329.0 (TID 67698) (172.29.16.104 executor driver): TaskKilled (Stage cancelled: Job 657 cancelled part of cancelled job group 87c6489b-3dc6-442b-a246-c0ceb4b010b5)
25/07/01 23:29:54 WARN TaskSetManager: Lost task 85.0 in stage 2329.0 (TID 67697) (172.29.16.104 executor driver): TaskKilled (Stage cancelled: Job 657 cancelled part of cancelled job group 87c6489b-3dc6-442b-a246-c0ceb4b010b5)
25/07/01 23:29:54 WARN TaskSetManager: Lost task 84.0 in stage 2329.0 (TID 67696) (172.29.16.104 executor driver): TaskKilled (Stage cancelled: Job 657 cancelled part of cancelled job group 87c6489b-3dc6-442b-a246-c0ceb4b010b5)


In [13]:
query1 = Trending_Song_df.writeStream \
    .outputMode("update") \
    .foreachBatch(create_clickhouse_writer("bda_g8_trending_songs")) \
    .start()

query2 = popular_genres_df.writeStream \
    .outputMode("update") \
    .foreachBatch(create_clickhouse_writer("bda_g8_popular_genres")) \
    .start()

query3 = active_listeners_locations.writeStream \
    .outputMode("update") \
    .foreachBatch(create_clickhouse_writer("bda_g8_active_users_location")) \
    .start()

query4 = repetition_df.writeStream \
    .outputMode("update") \
    .foreachBatch(create_clickhouse_writer("bda_g8_repetitions")) \
    .start()

25/07/01 23:47:22 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-cd1bdc50-f16f-49ab-90a8-bd2931895a5f. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/07/01 23:47:22 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
25/07/01 23:47:22 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-ca2a1592-f924-4d72-90a5-006643bcfc0a. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/07/01 23:47:22 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not support

In [None]:
query1.stop()
query2.stop()
query3.stop()
query4.stop()

In [14]:
spark.stop()

25/07/02 07:59:26 WARN StateStore: Error running maintenance thread
java.lang.IllegalStateException: SparkEnv not active, cannot do maintenance on StateStores
	at org.apache.spark.sql.execution.streaming.state.StateStore$.doMaintenance(StateStore.scala:632)
	at org.apache.spark.sql.execution.streaming.state.StateStore$.$anonfun$startMaintenanceIfNeeded$1(StateStore.scala:610)
	at org.apache.spark.sql.execution.streaming.state.StateStore$MaintenanceTask$$anon$1.run(StateStore.scala:453)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
	at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.