# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


#### Optional: Run this cell to see available notebook commands ("magics").


####  Run this cell to set up and start your interactive session.


In [1]:
%idle_timeout 2880
%glue_version 5.0
%worker_type G.1X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.8 
Current idle_timeout is None minutes.
idle_timeout has been set to 2880 minutes.
Setting Glue version to: 5.0
Previous worker type: None
Setting new worker type to: G.1X
Previous number of workers: None
Setting new number of workers to: 5
Trying to create a Glue session for the kernel.
Session Type: glueetl
Worker Type: G.1X
Number of Workers: 5
Idle Timeout: 2880
Session ID: 26008ef0-6ac8-4760-a89d-7159f111b655
Applying the following default arguments:
--glue_kernel_version 1.0.8
--enable-glue-datacatalog true
Waiting for session 26008ef0-6ac8-4760-a89d-7159f111b655 to get into ready status...
Session 26008ef0-6ac8-4760-a89d-7159f111b655 ha

In [79]:
from pyspark.sql.functions import explode, col, to_date
from datetime import datetime

from awsglue.dynamicframe import DynamicFrame




In [4]:
path = "s3://spotify-etl-project-mohan/raw_data/to_processed/"
source_dyf = glueContext.create_dynamic_frame_from_options(
    connection_type="s3",
    connection_options={"paths": [path]},
    format="json"
)
dyf.printSchema()

root
|-- href: string
|-- items: array
|    |-- element: struct
|    |    |-- added_at: string
|    |    |-- added_by: struct
|    |    |    |-- external_urls: struct
|    |    |    |    |-- spotify: string
|    |    |    |-- href: string
|    |    |    |-- id: string
|    |    |    |-- type: string
|    |    |    |-- uri: string
|    |    |-- is_local: boolean
|    |    |-- primary_color: null
|    |    |-- track: struct
|    |    |    |-- preview_url: null
|    |    |    |-- available_markets: array
|    |    |    |    |-- element: string
|    |    |    |-- explicit: boolean
|    |    |    |-- type: string
|    |    |    |-- episode: boolean
|    |    |    |-- track: boolean
|    |    |    |-- album: struct
|    |    |    |    |-- available_markets: array
|    |    |    |    |    |-- element: string
|    |    |    |    |-- type: string
|    |    |    |    |-- album_type: string
|    |    |    |    |-- href: string
|    |    |    |    |-- id: string
|    |    |    |    |-- images: arr

In [38]:
spotify_df = source_dyf.toDF()
spotify_df.show()

+--------------------+--------------------+-----+----+------+--------+-----+
|                href|               items|limit|next|offset|previous|total|
+--------------------+--------------------+-----+----+------+--------+-----+
|https://api.spoti...|[{2025-03-07T07:4...|  100|NULL|     0|    NULL|   75|
+--------------------+--------------------+-----+----+------+--------+-----+


In [87]:
def process_albums(df):
    album_df = df.withColumn("items", explode("items")).select(
    col("items.track.album.id").alias("album_id"),
    col("items.track.album.name").alias("album_name"),
    col("items.track.album.release_date").alias("release_date"),
    col("items.track.album.total_tracks").alias("total_tracks"),
    col("items.track.album.external_urls.spotify").alias("url")
    ).drop_duplicates(['album_id'])

    album_df.withColumn("release_date", to_date("release_date", "yyyy-MM-dd"))
    
    return album_df


def process_artists(df):
    artist_df = df.withColumn("items", explode("items"))
    artist_df = artist_df.withColumn("artists", explode("items.track.artists")).select(
    col("artists.id").alias("artist_id"),
    col("artists.name").alias("artist_name"),
    col("artists.href").alias("external_url")
    ).drop_duplicates(["artist_id"])
    
    return artist_df


def process_songs(df):
    song_df = df.withColumn("item", explode("items")).select(
    col("item.track.id").alias("song_id"),
    col("item.track.name").alias("song_name"),
    col("item.track.duration_ms").alias("duration_ms"),
    col("item.track.external_urls.spotify").alias("url"),
    col("item.track.popularity").alias("popularity"),
    col("item.added_at").alias("song_added"),
    col("item.track.album.id").alias("album_id"),
    col("item.track.album.artists")[0]["id"].alias("artist_id")
    ).drop_duplicates(["song_id"])

    spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")

    song_df.withColumn("song_added", to_date("song_added", "yyyy-MM-dd"))
    
    return song_df




In [88]:
album_df = process_albums(spotify_df)
artist_df = process_artists(spotify_df)
song_df = process_songs(spotify_df)




In [75]:
def write_to_s3(df, path_suffix, format_type="csv"):
    dynamic_frame = DynamicFrame.fromDF(df, glueContext, "dynamic_frame")
    
    glueContext.write_dynamic_frame.from_options(
        frame=dynamic_frame,
        connection_type="s3",
        connection_options={"path": f"s3://spotify-etl-project-mohan/transformed_data/{path_suffix}/"},
        format=format_type
    )




In [80]:
write_to_s3(album_df, f"album_data/album_transformed_{datetime.now().strftime('%Y%m%d')}", "csv")




In [81]:
write_to_s3(artist_df, f"artist_data/artist_transformed_{datetime.now().strftime('%Y%m%d')}", "csv")




In [89]:
write_to_s3(song_df, f"songs_data/songs_transformed_{datetime.now().strftime('%Y%m%d')}", "csv")


