In [3]:
%idle_timeout 2880
%glue_version 5.0
%worker_type G.1X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

You are already connected to a glueetl session d3382297-a6ce-41e4-bd3b-224619b77067.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Current idle_timeout is 2880 minutes.
idle_timeout has been set to 2880 minutes.


You are already connected to a glueetl session d3382297-a6ce-41e4-bd3b-224619b77067.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Setting Glue version to: 5.0


You are already connected to a glueetl session d3382297-a6ce-41e4-bd3b-224619b77067.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Previous worker type: G.1X
Setting new worker type to: G.1X


You are already connected to a glueetl session d3382297-a6ce-41e4-bd3b-224619b77067.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Previous number of workers: 5
Setting new number of workers to: 5



In [7]:
s3_path = "s3://aws-spotify-etl-pipeline/raw_data/to_processed/"
source_dyf = glueContext.create_dynamic_frame_from_options(
    connection_type="s3",
    connection_options={"paths":[s3_path]},
    format="json"
)




In [11]:
from pyspark.sql.functions import explode, col, to_date
from datetime import datetime
from awsglue.dynamicframe import DynamicFrame




In [8]:
spotify_df = source_dyf.toDF()



In [12]:
spotify_df.show()

+--------------------+--------------------+-----+----+------+--------+-----+
|                href|               items|limit|next|offset|previous|total|
+--------------------+--------------------+-----+----+------+--------+-----+
|https://api.spoti...|[{2024-02-15T21:5...|  100|NULL|     0|    NULL|  100|
|https://api.spoti...|[{2024-02-15T21:5...|  100|NULL|     0|    NULL|  100|
+--------------------+--------------------+-----+----+------+--------+-----+


In [15]:
def process_albums(df):
    df = df.withColumn("items", explode("items")).select(
        col("items.track.album.id").alias("album_id"),
        col("items.track.album.name").alias("album_name"),
        col("items.track.album.release_date").alias("release_date"),
        col("items.track.album.total_tracks").alias("total_tracks"),
        col("items.track.album.external_urls.spotify").alias("url")
    ).drop_duplicates(["album_id"])
    return df


def process_artists(df):
    # First, explode the items to get individual tracks
    df_items_exploded = df.select(explode(col("items")).alias("item"))
    
    # Then, explode the artists array within each item to create a row for each artist
    df_artists_exploded = df_items_exploded.select(explode(col("item.track.artists")).alias("artist"))
    
    # Now, select the artist attributes, ensuring each artist is in its own row
    df_artists = df_artists_exploded.select(
        col("artist.id").alias("artist_id"),
        col("artist.name").alias("artist_name"),
        col("artist.external_urls.spotify").alias("external_url")
    ).drop_duplicates(["artist_id"])
    
    return df_artists


def process_songs(df):
    # Explode the items array to create a row for each song
    df_exploded = df.select(explode(col("items")).alias("item"))
    
    # Extract song information from the exploded DataFrame
    df_songs = df_exploded.select(
        col("item.track.id").alias("song_id"),
        col("item.track.name").alias("song_name"),
        col("item.track.duration_ms").alias("duration_ms"),
        col("item.track.external_urls.spotify").alias("url"),
        col("item.track.popularity").alias("popularity"),
        col("item.added_at").alias("song_added"),
        col("item.track.album.id").alias("album_id"),
        col("item.track.artists")[0]["id"].alias("artist_id")
    ).drop_duplicates(["song_id"])
    
    # Convert string dates in 'song_added' to actual date types
    df_songs = df_songs.withColumn("song_added", to_date(col("song_added")))
    
    return df_songs




In [18]:
album_df = process_albums(spotify_df)
album_df.show(5)

+--------------------+----------+------------+------------+--------------------+
|            album_id|album_name|release_date|total_tracks|                 url|
+--------------------+----------+------------+------------+--------------------+
|02h9kO2oLKnLtycgb...|      True|  2013-01-01|          12|https://open.spot...|
|02sEJTj1sye1Jaqxq...|    Encore|  2016-08-05|          14|https://open.spot...|
|05vYDLFFu5Frm6QUY...| The Raven|  2025-03-03|           1|https://open.spot...|
|07w0rG5TETcyihsEI...|       SOS|  2022-12-09|          23|https://open.spot...|
|09fggMHib4YkOtwQN...|   Starboy|  2016-11-25|          18|https://open.spot...|
+--------------------+----------+------------+------------+--------------------+
only showing top 5 rows


In [20]:
artist_df = process_artists(spotify_df)
song_df = process_songs(spotify_df)




In [22]:
song_df.show(5)

+--------------------+--------------------+-----------+--------------------+----------+----------+--------------------+--------------------+
|             song_id|           song_name|duration_ms|                 url|popularity|song_added|            album_id|           artist_id|
+--------------------+--------------------+-----------+--------------------+----------+----------+--------------------+--------------------+
|003vvx7Niy0yvhvHt...|      Mr. Brightside|     222973|https://open.spot...|        89|2024-02-15|4piJq7R3gjUOxnYs6...|0C0XlULifJtAgn6ZN...|
|0B7wvvmu9EISAwZnO...| When I Was Your Man|     213826|https://open.spot...|        52|2024-02-15|4xWulj18AGahlyuZP...|0du5cEVh5yTK9QJze...|
|0BuI0MtLtHLjPtNnb...|Welcome to My Wor...|     163217|https://open.spot...|        49|2025-07-02|1Tkg0Z50vPrhYvHrK...|4AnulnIsdMeFSGN8F...|
|0CcQNd8CINkwQfe1R...|            Believer|     204346|https://open.spot...|         6|2024-02-15|5GlPAy2PRJW06GVFh...|53XhwfbYqKCa1cC15...|
|0KKkJNfGyhkQ

In [24]:
def write_to_s3(df, path_suffix, format_type="csv"):
    # Convert back to DynamicFrame
    dynamic_frame = DynamicFrame.fromDF(df, glueContext, "dynamic_frame")
    
    glueContext.write_dynamic_frame.from_options(
        frame = dynamic_frame,
        connection_type = "s3",
        connection_options = {"path": f"s3://aws-spotify-etl-pipeline/transformed_data/{path_suffix}/"},
        format = format_type
    )




In [26]:
write_to_s3(album_df, "album_data/album_transformed_{}".format(datetime.now().strftime("%Y-%m-%d")), "csv")




In [27]:
write_to_s3(artist_df, "artist_data/artist_transformed_{}".format(datetime.now().strftime("%Y-%m-%d")), "csv")
write_to_s3(song_df, "songs_data/songs_transformed_{}".format(datetime.now().strftime("%Y-%m-%d")), "csv")


