In [1]:
%idle_timeout 2880
%glue_version 4.0
%worker_type G.1X
%number_of_workers 5
%additional_python_modules spotipy==2.19.0

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.5 
Current idle_timeout is None minutes.
idle_timeout has been set to 2880 minutes.
Setting Glue version to: 4.0
Previous worker type: None
Setting new worker type to: G.1X
Previous number of workers: None
Setting new number of workers to: 5
Additional python modules to be included:
spotipy==2.19.0
pandas==1.3.3
Trying to create a Glue session for the kernel.
Session Type: glueetl
Worker Type: G.1X
Number of Workers: 5
Idle Timeout: 2880
Session ID: 1b727efa-3c3c-4a7e-adf0-94699e084d6d
Applying the following default arguments:
--glue_kernel_version 1.0.5
--enable-glue-datacatalog true
--additional-python-modules spotipy==2.19.0,pandas==1.3.3
Wa

In [85]:
from pyspark.sql.functions import explode, col, udf, when, size, concat_ws, collect_list, element_at, to_date
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from datetime import datetime
import os




In [3]:
import spotipy
from spotipy.oauth2 import SpotifyClientCredentials




In [4]:
client_credentials_manager = SpotifyClientCredentials(client_id = '5f54ff961cbf41608b9c6eb7d7613e05', client_secret = '49622debd6274f2db7f52db6dad72de3')

spAPI = spotipy.Spotify(client_credentials_manager = client_credentials_manager)




In [5]:
s3_path = "s3://spotify-etl-project-tqx/raw_data/to_processed/"
source_dyf = glueContext.create_dynamic_frame.from_options(
    connection_type = "s3",
    connection_options = {"paths": [s3_path]},
    format="json"
)




In [6]:
spotify_df = source_dyf.toDF()



In [7]:
spotify_df.show(5)

+--------------------+--------------------+-----+----+------+--------+-----+
|                href|               items|limit|next|offset|previous|total|
+--------------------+--------------------+-----+----+------+--------+-----+
|https://api.spoti...|[{2024-10-11T12:3...|  100|null|     0|    null|   50|
+--------------------+--------------------+-----+----+------+--------+-----+


In [8]:
df = spotify_df




In [140]:
def process_albums(df):
    df = df.withColumn("items", explode("items")).select(
        col("items.track.album.id").alias("album_id"),
        col("items.track.album.name").alias("album_name"),
        col("items.track.album.release_date").alias("release_date"),
        col("items.track.album.total_tracks").alias("total_tracks"),
        col("items.track.album.external_urls.spotify").alias("url")
    ).drop_duplicates(["album_id"])
    return df




In [20]:
def get_artist_info(artist_id):
    artist_info = []
    try:
        artist_info = spAPI.artist(artist_id)
        genres = ", ".join(artist_info["genres"])
        followers = artist_info['followers']['total']
        popularity = artist_info['popularity']
        return genres, followers, popularity
    except:
        return "", 0, 0

# register the UDF and specify return types for each field
# skip explicitly defining the StructType in the UDF, and simply return a Python dictionary or tuple. However, it is a good practice to define the return type for UDFs in PySpark, as it makes the code more efficient and helps avoid type-related errors. Simply use: udf(get_artist_info)
@udf(returnType=StructType([
    StructField("genres", StringType(), True),
    StructField("followers", IntegerType(), True),
    StructField("popularity", IntegerType(), True)
]))

def artist_info_udf(artist_id):
    return get_artist_info(artist_id)




In [141]:
def process_artists(df):
    df_items_exploded = df.select(explode(col("items")).alias("item"))
    
    artists_df_exploded = df_items_exploded.select(
        explode(col("item.track.artists")).alias("artist"))
    
    artists_df = artists_df_exploded.select(
        col("artist.id").alias("artist_id"),
        col("artist.name").alias("artist_name"),
        col("artist.external_urls.spotify").alias("url")
    ).drop_duplicates(["artist_id"])
    
    artists_df = artists_df.withColumn("artist_info", artist_info_udf(col("artist_id")))
    
    artists_df = artists_df.select(
        col("artist_id"),
        col("artist_name"),
        col("url"),
        col("artist_info.genres").alias("artist_genres"),
        col("artist_info.followers").alias("artist_followers"),
        col("artist_info.popularity").alias("popularity")
    )
    
    return artists_df




In [None]:
def process_songs(df):
    df_items_exploded = df.select(explode(col("items")).alias("item"))
    
    df_songs_exploded = df_items_exploded.withColumn(
        "artist_id",
        explode(col("item.track.artists.id"))
    )

    # Create a new DataFrame that has track_id and artist_id
    df_songs_artist_exploded = df_songs_exploded.select(
        col("item.track.id").alias("song_id"),
        col("artist_id")
    )

    df_song_artist = df_songs_artist_exploded.groupBy("song_id") \
            .agg(concat_ws(", ", collect_list("artist_id")).alias("artist_ids"))
    
    df_songs = df_items_exploded.select(
        col("item.track.id").alias("song_id"),
        col("item.track.name").alias("song_name"),
        col("item.track.duration_ms").alias("duration_ms"),
        col("item.track.popularity").alias("popularity"),
        col("item.track.external_urls.spotify").alias("url"),
        col("item.added_at").alias("song_added"),
        col("item.track.album.id").alias("album_id")
    ).drop_duplicates(["song_id"])
    
    df_songs = df_songs.withColumn("song_added", to_date(col("song_added")))
    final_df = df_songs.join(df_song_artist, on="song_id", how="inner")
    
    return final_df

In [142]:
album_df = process_albums(spotify_df)
album_df.show(5)

+--------------------+--------------------+------------+------------+--------------------+
|            album_id|          album_name|release_date|total_tracks|                 url|
+--------------------+--------------------+------------+------------+--------------------+
|0DLvFVIfwt3OHdK9k...|Where I've Been, ...|  2024-05-31|          12|https://open.spot...|
|0EiI8ylL0FmWWpgHV...|The Rise and Fall...|  2023-09-22|          14|https://open.spot...|
|0XA403JTounqFh2ow...|          Diet Pepsi|  2024-08-09|           1|https://open.spot...|
|0lgs2Sa82lyX89nBU...|      FERXXOCALIPSIS|  2023-12-01|          10|https://open.spot...|
|0mV9Pfr1GfoZLkp1E...|          Embrace It|  2024-08-23|           1|https://open.spot...|
+--------------------+--------------------+------------+------------+--------------------+
only showing top 5 rows


In [143]:
artist_df = process_artists(spotify_df)
artist_df.show(5)

+--------------------+------------+--------------------+--------------------+----------------+----------+
|           artist_id| artist_name|                 url|       artist_genres|artist_followers|popularity|
+--------------------+------------+--------------------+--------------------+----------------+----------+
|06HL4z0CvFAxyc27G...|Taylor Swift|https://open.spot...|                 pop|       123615685|       100|
|0PCCGZ0wGLizHt2KZ...|     Artemas|https://open.spot...|                    |          956224|        80|
|0Y5tJX1MQlPlqiwlO...|Travis Scott|https://open.spot...|     rap, slap house|        33635798|        94|
|0du5cEVh5yTK9QJze...|  Bruno Mars|https://open.spot...|      dance pop, pop|        60240128|        94|
|12GqGscKJx3aE4t07...|  Peso Pluma|https://open.spot...|corridos tumbados...|        16635683|        93|
+--------------------+------------+--------------------+--------------------+----------------+----------+
only showing top 5 rows


In [144]:
song_df = process_songs(spotify_df)
song_df.show(5)

+--------------------+--------------------+-----------+----------+--------------------+--------------------+--------------------+--------------------+
|             song_id|           song_name|duration_ms|popularity|                 url|          song_added|            album_id|          artist_ids|
+--------------------+--------------------+-----------+----------+--------------------+--------------------+--------------------+--------------------+
|0IsIY8pfu1yaGkPUD...|Guess featuring b...|     143330|        70|https://open.spot...|2024-10-11T12:30:50Z|36P07bti6xD99o7S1...|25uiPmTg16RbhZWAq...|
|0Sr7ssScx54yxdM2o...|Q U E V A S H A C...|     224022|        87|https://open.spot...|2024-10-11T12:30:50Z|3C5uwdRE5QRoXSGPP...|3E12tRURRvPfHz0hA...|
|0WbMK4wrZ1wFSty9F...|    Good Luck, Babe!|     218423|        95|https://open.spot...|2024-10-11T12:30:50Z|1WAjjRMfZjEXtB0lQ...|7GlBOeep6PqTfFi59...|
|0io16MKpbeDIdYzmG...|          Embrace It|     104418|        88|https://open.spot...|2024-10

In [145]:
def write_to_s3(df, path_suffix, format_type="csv"):
    dynamic_frame = DynamicFrame.fromDF(df, glueContext, "dynamic_frame")
    
    glueContext.write_dynamic_frame.from_options(
        frame = dynamic_frame,
        connection_type = "s3",
        connection_options = {"path": f"s3://spotify-etl-project-tqx/transformed_data/{path_suffix}/"}, 
        format = format_type
    )




In [146]:
write_to_s3(album_df, "album_data/album_transformed_{}".format(datetime.now().strftime("%Y%m%d")), "csv")




In [None]:
write_to_s3(artist_df, "artist_data/artist_transformed_{}".format(datetime.now().strftime("%Y%m%d")), "csv")

In [1]:
write_to_s3(song_df, "song_data/song_transformed_{}".format(datetime.now().strftime("%Y%m%d")), "csv")

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.5 
Trying to create a Glue session for the kernel.
Session Type: glueetl
Session ID: b9e3ad74-addc-4b98-a69c-888dc9a44516
Applying the following default arguments:
--glue_kernel_version 1.0.5
--enable-glue-datacatalog true
Waiting for session b9e3ad74-addc-4b98-a69c-888dc9a44516 to get into ready status...
Session b9e3ad74-addc-4b98-a69c-888dc9a44516 has been created.
NameError: name 'write_to_s3' is not defined
