In [1]:
!pip install numpy pandas faker pyngrok fastavro kafka-python TDigest

Collecting faker
  Downloading Faker-24.9.0-py3-none-any.whl (1.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.8/1.8 MB[0m [31m6.9 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting pyngrok
  Downloading pyngrok-7.1.6-py3-none-any.whl (22 kB)
Collecting fastavro
  Downloading fastavro-1.9.4-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.1 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.1/3.1 MB[0m [31m23.3 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting kafka-python
  Downloading kafka_python-2.0.2-py2.py3-none-any.whl (246 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m246.5/246.5 kB[0m [31m13.3 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting TDigest
  Downloading tdigest-0.5.2.2-py3-none-any.whl (9.4 kB)
Collecting accumulation-tree (from TDigest)
  Downloading accumulation_tree-0.6.2.tar.gz (12 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting pyudorandom (from TDigest)
  Downloading

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
%%writefile environment.sh
#!/usr/bin/bash
export KAFKA_BINARY_VERSION='3.7.0'
export SCALA_BINARY_VERSION='2.13'
export KAFKA_BINARY_VERSION=$KAFKA_BINARY_VERSION
export SCALA_BINARY_VERSION=$SCALA_BINARY_VERSION
export PATH=$PATH:$PWD/kafka_$SCALA_BINARY_VERSION-$KAFKA_BINARY_VERSION/bin

Writing environment.sh


In [4]:
%%writefile kafka_setup.sh

source ./environment.sh
echo kafka_$SCALA_BINARY_VERSION-$KAFKA_BINARY_VERSION
echo $PATH

# Java Setup
wget -O- https://apt.corretto.aws/corretto.key | sudo apt-key add -
sudo add-apt-repository 'deb https://apt.corretto.aws stable main' -y
sudo apt-get -y update; sudo apt-get install -y java-11-amazon-corretto-jdk

# Kafka Setup
wget https://downloads.apache.org/kafka/${KAFKA_BINARY_VERSION}/kafka_${SCALA_BINARY_VERSION}-${KAFKA_BINARY_VERSION}.tgz
tar xzf kafka_${SCALA_BINARY_VERSION}-${KAFKA_BINARY_VERSION}.tgz

UUID=$(./kafka_${SCALA_BINARY_VERSION}-${KAFKA_BINARY_VERSION}/bin/kafka-storage.sh random-uuid)
echo "export UUID=$UUID" >> ./environment.sh
cat environment.sh

# Start Kafka Broker

echo kafka_$SCALA_BINARY_VERSION-$KAFKA_BINARY_VERSION

# offsets.retention.minutes determines how long Kafka retains the commit offsets for consumer groups.
echo "offsets.retention.minutes=300" >> ./kafka_${SCALA_BINARY_VERSION}-${KAFKA_BINARY_VERSION}/config/kraft/server.properties

./kafka_${SCALA_BINARY_VERSION}-${KAFKA_BINARY_VERSION}/bin/kafka-storage.sh format -t ${UUID} -c ./kafka_${SCALA_BINARY_VERSION}-${KAFKA_BINARY_VERSION}/config/kraft/server.properties
nohup ./kafka_${SCALA_BINARY_VERSION}-${KAFKA_BINARY_VERSION}/bin/kafka-server-start.sh ./kafka_${SCALA_BINARY_VERSION}-${KAFKA_BINARY_VERSION}/config/kraft/server.properties > kafka_server.log &

Writing kafka_setup.sh


In [5]:
%%shell
source kafka_setup.sh
sleep 10
tail -20 kafka_server.log

kafka_2.13-3.7.0
/opt/bin:/usr/local/nvidia/bin:/usr/local/cuda/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/tools/node/bin:/tools/google-cloud-sdk/bin:/content/kafka_2.13-3.7.0/bin
--2024-04-16 01:44:37--  https://apt.corretto.aws/corretto.key
Resolving apt.corretto.aws (apt.corretto.aws)... 52.84.18.14, 52.84.18.105, 52.84.18.24, ...
Connecting to apt.corretto.aws (apt.corretto.aws)|52.84.18.14|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1695 (1.7K) [binary/octet-stream]
Saving to: ‘STDOUT’


2024-04-16 01:44:37 (835 MB/s) - written to stdout [1695/1695]

OK
Repository: 'deb https://apt.corretto.aws stable main'
Description:
Archive for codename: stable components: main
More info: https://apt.corretto.aws
Adding repository.
Adding deb entry to /etc/apt/sources.list.d/archive_uri-https_apt_corretto_aws-jammy.list
Adding disabled deb-src entry to /etc/apt/sources.list.d/archive_uri-https_apt_corretto_aws-jammy.list
Get:1 http://securit



In [6]:
%%shell

source ./environment.sh

kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --topic spotify --create --partitions 10 --replication-factor 1

Created topic spotify.




In [7]:
%%writefile check_kafka_consumers.sh
#!/usr/bin/env bash
source ./environment.sh

echo "Active Consumer Groups"
while true
do
date
kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --all-groups
sleep 1
done


Writing check_kafka_consumers.sh


In [8]:
%%shell
chmod +x ./check_kafka_consumers.sh
nohup ./check_kafka_consumers.sh > kafka_consumers.log &

nohup: redirecting stderr to stdout




In [9]:
!nohup python drive/MyDrive/stream/src/data_generator.py > avro_producer.log &

nohup: redirecting stderr to stdout


In [10]:
!sleep 5
!tail -20 avro_producer.log

In [11]:
%%writefile avro_consumer.py

from kafka import KafkaConsumer
import fastavro
import io

# Define the schema for the Spotify records
schema = {
    "doc": "Spotify Wrapped Data Feed - User Event",
    "name": "UserEvent",
    "namespace": "com.spotify.wrapped",
    "type": "record",
    "fields": [
        {"name": "id", "type": "long"},
        {"name": "timestamp", "type": "string", "logicalType": "timestamp-millis"},
        {"name": "track_id", "type": "string"},
        {"name": "user_id", "type": "long"},
        {"name": "listening_time", "type": "long"}
    ]
}

# Parse the schema
parsed_schema = fastavro.parse_schema(schema)

# Create a Kafka consumer with value deserializer
def deserialize(message):
    schemaless_bytes_reader = io.BytesIO(message)
    try:
        record = fastavro.schemaless_reader(schemaless_bytes_reader, parsed_schema)
        return record
    except Exception as ex:
        print(ex)
        return None  # Return None for failed deserialization

# Create a Kafka consumer with value deserializer
consumer = KafkaConsumer(
    'spotify',  # Adjust topic name as needed
    bootstrap_servers=['localhost:9092'],  # Adjust bootstrap servers if needed
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    group_id='Python_AVRO_Consumer',  # Adjust consumer group ID if needed
    value_deserializer=deserialize
)

# Consume messages from the topic and print them
for message in consumer:
    print("=" * 10)
    print(message.value)
print('hi')

Writing avro_consumer.py


In [12]:
!nohup python avro_consumer.py > avro_consumer.log &

nohup: redirecting stderr to stdout


In [13]:
!sleep 5
!tail -20 avro_consumer.log

In [14]:
!ps -ef |grep avro

root        5547       1  5 01:45 ?        00:00:00 python3 avro_consumer.py
root        5579     221  0 01:45 ?        00:00:00 /bin/bash -c ps -ef |grep avro
root        5581    5579  0 01:45 ?        00:00:00 grep avro


In [15]:
%%shell
source ./environment.sh
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

Python_AVRO_Consumer




In [16]:
%%shell

source ./environment.sh
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group Python_AVRO_Consumer


GROUP                TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                             HOST            CLIENT-ID
Python_AVRO_Consumer spotify         1          0               0               0               kafka-python-2.0.2-aa379e5e-270b-44d0-8ff6-0be07f7b7be0 /127.0.0.1      kafka-python-2.0.2
Python_AVRO_Consumer spotify         9          0               0               0               kafka-python-2.0.2-aa379e5e-270b-44d0-8ff6-0be07f7b7be0 /127.0.0.1      kafka-python-2.0.2
Python_AVRO_Consumer spotify         6          0               0               0               kafka-python-2.0.2-aa379e5e-270b-44d0-8ff6-0be07f7b7be0 /127.0.0.1      kafka-python-2.0.2
Python_AVRO_Consumer spotify         8          0               0               0               kafka-python-2.0.2-aa379e5e-270b-44d0-8ff6-0be07f7b7be0 /127.0.0.1      kafka-python-2.0.2
Python_AVRO_Consumer spotify         2          0               0        



In [17]:
!tail -20 kafka_consumers.log

Python_AVRO_Consumer spotify         2          -               0               -               kafka-python-2.0.2-aa379e5e-270b-44d0-8ff6-0be07f7b7be0 /127.0.0.1      kafka-python-2.0.2
Python_AVRO_Consumer spotify         4          -               0               -               kafka-python-2.0.2-aa379e5e-270b-44d0-8ff6-0be07f7b7be0 /127.0.0.1      kafka-python-2.0.2
Python_AVRO_Consumer spotify         5          -               0               -               kafka-python-2.0.2-aa379e5e-270b-44d0-8ff6-0be07f7b7be0 /127.0.0.1      kafka-python-2.0.2
Python_AVRO_Consumer spotify         7          -               0               -               kafka-python-2.0.2-aa379e5e-270b-44d0-8ff6-0be07f7b7be0 /127.0.0.1      kafka-python-2.0.2
Python_AVRO_Consumer spotify         0          -               0               -               kafka-python-2.0.2-aa379e5e-270b-44d0-8ff6-0be07f7b7be0 /127.0.0.1      kafka-python-2.0.2
Python_AVRO_Consumer spotify         3          -               0

In [18]:
%%shell

source ./environment.sh
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --all-groups


GROUP                TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                             HOST            CLIENT-ID
Python_AVRO_Consumer spotify         1          0               0               0               kafka-python-2.0.2-aa379e5e-270b-44d0-8ff6-0be07f7b7be0 /127.0.0.1      kafka-python-2.0.2
Python_AVRO_Consumer spotify         9          0               0               0               kafka-python-2.0.2-aa379e5e-270b-44d0-8ff6-0be07f7b7be0 /127.0.0.1      kafka-python-2.0.2
Python_AVRO_Consumer spotify         6          0               0               0               kafka-python-2.0.2-aa379e5e-270b-44d0-8ff6-0be07f7b7be0 /127.0.0.1      kafka-python-2.0.2
Python_AVRO_Consumer spotify         8          0               0               0               kafka-python-2.0.2-aa379e5e-270b-44d0-8ff6-0be07f7b7be0 /127.0.0.1      kafka-python-2.0.2
Python_AVRO_Consumer spotify         2          0               0        



In [1]:
spark_release='spark-3.5.1'
hadoop_version='hadoop3'

import os, time
start=time.time()
os.environ['SPARK_RELEASE']=spark_release
os.environ['HADOOP_VERSION']=hadoop_version
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_release}-bin-{hadoop_version}"

In [2]:
# Run below commands in google colab
!apt-get install openjdk-8-jdk-headless -qq > /dev/null # install Java8
!wget -q http://apache.osuosl.org/spark/${SPARK_RELEASE}/${SPARK_RELEASE}-bin-${HADOOP_VERSION}.tgz # download spark-3.3.X
!tar xf ${SPARK_RELEASE}-bin-${HADOOP_VERSION}.tgz # unzip it

!pip install -q findspark # install findspark
# findspark find your Spark Distribution and sets necessary environment variables

import findspark
findspark.init()

# Check the pyspark version
import pyspark
print(pyspark.__version__)

3.5.1


In [3]:
kafka_brokers="127.0.0.1:9092" # Can be a comma-separated list of brokers
topic_name="spotify"

# Define the AVRO schema as a string
schema = """
{
        "doc": "Spotify Wrapped Data Feed - User Event",
        "name": "UserEvent",
        "namespace": "com.spotify.wrapped",
        "type": "record",
        "fields": [
            {"name": "id", "type": "long"},
            {"name": "timestamp", "type": "string",
                "logicalType": "timestamp-millis"},
            {"name": "track_id", "type": "string"},
            {"name": "user_id", "type": "long"},
            {"name": "listening_time", "type":"long"}
        ]
    }
"""

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.avro.functions import from_avro

# Create a Spark session
spark = SparkSession \
    .builder \
    .appName("StreamingAVROFromKafka") \
    .config("spark.streaming.stopGracefullyOnShutdown", True) \
    .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0,org.apache.spark:spark-avro_2.12:3.5.0') \
    .config("spark.sql.shuffle.partitions", 4) \
    .master("local[*]") \
    .getOrCreate()

In [5]:
# Kafka Configuration for reading from Kafka/Event Hub
# Kafka source will create a unique group id for each query automatically. The user can set the prefix of the automatically
# generated group.id’s via the optional source option groupIdPrefix, default value is “spark-kafka-source”.
kafkaConf = {
    "kafka.bootstrap.servers": kafka_brokers,
    # Below settins required if kafka is secured:
    # "kafka.sasl.mechanism": "PLAIN",
    # "kafka.security.protocol": "SASL_SSL",
    # "kafka.sasl.jaas.config": 'org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://eventhubname.servicebus.windows.net/;SharedAccessKeyName=listenpolicyforspark;SharedAccessKey=ckNkSjcyXKGN8FCIRIS3qtkKvW+AEhB6QPaM=;EntityPath=instructortest";',
    "subscribe": topic_name, # to read from specific partitions use option: "assign": {topic_name:[0,1]})
    "startingOffsets": "latest", # "earliest", "latest"
    "enable.auto.commit": "true ",
    "groupIdPrefix": "Stream_Analytics_",
    "auto.commit.interval.ms": "5000"
}


# Read from Event Hub using Kafka
df = spark \
    .readStream \
    .format("kafka") \
    .options(**kafkaConf)

In [6]:
df = df.load()  # Start reading data from the specified Kafka topic

In [7]:
# Deserialize the AVRO messages from the value column
df = df.select(from_avro(df.value, schema).alias("spotify"))

# Print the schema of the DataFrame
df.printSchema()

root
 |-- spotify: struct (nullable = true)
 |    |-- id: long (nullable = false)
 |    |-- timestamp: string (nullable = false)
 |    |-- track_id: string (nullable = false)
 |    |-- user_id: long (nullable = false)
 |    |-- listening_time: long (nullable = false)



In [8]:
from pyspark.sql.functions import col
from pyspark.sql.functions import  to_timestamp

# Update DataFrame to flatten the AVRO record
df = df.select(
    col("spotify.id").alias("event_id"),
    to_timestamp(col("spotify.timestamp"), "yyyy-MM-dd'T'HH:mm:ss.SSSSSS").alias("event_timestamp"),
    col("spotify.track_id"),
    col("spotify.user_id"),
    col("spotify.listening_time")
)

# Print the schema of the updated DataFrame
df.printSchema()

root
 |-- event_id: long (nullable = true)
 |-- event_timestamp: timestamp (nullable = true)
 |-- track_id: string (nullable = true)
 |-- user_id: long (nullable = true)
 |-- listening_time: long (nullable = true)



In [9]:
!mkdir checkpoint

mkdir: cannot create directory ‘checkpoint’: File exists


In [10]:
# Reading the other avro files as spark df:
df_tracks = spark.read.format("avro").load("/content/drive/MyDrive/stream/data/tracks.avro")
df_users = spark.read.format("avro").load("/content/drive/MyDrive/stream/data/users.avro")
df_artists = spark.read.format("avro").load("/content/drive/MyDrive/stream/data/artists.avro")

# 1. PERSONALITY

## 1.1 F-E SCORE

### 1.1.1 GET TOTAL LISTENS OF USER

In [11]:
from pyspark.sql.functions import session_window
from pyspark.sql import functions as F

In [12]:
gap_duration = "20 minutes"

session_window_df_total_listens = df \
    .withWatermark("event_timestamp", "5 minutes") \
    .groupBy('user_id', session_window("event_timestamp", gap_duration).alias("session_window"))\
    .agg(
        F.count("*").alias("track_count")
    )

In [13]:
# QUERY TO GET TOTAL LISTENS
query_name='user_total_listens'
query=session_window_df_total_listens.writeStream \
    .outputMode("append") \
    .format("memory") \
    .queryName(query_name) \
    .trigger(processingTime='2 seconds')\
    .start()

In [14]:
df_user_total_listens = spark.sql(f"SELECT * FROM {query_name}")

In [15]:
df_user_total_listens.show()

+-------+--------------+-----------+
|user_id|session_window|track_count|
+-------+--------------+-----------+
+-------+--------------+-----------+



In [16]:
def get_user_total_listens(user_id):
  df_filtered = df_user_total_listens.filter(col("user_id") == user_id)
  total_listens = df_filtered.groupBy("user_id").agg(sum("track_count").alias("total_track_count")).collect()[0]

  return total_listens["total_track_count"]

### 1.1.2 GET TOTAL OF UNIQUE SONGS

In [17]:
def get_user_listens_unique_artist(user_id):
  df_filtered = df_user_total_listens.filter(col("user_id") == user_id)
  df_filtered = df_filtered.filter(df_filtered["track_count"] == 1)

  return df_filtered.count()

## 1.2 T-N score

### 1.2.1 GET THE AVERAGE RELEASE DATE OF SONGS IN THE DATASET

In [18]:
from pyspark.sql.functions import col

In [19]:
# QUERY TO GET THE AVERAGE RELEASE DATE OF SONGS IN THE DATASET
joined_df = df.join(df_tracks, df.track_id == df_tracks.track_id, "inner")
joined_df = joined_df.withColumn("release_year", col("release_date").cast("int"))
average_release_year = joined_df.selectExpr("avg(release_year) as avg_release_year")

query_name = "average_release_year"
query = average_release_year.writeStream \
    .outputMode("complete") \
    .format("memory") \
    .queryName(query_name) \
    .start()

In [20]:
query_name = "average_release_year"
df_average_release_year = spark.sql(f"SELECT * FROM {query_name}")

In [21]:
df_average_release_year.show()

+----------------+
|avg_release_year|
+----------------+
+----------------+



In [22]:
def get_average_release_date():
  return df_average_release_year.collect()[0]["avg_release_year"]

### 1.2.2 GET THE AVERAGE RELEASE DATE OF SONGS FOR A GIVEN USER

In [23]:
from pyspark.sql.functions import count, avg

In [24]:
joined_df = df.join(df_tracks, df.track_id == df_tracks.track_id, "inner")
joined_df = joined_df.withColumn("release_year", col("release_date").cast("int"))
average_release_date_df = joined_df.groupBy("user_id").agg(avg("release_date").alias("average_release_date"))

In [25]:
query_name = "average_release_date_df"
query = average_release_date_df.writeStream \
    .outputMode("complete") \
    .format("memory") \
    .queryName(query_name) \
    .start()

In [26]:
query_name = "average_release_date_df"
df_average_release_date_by_user = spark.sql(f"SELECT * FROM {query_name}")

In [27]:
df_average_release_date_by_user.show()

+-------+--------------------+
|user_id|average_release_date|
+-------+--------------------+
+-------+--------------------+



In [28]:
def get_user_average_release_date(user_id):
  df_filtered = df_average_release_date_by_user.filter(col("user_id") == user_id).collect()[0]

  return df_filtered["average_release_date"]

## 1.3 L-V score

### LOYALTY_COUNTER
For every user, for every listen. If listen is of the same artist than the previous listen, add 1 to the counter.

In [29]:
def get_user_loyalty_counter(user_id):
    df_user = df.filter(df.user_id == str(user_id))
    joined_df = df_user.join(df_tracks, df_user.track_id == df_tracks.track_id, "inner")

    query_name=f'user{str(user_id)}_loyalty_counter'
    try:
      streaming_df = joined_df.writeStream \
        .outputMode("append") \
        .format("memory") \
        .queryName(query_name) \
        .start()
    except Exception as e:
      print(f"Can't create query because exception: {e}")

    query = f"""
        SELECT count(*) AS count
        FROM (
            SELECT event_timestamp, artist,
                  lag(artist) OVER (ORDER BY event_timestamp) AS previous_artist
            FROM {query_name}
        ) WHERE artist = previous_artist
    """

    output = spark.sql(query)
    loyalty_counter = int(output.first()["count"])

    return loyalty_counter

## 1.4 C-U score

### GET NUMBER OF LISTENS TO MAINSTREAM SONGS BY USER

In [30]:
joined_df = df.join(df_tracks, df.track_id == df_tracks.track_id, "inner")

In [31]:
streaming_df = joined_df.writeStream \
    .outputMode("append") \
    .format("memory") \
    .queryName("artists") \
    .start()

In [32]:
query_name = "artists"
df_mainstream_artists = spark.sql(f"SELECT * FROM {query_name}")

In [33]:
df_mainstream_artists.show()

+--------+---------------+--------+-------+--------------+--------+--------+------+----+----------+------------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-----+--------------+----------+-----------+
|event_id|event_timestamp|track_id|user_id|listening_time|track_id|duration|artist|name|popularity|release_date|danceability|energy|key|loudness|mode|speechiness|acousticness|instrumentalness|liveness|valence|tempo|time_signature|album_name|track_genre|
+--------+---------------+--------+-------+--------------+--------+--------+------+----+----------+------------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-----+--------------+----------+-----------+
+--------+---------------+--------+-------+--------------+--------+--------+------+----+----------+------------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-----+--------------+--------

In [34]:
def get_user_mainstream_counter(user_id):
  df_filtered = df_mainstream_artists.filter(col("user_id") == user_id)

  return df_filtered.filter(col("popularity") > 50).count()

# 2. ANALYTICS

## SETUP

In [35]:
from pyspark.sql.functions import window, col, min, max, count, unix_timestamp, collect_list, map_from_arrays
spark.conf.set("spark.sql.mapKeyDedupPolicy", "LAST_WIN")


query_name = 'df'
query = df.writeStream \
    .outputMode("update") \
    .format("memory") \
    .queryName(query_name) \
    .start()

df_final = spark.sql("SELECT * FROM df")
# Apply transformations to create session_window_df
session_window_df = df_final \
    .withWatermark("event_timestamp", "5 minutes") \
    .groupBy('user_id', session_window("event_timestamp", "20 minutes").alias("session_window"), "track_id") \
    .agg(
        min("event_timestamp").alias("window_start"),
        max("event_timestamp").alias("window_end"),
        (unix_timestamp(max("event_timestamp")) - unix_timestamp(min("event_timestamp"))).alias("session_duration"),
        count("*").alias("track_count")
    )

# Aggregate track counts into a map-type column
session_window_dict_df = session_window_df \
    .groupBy('user_id', 'session_window') \
    .agg(
        map_from_arrays(collect_list('track_id'), collect_list('track_count')).alias('track_count_dict'),
        min('window_start').alias('window_start'),
        max('window_end').alias('window_end'),
        max('session_duration').alias('session_duration')
    )

# Write the DataFrame to a memory table
session_window_dict_df.createOrReplaceTempView("spotify_query_with_tracks_dict")

# Query the memory table
result_df = spark.sql("SELECT * FROM spotify_query_with_tracks_dict")


In [36]:
result_df.count()

0

In [37]:
query_name_parquet = "df_parquet"
query_parquet = df \
    .writeStream \
    .format("parquet") \
    .option("checkpointLocation", "checkpoint3") \
    .option("path", "/content/events_df") \
    .queryName(query_name_parquet) \
    .start()

In [None]:
parquet_df = spark.read.parquet('/content/events_df')
parquet_df.show(50, truncate=False)

## 2.0 GENERAL ANALYTICS

### 2.0.1  Top 5 most streamed songs per week

In [38]:
from pyspark.sql import Window

In [39]:
from pyspark.sql.functions import explode, weekofyear, collect_list, rank, desc

In [40]:
# Explode the track_count_dict map into separate rows for each track and its count
exploded_df = result_df.select("user_id", "session_window", explode("track_count_dict").alias("track_id", "count"))

# Extract week from session_window
exploded_df = exploded_df.withColumn("week", weekofyear("session_window.start"))

# Group by week and track_id to calculate the total count of each track for each week
week_grouped_df = exploded_df.groupby("week", "track_id").agg({"count": "sum"})

# Rank the songs for each week
window_spec = Window.partitionBy("week").orderBy(desc("sum(count)"))
ranked_songs_df = week_grouped_df.withColumn("rank", rank().over(window_spec))

# Filter to select the top 5 songs for each week
top_5_songs_df = ranked_songs_df.filter(col("rank") <= 5)

# Collect the top 5 songs for each week into a list
top_5_songs_list_per_week = top_5_songs_df.groupby("week").agg(collect_list("track_id").alias("top_5_songs"))

top_5_songs_with_names_df = top_5_songs_df.join(df_tracks, "track_id", "left")

# Select the necessary columns
top_5_songs_names_df = top_5_songs_with_names_df.select("week", "name")

# Group by week and collect the names into a list
top_5_songs_list_per_week_with_names = top_5_songs_names_df.groupby("week") \
    .agg(collect_list("name").alias("top_5_song_names"))

# Show the result
top_5_songs_list_per_week_with_names.show(truncate=False)

+----+---------------------------------+
|week|top_5_song_names                 |
+----+---------------------------------+
|6   |[How Can You Mend A Broken Heart]|
+----+---------------------------------+



In [41]:
def get_top_5_streamed_songs_current_week():
  max_week = top_5_songs_list_per_week_with_names.select(max("week")).collect()[0][0]
  latest_row = top_5_songs_list_per_week_with_names.filter(col("week") == max_week).first()

  return latest_row["top_5_song_names"]

### 2.0.2 Top 5 most streamed artists per week

In [42]:
from pyspark.sql.functions import explode, weekofyear, collect_list, rank, desc

In [43]:
# Explode the track_count_dict map into separate rows for each track and its count
exploded_df = result_df.select("user_id", "session_window", explode("track_count_dict").alias("track_id", "count"))

# Extract week from session_window
exploded_df = exploded_df.withColumn("week", weekofyear("session_window.start"))

# Join with df_tracks to get artist information
joined_df = exploded_df.join(df_tracks, "track_id", "left")

# Group by week and artist to calculate the total count of each artist for each week
week_artist_grouped_df = joined_df.groupby("week", "artist").agg({"count": "sum"})

# Rank the artists for each week
window_spec = Window.partitionBy("week").orderBy(desc("sum(count)"))
ranked_artists_df = week_artist_grouped_df.withColumn("rank", rank().over(window_spec))

# Filter to select the top 5 artists for each week
top_5_artists_df = ranked_artists_df.filter(col("rank") <= 5)

# Collect the top 5 artists for each week into a list
top_5_artists_list_per_week = top_5_artists_df.groupby("week").agg(collect_list("artist").alias("top_5_artists"))

# Show the result
top_5_artists_list_per_week.show(truncate=False)

+----+------------------------------------------------------------------------------------------------------------------------+
|week|top_5_artists                                                                                                           |
+----+------------------------------------------------------------------------------------------------------------------------+
|6   |[['McCoy Tyner'], ['Belle and the Nursery Rhymes Band'], ['Unnikrishnan', 'Sujatha'], ['Fred Åkerström'], ['Jazzanova']]|
+----+------------------------------------------------------------------------------------------------------------------------+



In [44]:
def get_top_5_streamed_artists_current_week():
  max_week = top_5_artists_list_per_week.select(max("week")).collect()[0][0]
  latest_row = top_5_artists_list_per_week.filter(col("week") == max_week).first()

  return latest_row["top_5_artists"]

### 2.0.3 Top 5 songs streamed per month

In [45]:
from pyspark.sql.functions import month, dense_rank

In [46]:
# Extract month from session_window
exploded_df = exploded_df.withColumn("month", month("session_window.start"))

# Group by month and track_id to calculate the total count of each track for each month
month_grouped_df = exploded_df.groupby("month", "track_id").agg({"count": "sum"})

# Rank the songs for each month
window_spec_month = Window.partitionBy("month").orderBy(desc("sum(count)"))
ranked_songs_month_df = month_grouped_df.withColumn("rank", dense_rank().over(window_spec_month))

# Filter to select the top 5 songs for each month
top_5_songs_month_df = ranked_songs_month_df.filter(col("rank") <= 5)

# Join with df_tracks to get the names of the top 5 songs for each month
top_5_songs_with_names_month_df = top_5_songs_month_df.join(df_tracks, "track_id", "left")

# Select the necessary columns
top_5_songs_names_month_df = top_5_songs_with_names_month_df.select("month", "name")

# Group by month and collect the names into a list
top_5_songs_list_per_month_with_names = top_5_songs_names_month_df.groupby("month") \
    .agg(collect_list("name").alias("top_5_song_names"))

# Show the result
top_5_songs_list_per_month_with_names.show(truncate=False)

+-----+---------------------------------------------------------------------------------------------------+
|month|top_5_song_names                                                                                   |
+-----+---------------------------------------------------------------------------------------------------+
|2    |[Contemplation, Reykjavik, Esmer, Us Against The World (Da Tweekaz Remix), Loving Strangers, Kukla]|
+-----+---------------------------------------------------------------------------------------------------+



In [47]:
def get_top_5_streamed_songs_current_month():
  max_month = top_5_songs_list_per_month_with_names.select(max("month")).collect()[0][0]
  latest_row = top_5_songs_list_per_month_with_names.filter(col("month") == max_month).first()

  return latest_row["top_5_song_names"]

### 2.0.4 Top 5 streamed artist per month

In [48]:
from pyspark.sql.functions import explode, month, collect_list, rank, desc

In [49]:
# Explode the track_count_dict map into separate rows for each track and its count
exploded_df = result_df.select("user_id", "session_window", explode("track_count_dict").alias("track_id", "count"))

# Extract month from session_window
exploded_df = exploded_df.withColumn("month", month("session_window.start"))

# Join with df_tracks to get artist information
joined_df = exploded_df.join(df_tracks, "track_id", "left")

# Group by month and artist to calculate the total count of each artist for each month
month_artist_grouped_df = joined_df.groupby("month", "artist").agg({"count": "sum"})

# Rank the artists for each month
window_spec = Window.partitionBy("month").orderBy(desc("sum(count)"))
ranked_artists_df = month_artist_grouped_df.withColumn("rank", rank().over(window_spec))

# Filter to select the top 5 artists for each month
top_5_artists_df = ranked_artists_df.filter(col("rank") <= 5)

# Collect the top 5 artists for each month into a list
top_5_artists_list_per_month = top_5_artists_df.groupby("month").agg(collect_list("artist").alias("top_5_artists"))

# Show the result
top_5_artists_list_per_month.show(truncate=False)

+-----+-------------------------------------------------------------------------------------------------------------------+
|month|top_5_artists                                                                                                      |
+-----+-------------------------------------------------------------------------------------------------------------------+
|2    |[['Ceylan Ertem'], ['McCoy Tyner'], ['Old Harp Singers of Eastern Tennessee'], ['Gunnar Þórðarson'], ['Clockartz']]|
+-----+-------------------------------------------------------------------------------------------------------------------+



In [50]:
def get_top_5_streamed_artists_current_month():
  max_month = top_5_artists_list_per_month.select(max("month")).collect()[0][0]
  latest_row = top_5_artists_list_per_month.filter(col("month") == max_month).first()

  return latest_row["top_5_artists"]

## 2.1 Weekly User Queries

### 2.1.1 Top 5 songs per user per week

In [51]:
from pyspark.sql.functions import explode, row_number, col, weekofyear, max
from pyspark.sql.window import Window

In [52]:
def get_user_latest_week(df, user_id):
  filtered_df = df.filter(col("user_id") == user_id)
  max_week = filtered_df.select(max("week")).collect()[0][0]
  latest_row = filtered_df.filter(col("week") == max_week).first()
  return latest_row

In [53]:
# Explode the track_count_dict map into separate rows for each track and its count
exploded_df = result_df.withColumn("track_id", explode(F.map_keys("track_count_dict"))) \
                      .withColumn("count", explode(F.map_values("track_count_dict")))

# Define a window specification to partition by user_id and week and order by the count descending
window_spec = Window.partitionBy("user_id", weekofyear("session_window.start")).orderBy(F.desc("count"))

# Rank the top 5 songs for each user and each week
ranked_songs_df = exploded_df.withColumn("rank", row_number().over(window_spec)) \
                             .filter(F.col("rank") <= 5)

# Join with the DataFrame containing song information to get the song name
top_songs_with_names_df = ranked_songs_df.join(df_tracks, "track_id", "left")

# Aggregate the counts of the top 5 songs for each user and each week
top_songs_count_df = top_songs_with_names_df.groupBy("user_id", weekofyear("session_window.start").alias("week")) \
                                             .agg(F.collect_list("name").alias("top_songs"),
                                                  F.collect_list("count").alias("count"))

# Show the result
top_songs_count_df.show(truncate=False)

+-------+----+----------------------------------------------------------------------------------------------------------------+--------------------+
|user_id|week|top_songs                                                                                                       |count               |
+-------+----+----------------------------------------------------------------------------------------------------------------+--------------------+
|0      |6   |[Amazing Grace, Amazing Grace, Dónde Estás, Once I Caught a Fish Alive, Acid Thunder]                           |[8, 8, 8, 7, 7]     |
|1      |6   |[Deep Waters, Smiling Faces, Bullet, I'm Not the Man I'm Supposed to Be, Serenata]                              |[5, 5, 5, 5, 5]     |
|2      |6   |[Delirium, Us Against The World (Da Tweekaz Remix), Delirium, Us Against The World (Da Tweekaz Remix), Delirium]|[13, 8, 7, 5, 5]    |
|3      |6   |[Loving Strangers, Kamini, Rockin' Around the Christmas Tree, Heading Home, Loving Strangers

In [54]:
def get_user_top_5_songs_current_week(user_id):
  latest_row = get_user_latest_week(top_songs_count_df, user_id)

  return latest_row["top_songs"]

### 2.1.2 Weekly listening time / count of songs

In [55]:
from pyspark.sql.functions import explode, weekofyear, sum

In [56]:
# Explode the track_count_dict map into separate rows for each track and its count
exploded_df_week = result_df.withColumn("track_id", explode(F.map_keys("track_count_dict"))) \
                            .withColumn("count", explode(F.map_values("track_count_dict")))

# Define a window specification to partition by user_id and week
window_spec_week = Window.partitionBy("user_id", weekofyear("session_window.start"))

# Aggregate the listening time and total count of all songs for each user and week
aggregated_df_week = exploded_df_week.groupBy("user_id", weekofyear("session_window.start").alias("week")) \
                                     .agg(sum("session_duration").alias("listening_time"),
                                          sum("count").alias("total_song_count"))

# Show the result
aggregated_df_week.show()

+-------+----+--------------+----------------+
|user_id|week|listening_time|total_song_count|
+-------+----+--------------+----------------+
|      0|   6|        105127|             314|
|      1|   6|        219848|             435|
|      2|   6|         33977|             197|
|      3|   6|         33823|             104|
|      4|   6|         82678|             264|
|      5|   6|        415880|             423|
|      6|   6|         81734|             215|
|      7|   6|        138808|             327|
|      8|   6|        346019|             562|
|      9|   6|         96448|             275|
+-------+----+--------------+----------------+



In [57]:
def get_user_listening_time_current_week(user_id):
  latest_row = get_user_latest_week(aggregated_df_week, user_id)

  return latest_row["listening_time"]

In [58]:
def get_user_total_listens_current_week(user_id):
  latest_row = get_user_latest_week(aggregated_df_week, user_id)

  return latest_row["total_song_count"]

### 2.1.3 Percentile Analysis

In [59]:
from pyspark.sql.functions import explode, col, weekofyear, when ,expr
from pyspark.sql.window import Window
from tdigest import TDigest

In [60]:
# Define a function to compute the percentile using T-Digest
def compute_percentile(counts):
    tdigest = TDigest()
    for count in counts:
        tdigest.update(count)
    return tdigest.percentile(0.99)

# UDF to apply the percentile function
compute_percentile_udf = F.udf(compute_percentile)

# Explode the track_count_dict map into separate rows for each track and its count
exploded_df = result_df.withColumn("track_id", explode(F.map_keys("track_count_dict"))) \
                        .withColumn("count", explode(F.map_values("track_count_dict")))

# Define a window specification to partition by user_id and week and order by the count descending
window_spec = Window.partitionBy("user_id", weekofyear("session_window.start")).orderBy(col("count").desc())

# Rank the songs for each user and each week
ranked_songs_df = exploded_df.withColumn("rank", F.row_number().over(window_spec))

# Select the top 1 song for each user and each week
top_songs_df = ranked_songs_df.filter(col("rank") == 1)

# Aggregate the counts of the top 1 song for each user, week, and track
top_songs_count_df = top_songs_df.groupBy("user_id", weekofyear("session_window.start").alias("week"), "track_id") \
                                  .agg(F.sum("count").alias("count"))

top_songs_with_names_df = top_songs_count_df.join(df_tracks, "track_id", "left") \
                                             .withColumnRenamed("name", "song_name") \
                                             .select("user_id", "week", "song_name", "count")

# Compute the 95th percentile for each user, week, and track
percentile_df = top_songs_with_names_df.groupBy("user_id", "week", "song_name").agg(F.collect_list("count").alias("counts")) \
                                  .withColumn("percentile", compute_percentile_udf("counts").cast("double"))


# Add a new column indicating whether the count is equal to the percentile
results_df = percentile_df.withColumn("top_percentile",
                                     expr("transform(counts, x -> if(x = percentile, 'Top 1%', 'Not Top 1%'))"))

# Show the result with all required columns
results_df.show(truncate=False)

+-------+----+--------------------+------+----------+--------------+
|user_id|week|song_name           |counts|percentile|top_percentile|
+-------+----+--------------------+------+----------+--------------+
|0      |6   |Dónde Estás         |[9]   |9.0       |[Top 1%]      |
|1      |6   |Twilight            |[8]   |8.0       |[Top 1%]      |
|2      |6   |Delirium            |[13]  |13.0      |[Top 1%]      |
|3      |6   |Escrito Nas Estrelas|[13]  |13.0      |[Top 1%]      |
|4      |6   |Chantilly Lace      |[14]  |14.0      |[Top 1%]      |
|5      |6   |Macarena            |[15]  |15.0      |[Top 1%]      |
|6      |6   |Poovukkul           |[13]  |13.0      |[Top 1%]      |
|7      |6   |Contemplation       |[18]  |18.0      |[Top 1%]      |
|8      |6   |Tie                 |[19]  |19.0      |[Top 1%]      |
|9      |6   |Reykjavik           |[14]  |14.0      |[Top 1%]      |
+-------+----+--------------------+------+----------+--------------+



In [61]:
def get_user_top_song_percentile_current_week(user_id):
  latest_row = get_user_latest_week(results_df, user_id)

  return latest_row["song_name"], latest_row["top_percentile"][0]


### 2.1.4 Engagement Metrics user_id vs all users

In [None]:
from pyspark.sql.functions import weekofyear, col
import matplotlib.pyplot as plt

In [None]:
# Input the user_id
user_id = "1"

In [None]:
result_df = result_df.withColumn("session_duration_minutes", col("session_duration") / 60)

In [None]:
result_df

DataFrame[user_id: bigint, session_window: struct<start:timestamp,end:timestamp>, track_count_dict: map<string,bigint>, window_start: timestamp, window_end: timestamp, session_duration: bigint, session_duration_minutes: double]

In [None]:
# Filter result_df for the specific user_id
user_result_df = result_df.filter(col("user_id") == user_id)

# Calculate session duration for the specific user
user_result_df = user_result_df.withColumn("session_duration_minutes", (col("session_window.end").cast("long") - col("session_window.start").cast("long")) / 60)

# Extract week from session window for the specific user
user_result_df = user_result_df.withColumn("week", weekofyear("session_window.start"))

# Group by week and calculate average session duration for the specific user
average_session_duration_per_week_user = user_result_df.groupby("week").agg({"session_duration_minutes": "avg"})

# Order by week for the specific user
average_session_duration_per_week_user = average_session_duration_per_week_user.orderBy("week")

# Convert to Pandas DataFrame for plotting for the specific user
average_session_duration_per_week_pd_user = average_session_duration_per_week_user.toPandas()

# Group by week and calculate average session duration for all users
average_session_duration_per_week_all = result_df.groupby("week").agg({"session_duration_minutes": "avg"})

# Order by week for all users
average_session_duration_per_week_all = average_session_duration_per_week_all.orderBy("week")

# Convert to Pandas DataFrame for plotting for all users
average_session_duration_per_week_pd_all = average_session_duration_per_week_all.toPandas()

# Plot both lines in one plot
plt.figure(figsize=(10, 6))

# Plot average session duration per week for all users
plt.plot(average_session_duration_per_week_pd_all["week"], average_session_duration_per_week_pd_all["avg(session_duration_minutes)"], marker='o', label="All Users")

# Plot average session duration per week for the specific user
plt.plot(average_session_duration_per_week_pd_user["week"], average_session_duration_per_week_pd_user["avg(session_duration_minutes)"], marker='s', label=f"User {user_id}")

plt.xlabel("Week")
plt.ylabel("Average Session Duration (minutes)")
plt.title("Average Session Duration Per Week")
plt.xticks(average_session_duration_per_week_pd_all["week"])
plt.legend()
plt.grid(True)
plt.show()

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `week` cannot be resolved. Did you mean one of the following? [`session_duration_minutes`, `spotify_query_with_tracks_dict`.`user_id`, `spotify_query_with_tracks_dict`.`window_end`, `spotify_query_with_tracks_dict`.`window_start`, `spotify_query_with_tracks_dict`.`session_window`].;
'Aggregate ['week], ['week, avg(session_duration_minutes#188212) AS avg(session_duration_minutes)#188911]
+- Project [user_id#1507L, session_window#1520, track_count_dict#1569, window_start#1571, window_end#1573, session_duration#1575L, (cast(session_duration#1575L as double) / cast(60 as double)) AS session_duration_minutes#188212]
   +- Project [user_id#1507L, session_window#1520, track_count_dict#1569, window_start#1571, window_end#1573, session_duration#1575L]
      +- SubqueryAlias spotify_query_with_tracks_dict
         +- View (`spotify_query_with_tracks_dict`, [user_id#1507L,session_window#1520,track_count_dict#1569,window_start#1571,window_end#1573,session_duration#1575L])
            +- Aggregate [user_id#1507L, session_window#1520], [user_id#1507L, session_window#1520, map_from_arrays(collect_list(track_id#1506, 0, 0), collect_list(track_count#1534L, 0, 0)) AS track_count_dict#1569, min(window_start#1527) AS window_start#1571, max(window_end#1529) AS window_end#1573, max(session_duration#1532L) AS session_duration#1575L]
               +- Aggregate [user_id#1507L, session_window#1535, track_id#1506], [user_id#1507L, session_window#1535 AS session_window#1520, track_id#1506, min(event_timestamp#1505) AS window_start#1527, max(event_timestamp#1505) AS window_end#1529, (unix_timestamp(max(event_timestamp#1505), yyyy-MM-dd HH:mm:ss, Some(Etc/UTC), false) - unix_timestamp(min(event_timestamp#1505), yyyy-MM-dd HH:mm:ss, Some(Etc/UTC), false)) AS session_duration#1532L, count(1) AS track_count#1534L]
                  +- Filter isnotnull(event_timestamp#1505)
                     +- Project [named_struct(start, precisetimestampconversion(precisetimestampconversion(event_timestamp#1505, TimestampType, LongType), LongType, TimestampType), end, knownnullable(precisetimestampconversion(precisetimestampconversion(cast(event_timestamp#1505 + cast(20 minutes as interval) as timestamp), TimestampType, LongType), LongType, TimestampType))) AS session_window#1535, event_id#1504L, event_timestamp#1505, track_id#1506, user_id#1507L, listening_time#1508L]
                        +- Project [event_id#1504L, event_timestamp#1505, track_id#1506, user_id#1507L, listening_time#1508L]
                           +- SubqueryAlias df
                              +- View (`df`, [event_id#1504L,event_timestamp#1505,track_id#1506,user_id#1507L,listening_time#1508L])
                                 +- MemoryPlan MemorySink, [event_id#1504L, event_timestamp#1505, track_id#1506, user_id#1507L, listening_time#1508L]


## 2.2 Monthly Queries

### 2.2.1 Top 5 songs per user per week

In [62]:
from pyspark.sql.functions import explode, row_number, month

In [63]:
def get_user_latest_month(df, user_id):
  filtered_df = df.filter(col("user_id") == user_id)
  max_month = filtered_df.select(max("month")).collect()[0][0]
  latest_row = filtered_df.filter(col("month") == max_month).first()
  return latest_row

In [64]:
# Explode the track_count_dict map into separate rows for each track and its count
exploded_df_month = result_df.withColumn("track_id", explode(F.map_keys("track_count_dict"))) \
                      .withColumn("count", explode(F.map_values("track_count_dict")))

# Define a window specification to partition by user_id and month and order by the count descending
window_spec_month = Window.partitionBy("user_id", month("session_window.start")).orderBy(F.desc("count"))

# Rank the top 5 songs for each user and each month
ranked_songs_df_month = exploded_df_month.withColumn("rank", row_number().over(window_spec_month)) \
                             .filter(F.col("rank") <= 5)

# Join with the DataFrame containing song information to get the song name
top_songs_with_names_df_month = ranked_songs_df_month.join(df_tracks, "track_id", "left")

# Aggregate the counts of the top 5 songs for each user and each month
top_songs_count_df_month = top_songs_with_names_df_month.groupBy("user_id", month("session_window.start").alias("month")) \
                                             .agg(F.collect_list("name").alias("top_songs"),
                                                  F.collect_list("count").alias("count"))

# Show the result
top_songs_count_df_month.show(truncate=False)

+-------+-----+-----------------------------------------------------------------------------------------------------------------+--------------------+
|user_id|month|top_songs                                                                                                        |count               |
+-------+-----+-----------------------------------------------------------------------------------------------------------------+--------------------+
|0      |2    |[Dónde Estás, Amazing Grace, Amazing Grace, Dónde Estás, Dónde Estás]                                            |[9, 8, 8, 8, 8]     |
|1      |2    |[The Humbling River, Twilight, For You, The Humbling River, Twilight]                                            |[11, 8, 8, 8, 7]    |
|2      |2    |[Delirium, Us Against The World (Da Tweekaz Remix), Delirium, Sinto Falta Dela, 情歌]                            |[13, 8, 7, 6, 6]    |
|3      |2    |[Escrito Nas Estrelas, Loving Strangers, Kamini, Rockin' Around the Christmas Tre

In [65]:
def get_user_top_5_songs_current_month(user_id):
  latest_row = get_user_latest_month(top_songs_count_df_month, user_id)

  return latest_row["top_songs"]

### 2.2.2 Monthly listening time / count of songs

In [66]:
from pyspark.sql.functions import explode, month, sum, count

In [67]:
# Explode the track_count_dict map into separate rows for each track and its count
exploded_df_month = result_df.withColumn("track_id", explode(F.map_keys("track_count_dict"))) \
                             .withColumn("count", explode(F.map_values("track_count_dict")))

# Define a window specification to partition by user_id and month
window_spec_month = Window.partitionBy("user_id", month("session_window.start"))

# Aggregate the listening time and number of songs for each user and month
aggregated_df_month = exploded_df_month.groupBy("user_id", month("session_window.start").alias("month")) \
                                       .agg(sum("session_duration").alias("listening_time"),
                                            count("track_id").alias("number_of_songs"))

# Show the result
aggregated_df_month.show()

+-------+-----+--------------+---------------+
|user_id|month|listening_time|number_of_songs|
+-------+-----+--------------+---------------+
|      0|    2|        142371|            192|
|      1|    2|        236656|            284|
|      2|    2|        105531|            250|
|      3|    2|         75711|             59|
|      4|    2|        137931|            338|
|      5|    2|        489995|            408|
|      6|    2|        119323|            151|
|      7|    2|        232783|            192|
|      8|    2|        376129|            364|
|      9|    2|        114491|            146|
+-------+-----+--------------+---------------+



In [68]:
def get_user_listening_time_current_month(user_id):
  latest_row = get_user_latest_month(aggregated_df_month, user_id)

  return latest_row["listening_time"]

In [69]:
def get_user_total_listens_current_month(user_id):
  latest_row = get_user_latest_month(aggregated_df_month, user_id)

  return latest_row["number_of_songs"]

### 2.2.3 Percentile Analysis

In [70]:
from pyspark.sql.functions import explode, col, month, expr
from pyspark.sql.window import Window
from tdigest import TDigest

In [71]:
# Define a function to compute the percentile using T-Digest
def compute_percentile(counts):
    tdigest = TDigest()
    for count in counts:
        tdigest.update(count)
    return tdigest.percentile(0.99)  # Change the percentile as needed

# UDF to apply the percentile function
compute_percentile_udf = F.udf(compute_percentile)

# Explode the track_count_dict map into separate rows for each track and its count
exploded_df = result_df.withColumn("track_id", explode(F.map_keys("track_count_dict"))) \
                        .withColumn("count", explode(F.map_values("track_count_dict")))

window_spec = Window.partitionBy("user_id", month("session_window.start")).orderBy(col("count").desc())

ranked_songs_df = exploded_df.withColumn("rank", F.row_number().over(window_spec))

top_songs_df = ranked_songs_df.filter(col("rank") == 1)

top_songs_count_df = top_songs_df.groupBy("user_id", month("session_window.start").alias("month"), "track_id") \
                                  .agg(F.sum("count").alias("count"))

top_songs_with_names_df = top_songs_count_df.join(df_tracks, "track_id", "left") \
                                             .withColumnRenamed("name", "song_name") \
                                             .select("user_id", "month", "song_name", "count")

percentile_df = top_songs_with_names_df.groupBy("user_id", "month", "song_name").agg(F.collect_list("count").alias("counts")) \
                                  .withColumn("percentile", compute_percentile_udf("counts").cast("double"))

results_df_month = percentile_df.withColumn("top_percentile",
                                     expr("transform(counts, x -> if(x = percentile, 'Top 1%', 'Not Top 5%'))"))

# Show the result with all required columns
results_df_month.show(truncate=False)


+-------+-----+--------------------+------+----------+--------------+
|user_id|month|song_name           |counts|percentile|top_percentile|
+-------+-----+--------------------+------+----------+--------------+
|0      |2    |Dónde Estás         |[9]   |9.0       |[Top 1%]      |
|1      |2    |The Humbling River  |[11]  |11.0      |[Top 1%]      |
|2      |2    |Delirium            |[13]  |13.0      |[Top 1%]      |
|3      |2    |Escrito Nas Estrelas|[13]  |13.0      |[Top 1%]      |
|4      |2    |Chantilly Lace      |[14]  |14.0      |[Top 1%]      |
|5      |2    |Macarena            |[15]  |15.0      |[Top 1%]      |
|6      |2    |Poovukkul           |[13]  |13.0      |[Top 1%]      |
|7      |2    |Contemplation       |[18]  |18.0      |[Top 1%]      |
|8      |2    |Tie                 |[19]  |19.0      |[Top 1%]      |
|9      |2    |Reykjavik           |[14]  |14.0      |[Top 1%]      |
+-------+-----+--------------------+------+----------+--------------+



In [72]:
def get_user_top_song_percentile_current_month(user_id):
  latest_row = get_user_latest_month(results_df_month, user_id)

  return latest_row["song_name"], latest_row["top_percentile"][0]

### 2.2.4 User engagement

In [None]:
from pyspark.sql.functions import month, col
import matplotlib.pyplot as plt

In [None]:
# Input the user_id
user_id = 1

In [None]:
# Filter result_df for the specific user_id
user_result_df = result_df.filter(col("user_id") == user_id)

# Calculate session duration for the specific user
user_result_df = user_result_df.withColumn("session_duration_minutes", (col("session_window.end").cast("long") - col("session_window.start").cast("long")) / 60)

# Extract month from session window for the specific user
user_result_df = user_result_df.withColumn("month", month("session_window.start"))

# Group by month and calculate average session duration for the specific user
average_session_duration_per_month_user = user_result_df.groupby("month").agg({"session_duration_minutes": "avg"})

# Order by month for the specific user
average_session_duration_per_month_user = average_session_duration_per_month_user.orderBy("month")

# Convert to Pandas DataFrame for plotting for the specific user
average_session_duration_per_month_pd_user = average_session_duration_per_month_user.toPandas()

# Group by month and calculate average session duration for all users
average_session_duration_per_month_all = result_df.groupby(month("session_window.start").alias("month")).agg({"session_duration_minutes": "avg"})

# Order by month for all users
average_session_duration_per_month_all = average_session_duration_per_month_all.orderBy("month")

# Convert to Pandas DataFrame for plotting for all users
average_session_duration_per_month_pd_all = average_session_duration_per_month_all.toPandas()

# Plot both lines in one plot
plt.figure(figsize=(10, 6))

# Plot average session duration per month for all users
plt.plot(average_session_duration_per_month_pd_all["month"], average_session_duration_per_month_pd_all["avg(session_duration_minutes)"], marker='o', label="All Users")

# Plot average session duration per month for the specific user
plt.plot(average_session_duration_per_month_pd_user["month"], average_session_duration_per_month_pd_user["avg(session_duration_minutes)"], marker='s', label=f"User {user_id}")

plt.xlabel("Month")
plt.ylabel("Average Session Duration (minutes)")
plt.title("Average Session Duration Per Month")
plt.xticks(average_session_duration_per_month_pd_all["month"])
plt.legend()
plt.grid(True)
plt.show()


# 3. SONG RECOMMENDATIONS

In [None]:
# Read the Avro file and rename the column track_id to song_id
df_songs = spark.read.format("avro").load("/content/drive/MyDrive/stream/data/tracks.avro") \
    .withColumnRenamed("track_id", "song_id")

In [None]:
# Import necessary functions
from pyspark.sql.functions import col

# Join the streaming DataFrame df with df_tracks on the track_id column
joined_df = df.join(df_songs, df["track_id"] == df_songs["song_id"], "inner")

In [None]:
query_merged = 'merged_df'
query = joined_df.writeStream \
    .outputMode("append") \
    .format("memory") \
    .queryName(query_merged) \
    .start()
df_merged = spark.sql("SELECT * FROM merged_df")

In [None]:
from pyspark.sql.functions import col, rank
from pyspark.sql import Window


# Group by user_id and song_id, count the occurrences of each song for each user
user_song_counts = df_merged.groupBy("user_id", "song_id").count()

# Assign a rank to each song within each user group based on the count, ordered by count in descending order
window_spec = Window.partitionBy("user_id").orderBy(col("count").desc())
user_song_rank = user_song_counts.withColumn("rank", rank().over(window_spec))

# Filter to keep only the rows where the rank is 1, indicating the most played song for each user
most_played_per_user = user_song_rank.filter(col("rank") == 1).drop("rank")

# Join most_played_per_user with df_tracks on song_id
most_played_with_characteristics = most_played_per_user.join(df_songs, "song_id", "left")

# Select the desired columns from df_tracks and keep the user_id and song_id
selected_columns = [
    "user_id", "song_id", "duration", "artist", "name", "popularity", "release_date",
    "danceability", "energy", "key", "loudness", "mode",
    "speechiness", "acousticness", "instrumentalness", "liveness",
    "valence", "tempo", "time_signature", "album_name", "track_genre"
]

# Show the result
best_song = most_played_with_characteristics.select(selected_columns)
best_song.show()

In [None]:
df_songs_pd = df_songs.toPandas()
best_song_pd = best_song.toPandas()

In [None]:
import numpy as np
import pandas as pd

def get_top_similar_songs(song_id, df_songs_pd):
    # Define vector_columns
    vector_columns = ["duration", "popularity", "danceability", "energy", "key",
                      "loudness", "mode", "speechiness", "acousticness", "instrumentalness",
                      "liveness", "valence", "tempo", "time_signature"]

    # Extract the features of the target song
    target_features = df_songs_pd.loc[df_songs_pd['song_id'] == song_id, vector_columns].values
    if len(target_features) == 0:
        return []  # Return an empty list if the song_id is not found

    target_features = target_features[0]

    # Calculate cosine similarity between the target song and all other songs
    other_features = df_songs_pd.loc[df_songs_pd['song_id'] != song_id, vector_columns].values
    dot_products = np.dot(other_features, target_features)
    target_norm = np.linalg.norm(target_features)
    other_norms = np.linalg.norm(other_features, axis=1)
    similarities = dot_products / (target_norm * other_norms)

    # Get indices of top 5 most similar songs
    top_indices = np.argsort(similarities)[::-1][:5]

    # Get the names of the top similar songs
    top_similar_songs = df_songs_pd.loc[df_songs_pd.index[top_indices], 'name'].tolist()

    return top_similar_songs

# Apply the function to the DataFrame
best_song_pd['recommendations'] = best_song_pd['song_id'].apply(lambda x: get_top_similar_songs(x, df_songs_pd))

# Display the DataFrame with recommendations
print(best_song_pd)

In [None]:
!pip install azure-storage-queue

In [None]:
username= 'yassine02' # this can be any string identifier, for example your email username
account_name= 'ysnspotify' # the name of the Azure Storage Account you want to use
account_key= 'ewdKctJnob2ion3ihNKmTCqQzqnUv2/tTG4xcPpYqYEoK7P7IQJPxrnorYBa2QKQi7k0nEsoPv1Z+AStyDW4qA=='
connection_string = f"DefaultEndpointsProtocol=https;AccountName={account_name};AccountKey={account_key};EndpointSuffix=core.windows.net"


connection_string

In [None]:
def create_user_queue(user_id):
    queue_name = f"{username}-{user_id}-queue"
    try:
        queue_service_client.create_queue(queue_name)
        print(f"Queue '{queue_name}' created successfully for user {user_id}.")
    except ResourceExistsError:
        print(f"Queue '{queue_name}' already exists for user {user_id}.")
    except Exception as e:
        print(f"An error occurred while creating queue for user {user_id}: {e}")

# Function to send a message to the queue without retries
def send_to_queue(user_id, recommendation_rank, recommendation_name):
    queue_name = f"{username}-{user_id}-queue"
    queue_client = queue_service_client.get_queue_client(queue_name)
    message = f"User ID: {user_id}, Recommendation Rank: {recommendation_rank}, Recommendation Name: {recommendation_name}"

    try:
        queue_client.send_message(message)
        print(f"Recommendation for user {user_id} (Rank: {recommendation_rank}) sent to queue.")
    except Exception as e:
        print(f"An error occurred while sending message to queue {queue_name}: {e}")

# Iterate over each user and send recommendations to their respective queues
for user_id, row in best_song_pd.iterrows():
    create_user_queue(user_id)  # Create queue for the user
    recommendations = row['recommendations']
    for rank, recommendation in enumerate(recommendations, start=1):
        send_to_queue(user_id, rank, recommendation)

In [None]:
def check_and_alert(user_id, recommendations):
    if len(recommendations) < 5:
        alert_record = {'user_id': user_id, 'message': f'Recommendations for user {user_id} are less than 5'}
        send_alert(alert_record)
    # You can add more conditions here based on your requirements, such as checking if recommendations are sent to the wrong user.
    # For example, you could compare the user_id in the recommendation with the actual user_id.

# Iterate over each user and send recommendations to their respective queues
for user_id, row in best_song_pd.iterrows():
    create_user_queue(user_id)  # Create queue for the user
    recommendations = row['recommendations']
    for rank, recommendation in enumerate(recommendations, start=1):
        send_to_queue_with_retry(user_id, rank, recommendation)

    # Check for alerts
    check_and_alert(user_id, recommendations)

# FLASK APP

In [73]:
!kill -9 "$(pgrep ngrok)"

/bin/bash: line 1: kill: `': not a pid or valid job spec


In [74]:
!pwd

/content


In [75]:
import os
os.chdir('drive/MyDrive/stream/src')

In [76]:
def get_familiarity_exploration_score(total_listens, listens_with_unique_artist):
    return (total_listens - listens_with_unique_artist) / total_listens

def get_timelessness_newness_score(average_user_release_date, average_dataset_release_date):
    return average_user_release_date / average_dataset_release_date

def get_loyalty_variety_score(loyalty_counter, total_listens):
    return loyalty_counter / total_listens

def get_commonality_uniqueness_score(mainstream_counter, total_listens):
    return mainstream_counter / total_listens


def get_personality_traits(user_id):
    traits = ""
    traits_list = []

    total_listens = get_user_total_listens(user_id)
    listens_with_unique_artist = get_user_listens_unique_artist(user_id)
    average_user_release_date = get_user_average_release_date(user_id)
    average_dataset_release_date = get_average_release_date()
    loyalty_counter = get_user_loyalty_counter(user_id)
    mainstream_counter = get_user_mainstream_counter(user_id)

    try:
      fe_score = get_familiarity_exploration_score(total_listens, listens_with_unique_artist)
    except ZeroDivisionError:
      fe_score = 1
    if fe_score > 0.5:
        traits += "F"
        traits_list.append("Familiarity")
    else:
        traits += "E"
        traits_list.append("Exploration")
    try:
      te_score = get_timelessness_newness_score(average_user_release_date, average_dataset_release_date)
    except ZeroDivisionError:
      te_score = 1
    if te_score >= 1:
        traits += "N"
        traits_list.append("Newness")
    else:
        traits += "T"
        traits_list.append("Timelessness")
    try:
      lv_score = get_loyalty_variety_score(loyalty_counter, total_listens)
    except ZeroDivisionError:
      lv_score = 1
    if lv_score > 0.5:
        traits += "L"
        traits_list.append("Loyalty")
    else:
        traits += "V"
        traits_list.append("Variety")
    try:
      cu_score = get_commonality_uniqueness_score(mainstream_counter, total_listens)
    except ZeroDivisionError:
      cu_score = 1
    if cu_score > 0.5:
        traits += "C"
        traits_list.append("Commonality")
    else:
        traits += "U"
        traits_list.append("Uniqueness")

    return traits, traits_list


def get_personality_type(user_id):
    personality_map = {
        "ENVC": 'The Early Adopter',
        "ENLU": 'The Nomad',
        "FNVU": 'The Specialist',
        "FNLC": 'The Enthusiast',
        "FTLC": 'The Connoisseur',
        "FTVU": 'The Deep Diver',
        "FNVC": 'The Fanclubber',
        "ETLC": 'The Top Charter',
        "FTLU": 'The Replayer',
        "FTVC": 'The Jukeboxer',
        "ENLC": "The Voyager",
        "FNLU": "The Devotee",
        "ETLU": "The Maverick",
        "ETVU": "The Time Traveler",
        "ETVC": "The Musicologist",
        "ENVU": "The Adventurer"
    }
    traits, _ = get_personality_traits(user_id)

    return personality_map[traits]

In [77]:
import getpass
import os
import threading

from flask import Flask, render_template, request, redirect, url_for
from pyngrok import ngrok, conf


conf.get_default().auth_token = "2f3FZdCmboHeGLduyiPd3PuvG5h_2kiQwA9e3XyG76J1PAiLj"

app = Flask(__name__)

public_url = ngrok.connect(5000).public_url
print(" * ngrok tunnel \"{}\" -> \"http://127.0.0.1:{}/\"".format(public_url, 5000))

app.config["BASE_URL"] = public_url


@app.route('/', methods=['GET', 'POST'])
def index():
  if request.method == 'POST':
    user_input = request.form['user_input']
    return redirect(url_for('wrapped', user_id=user_input))

  context = {
      "top_songs_week": get_top_5_streamed_songs_current_week(),
      "top_songs_month": get_top_5_streamed_songs_current_month(),
      "top_artists_week": get_top_5_streamed_artists_current_week(),
      "top_artists_month": get_top_5_streamed_artists_current_month(),
  }

  return render_template('index.html', **context)


@app.route('/wrapped/<user_id>')
def wrapped(user_id):
    _, traits_list = get_personality_traits(user_id)
    top_song_week, percentile_week = get_user_top_song_percentile_current_week(user_id)
    top_song_month, percentile_month = get_user_top_song_percentile_current_month(user_id)

    context = {
       "user_id": user_id,
       "personality_type": get_personality_type(user_id),
       "personality_traits": traits_list,
       "total_listens_week": get_user_total_listens_current_week(user_id),
       "total_listening_time_week": get_user_listening_time_current_week(user_id) / 3600,
       "total_listens_month": get_user_total_listens_current_month(user_id),
       "total_listening_time_month": get_user_listening_time_current_month(user_id) / 3600,
       "top_song_week": top_song_week,
       "percentile_week": percentile_week,
       "top_song_month": top_song_month,
       "percentile_month": percentile_month,
       "top_5_week": get_user_top_5_songs_current_week(user_id),
       "top_5_month": get_user_top_5_songs_current_month(user_id)
    }

    return render_template('wrapped.html', **context)

threading.Thread(target=app.run, kwargs={"debug": True,"use_reloader": False}).start()



 * ngrok tunnel "https://5e40-34-71-59-192.ngrok-free.app" -> "http://127.0.0.1:5000/"
 * Serving Flask app '__main__'
 * Debug mode: on
