In [45]:
import spotipy
from spotipy.oauth2 import SpotifyClientCredentials
from datetime import datetime
from pyspark.sql.functions import to_date
from pyspark.sql.types import DateType

In [46]:
from pyspark.sql import SparkSession

try:
    spark = SparkSession.builder.appName('SpotifyETL-dev-20230911')\
            .config('spark.app.name', 'SpotifyETL-dev-20230911')\
            .getOrCreate()
    
    # Your PySpark code here
    
except Exception as e:
    # Handle exceptions
    print(f"Spark Session builder failed {e}")

# Continue with the rest of your PySpark code...


In [47]:
# Spotify client credentials
client_id = "43964785e73c43698f52de9f8ea49068"
client_secret = "b5c8bf24d8454f50a9fdb8fa86f0d92c"

try:
    # Initialize Spotify client
    credentials_manager = SpotifyClientCredentials(client_id=client_id, client_secret=client_secret)
    spotify_client = spotipy.Spotify(client_credentials_manager=credentials_manager)
    
    # Your Spotify API calls or other code here
    
except spotipy.SpotifyException as e:
    # Handle Spotipy-specific exceptions
    print(f"Spotipy Error: {e}")
except Exception as e:
    # Handle other exceptions
    print(f"An error occurred: {e}")

# Continue with the rest of your code...


In [48]:
# Define the Spotify playlist link
playlist_link = "https://open.spotify.com/playlist/37i9dQZEVXbLiRSasKsNU9"

# Extract the playlist URI from the link
try:
    playlist_uri = playlist_link.split("/")[-1]
except IndexError as e:
    print(f"Error extracting playlist URI: {e}")
    playlist_uri = None

# Fetch playlist tracks using the Spotify API
try:
    if playlist_uri:
        data = sp.playlist_tracks(playlist_uri)
    else:
        print("No valid playlist URI found.")
except spotipy.SpotifyException as e:
    # Handle Spotipy-specific exceptions
    print(f"Spotipy Error: {e}")
except Exception as e:
    # Handle other exceptions
    print(f"An error occurred while fetching playlist tracks: {e}")


In [49]:
# Initialize an empty list to store album information
album_list = []

try:
    # Iterate over the 'items' in the 'data' object
    for item in data['items']:
        # Extract album information from the nested structure within 'item'
        try:
            id = item['track']['album']['id']
            name = item['track']['album']['name']
            release_date = item['track']['album']['release_date']
            total_tracks = item['track']['album']['total_tracks']
            external_urls = item['track']['album']['external_urls']['spotify']

            # Create a dictionary 'album_element' to store the extracted information
            album_element = {
                'id': id,
                'name': name,
                'release_date': release_date,
                'total_tracks': total_tracks,
                'external_urls': external_urls
            }

            # Append 'album_element' to the 'album_list'
            album_list.append(album_element)
        except KeyError as e:
            print(f"KeyError occurred while extracting album information: {e}")
            # You can choose to skip this item or handle the error differently
        
except Exception as e:
    # Handle other exceptions that may occur during the iteration
    print(f"An error occurred during the iteration: {e}")

# 'album_list' now contains the extracted album information
type(album_list)


list

In [50]:
# Initialize an empty list for artist information
artist_list = []

# Extract artist data from the 'data' object
for item in data['items']:
    for key, value in item.items():
        if key == 'track':
            for artist in value['artists']:
                # Create a dictionary with artist details
                artist_dict = {
                    'artist_id': artist['id'],
                    'artist_name': artist['name'],
                    'external_url': artist['href']
                }
                artist_list.append(artist_dict)

# 'artist_list' now contains artist information


In [51]:
# Initialize an empty list to store song information
song_list = []

# Loop through each item in the 'data' dictionary's 'items' list
for row in data['items']:
    # Extract various attributes of the song from the 'row' dictionary
    song_id = row['track']['id']
    song_name = row['track']['name']
    song_duration = row['track']['duration_ms']
    song_url = row['track']['external_urls']['spotify']
    song_popularity = row['track']['popularity']
    song_added = row['added_at']
    album_id = row['track']['album']['id']
    artist_id = row['track']['album']['artists'][0]['id']
    
    # Create a dictionary representing a song element with extracted attributes
    song_element = {
        'song_id': song_id,
        'song_name': song_name,
        'song_duration': song_duration,
        'song_url': song_url,
        'song_popularity': song_popularity,
        'song_added': song_added,
        'album_id': album_id,
        'artist_id': artist_id
    }
    
    # Append the song element to the 'song_list'
    song_list.append(song_element)


In [52]:
album_df = spark.createDataFrame(album_list)
album_df.printSchema()

root
 |-- external_urls: string (nullable = true)
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- total_tracks: long (nullable = true)



In [53]:
# Create Spark DataFrames from dictionaries and drop duplicates in a single line
album_df = spark.createDataFrame(album_list).dropDuplicates(subset=['id'])
song_df = spark.createDataFrame(song_list).dropDuplicates(subset=['song_id'])
artist_df = spark.createDataFrame(artist_list).dropDuplicates(subset=['artist_id'])

In [54]:
# Convert date columns to datetime objects
album_df = album_df.withColumn("release_date", to_date(album_df["release_date"], "yyyy-MM-dd").cast(DateType()))
song_df = song_df.withColumn("song_added", to_date(song_df["song_added"], "yyyy-MM-dd").cast(DateType()))
