In [1]:
from fycharts.SpotifyCharts import SpotifyCharts
import numpy as np
import pandas as pd
import sqlalchemy
import matplotlib
from matplotlib import pyplot as plt
from datetime import date, timedelta
import requests
from dotenv import load_dotenv
import os
import json
import urllib.parse

In [2]:
 """
1. create .env file
2. Paste your Spotify OAuth Token:
TOKEN=%{OAuth_Token}

it is included in .gitignore, don't change it 😊
"""
load_dotenv(verbose=True)
spotify_token = os.getenv("TOKEN")

In [3]:
df = pd.read_csv("../2020-2021_daily_top_200.csv")
df.head()

Unnamed: 0,Position,Track Name,Artist,Streams,date,region,spotify_id
0,1.0,The Box,Roddy Ricch,1812103.0,2020-01-01,us,0nbXyq5TXYPCO7pr3N8S4I
1,2.0,ROXANNE,Arizona Zervas,1400495.0,2020-01-01,us,696DnlkuDOXcMAnKlTgXXK
2,3.0,Circles,Post Malone,1186861.0,2020-01-01,us,21jGcNKet2qwijlDFuPiPb
3,4.0,BOP,DaBaby,1130123.0,2020-01-01,us,6Ozh9Ok6h4Oi1wUSLtBseN
4,5.0,Bandit (with YoungBoy Never Broke Again),Juice WRLD,968612.0,2020-01-01,us,6Gg1gjgKi2AK4e0qzsR7sd


In [4]:
def get_headers():
    return {'Authorization' : 'Bearer {}'.format(spotify_token),
            'Content-Type': 'application/json',
            'Accept': 'application/json'}

In [5]:
def get_artist_id_by_track_id(track_id):
    pass

In [6]:
def get_albums_of_tracks(ids):
    endpoint = "https://api.spotify.com/v1/tracks?ids="
    for i, track_id in enumerate(ids):
        endpoint += track_id
        if i != len(ids) - 1:
            endpoint += ","        
    response = requests.get(endpoint, headers=get_headers())
    tracks = json.loads(response.text)['tracks']
    track_album = {}
    for track in tracks:
        album = track['album']
        album_id = album['id']
        track_album[track['id']] = album_id
    return track_album

In [7]:
def get_tracks_from_album(album_id):
    endpoint = 'https://api.spotify.com/v1/albums/{}/tracks'.format(album_id)
    response = requests.get(endpoint, headers=get_headers())
    track_ids = []
    response_dict = json.loads(response.text)
    for item in response_dict['items']:
        track_ids.append(item['id'])
    return track_ids


In [8]:
def get_track_features(track_id):
    endpoint = "https://api.spotify.com/v1/audio-features/{}".format(track_id)
    response = requests.get(endpoint, headers=get_headers())
    return json.loads(response.text)

In [9]:
def extract_relevant_features(audio_features):
    relevant_audio_features = ['danceability', 'energy', 'loudness',
                               'speechiness', 'acousticness', 'instrumentalness',
                               'liveness', 'valence','tempo', 'duration_ms', 'id']
    relevant_features_dict = {}
    for key in relevant_audio_features:
        relevant_features_dict[key] = audio_features[key]
    return relevant_features_dict

In [10]:
def chunks(lst, n):
    for i in range(0, len(lst), n):
        yield lst[i:i + n]

In [11]:
get_track_features('06AKEBrKUckW0KREUWRnvT')

{'error': {'status': 401, 'message': 'Invalid access token'}}

In [12]:
get_albums_of_tracks(['06AKEBrKUckW0KREUWRnvT'])

KeyError: 'tracks'

In [None]:
get_tracks_from_album('7q4gKzB9XsZ4MfGj4RYQko')

In [None]:
us_df = df[df['region'] == 'us']
top_tracks_ids = set(us_df['spotify_id'].values)
print(len(top_tracks_ids))

In [None]:
albums = []
for chunk in chunks(list(top_tracks_ids),4):
    ids = get_albums_of_tracks(chunk).values()
    albums.extend(ids)


In [None]:
all_tracks = []
for album in albums:
    tracks = get_tracks_from_album(album)
    all_tracks.extend(tracks)    

In [None]:
rows = []
for track in all_tracks:
    try:
        features = extract_relevant_features(get_track_features(track))
        features['viral'] = track in top_tracks_ids
        rows.append(features)
    except Exception as e:
        print(e)

In [None]:
dev_data = pd.DataFrame(rows)

In [None]:
len(dev_data)

In [None]:
dev_data.head()

In [None]:
len(dev_data[dev_data['viral'] == False])

In [None]:
dev_data.to_csv('us_2020_top200.csv')

In [None]:
dev_data = pd.read_csv('us_2020_top200.csv', index_col=0)

In [None]:
dev_data.head()

In [None]:
targets = dev_data['viral'].values
del dev_data['id']
del dev_data['viral']
X = dev_data

In [None]:
X.head()

### PCA Analysis

In [None]:
from sklearn.decomposition import PCA
from sklearn.preprocessing import scale
X_scaled = scale(X.values)
pca = PCA()

In [None]:
X_transformed = pca.fit_transform(X_scaled)

In [None]:
X_transformed.shape

In [None]:
variance_ratio = pca.explained_variance_ratio_
plt.plot(variance_ratio, 'ro')
plt.show()
for i in range(1, 11):
    print("{} components:".format(i), sum(variance_ratio[0:i]))

In [None]:
def plot_vectors(score,coeff,labels=None):
    xs = score[:,0]
    ys = score[:,1]
    n = coeff.shape[0]
    scalex = 1.0/(xs.max() - xs.min())
    scaley = 1.0/(ys.max() - ys.min())
    plt.figure(figsize=(20,20))
    plt.scatter(xs * scalex,ys * scaley)
    for i in range(n):
        plt.arrow(0, 0, coeff[i,0], coeff[i,1],color = 'r',alpha = 0.5)
        if labels is None:
            plt.text(coeff[i,0]* 1.15, coeff[i,1] * 1.15, "Var"+str(i+1), color = 'b', ha = 'center', va = 'center')
        else:
            plt.text(coeff[i,0]* 1.15, coeff[i,1] * 1.15, labels[i], color = 'b', ha = 'center', va = 'center')
    plt.xlim(-1,1)
    plt.ylim(-1,1)
    plt.xlabel("PC{}".format(1))
    plt.ylabel("PC{}".format(2))
    plt.grid()

In [None]:
attributes = dev_data.columns.values
attributes

In [None]:
plot_vectors(X_transformed[:,0:2],np.transpose(pca.components_[0:2, :]),labels = attributes)
plt.show()

In [None]:
pca.components_[0]

In [None]:
def get_top_last_words(components, labels, index, n=15):  
    pc = components[index]
    zipped_pc = []
    for label, value in zip(labels, pc):
        zipped_pc.append((value, label))
    sorted_pc= sorted(zipped_pc, key=lambda item: item[0], reverse=True)
    top_n = sorted_pc[:n]
    last_n = sorted_pc[-n:]
    return top_n, reversed(last_n)

In [None]:
def print_title(title):
    print("---------{}---------".format(title))

In [None]:
def print_ranking(rank):
    for i, (k,v) in enumerate(rank):
        print(str(i) + ". ", v, ":", k)

In [None]:
top15, last15 = get_top_last_words(pca.components_,attributes,0, n=3)
print_title("PC1")
print("Top 3")
print_ranking(top15)
print("Top 3 - negative")
print_ranking(last15)

In [None]:
top15, last15 = get_top_last_words(pca.components_,attributes,1, n=3)
print_title("PC2")
print("Top 3")
print_ranking(top15)
print("Top 3 - negative")
print_ranking(last15)

In [None]:
plot_2d_scatter(X_transformed[:,:2], targets)

### tSNE

In [None]:
from sklearn.manifold import TSNE

tsne = TSNE(n_iter=500, verbose=2)
X_tsne_embedded = tsne.fit_transform(X_scaled)

In [None]:
def plot_2d_scatter(X, y):
    fig, plot = plt.subplots()
    fig.set_size_inches(16, 16)
    plt.prism()

    for val in [True, False]:
        viral_indices = y == val
        plot.scatter(X[viral_indices, 0], X[viral_indices, 1], label="class: " + str(val))

    plot.set_xticks(())
    plot.set_yticks(())

    plt.tight_layout()
    plt.legend()
    plt.show()

In [None]:
plot_2d_scatter(X_tsne_embedded, targets)