In [0]:
bronze_df=spark.read.format("csv").option("header", "true").option("inferSchema","true").load("/Volumes/workspace/default/spotify_data/spotify_recent.csv")
display(bronze_df)
bronze_df.printSchema()

played_at,track_name,artist,album,duration_ms
2025-10-03T07:59:24.960Z,CANCELLED!,Taylor Swift,The Life of a Showgirl,211498
2025-10-03T07:55:53.286Z,CANCELLED!,Taylor Swift,The Life of a Showgirl,211498
2025-10-03T07:46:47.291Z,CANCELLED!,Taylor Swift,The Life of a Showgirl,211498
2025-10-03T07:43:30.130Z,CANCELLED!,Taylor Swift,The Life of a Showgirl,211498
2025-10-03T07:38:24.067Z,CANCELLED!,Taylor Swift,The Life of a Showgirl,211498
2025-10-03T07:34:24.492Z,CANCELLED!,Taylor Swift,The Life of a Showgirl,211498
2025-10-03T07:15:58.905Z,Actually Romantic,Taylor Swift,The Life of a Showgirl,163699
2025-10-03T07:05:14.483Z,Ruin The Friendship,Taylor Swift,The Life of a Showgirl,220564
2025-10-03T07:00:56.012Z,Elizabeth Taylor,Taylor Swift,The Life of a Showgirl,208291
2025-10-03T06:57:27.481Z,The Fate of Ophelia,Taylor Swift,The Life of a Showgirl,226073


root
 |-- played_at: timestamp (nullable = true)
 |-- track_name: string (nullable = true)
 |-- artist: string (nullable = true)
 |-- album: string (nullable = true)
 |-- duration_ms: integer (nullable = true)



In [0]:
from pyspark.sql.functions import *

silver_df = bronze_df \
    .withColumnRenamed("track_name", "track") \
    .withColumn("track", trim(col("track"))) \
    .withColumn("artist", trim(col("artist"))) \
    .withColumn("album", trim(col("album"))) \
    .dropna() \
    .dropDuplicates()

display(silver_df)
silver_df.printSchema()


played_at,track,artist,album,duration_ms
2025-10-03T06:05:31.474Z,CANCELLED!,Taylor Swift,The Life of a Showgirl,211498
2025-08-30T09:09:07.807Z,Red (Taylor's Version),Taylor Swift,Red (Taylor's Version),223093
2025-10-03T06:57:27.481Z,The Fate of Ophelia,Taylor Swift,The Life of a Showgirl,226073
2025-10-03T05:38:09.844Z,Actually Romantic,Taylor Swift,The Life of a Showgirl,163699
2025-08-27T16:19:26.298Z,Give Your Heart a Break,Demi Lovato,Unbroken,205346
2025-10-03T05:44:07.698Z,Wood,Taylor Swift,The Life of a Showgirl,150517
2025-08-28T15:10:33.923Z,I Like Me Better,Lauv,I met you when I was 18. (the playlist),197436
2025-10-03T04:11:18.865Z,Opalite,Taylor Swift,The Life of a Showgirl,235355
2025-08-28T14:56:14.532Z,Summertime Sadness,Lana Del Rey,Born To Die - The Paradise Edition,265427
2025-10-03T06:51:30.486Z,CANCELLED!,Taylor Swift,The Life of a Showgirl,211498


root
 |-- played_at: timestamp (nullable = true)
 |-- track: string (nullable = true)
 |-- artist: string (nullable = true)
 |-- album: string (nullable = true)
 |-- duration_ms: integer (nullable = true)



**Top Artists by Play Count**

In [0]:
gold_top_artists = silver_df\
    .groupBy("artist").count()\
    .orderBy(col("count").desc())
display(gold_top_artists)
gold_top_artists.write.mode("overwrite").saveAsTable("gold_top_artists")


artist,count
Taylor Swift,38
Demi Lovato,3
Lana Del Rey,3
Justin Bieber,3
Lauv,1
Kesha,1
Cartoon,1


**Listening Trends Over Time**

In [0]:
from pyspark.sql.functions import to_date

gold_daily_trend = silver_df \
    .withColumn("date", to_date("played_at")) \
    .groupBy("date") \
    .count() \
    .orderBy("date")

display(gold_daily_trend)
gold_daily_trend.write.mode("overwrite").saveAsTable("gold_daily_trend")


date,count
2025-08-27,4
2025-08-28,5
2025-08-30,6
2025-08-31,1
2025-10-03,34


**Daily Pattern across Hours**

In [0]:
from pyspark.sql.functions import when

gold_daily_pattern = silver_df \
    .withColumn("hour", hour("played_at")) \
    .withColumn("time_of_day",
        when((col("hour") >= 5) & (col("hour") < 12), "Morning")
        .when((col("hour") >= 12) & (col("hour") < 17), "Afternoon")
        .when((col("hour") >= 17) & (col("hour") < 21), "Evening")
        .otherwise("Night")
    ) \
    .groupBy("time_of_day") \
    .count() \
    .orderBy("count", ascending=False)

display(gold_daily_pattern)
gold_daily_pattern.write.mode("overwrite").option("mergeSchema", "true").saveAsTable("gold_daily_pattern")

time_of_day,count
Morning,35
Afternoon,9
Night,6


**Monday to Sunday Behavior**

In [0]:
from pyspark.sql.functions import hour, date_format, dayofweek, count

gold_daywise_trend = silver_df \
    .withColumn("hour", hour("played_at")) \
    .withColumn("day_of_week", date_format("played_at", "EEEE")) \
    .withColumn("day_num", dayofweek("played_at")) \
    .groupBy("day_num", "day_of_week", "hour") \
    .agg(count("*").alias("song_count")) \
    .orderBy("day_num", "hour") \
    .drop("day_num") #.drop hides it in final output

display(gold_daywise_trend)
gold_daywise_trend.write.mode("overwrite").saveAsTable("gold_daywise_trend")



day_of_week,hour,song_count
Sunday,8,1
Wednesday,16,4
Thursday,14,2
Thursday,15,3
Friday,0,1
Friday,4,5
Friday,5,9
Friday,6,10
Friday,7,9
Saturday,9,6


**Weekday vs Weekend Behavior**

In [0]:
from pyspark.sql.functions import hour, date_format, count, when

# Add hour and day_of_week
gold_weekday_weekend = silver_df \
    .withColumn("hour", hour("played_at")) \
    .withColumn("day_of_week", date_format("played_at", "EEEE")) \
    .withColumn(
        "day_type", 
        when(col("day_of_week").isin("Saturday", "Sunday"), "Weekend").otherwise("Weekday")
    ) \
    .groupBy("day_type", "hour") \
    .agg(count("*").alias("song_count")) \
    .orderBy("day_type", "hour")

display(gold_weekday_weekend)
gold_weekday_weekend.write.mode("overwrite").saveAsTable("gold_weekday_weekend")


day_type,hour,song_count
Weekday,0,1
Weekday,4,5
Weekday,5,9
Weekday,6,10
Weekday,7,9
Weekday,14,2
Weekday,15,3
Weekday,16,4
Weekend,8,1
Weekend,9,6


**Top Tracks**

In [0]:
from pyspark.sql.functions import count, col

gold_top_tracks = silver_df.groupBy("track") \
    .agg(count("*").alias("song_count")) \
    .orderBy(col("song_count").desc())

display(gold_top_tracks)

gold_top_tracks.write.mode("overwrite").saveAsTable("gold_top_tracks")

track,song_count
CANCELLED!,12
Actually Romantic,4
Red (Taylor's Version),3
Wood,3
Wi$h Li$t,3
The Fate of Ophelia,2
Give Your Heart a Break,2
Summertime Sadness,2
Ruin The Friendship,2
Father Figure,2


**Album Popularity**

In [0]:
from pyspark.sql.functions import count

gold_album_popularity = silver_df \
    .groupBy("album") \
    .agg(count("*").alias("song_count")) \
    .orderBy("song_count", ascending=False)

display(gold_album_popularity)
gold_album_popularity.write.mode("overwrite").saveAsTable("gold_album_popularity")


album,song_count
The Life of a Showgirl,33
Red (Taylor's Version),3
Unbroken,2
Born To Die - The Paradise Edition,2
Believe,2
I met you when I was 18. (the playlist),1
Demi,1
Animal (Expanded Edition),1
Young And Beautiful,1
SWAG,1
