In [1]:
%idle_timeout 2880
%glue_version 5.0
%worker_type G.1X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.7 
Current idle_timeout is None minutes.
idle_timeout has been set to 2880 minutes.
Setting Glue version to: 5.0
Previous worker type: None
Setting new worker type to: G.1X
Previous number of workers: None
Setting new number of workers to: 5
Trying to create a Glue session for the kernel.
Session Type: glueetl
Worker Type: G.1X
Number of Workers: 5
Idle Timeout: 2880
Session ID: ce9a2fee-acc0-4766-99c4-bd4007e94f49
Applying the following default arguments:
--glue_kernel_version 1.0.7
--enable-glue-datacatalog true
Waiting for session ce9a2fee-acc0-4766-99c4-bd4007e94f49 to get into ready status...
Session ce9a2fee-acc0-4766-99c4-bd4007e94f49 ha

In [2]:
from pyspark.sql.functions import explode, col, to_date
from datetime import datetime




In [3]:
s3_path = 's3://soptify-etl-project-kkk/raw_data/to_be_processed/spotify_raw_2025-01-05 16:22:15.181294.json'
source_dyf = glueContext.create_dynamic_frame_from_options(
    connection_type = 's3',
    connection_options = {'paths':[s3_path]},
    format = 'json'
)
# source_dyf.show()
spotify_df = source_dyf.toDF()
spotify_df.show()

+--------------------+--------------------+-----+----+------+--------+-----+
|                href|               items|limit|next|offset|previous|total|
+--------------------+--------------------+-----+----+------+--------+-----+
|https://api.spoti...|[{2024-02-23T14:5...|  100|NULL|     0|    NULL|   62|
+--------------------+--------------------+-----+----+------+--------+-----+



In [30]:
def process_albums(spotify_df):
    df_album = spotify_df.withColumn('items', explode('items')).select(
        col('items.track.album.id').alias('album_id'),
        col('items.track.album.name').alias('album_name'),
        col('items.track.album.release_date').alias('release_date'),
        col('items.track.album.total_tracks').alias('total_tracks'),
        col('items.track.album.external_urls.spotify').alias('url')
        ).drop_duplicates(['album_id'])
    return df_album

def process_artists(spotify_df):
    df_artist_exploded = spotify_df.select(explode(col('items')).alias('item')).select(explode(col('item.track.artists')).alias('artist'))
    df_artists = df_artist_exploded.select(
        col('artist.id').alias('artist_id'),
        col('artist.name').alias('artist_name'),
        col('artist.external_urls.spotify').alias('external_url')
        ).drop_duplicates(['artist_id'])
    return df_artists

def process_songs(spotify_df):
    df_explode = spotify_df.select(explode(col('items')).alias('item'))
    df_songs = df_explode.select(
        col('item.track.id').alias('song_id'),
        col('item.track.name').alias('song_name'),
        col('item.track.duration_ms').alias('duration_ms'),
        col('item.track.external_urls.spotify').alias('url'),
        col('item.track.popularity').alias('popularity'),
        col('item.added_at').alias('song_added'),
        col('item.track.album.id').alias('album_id'),
        col('item.track.artists')[0]['id'].alias('artist_id')
        ).drop_duplicates(['song_id'])
    df_songs = df_songs.withColumn('song_added', to_date(col('song_added')))
    return df_songs





In [18]:
album_df = process_albums(spotify_df)
album_df.show(5)

+--------------------+--------------------+------------+------------+--------------------+
|            album_id|          album_name|release_date|total_tracks|                 url|
+--------------------+--------------------+------------+------------+--------------------+
|03GXBC2T6YdaBZo75...|Montagem Sonic Ri...|  2024-05-15|           1|https://open.spot...|
|06a7H7nusNMvM7yL8...|                AURA|  2024-07-12|           3|https://open.spot...|
|09xRsRc7pX399wCGE...|      BRUXO FANTASMA|  2024-08-16|           4|https://open.spot...|
|0TwAp1jAUFp1PQtdD...|              GHOST!|  2022-12-02|           1|https://open.spot...|
|0ZE5fMneYtdl93X1G...|            BYE BYE!|  2024-01-12|           3|https://open.spot...|
+--------------------+--------------------+------------+------------+--------------------+
only showing top 5 rows


In [7]:
artist_df = process_artists(spotify_df)
artist_df.show(5)

+--------------------+------------+--------------------+
|           artist_id| artist_name|        external_url|
+--------------------+------------+--------------------+
|07MaeHw0cyfmgQ9D9...|DJ JEEAN 011|https://open.spot...|
|09cKncAQn28NqTUOR...|       Ariis|https://open.spot...|
|0H942IkjXv9bjx5Ox...|      jnhygs|https://open.spot...|
|0T82thOoh3ksJmLJv...|     dawnicy|https://open.spot...|
|0ayuHZiwRQ5jXBaGG...|      DJ RIO|https://open.spot...|
+--------------------+------------+--------------------+
only showing top 5 rows


In [31]:
song_df = process_songs(spotify_df)
song_df.show(5)

+--------------------+--------------------+-----------+--------------------+----------+----------+--------------------+--------------------+
|             song_id|           song_name|duration_ms|                 url|popularity|song_added|            album_id|           artist_id|
+--------------------+--------------------+-----------+--------------------+----------+----------+--------------------+--------------------+
|02WKo37gebkkQKor2...|Beeper Funk - Slowed|     103375|https://open.spot...|        71|2024-10-10|5eEfzNcFaXPsBXJzk...|648ZoUpTM6iIMTCgo...|
|02oUvHd9QhVUO4YYb...|METAMORPHOSIS - S...|     172219|https://open.spot...|        63|2024-10-07|4z0nL9bblVphvhmt8...|5hKGLu4Ik88FzWcTP...|
|0G17UriYHMjXnZE2O...|SUNRISE (Slowed +...|     138857|https://open.spot...|        67|2025-01-01|2tiHE58yMuZI1wiyg...|2rgcNuLkn8pPBdKZh...|
|0RgKtaVv27Nff2y29...|           ICEWHORE!|      79052|https://open.spot...|        68|2024-05-15|5J00ADHG1jlJiLjQD...|1TTHC3GlNDaE5eVoC...|
|0iaa1DkqOki4

In [22]:
def write_to_s3(df, path_suffix, format_type='csv'):
    dynamic_frame = DynamicFrame.fromDF(df, glueContext, 'dynamic_frame')

    glueContext.write_dynamic_frame.from_options(
        frame = dynamic_frame,
        connection_type = 's3',
        connection_options = {'path' : f's3://soptify-etl-project-kkk/transformed_data/{path_suffix}/'},
        format = format_type)




In [20]:
write_to_s3(album_df, 'album/album_transformed_{}'.format(datetime.now().strftime('%Y-%m-%d')), 'csv')




In [23]:
write_to_s3(artist_df, 'artist/artist_transformed_{}'.format(datetime.now().strftime('%Y-%m-%d')), 'csv')




In [32]:
write_to_s3(song_df, 'song/song_transformed_{}'.format(datetime.now().strftime('%Y-%m-%d')), 'csv')




In [5]:
#album_df.printSchema()