# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


### Optional: Run this cell to see available notebook commands ("magics").


In [18]:
# %help

####  Run this cell to set up and start your interactive session.


In [58]:
%idle_timeout 2880
%glue_version 4.0
%worker_type G.1X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import explode, col

from datetime import datetime

You are already connected to a glueetl session e2aa1560-6c13-4c84-bfc2-973d27da9c92.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Current idle_timeout is 2880 minutes.
idle_timeout has been set to 2880 minutes.


You are already connected to a glueetl session e2aa1560-6c13-4c84-bfc2-973d27da9c92.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Setting Glue version to: 4.0


You are already connected to a glueetl session e2aa1560-6c13-4c84-bfc2-973d27da9c92.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Previous worker type: G.1X
Setting new worker type to: G.1X


You are already connected to a glueetl session e2aa1560-6c13-4c84-bfc2-973d27da9c92.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Previous number of workers: 5
Setting new number of workers to: 5



### Creating a dynamic frame using method


In [45]:
dyf = glueContext.create_dynamic_frame_from_options(
  connection_type="s3",
  connection_options={"paths": ["s3://s2passi-spotipy-project/raw-data/to-be-processed/"],
                      },
  format="json"
)

df = dyf.toDF()




In [54]:
def process_albums(df):
    df = df.withColumn("items", explode("items")).select(col("items.track.album.id").alias("album_id"),
                                                col("items.track.album.name").alias("album_name"),
                                                col("items.track.album.release_date").alias("album_release_date"),
                                                col("items.track.album.total_tracks").alias("album_total_tracks"),
                                                col("items.track.album.external_urls.spotify").alias("album_url"),
                                                col("items.track.album.href").alias("album_href")).drop_duplicates(subset=["album_id"])
    return df

def process_artists(df):
    df1 = df.withColumn("items", explode("items")).select(col("items.track.artists").alias("artists"))
    df2 = df1.withColumn("artists", explode("artists")).select(col("artists.id").alias("artist_id"),
                                                col("artists.name").alias("artist_name"),
                                                col("artists.uri").alias("artist_uri"),
                                                col("artists.external_urls.spotify").alias("artist_spotify_url"),
                                                col("artists.href").alias("artist_href")).drop_duplicates(subset=["artist_id"])
    return df2

def process_tracks(df):
    df1 = df.withColumn("items", explode("items")).select(col("items.track.id").alias("track_id"),
                                                col("items.track.name").alias("track_name"),
                                                col("items.track.popularity").alias("track_popularity"),
                                                col("items.track.uri").alias("track_uri"),
                                                col("items.track.duration_ms").alias("track_duration_ms"),
                                                col("items.track.external_urls.spotify").alias("track_url"),
                                                col("items.track.href").alias("track_href"))
    return df1





In [60]:
def write_to_s3(df, path_suffix, format_type="csv"):
    dyf = DynamicFrame.fromDF(df, glueContext, "dyf")
    glueContext.write_dynamic_frame.from_options(
        frame=dyf,
        connection_type="s3",
        connection_options={"path": f"s3://s2passi-spotipy-project/transformed-data/{path_suffix}/"},
        format= format_type
    )




In [61]:
write_to_s3(process_albums(df), "album-data/album_transformed_{}".format(datetime.now().strftime("%Y-%m-%d")), "csv")


