1. Get Audio Information from Spotify
2. Insert into DB


In [33]:
import requests
from urllib.parse import urljoin

In [1]:
from pymongo import MongoClient
from pprint import pprint
from typing import List

# DB CONFIG
client = MongoClient('mongo', 27017)
db = client.music_db
COLLECTION = 'rap-song'
songs_collection = db[COLLECTION]

In [2]:
# Pull all songs from DB
SONGS = list(songs_collection.find())
len(SONGS)

5914

In [3]:
def has_lyrics(song):
    if not song.get('lyrics'):
        return False
    
    if song.get('lyrics') == 'null':
        return False

    if song.get('lyrics').get('error'):
        return False
        
    return True

songs = [song for song in SONGS]

In [4]:
class Song:
    def __init__(self, song: dict):
        self.data = song
        self.spotify = {}    # Should be assined with the spotify api
    
    @property
    def title(self):
        return self.data['title']
    
    @property
    def artist(self):
        return self.data['artist']
        
    @property
    def lyrics(self):
        return self.data['lyrics']['result']['track']['text']
    
    @property
    def search_phrase(self):
        return self.data['title'] + " by " + self.data['artist']
    
    def __str__(self):
        return str(self.data)
    
    def __repr__(self):
        return self.__str__()
    
song_list = list(map(Song, songs))

In [5]:
import spotipy
from spotipy.oauth2 import SpotifyClientCredentials

CLIENT_SECRET_KEY = 'c784fff30881479abc02c128d341efe2'
CLIENT_ID = '299217ad61af41beb3f025a8b9bf0d99'


client_credentials_manager = SpotifyClientCredentials(client_id=CLIENT_ID, client_secret=CLIENT_SECRET_KEY)
spotify = spotipy.Spotify(client_credentials_manager=client_credentials_manager)

In [6]:
from typing import Iterable

def retrieve_track_id(name):
    results = spotify.search(q='track:' + name, type='track')
    return results

def chunk(it: Iterable, size: int):
    i = 0
    for j in range(size, len(it), size):
        yield it[i:j]
        i = j

In [9]:
from typing import List
from IPython.display import clear_output

    
def retrieve_and_assign_features(songs: List):
    song_ids = [song.spotify['id'] for song in songs]
    features = spotify.audio_features(tracks=song_ids)
    for song, feature in zip(songs, features):
        song.spotify['audio_features'] = feature
    return songs

def retrieve_and_assign_ids(song_list, start_index=0):
    i = 0
    for song in song_list[start_index:]:
        i += 1
        clear_output(wait=True)
        print(f"Retreiving ID: {i} for {song.search_phrase}\r")
        try:
            response = retrieve_track_id(song.search_phrase)
            song_id = response['tracks']['items'][0]['id']
            song.spotify['id'] = song_id
            yield song
        except Exception as e:
            print("Couldn't retrieve", song.search_phrase, e)
            continue
    
song_list = list(retrieve_and_assign_ids(song_list, 0))

Retreiving ID: 5914 for Take You There by Pete Rock & C.L. Smooth
Couldn't retrieve Take You There by Pete Rock & C.L. Smooth list index out of range


In [10]:
filtered_song_list = [song for song in song_list if song.spotify.get('id')]

In [12]:
import time
i=0
for lst in chunk(filtered_song_list, 50):
    retrieve_and_assign_features(lst)
    print(i)
    clear_output(wait=True)
    time.sleep(.5)
    i+=1

21


In [13]:
COLLECTION

'rap-song'

In [14]:
def update_db(document_id, spotify_data: dict):
    document = songs_collection.find_one_and_update({"_id": document_id}, 
                                         {"$set": {"spotify": spotify_data}})
    return document

def update_db_from_songs(songs):
    for song in songs:
        update_db(song.data['_id'], song.spotify)

update_db_from_songs(song_list)

In [15]:
# Get album IDs
SONGS = list(songs_collection.find())

In [16]:
songs = [song for song in SONGS if song.get('spotify', {}).get('id')]
len(songs)

1109