In [96]:
import pandas as pd
import requests
from requests import post, get
import datetime
import base64
from dotenv import load_dotenv
import os
import json
from urllib.parse import urlencode
from bs4 import BeautifulSoup
from flask import Flask, request
import webbrowser

In [97]:
load_dotenv()
client_id = os.getenv('client_id')
client_secret = os.getenv('client_secret')
redirect_uri = os.getenv('redirect_uri')

In [98]:
def get_authorization_url(client_id, redirect_uri):
    parameters = {
        'client_id': client_id,
        'response_type': 'code',
        'redirect_uri': redirect_uri,
        'scope': 'user-read-recently-played',
    }
    url = "https://accounts.spotify.com/authorize?" + urlencode(parameters)
    return url

In [61]:
def get_token_with_authorization_code(authorization_code, client_id, client_secret, redirect_uri):
    auth_string = client_id + ':' + client_secret
    auth_bytes = auth_string.encode('utf-8')
    auth_base64 = str(base64.b64encode(auth_bytes), 'utf-8')
    url = 'https://accounts.spotify.com/api/token'
    headers = {
        'Authorization': 'Basic ' + auth_base64,
        'Content-Type': 'application/x-www-form-urlencoded'
    }
    data = {
        'grant_type': 'authorization_code',
        'code': authorization_code,
        'redirect_uri': redirect_uri   
    }
    result = post(url, headers=headers, data=data)
    json_result = json.loads(result.content)
    token = json_result['access_token']
    return token

In [None]:
authorization_url = get_authorization_url(client_id, redirect_uri)
webbrowser.open(authorization_url)

In [None]:
# Authorization code is received from opening up the authorization url.
authorization_code = 'your_authorization_code'
try:
    access_token = get_token_with_authorization_code(authorization_code, client_id, client_secret, redirect_uri)
    print('Access token received!')
except:
    print('Access token already received for this authorization code.')

In [101]:
def create_dataframe(access_token):
    base_url = 'https://api.spotify.com'
    recently_played_url = '/v1/me/player/recently-played?'
    time = int((datetime.datetime.now() - datetime.timedelta(days=5)).timestamp()) * 1000
    parameters = {
        'limit': 50,
        'after': time
    }
    url = base_url + recently_played_url + urlencode(parameters)
    headers = {
        'Authorization': 'Bearer ' + access_token,
        'Accept': 'application/json',
        'Content-Type': 'application/json'
    }
    results = get(url, headers=headers)
    data = results.json()
    song_names = []
    artist_names = []
    time_played_at = []
    date_played_at = []
    for song in data['items']:
        song_names.append(song['track']['name'])
        artist_names.append(song['track']['album']['artists'][0]['name'])
        time_played_at.append(song['played_at'][10:])
        date_played_at.append(song['played_at'][0:10])
    dictionary = {
        'song_name': song_names,
        'artist_name': artist_names,
        'time_played_at': time_played_at,
        'date_played_at': date_played_at
    }
    df = pd.DataFrame(dictionary)
    return df

In [None]:
data_df = create_dataframe(access_token)
data_df

In [103]:
def check(df):
    if df.empty:
        print('DataFrame is empty.')
        return False
    elif df.isnull().values.any():
        raise Exception('Null values found.')
    else:
        print('Data looks good.')
    

In [None]:
check(data_df)

In [None]:
from sqlalchemy import create_engine

load_dotenv()
username = os.getenv('username')
password = os.getenv('password')
host = os.getenv('host')
port = os.getenv('port')
database = os.getenv('database')

engine = create_engine(f'postgresql://{username}:{password}@{host}:{port}/{database}')
try:
    data_df.to_sql('spotify_data', engine, if_exists='replace', index=False)
    print("Data uploaded successfully!")
except Exception as e:
    print(f"An error occurred: {e}")
finally:
    engine.dispose()