# Kafka music stream analysis

Let's first take a look at the data provided by Kafla.

In [1]:
from confluent_kafka.admin import AdminClient

In [2]:
conf = {'bootstrap.servers': '172.29.16.101:9092'}

kadmin = AdminClient(conf)

In [3]:
kadmin.list_topics().topics

{'here-routes': TopicMetadata(here-routes, 1 partitions),
 'personenkraftwagen_data': TopicMetadata(personenkraftwagen_data, 1 partitions),
 'traffic_data_group7': TopicMetadata(traffic_data_group7, 1 partitions),
 'weather_topic3': TopicMetadata(weather_topic3, 1 partitions),
 'weather_topic2': TopicMetadata(weather_topic2, 1 partitions),
 'stock-prices': TopicMetadata(stock-prices, 1 partitions),
 'hello-world': TopicMetadata(hello-world, 1 partitions),
 'personenkraftwagen': TopicMetadata(personenkraftwagen, 1 partitions),
 'stocks': TopicMetadata(stocks, 1 partitions),
 'emissions_topic': TopicMetadata(emissions_topic, 1 partitions),
 'music': TopicMetadata(music, 1 partitions),
 'delhi_routes': TopicMetadata(delhi_routes, 1 partitions),
 'roulette': TopicMetadata(roulette, 1 partitions),
 'traffic-data': TopicMetadata(traffic-data, 1 partitions),
 'weather_topic4': TopicMetadata(weather_topic4, 1 partitions),
 'nyt_article_publishes': TopicMetadata(nyt_article_publishes, 1 partiti

Let's take a closer look at the ```music``` topic.

In [4]:
from confluent_kafka import Consumer

In [5]:
conf = {'bootstrap.servers': '172.29.16.101:9092',
        'group.id': 'music-stats'}

consumer = Consumer(conf)

In [10]:
consumer.subscribe(['music'])

while True:
    msg = consumer.poll(timeout=1.0)
        
    if msg is not None and msg.error():
        if msg.error().code() == KafkaError._PARTITION_EOF:
            # End of partition event
            sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
                             (msg.topic(), msg.partition(), msg.offset()))
        elif msg.error():
            raise KafkaException(msg.error())
    elif msg is not None:
        print(msg.value())
        break
    else:
        print('No msg')

b'{"ts": 1716379588091, "auth": "Logged In", "page": "NextSong", "song": "Loving You Sunday Morning", "level": "paid", "artist": "Scorpions", "gender": "M", "method": "PUT", "status": 200, "userId": "7", "lastName": "Freeman", "location": "Bakersfield, CA", "track_id": 3291, "firstName": "Colin", "sessionId": 784, "userAgent": "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:31.0) Gecko/20100101 Firefox/31.0", "registration": 1715218288033, "itemInSession": 25}'


## Spark Streaming and Kafka

Now that we know the structure of the streamed data, we can analyze the stream with Spark.

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession.builder \
    .appName("PySpark Kafka Music Stats") \
    .master("spark://172.29.16.102:7077") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/01/28 10:18:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
music_stream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "172.29.16.101:9092") \
    .option("subscribe", "music") \
    .load()

In [18]:
schema = StructType() \
    .add("artist", StringType()) \
    .add("status", StringType()) \
    .add("level", StringType()) \
    .add("registration", TimestampType()) \
    .add("ts", TimestampType())

# Parse JSON messages and extract fields
music_df = music_stream \
    .selectExpr("CAST(value AS STRING)") \
    .select(from_json("value", schema).alias("data")) \
    .select("data.*")

In [19]:
music_df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- status: string (nullable = true)
 |-- level: string (nullable = true)
 |-- registration: timestamp (nullable = true)
 |-- ts: timestamp (nullable = true)



### Total Artist Streamed Count

In [30]:
artist_counts = music_df.groupBy("artist").count().sort(desc('count'))

In [34]:
query = artist_counts \
    .writeStream \
    .outputMode('complete') \
    .format('console') \
    .start()

25/01/28 09:37:25 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-86687fc5-72de-4cdf-a5e6-2cabbc76d149. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/01/28 09:37:25 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
25/01/28 09:37:25 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+------+-----+
|artist|count|
+------+-----+
+------+-----+



                                                                                

-------------------------------------------
Batch: 1
-------------------------------------------
+--------------------+-----+
|              artist|count|
+--------------------+-----+
|         Iron Maiden|  213|
|                  U2|  135|
|        Led Zeppelin|  114|
|           Metallica|  112|
|                Lost|   92|
|         Deep Purple|   92|
|           Pearl Jam|   67|
|       Lenny Kravitz|   57|
|    Various artistss|   56|
|          The Office|   53|
|       Faith No More|   52|
|           Van Halen|   52|
|Os Paralamas Do S...|   49|
|        Eric Clapton|   48|
|Red Hot Chili Pep...|   48|
|               Queen|   45|
|        Foo Fighters|   44|
|      Guns N'' Roses|   42|
|              R.E.M.|   41|
|  The Rolling Stones|   41|
+--------------------+-----+
only showing top 20 rows



In [35]:
query.stop()

25/01/28 09:37:43 WARN TaskSetManager: Lost task 0.0 in stage 67.0 (TID 5712) (172.29.16.108 executor 1): TaskKilled (Stage cancelled: Job 30 cancelled part of cancelled job group 600017d8-90eb-4bf5-ac6a-cfab4a337171)


### Registation Level Count in 30 day windows

In [26]:
windowed_level_counts = music_df.groupBy(
    window(col('registration'), '30 days'),
    col('level')
).count()

In [27]:
query = windowed_level_counts \
    .writeStream \
    .outputMode("complete") \
    .format("memory") \
    .queryName("status_stats") \
    .start()

25/01/28 10:26:04 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-1f11d0b2-1d88-4e9f-b087-85c0cf385687. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/01/28 10:26:04 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
25/01/28 10:26:04 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
                                                                                

In [37]:
spark.sql("SELECT * FROM status_stats ORDER BY count DESC").show(20, False)



+----------------------------------------------+-----+-----+
|window                                        |level|count|
+----------------------------------------------+-----+-----+
|{+56355-10-05 00:00:00, +56355-11-04 00:00:00}|free |201  |
|{+56223-07-09 00:00:00, +56223-08-08 00:00:00}|paid |200  |
|{+56328-02-29 00:00:00, +56328-03-30 00:00:00}|paid |200  |
|{+56335-07-22 00:00:00, +56335-08-21 00:00:00}|paid |200  |
|{+56242-12-26 00:00:00, +56243-01-25 00:00:00}|paid |161  |
|{+56160-07-08 00:00:00, +56160-08-07 00:00:00}|paid |161  |
|{+56151-03-28 00:00:00, +56151-04-27 00:00:00}|paid |160  |
|{+56142-07-13 00:00:00, +56142-08-12 00:00:00}|free |160  |
|{+56102-06-13 00:00:00, +56102-07-13 00:00:00}|paid |160  |
|{+56347-11-16 00:00:00, +56347-12-16 00:00:00}|paid |160  |
|{+56274-06-11 00:00:00, +56274-07-11 00:00:00}|paid |160  |
|{+56190-12-28 00:00:00, +56191-01-27 00:00:00}|free |160  |
|{+56216-10-13 00:00:00, +56216-11-12 00:00:00}|free |160  |
|{+56297-02-10 00:00:00,

                                                                                

In [38]:
query.stop()

25/01/28 10:29:42 ERROR WriteToDataSourceV2Exec: Data source write support MicroBatchWrite[epoch: 46, writer: org.apache.spark.sql.execution.streaming.sources.MemoryStreamingWrite@553536ee] is aborting.
25/01/28 10:29:42 ERROR WriteToDataSourceV2Exec: Data source write support MicroBatchWrite[epoch: 46, writer: org.apache.spark.sql.execution.streaming.sources.MemoryStreamingWrite@553536ee] aborted.
25/01/28 10:29:43 WARN TaskSetManager: Lost task 0.0 in stage 240.0 (TID 22436) (172.29.16.108 executor 1): TaskKilled (Stage cancelled: Job 127 cancelled part of cancelled job group 8cafec77-8d91-42ff-b7e7-5d966fbf7bae)


<hr>

In [39]:
spark.stop()