# Spotipy development notebook 

This notebook does the following: 
1. Connect and Call spotify API
    * Authentication to the Spotify API
    * Extracting the refresh TOEKN from Spotify server
    * Call spotify API to get recently played / specific playlist
2. Format to df
3. Save to local sqlite3 or parquet

In [2]:
import json
import os
import spotipy
from spotipy.oauth2 import SpotifyClientCredentials, SpotifyOAuth
from datetime import datetime
import pandas as pd

from dotenv import load_dotenv

In [3]:
# Read the environment variables
load_dotenv()

client_id = os.getenv("CLIENT_ID")
client_secret = os.getenv("CLIENT_SECRET")

## Connect and Call spotify API

### a) Authentication to the Spotify API

In [4]:
# Define the desired scopes as a list
scopes = ["user-read-recently-played", "user-library-read"]

# Initialize the SpotifyOAuth object with your client ID, client secret, and desired scopes
sp_oauth = SpotifyOAuth(
    client_id=client_id,
    client_secret=client_secret,
    redirect_uri='http://localhost:8888/callback',
    scope=' '.join(scopes)
)

# # # Generate the authorization URL
# auth_url = sp_oauth.get_authorize_url()

# # Redirect the user to the generated auth_url to start the authorization process
# print(f"Click the following link to authorize your application: {auth_url}")

In [5]:
# client_credentials_manager = SpotifyClientCredentials(client_id=client_id, client_secret=client_secret)
# sp = spotipy.Spotify(client_credentials_manager = client_credentials_manager)
sp = spotipy.Spotify(client_credentials_manager = sp_oauth)

### b) Extracting the refresh TOEKN from Spotify server
### ---- CAUTION: Only run when refresh token is needed to be refresed ----

In [5]:
# Get the access token
token_info = sp_oauth.get_access_token(as_dict=True)

# The refresh token is available in the token_info dictionary
refresh_token = token_info['refresh_token']

  token_info = sp_oauth.get_access_token(as_dict=True)


### c) Call spotify API to get recently played / specific playlist

In [12]:
# GET the recently played tracks
recently_played = sp.current_user_recently_played()

#### Timestamp before/ after test --> Not working

In [None]:
specific_datetime = datetime(2023, 10, 24, 0, 0, 0)

# Convert the datetime object to a timestamp in milliseconds
specific_timestamp_ms = int(specific_datetime.timestamp() * 1000)
print(specific_timestamp_ms)

1698120000000


In [14]:
import re

# The URL containing the timestamp
url = recently_played['next']

# Regular expression to find the 'before' parameter followed by the timestamp
timestamp_pattern = r'before=(\d+)'

# Search for the pattern
match = re.search(timestamp_pattern, url)

# Extract the timestamp if found
if match:
    timestamp = match.group(1)
    print(timestamp)
else:
    print("Timestamp not found in the URL.")

1697746977650


In [25]:
recently_played_next = sp.current_user_recently_played(after=specific_timestamp_ms)

In [31]:
len(recently_played_next['items'])

25

#### Playlist extraction

In [None]:
#Extract playlist data
playlists = sp.user_playlists('spotify')
playlist_link = "https://open.spotify.com/playlist/37i9dQZEVXcDYGt49X0ozW"
playlist_URI = playlist_link.split("/")[-1].split("?")[0]

spotify_data = sp.playlist_tracks(playlist_URI)   

## Format to df

In [32]:
## Save the json to df

song_names = []
artist_names = []
played_at_list = []
timestamps = []

# Extracting only the relevant bits of data from the json object      
for song in recently_played["items"]:
    song_names.append(song["track"]["name"])
    artist_names.append(song["track"]["album"]["artists"][0]["name"])
    played_at_list.append(song["played_at"])
    timestamps.append(song["played_at"][0:10])
    
# Prepare a dictionary in order to turn it into a pandas dataframe below       
song_dict = {
    "song_name" : song_names,
    "artist_name": artist_names,
    "played_at" : played_at_list,
    "timestamp" : timestamps
}

song_df = pd.DataFrame(song_dict, columns = ["song_name", "artist_name", "played_at", "timestamp"])

In [33]:
song_df

Unnamed: 0,song_name,artist_name,played_at,timestamp
0,You & I,Dizaro,2023-11-01T19:31:04.041Z,2023-11-01
1,Let Me Go,Resiino,2023-11-01T19:28:22.055Z,2023-11-01
2,Need You,Matt Skies,2023-11-01T19:23:47.051Z,2023-11-01
3,Ilomilo,Michel Dj,2023-11-01T19:21:01.944Z,2023-11-01
4,By My Side,Y.V.E. 48,2023-10-29T14:44:50.870Z,2023-10-29
5,Careless Whisper,Silience,2023-10-29T14:41:49.147Z,2023-10-29
6,King,TWOPILOTS,2023-10-29T14:38:39.725Z,2023-10-29
7,My Favourite One,Shiek,2023-10-29T14:35:09.228Z,2023-10-29
8,Falling,Good Vibe Delivery,2023-10-29T14:32:22.883Z,2023-10-29
9,Refuge,RAZZ,2023-10-29T14:28:53.628Z,2023-10-29


## Save to local sqlite3 or parquet

In [16]:
import sqlite3

conn = sqlite3.connect('spotify.db')
cursor = conn.cursor()

# # Get the path to the database file
# cursor.execute("PRAGMA database_list;")
# database_path = cursor.fetchone()[2]

# print("Database path:", database_path)

# Use the to_sql method to write the DataFrame to a new table in the database
table_name = 'recently_played'  # Name for the new table in the database
song_df.to_sql(table_name, conn, if_exists='replace', index=False)

# Close the database connection
conn.close()

Database path: /Users/yusali/dev/ETL_Spotify/notebook/spotify.db


In [18]:
conn = sqlite3.connect('spotify.db')
df = pd.read_sql('select * from {}'.format(table_name), conn)
conn.close()

In [25]:
df.to_parquet('../data/recently_payed.parquet', partition_cols=['timestamp', 'artist_name'])