<p align="center" style="background-color:rgb(41, 65, 171); ">
  <img src="https://github.com/thecodemancer/BIMarathon2021/raw/main/images/spotify_cover.jpg?raw=true?raw=true?raw=true" style="width:100%">
</p>

# Build an ETL using your own Spotify data

The objective of this project is:

1. Extract our own data about the songs we listen to on Spotify
2. Do some cleaning and transformation to the data
3. Store the resulting dataset in sqllite
4. Build a dashboard and find insights

Let's import libraries

In [70]:
import sqlalchemy
import pandas as pd 
from sqlalchemy.orm import sessionmaker
import requests
import json
from datetime import datetime
import datetime
import sqlite3
import pprint as pp


Let's define sombre CONSTANTS (a type of variable that holds values, which cannot be changed. Constants are written in all capital letters and underscores separating the words.)

In [71]:
DATABASE_LOCATION = "sqlite:///my_played_tracks.sqlite"

# your Spotify username & token
USER_ID = "gatoreloaded"

# Generate your token here:  https://developer.spotify.com/console/get-recently-played/
TOKEN="BQAyaNqbUgruxByVc_x2i5WYdatF_2JeLt2G7q4h_FeUG2IL8v89j37fpjE7QsnYAEycCkbKJJGpg9a65ffy_FKzKGz41BCnL1fLqov1b6B9WdUdOqTbn77TBZOf7FYw-tXrdedswKC_fPTvJY4" # your Spotify API token

Let's declare a class Spotify that will hold the variables and methods necessary to interact with the Spotify API

In [72]:
class Spotify:
    def __init__(self, USER_ID, TOKEN):
        self.set_user_id(USER_ID)       
        self.set_token(TOKEN) 
        self.df = None
        self.request_headers = None
        self.time_in_the_past = None
        self.time_in_the_past_unix_timestamp = None
        self.URL_SPOTIFY_RECENTLY_PLAYED_ENDPOINT = "https://api.spotify.com/v1/me/player/recently-played"
        
    def set_user_id(self, USER_ID):
        self.USER_ID = USER_ID
    
    def get_user_id(self):
        return self.USER_ID
    
    def set_token(self, TOKEN):
        self.TOKEN = TOKEN
    
    def get_token(self):
        return self.TOKEN
    
    def set_df(self, df):
        self.df = df
    
    def get_df(self):
        return self.df    
    
    def set_request_headers(self, headers):
        self.request_headers = headers

    def get_request_headers(self):
        return self.request_headers

    def send_request(self):
        '''
        Download all songs you've listened to after a certain amount of time
        In order to achieve this, we'll use the following methods:
        get_time_in_the_past_unix_timestamp()
        get_request_headers()
        '''
        self.request = requests.get(self.URL_SPOTIFY_RECENTLY_PLAYED_ENDPOINT+"?after={time}".format(time=s.get_time_in_the_past_unix_timestamp() ), headers = s.get_request_headers() )

    def get_request(self):
        return self.request
    
    def set_time_in_the_past(self, today, days):
        '''
        Set the time in the past
        '''
        self.time_in_the_past = today - datetime.timedelta(days=days)

    def get_time_in_the_past(self):
        return self.time_in_the_past

    def set_time_in_the_past_unix_timestamp(self):
        '''
        Convert time to Unix timestamp in miliseconds      
        '''
        self.time_in_the_past_unix_timestamp = int(self.get_time_in_the_past().timestamp()) * 1000

    def get_time_in_the_past_unix_timestamp(self):
        return self.time_in_the_past_unix_timestamp
    
    def is_this_dataframe_empty(self):
        '''
        Check if dataframe is empty
        '''
        output = True
        if self.df.empty:
            print("No songs downloaded. Finishing execution")
            output = False 
        return output
    
    def the_played_at_is_unique(self):
        '''
        Primary Key Check
        '''
        if pd.Series(self.df['played_at']).is_unique:
            pass
        else:
            raise Exception("Primary Key check is violated")
        
    def are_there_any_null_values(self):
        '''
        Check for nulls
        '''
        if self.df.isnull().values.any():
            raise Exception("Null values found")

    def check_if_timestamp_is_correct(self):
        '''
        Check that all timestamps are of yesterday's date
        '''
        yesterday = datetime.datetime.now() - datetime.timedelta(days=1)
        yesterday = yesterday.replace(hour=0, minute=0, second=0, microsecond=0)

        timestamps = self.df["timestamp"].tolist()
        for timestamp in timestamps:
            if datetime.datetime.strptime(timestamp, '%Y-%m-%d') != yesterday:
                #raise Exception("At least one of the returned songs does not have a yesterday's timestamp")
                pass
            
    def check_if_valid_data(self, df: pd.DataFrame) -> bool:
        '''
        Input
        -----

        df: A pandas dataframe

        Description
        -----------

        This function will receive a dataframe as input and then it will perform the following validations:
        1. Is this dataframe... Empty?
        2. Since you cannot listen to more than 1 song at a specific time let's check if the field 'played at' is unique
        3. Are there any null values? We don't work with null values here :)

        Output
        ------
        True or False
        '''
        self.set_df(df)
        self.is_this_dataframe_empty()
        self.the_played_at_is_unique()
        self.are_there_any_null_values()
        self.check_if_timestamp_is_correct()

        return True



Every Python module has it's __name__ defined and if this is '__main__', it implies that the module is being run standalone by the user and we can do corresponding appropriate actions. If you import this script as a module in another script, the __name__ is set to the name of the script/module

In [85]:
if __name__ == "__main__":

    # Let's instanciate a Spotify object  
    s= Spotify(USER_ID, TOKEN)
    
    # The request that we are going to send to the API needs headers. Hey, look! The token is being used here!
    headers = {
        "Accept" : "application/json",
        "Content-Type" : "application/json",
        "Authorization" : "Bearer {token}".format(token=s.get_token())
    }
    
    # Let know the Spotify object about the headers that we are going to use
    s.set_request_headers(headers)
    
    s.set_time_in_the_past(datetime.datetime.now(), 1 )
    #print(s.get_time_in_the_past( ) )
    #print(str(int(s.get_time_in_the_past().timestamp() ))) 
    s.set_time_in_the_past_unix_timestamp()
    #print(s.get_time_in_the_past_unix_timestamp() )

    # Download all songs you've listened to after a certain amount of time  
    # Let's send the request! Sin miedo al éxito!
    s.send_request()

    data = s.get_request().json()

    song_names = []
    artist_names = []
    played_at_list = []
    timestamps = []

    try:
        
        # Extracting only the relevant bits of data from the json object      
        for song in data["items"]:
            song_names.append(song["track"]["name"])
            artist_names.append(song["track"]["album"]["artists"][0]["name"])
            played_at_list.append(song["played_at"])
            timestamps.append(song["played_at"][0:10])

        # Prepare a dictionary in order to turn it into a pandas dataframe below       
        song_dict = {
            "song_name" : song_names,
            "artist_name": artist_names,
            "played_at" : played_at_list,
            "timestamp" : timestamps
        }

        song_df = pd.DataFrame(song_dict, columns = ["song_name", "artist_name", "played_at", "timestamp"])
                
        # Validate
        if s.check_if_valid_data(song_df):
            pp.pprint("Data valid, proceed to Load stage")
        else:
            pp.pprint("Warning. The dataframe has issues")

        # Load

        engine = sqlalchemy.create_engine(DATABASE_LOCATION)
        conn = sqlite3.connect('my_played_tracks.sqlite')
        cursor = conn.cursor()

        sql_query = """
        CREATE TABLE IF NOT EXISTS my_played_tracks(
            song_name VARCHAR(200),
            artist_name VARCHAR(200),
            played_at VARCHAR(200),
            timestamp VARCHAR(200),
            CONSTRAINT primary_key_constraint PRIMARY KEY (played_at)
        )
        """

        cursor.execute(sql_query)
        pp.pprint("Opened database successfully")

        try:
            song_df.to_sql("my_played_tracks", engine, index=False, if_exists='append')
        except:
            pp.pprint("Data already exists in the database")

        conn.close()
        pp.pprint("Close database successfully")



        # Job scheduling 

        # For the scheduling in Airflow, refer to files in the dag folder 
    except KeyError as err:
        pp.pprint("Key {} was not found in json data:".format(str(err)) )
        pp.pprint(data, width=200)
    except ConnectionError as err:
        pp.print('Failed to open database ')
    except OSError as err:
        pp.print('Failed reading the OS ')
    except Exception as err:
        print("Error " + str(err))  
        

"Key 'items' was not found in json data:"
{'error': {'message': 'The access token expired', 'status': 401}}
