In [1]:
%idle_timeout 2880
%glue_version 4.0
%worker_type G.1X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.5 
Current idle_timeout is None minutes.
idle_timeout has been set to 2880 minutes.
Setting Glue version to: 4.0
Previous worker type: None
Setting new worker type to: G.1X
Previous number of workers: None
Setting new number of workers to: 5
Trying to create a Glue session for the kernel.
Session Type: glueetl
Worker Type: G.1X
Number of Workers: 5
Idle Timeout: 2880
Session ID: 0de4aacb-5822-48df-938e-2c1614357e54
Applying the following default arguments:
--glue_kernel_version 1.0.5
--enable-glue-datacatalog true
Waiting for session 0de4aacb-5822-48df-938e-2c1614357e54 to get into ready status...
Session 0de4aacb-5822-48df-938e-2c1614357e54 ha

In [5]:
s3_path="s3://spotify-etl-project-shivasai/raw_data/to_processed/"




In [63]:
from pyspark.sql.functions import explode, col,to_date
from datetime import datetime
from awsglue.dynamicframe import DynamicFrame




In [6]:
source_dyf=glueContext.create_dynamic_frame.from_options( 
    connection_type="s3", 
    connection_options={"paths": [s3_path]}, 
    format="json")
#This is how we will read the json file in this bucket




In [8]:
spotify_df=source_dyf.toDF()



In [55]:
def process_album(df):
    #This is For Album
    df=df.withColumn("items", explode("items")).select(col("items.track.album.id").alias("album_id"),
                                                            col("items.track.album.name").alias("album_name"),
                                                          col("items.track.album.release_date").alias("release_date"),
                                                          col("items.track.album.total_tracks").alias("total_tracks"),
                                                          col("items.track.album.external_urls.spotify").alias("album_url")
                                                      ).dropDuplicates(['album_id'])
    return df

def process_artist(df):
    #This is for artists
    
    #Explode the items
    df_items_exploded=df.select(explode(col("items")).alias("item"))
    
    #Explode the artists
    df_artists_exploded=df_items_exploded.select(explode(col("item.track.artists")).alias("artist"))
    
    #Now select the artists
    df_artists=df_artists_exploded.select(
        col("artist.id").alias("artist_id"), 
        col("artist.name").alias("artist_name"),
        col("artist.external_urls.spotify").alias("artist_url")
    ).dropDuplicates(['artist_id'])
    
    return df_artists

def process_songs(df):
    #Explode the items array to create a row for each song
    df_exploded=df.select(explode(col("items")).alias("item"))
    
    df_songs=df_exploded.select(
        col("item.track.id").alias("song_id"), 
        col("item.track.name").alias("song_name"), 
        col("item.track.duration_ms").alias("duration_ms"), 
        col("item.track.external_urls.spotify").alias("url"),
        col("item.track.popularity").alias("popularity"),
        col("item.added_at").alias("song_added"),
        col("item.track.album.id").alias("album_id"),
        col("item.track.artists")[0]["id"].alias("artist_id")
    ).dropDuplicates(['song_id'])   
    
    #convert string dates in 'song added'to actual date types
    df_songs=df_songs.withColumn("song_added", to_date(col("song_added")))
    
    return df_songs
    
                                                           
    




In [58]:
album_df=process_album(spotify_df)
artist_df=process_artist(spotify_df)
song_df=process_songs(spotify_df)




In [59]:
album_df.show(5)

+--------------------+--------------------+------------+------------+--------------------+
|            album_id|          album_name|release_date|total_tracks|           album_url|
+--------------------+--------------------+------------+------------+--------------------+
|0DLvFVIfwt3OHdK9k...|Where I've Been, ...|  2024-05-31|          12|https://open.spot...|
|0EiI8ylL0FmWWpgHV...|The Rise and Fall...|  2023-09-22|          14|https://open.spot...|
|0Wmt50XH9EZvSuML0...|Neva Play (feat. ...|  2024-09-06|           1|https://open.spot...|
|10FLjwfpbxLmW8c25...|    Die With A Smile|  2024-08-16|           1|https://open.spot...|
|15XcLhiVMlSOipUdd...|                MUSE|  2024-07-19|           7|https://open.spot...|
+--------------------+--------------------+------------+------------+--------------------+
only showing top 5 rows


In [60]:
artist_df.show(5)

+--------------------+------------+--------------------+
|           artist_id| artist_name|          artist_url|
+--------------------+------------+--------------------+
|06HL4z0CvFAxyc27G...|Taylor Swift|https://open.spot...|
|0PCCGZ0wGLizHt2KZ...|     Artemas|https://open.spot...|
|0Y5tJX1MQlPlqiwlO...|Travis Scott|https://open.spot...|
|0du5cEVh5yTK9QJze...|  Bruno Mars|https://open.spot...|
|12GqGscKJx3aE4t07...|  Peso Pluma|https://open.spot...|
+--------------------+------------+--------------------+
only showing top 5 rows


In [61]:
song_df.show(5)

+--------------------+-------------------+-----------+--------------------+----------+----------+--------------------+--------------------+
|             song_id|          song_name|duration_ms|                 url|popularity|song_added|            album_id|           artist_id|
+--------------------+-------------------+-----------+--------------------+----------+----------+--------------------+--------------------+
|0OA00aPt3BV10qeMI...|          Big Dawgs|     190666|https://open.spot...|        93|2024-09-20|6Yw4204wbgmpsGTzj...|4nVa6XlBFlIkF6msW...|
|0UYnhUfnUj5adChuA...|        Sailor Song|     211978|https://open.spot...|        89|2024-09-20|4DWrYvfGXRE8ko5Zx...|1iCnM8foFssWlPRLf...|
|0WbMK4wrZ1wFSty9F...|   Good Luck, Babe!|     218423|https://open.spot...|        97|2024-09-20|1WAjjRMfZjEXtB0lQ...|7GlBOeep6PqTfFi59...|
|0nJW01T7XtvILxQgC...|When I Was Your Man|     213826|https://open.spot...|        89|2024-09-20|58ufpQsJ1DS5kq4hh...|0du5cEVh5yTK9QJze...|
|17phhZDn6oGtzMe56..

In [64]:
def write_to_s3(df, path_suffix,format_type="csv"):
    #From the spark data framewe are creating the Dynamic Frame
    dynamic_frame=DynamicFrame.fromDF(df, glueContext, "dynamic_frame")
    
    #Writes the dynamic frame in the specific format
    glueContext.write_dynamic_frame.from_options(
        frame=dynamic_frame,
        connection_type="s3",
        connection_options={"path": f"s3://spotify-etl-project-shivasai/transformed_data/{path_suffix}/" },
        format=format_type 
    )




In [66]:
write_to_s3(album_df, f"albums_data/album_transformed_{datetime.now().strftime('%Y-%m-%d')}","csv")




In [67]:
write_to_s3(artist_df, f"artists_data/artist_transformed_{datetime.now().strftime('%Y-%m-%d')}","csv")
write_to_s3(song_df, f"songs_data/songs_transformed_{datetime.now().strftime('%Y-%m-%d')}","csv")


