In [None]:
import spotipy
from spotipy.oauth2 import SpotifyOAuth
import re
import time
import os
from dotenv import load_dotenv
import json
import pandas as pd
from tqdm import tqdm
from datetime import datetime
from tqdm import tqdm
import calendar
import random
import numpy as np
import base64
import requests

In [None]:
load_dotenv()

# Defining Spotify export data location
data_directory = os.getenv('EXPORT_PATH')

# Creating a new directory in order to better organize the pickle files to be created
pickle_directory = os.path.join(os.getcwd(),'pickle_files')
os.makedirs(pickle_directory, exist_ok=True)

## Extract information from spotify account data export

### Identify streaming history music files

In [None]:
def find_streaming_history_files(directory):
    try:
        all_files = os.listdir(directory)
        
        streaming_history_files = [file for file in all_files if 'StreamingHistory_music' in file]
        
        return streaming_history_files

    except FileNotFoundError:
        print(f"Error: The directory '{directory}' was not found.")
        return []

    except PermissionError:
        print(f"Error: You do not have permission to access the directory '{directory}'.")
        return []

    except Exception as e:
        print(f"An unexpected error occurred: {e}")
        return []

In [None]:
file_names = find_streaming_history_files(data_directory)

### Load and join streaming data

In [None]:
def load_and_join_streaming_data(directory, file_names):

    streaming_data = []

    for file_name in file_names:
        file_path = os.path.join(directory, file_name)
        try:
            with open(file_path, 'r') as file:
                data = json.load(file)
                if isinstance(data, list):
                    streaming_data.extend(data)  
                else:
                    streaming_data.append(data)  
                print(f"Appended data from {file_name}")
        except FileNotFoundError:
            print(f"Error: The file '{file_name}' was not found in the directory '{directory}'.")
        except json.JSONDecodeError:
            print(f"Error: Failed to decode JSON in the file '{file_name}'. The file might be corrupted.")
        except Exception as e:
            print(f"An unexpected error occurred while loading '{file_name}': {e}")

    return streaming_data

In [None]:
streaming_data = load_and_join_streaming_data(data_directory,file_names)

### Convert data from list to df

In [None]:
def create_streaming_data_df(streaming_data):

    rows = []

    for entry in streaming_data:
            rows.append({
                'endTime': entry['endTime'],
                'artistName': entry['artistName'],
                'trackName': entry['trackName'],
                'msPlayed': entry['msPlayed']
            })

    streaming_data_df = pd.DataFrame(rows)

    streaming_data_df['endTime'] = pd.to_datetime(streaming_data_df['endTime'], format='%Y-%m-%d %H:%M')

    output_pickle_path = os.path.join(pickle_directory, 'streaming_data_df.pkl')

    streaming_data_df.to_pickle(output_pickle_path)
    print(f"DataFrame saved as pickle file at: {output_pickle_path}")

    return streaming_data_df

In [None]:
create_streaming_data_df(streaming_data)

## Get more additional information using the Spotify Web API

### Create Spotify Web API instance.

In [None]:
def get_spotify_instance():
    load_dotenv()

    sp_oauth = SpotifyOAuth(
        client_id=os.getenv('SPOTIPY_CLIENT_ID'),
        client_secret=os.getenv('SPOTIPY_CLIENT_SECRET'),
        redirect_uri=os.getenv('SPOTIPY_REDIRECT_URI'),
        scope='ugc-image-upload playlist-modify-public playlist-modify-private playlist-read-private',
        cache_path='.spotipyoauthcache'
    )

    try:
        token_info = sp_oauth.get_cached_token()

        if not token_info:
            token_info = sp_oauth.get_access_token(as_dict=False)

        return spotipy.Spotify(auth=token_info['access_token'])
    
    except Exception as e:
        print(f"Error retrieving token: {e}")
        return None

### Create a df with the unique songs from streaming_data_df in order to make the API query less time consuming.

In [None]:
streaming_data_df = pd.read_pickle(os.path.join(pickle_directory,'streaming_data_df.pkl'))
song_data_df = streaming_data_df.drop_duplicates(subset=['trackName'])
song_data_df.to_pickle(os.path.join(pickle_directory, 'song_data_df.pkl'))

### Query spotify and update song_data_df with the additional information.

In [None]:
def query_track_info(df, artist_col='artistName', track_col='trackName'):
    sp = get_spotify_instance()
    durations = []
    spotify_ids = []
    
    for idx, row in tqdm(df.iterrows(), total=len(df), desc="Querying Spotify"):
        query = f"track:{row[track_col]} artist:{row[artist_col]}"
        results = sp.search(q=query, type='track', limit=1)
        
        if results['tracks']['items']:
            track_info = results['tracks']['items'][0]
            durations.append(track_info['duration_ms'])
            spotify_ids.append(track_info['id'])
        else:
            durations.append(None)
            spotify_ids.append(None)
    
    df['duration'] = durations
    df['spotify_id'] = spotify_ids
    
    df.to_pickle(os.path.join(pickle_directory, 'final_song_data_df.pkl'))
    
    return df

In [None]:
query_track_info(song_data_df)

## Preparation of final dataframe

### Cleaning.

In [None]:
# Loading dataframes from pickle.
final_song_data_df = pd.read_pickle(os.path.join(pickle_directory, 'final_song_data_df.pkl'))
final_streaming_data_df = pd.read_pickle(os.path.join(pickle_directory, 'streaming_data_df.pkl'))

In [None]:
# Merging 'final_streaming_data_df' with 'final_song_data_df' to add 'duration' and 'spotify_id' columns.
final_streaming_data_df = final_streaming_data_df.merge(
    final_song_data_df[['trackName', 'duration', 'spotify_id']],
    on='trackName',
    how='left'
)

In [None]:
# Creating 'play_ratio' column as it provides a more nuanced view of listening behavior.
final_streaming_data_df['play_ratio'] = final_streaming_data_df['msPlayed']/final_streaming_data_df['duration']

In [None]:
# Dropping songs with no id. Could be due to renaming or removal.
final_streaming_data_df = final_streaming_data_df.dropna(subset=['spotify_id'])

In [None]:
# Creating 'month' and 'year' columns for easier grouping later.
final_streaming_data_df['month'] = final_streaming_data_df['endTime'].dt.month
final_streaming_data_df['year'] = final_streaming_data_df['endTime'].dt.year

In [None]:
# Saving dataframe to pickle.
final_streaming_data_df.to_pickle(os.path.join(pickle_directory, 'final_streaming_data_df.pkl'))

In [None]:
final_streaming_data_df.shape

### Removal specific playlists from analysis. If playlists do not include unique songs then the anlaysis might be inaccurate.

In [None]:
def remove_playlists(df):
    sp = get_spotify_instance()
    playlists_input = input('Enter the names of the playlists you would like to remove, separated by commas (enter "none!" to continue without removing any playlist): ')
    
    # Check if the input is empty
    if playlists_input.strip() == "none!":
        print("No playlists entered. Exiting the function.")
        return df
    
    playlists_to_remove = [playlist.strip() for playlist in playlists_input.split(',')]  # Split input by commas
    
    playlists = dict()
    songs = set()
    idx_to_drop = list()

    limit = 50
    offset = 0 
    
    # Retrieve all playlists from the user's library
    while True:
        result = sp.current_user_playlists(limit=limit, offset=offset)
        for item in result['items']:
            playlists[item['name']] = item['id']
        if len(result['items']) < limit:
            break
        offset += limit
    
    # Ensure that each playlist is processed individually
    for playlist in playlists_to_remove:
        if playlist not in playlists:
            print(f'The playlist name "{playlist}" does not exist in your library. Skipping.')
            continue
        else:
            print(f'Removing playlist: {playlist}')
            response = sp.playlist(playlist_id=playlists[playlist])
            for track in response['tracks']['items']:
                songs.add(track['track']['id'])
    
    # Remove songs from the DataFrame
    for idx, row in df.iterrows():
        if row['spotify_id'] in songs:
            idx_to_drop.append(idx)
    
    df.drop(idx_to_drop, inplace=True)
    df.to_pickle(os.path.join(pickle_directory,'filtered_final_streaming_data_df.pkl'))
    
    return df

In [None]:
remove_playlists(final_streaming_data_df)

In [None]:
if os.path.exists(os.path.join(os.getcwd(),'filtered_final_streaming_data_df.pkl')):
    filtered_final_streaming_data_df = pd.read_pickle(os.path.join(pickle_directory,'filtered_final_streaming_data_df.pkl'))
    print("File loaded successfully.")
    filtered_final_streaming_data_df.shape
else:
    print("File does not exist. Continuing without loading.")

## Create playlists

In [None]:
def create_monthly_playlists(df):
    sp = get_spotify_instance()
        
    # Group by year and month, then get the top 20 tracks by play_ratio
    grouped_df = df.groupby(['year', 'month', 'spotify_id'], as_index=False).agg({'play_ratio': 'sum'})
    
    for (year, month), group in grouped_df.groupby(['year', 'month']):
        # Sort by play_ratio and select the top 20 tracks
        top_tracks = group.sort_values(by='play_ratio', ascending=False).head(20)
        track_ids = top_tracks['spotify_id'].tolist()
        
        # Format the playlist name
        month_name = calendar.month_name[month]
        playlist_name = f"{month_name} {year}"
        playlist_description = "https://github.com/kyriakos-papadopoulos/Projects/tree/main/API_Projects/Spotify/Monthly_Playlists"
        
        try:
            # Create the playlist with a description
            user_id = sp.current_user()['id']
            playlist = sp.user_playlist_create(user=user_id, name=playlist_name, public=True, description=playlist_description)
            
            if track_ids:
                # Add the top 20 tracks to the playlist
                sp.user_playlist_add_tracks(user=user_id, playlist_id=playlist['id'], tracks=track_ids)
                print(f"Playlist '{playlist_name}' created successfully!")
            else:
                print(f"No tracks were added to the playlist '{playlist_name}'.")
        
        except spotipy.SpotifyException as e:
            print(f"Error creating playlist '{playlist_name}': {e}")

In [None]:
if os.path.exists(os.path.join(pickle_directory,'filtered_final_streaming_data_df.pkl')):
    filtered_final_streaming_data_df = pd.read_pickle(os.path.join(pickle_directory,'filtered_final_streaming_data_df.pkl'))
    create_monthly_playlists(filtered_final_streaming_data_df)
else:
    create_monthly_playlists(final_streaming_data_df)

In [None]:
# Keep getting HTTP errors. Seek help in spotify dev forum or contact spotify dev support...

"""def set_playlist_covers(sp, playlist_ids_file='playlist_ids.json', covers_dir='/Users/kyriakospapadopoulos/Desktop/University/Big Blue Data Academy/Personal/Projects/API_Projects/Spotify/Monthly_Playlists/covers', delay=2):
    # Load the playlist IDs from the JSON file
    with open(playlist_ids_file, 'r') as f:
        playlist_ids = json.load(f)
    
    for playlist_name, playlist_id in playlist_ids.items():
        # Split the playlist name to get the month and year
        month_name, year = playlist_name.split()
        month_abbr = month_name[:3].upper()

        # Get the zero-padded month number
        month_number = str(list(calendar.month_name).index(month_name.capitalize())).zfill(2)

        # Construct the correct image filename
        image_filename = f"{year}_{month_number}_{month_abbr}.png"
        image_path = os.path.join(covers_dir, image_filename)

        if os.path.exists(image_path):
            try:
                # Open and convert the image to JPEG and resize if necessary
                with Image.open(image_path) as img:
                    img = img.convert('RGB')
                    img = img.resize((640, 640), Image.ANTIALIAS)
                    
                    # Save the image to a temporary file in JPEG format
                    temp_image_path = os.path.join(covers_dir, "temp_image.jpg")
                    img.save(temp_image_path, format="JPEG", quality=85)
                    
                    # Read the image file and encode it to base64
                    with open(temp_image_path, 'rb') as image_file:
                        image_b64 = base64.b64encode(image_file.read()).decode('utf-8')
                        sp.playlist_upload_cover_image(playlist_id=playlist_id, image_b64=image_b64)
                    print(f"Cover image set for playlist '{playlist_name}'")
                    
                    # Clean up the temporary file
                    os.remove(temp_image_path)
                    
            except spotipy.SpotifyException as e:
                print(f"Error setting cover for playlist '{playlist_name}': {e}")
            except Exception as e:
                print(f"Unexpected error for playlist '{playlist_name}': {e}")
        else:
            print(f"No cover image found for playlist '{playlist_name}'")
        
        # Sleep for a short period to avoid hitting rate limits
        time.sleep(delay)"""

#### Error info:

HTTP Error for PUT to https://api.spotify.com/v1/playlists/3UzKL7EgBYmHFFOGOeSTSY/images with Params: {} returned 413 due to None
