In [4]:
#!pip3 install pyspark==3.5.5

In [19]:
import os
os.environ["SPARK_HOME"] = "/Applications/spark"
os.environ["PYSPARK_DRIVER_PYTHON"] = "jupyter"
os.environ["PYSPARK_DRIVER_PYTHON_OPTS"] = "notebook"
os.environ["PYSPARK_PYTHON"] = "python"

In [20]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("pyspark-ml") \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4") \
    .getOrCreate()

In [72]:
import os
from dotenv import load_dotenv
load_dotenv()

os.environ['AWS_ACCESS_KEY_ID'] = os.environ.get("AWS_ACCESS_KEY_ID")
os.environ['AWS_SECRET_ACCESS_KEY'] = os.environ.get("AWS_SECRET_ACCESS_KEY")
os.environ['AWS_DEFAULT_REGION'] = os.environ.get("AWS_DEFAULT_REGION")


import logging

# Pyspark - related libraries
import pyspark.sql.functions as F
from pyspark.sql.types import (StructType, 
                                StructField,
                                FloatType, 
                                IntegerType, 
                                StringType)
from pyspark.sql.functions import lit
from pyspark.sql import DataFrame
from pyspark.sql.window import Window

In [22]:
# Test Spark is working or not

data = [
    ("Product A", 5000), 
    ("Product B", 6000), 
    ("Product C", 7000)
]

columns = ["Name", "Price"]
df = spark.createDataFrame(data, columns)
df.show()

+---------+-----+
|     Name|Price|
+---------+-----+
|Product A| 5000|
|Product B| 6000|
|Product C| 7000|
+---------+-----+



#### 1. Load the Required files

In [93]:
songs = spark.read.csv("../data/songs.csv", inferSchema = True, header = True)
users = spark.read.csv("../data/users.csv", inferSchema = True, header = True)
streams = spark.read.csv("../data/streams/", inferSchema = True, header = True)


print(f"Shape of the songs dataframe : {songs.count(), len(songs.columns)}")
print(f"Shape of the users dataframe : {users.count(), len(users.columns)}")
print(f"Shape of the streams dataframe : {streams.count(), len(streams.columns)}")

Shape of the songs dataframe : (89741, 21)
Shape of the users dataframe : (50000, 5)
Shape of the streams dataframe : (34038, 3)


#### 2. Check if the columns that we are gonna use having null values or not

In [94]:
# Extract the date from listen time to use a report_date
streams = (streams.withColumn(
    "report_date", 
    F.to_date(F.col("listen_time"))
))

# Ensure there are no null keys where joins are going to happen
try:
    assert(streams.filter(streams["track_id"].isNotNull()).count() == streams.count())
except Exception as e:
    logging.error(f"==== FAILURE - NO NULL VAUES IN TRACK_ID COLUMN ====")
    raise ValueError(f"==== FAILURE - NO NULL VAUES IN TRACK_ID COLUMN ==== {e}")
else:
    streams = streams.filter(streams["track_id"].isNotNull())
    logging.info(f"==== SUCCESS - CHECK NULL VAUES IN TRACK_ID COLUMN ====") 


# Similarly check if there are any null values for the songs table as well 
try:
    assert(songs.filter(songs["track_id"].isNotNull()).count() == songs.count())
except Exception as e:
    logging.error(f"==== FAILURE - CHECK IF NULL VALUES IN SONGS DATAFRAME ====")
    raise ValueError(f"==== FAILURE - CHECK IF NULL VALUES IN SONGS DATAFRAME ====")
else:
    songs = songs.filter(songs["track_id"].isNotNull())
    logging.info(f"==== SUCCESS - CHECK IF NULL VALUES IN SONGS DATAFRAME")


#### 3. Calculate KPIs for each songs on a daily basis

In [95]:
songs_kpi_df = streams.groupBy("track_id", "report_date") \
        .agg(
            F.count("*").alias("total_listens"), 
            F.approx_count_distinct("user_id").alias("unique_users"), 
            F.sum(F.expr("unix_timestamp(listen_time)")).alias("total_listening_time"), 
            F.avg(F.expr("unix_timestamp(listen_time)")).alias("avg_listening_time_per_user")
        )

# Join with songs data to get genre
songs_kpi_with_details_df = songs_kpi_df.join(
    songs, 
    (songs_kpi_df["track_id"] == songs["track_id"]),
    how = "inner"
).drop(songs["track_id"])

songs_kpi_with_details_df.show()

+--------------------+-----------+-------------+------------+--------------------+---------------------------+---+--------------------+--------------------+--------------------+----------+-----------+--------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-------+--------------+-----------+
|            track_id|report_date|total_listens|unique_users|total_listening_time|avg_listening_time_per_user| id|             artists|          album_name|          track_name|popularity|duration_ms|explicit|danceability|energy|key|loudness|mode|speechiness|acousticness|instrumentalness|liveness|valence|  tempo|time_signature|track_genre|
+--------------------+-----------+-------------+------------+--------------------+---------------------------+---+--------------------+--------------------+--------------------+----------+-----------+--------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-------+--

#### 4. Compute Window functions

In [96]:
# Rank Songs within each genre by total_listens
windowSpec = Window.partitionBy("report_date", "track_genre").orderBy(F.desc("total_listens"))
ranked_songs_df = songs_kpi_with_details_df.withColumn("rank", F.rank().over(windowSpec))

# Now, let's filter out the songs where the rank is less than 3
top_songs_per_genre = ranked_songs_df.filter(ranked_songs_df["rank"] <= 3)
top_songs_per_genre.show(5)

+--------------------+-----------+-------------+------------+--------------------+---------------------------+-----+--------------------+--------------------+--------------------+-----------+--------------------+--------+------------+-------+----------+--------+-------+-----------+------------+----------------+--------+-------+------+--------------+-----------+----+
|            track_id|report_date|total_listens|unique_users|total_listening_time|avg_listening_time_per_user|   id|             artists|          album_name|          track_name| popularity|         duration_ms|explicit|danceability| energy|       key|loudness|   mode|speechiness|acousticness|instrumentalness|liveness|valence| tempo|time_signature|track_genre|rank|
+--------------------+-----------+-------------+------------+--------------------+---------------------------+-----+--------------------+--------------------+--------------------+-----------+--------------------+--------+------------+-------+----------+--------+

In [97]:
# Find out the top 5 genres based on total listeners across all songs and all days
genre_window = Window.partitionBy("report_date").orderBy(F.desc("total_listens"))
top_genres = top_songs_per_genre.withColumn("genre_rank", F.rank().over(genre_window)).filter(F.col("genre_rank") <= 5)
top_genres.show(5)

+--------------------+-----------+-------------+------------+--------------------+---------------------------+------+--------------------+-----------------------------------+--------------------+----------+-----------+--------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-------+--------------+-----------+----+----------+
|            track_id|report_date|total_listens|unique_users|total_listening_time|avg_listening_time_per_user|    id|             artists|                         album_name|          track_name|popularity|duration_ms|explicit|danceability|energy|key|loudness|mode|speechiness|acousticness|instrumentalness|liveness|valence|  tempo|time_signature|track_genre|rank|genre_rank|
+--------------------+-----------+-------------+------------+--------------------+---------------------------+------+--------------------+-----------------------------------+--------------------+----------+-----------+--------+------------+------+-

In [99]:
final_df = top_genres.select(
    F.col("report_date"),
    F.col("track_id"), 
    F.col("track_name"), 
    F.col("artists"), 
    F.col("track_genre"), 
    F.col("total_listens"), 
    F.col("unique_users"), 
    F.col("total_listening_time"), 
    F.col("avg_listening_time_per_user")
)

In [100]:
final_df.show()

+-----------+--------------------+--------------------+--------------------+-------------+-------------+------------+--------------------+---------------------------+
|report_date|            track_id|          track_name|             artists|  track_genre|total_listens|unique_users|total_listening_time|avg_listening_time_per_user|
+-----------+--------------------+--------------------+--------------------+-------------+-------------+------------+--------------------+---------------------------+
| 2024-06-25|0BtD9k8XjGliQJMS2...|      Varias Queixas|  Filipe Escandurras|       pagode|            5|           5|          8596706764|             1.7193413528E9|
| 2024-06-25|3u2jm2FZ4x5p8k3Xy...|Little Christmas ...|     Michael Jackson|         soul|            5|           5|          8596712116|             1.7193424232E9|
| 2024-06-25|4gDajIG4yNgBtym4z...|          自閉円頓裹|        桶狭間ありさ|        anime|            4|           4|          6877285438|             1.7193213595E9|
| 2024-06