# Find Producers/Writers for Billboard Hot 100 Songs since 2000
DB Fowler

Note: this script is not an exhaustive search for every track and includes moderate data cleaning..

In [1]:
import re
import json
import time
import pickle
import requests
from requests import get
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

from billboard import ChartData
import numpy as np
import pandas as pd
from tqdm import tqdm

import wptools
import wikipedia
from wikipedia.exceptions import *
from fuzzywuzzy import fuzz
from fuzzywuzzy import process

# Billboard Functions
* Scrape Billboard charts and store songs' metadata (artist, title etc.)
* Store in Pandas DF

In [2]:
def artist_cleaner(art_dirty):
    '''
        Takes an artist string and splits into a list of individual artists.
    '''
    return(re.sub('(\\s+([fF]eat\\.|[fF]eat[uring]{5}|[Cc]o-[sS]tarring|&|[Xx]|\\+|/|[oO][rR]|,|[aA]nd|AND|(?:[dD]uet\\s*)?[wW]ith|WITH|[Vv][Ss])\\s+|,|/|\\()', '|', art_dirty))           

def parse_chart(chart):
    '''
        Takes a Billboard chart object and extracts relevant details.
        Returns list of Billboard songs.
    '''
    details = []
    for entry in chart:
        clean_art = artist_cleaner(entry.artist)
        details+=[[chart.date,  entry.artist, entry.title, entry.rank, \
                  entry.weeks, entry.spotifyID, entry.spotifyLink, 
                  entry.videoLink, clean_art, clean_art.split('|')[0], \
                  int(chart.date[:4])]]
    return(details)

def process_charts(min_year=2016):
    '''
        For every chart until min_year, downloads and parses chart.
        Returns pandas dataframe of all charts formatted.
    '''
    all_charts = []
    chart = ChartData('hot-100')
    while chart.previousDate[:4]!=str(min_year):
        all_charts.extend(parse_chart(chart))
        chart = ChartData('hot-100', chart.previousDate)
    all_charts = pd.DataFrame(all_charts, columns=['date',  'artist', 'title', 'rank', 'weeks', \
                                                   'spotifyID', 'spotifyLink', 'videoLink', \
                                                   'all_art', 'art_1', 'year'])
    return(all_charts)

# Wikipedia Functions
* Search for a song on Wikipedia and download it's infobox.
* Add a flag for if the Infobox contained songwriter/producer information.

In [3]:
print('Downloading Billboard Charts...')
charts = process_charts(min_year=2016)
rolled = charts.groupby(['art_1', 'title'], as_index=False).agg({'year':'min', 'rank': 'min', 'weeks': 'max'})
rolled.art_1 = rolled.art_1.map(lambda x: x.lower())
rolled.title = rolled.title.map(lambda x: x.lower())
print('Found {} Billboard Tracks...'.format(len(rolled)))

Downloading Billboard Charts...
Found 416 Billboard Tracks...


In [16]:
def wiki_pg(row):
    '''
        Given a song's title and artist, search for it's Wiki page.
    '''
    query = str(row['title']) + ' (' + str(row['art_1']) + ' song)'
    try:
        results = wikipedia.page(query)
    except PageError as pg_error:
        try:
            results = wikipedia.page(pg_error.pageid)
        except:
            results = None
    except DisambiguationError as dis_error:
        results = None
        #choice = process.extractOne(query, dis_error.options, scorer=fuzz.token_sort_ratio)
        #try:
        #    results = wikipedia.page(choice[0].replace('"', ''))
        #except:
        #    results = None
    return(results)

def wiki_credits(df):
    '''
        Find the Wiki page for a song and download the InfoBox into the DF.
    '''
    wiki_results = []
    for index, row in df.iterrows():
        info = None
        results = wiki_pg(row)
        if results:
            url_end = results.url.replace('https://en.wikipedia.org/wiki/', '')
            try:
                info = wptools.page(url_end, silent=True).get_parse().infobox
                info = dict((k.lower(), v) for k, v in dict(info).items())
            except:
                info = None
        else:
            info = None
        wiki_results.append(info)
    df['wiki'] = wiki_results
    
    # add wiki creds found flag
    wiki_cred = []
    for i in df.wiki:
        if any(word in str(i).lower() for word in ['producer', 'writer']):
            wiki_cred.append(1)
        elif i is None:
            wiki_cred.append(0)
        else:
            wiki_cred.append(0)
    df['cred_found'] = wiki_cred
    
    return(df)

# DISCOGS Functions
* Search Discogs DB for a track's credits
* Find a track's master, then all associated releases, then the distinct list of credits for those releases.

In [5]:
class discogs(object):
    '''
        Used to monitor request timeout across multiple queries
    '''
    def __init__(self, token=''):
        self.url     = 'https://api.discogs.com/database/search'
        self.headers = {'accept-encoding': 'gzip, deflate', 'user-agent': 'cde456'} 
        self.token   = token
        self.rate_remain = 60

    def search_masters(self, artist, title, verbose=False):
        '''
            Query all masters matching artist and title; 
            Returns list of master urls
        '''
        if verbose==True: print(artist, title)
        rate_remain = self.rate_remain
        params      = {'token': self.token, 'type' : 'master', 'artist': artist, 'release_title': title}
        masters     = []
        if rate_remain==0:
            time.sleep(5)
        else:
            try:
                response = requests.get(self.url, headers=self.headers, params=params, verify=False)
                self.rate_remain = int(response.headers['X-Discogs-Ratelimit-Remaining'])
                response = response.json()['results']
                if len(response)>0:
                    for master in response:
                        masters.append(master['resource_url'])
                time.sleep(1)
            except Exception as e:
                print('Error: {}, Remaining: {}'.format(e, self.rate_remain))
                time.sleep(5)
            if verbose==True: print('Remaining: {}'.format(self.rate_remain))
            return(masters)

    def get_main_releases(self, masters=[], verbose=False):
        '''
            Finds all releases associated with a master.
        '''
        if len(masters) > 0:
            rate_remain      = self.rate_remain
            params           = {'token': self.token}
            main_releases    = []
            for master in masters:
                if rate_remain==0:
                    time.sleep(5)
                else:
                    try:
                        response = requests.get(master, headers=self.headers, params=params, verify=False)
                        self.rate_remain = int(response.headers['X-Discogs-Ratelimit-Remaining'])
                        time.sleep(1)
                        main_releases.append((response.json()['main_release_url']))
                    except Exception as e:
                        print('Error: {}, Remaining: {}'.format(e, self.rate_remain))
                        time.sleep(5)
            return(list(set(main_releases)))
        else:
            return([])

    def get_credits(self, releases, verbose=False):
        '''
            Returns all credits related to a release
        '''
        if len(releases)>0:
            rate_remain      = self.rate_remain
            params           = {'token': self.token}
            credits          = []
            for release in releases:
                if rate_remain==0:
                    time.sleep(5)
                else:
                    try:
                        response = requests.get(release, headers=self.headers, params=params, verify=False)
                        self.rate_remain = int(response.headers['X-Discogs-Ratelimit-Remaining'])
                        time.sleep(1)
                        credits.extend([(extra['id'], extra['role']) for extra in response.json()['extraartists']])
                    except Exception as e:
                        print('Error: {}, Remaining: {}'.format(e, self.rate_remain))
                        time.sleep(5)
            return(pd.Series({'credits': set(credits)}))
        else:
            return(pd.Series({'credits': np.nan}))

    def get_artist_details(self, df):
        '''
            Returns all artist details for all artists in the credits lists for all songs.
            Rather than request every detail as scraped, this way makes sure we don't make
            duplicitive requests.
            Control for poorly defined JSON returned from DISCOGS
        '''
        rate_remain   = self.rate_remain
        params        = {'token': self.token}
        all_ids       = []; artist_lookup = {}
        for i in list(df[df.credits.notnull()].credits):
            all_ids.extend([i[0] for i in list(i)])
        all_ids = list(set(all_ids))
        
        for art_id in all_ids:
            if rate_remain==0:
                time.sleep(5)
            else:
                try:
                    response = requests.get('https://api.discogs.com/artists/{}'.format(art_id), 
                                            headers=self.headers, params=params, verify=False)
                    self.rate_remain = int(response.headers['X-Discogs-Ratelimit-Remaining'])
                    time.sleep(1)
                    response = response.json()
                    try:
                        name = response['name']
                    except:
                        name = []
                    try:
                        realname = response['realname']
                    except:
                        realname = []
                    try:
                        groups = [i['name'] for i in response['groups']]
                    except:
                        groups = []
                    try:
                        namevariations = response['namevariations']
                    except:
                        namevariations = []
                    artist_lookup[art_id] = {'name':       name, 
                                             'realname':   realname, 
                                             'groups':     groups,
                                             'variations': namevariations}
                except Exception as e:
                    print('Error: {}, Remaining: {}'.format(e, self.rate_remain))
                    time.sleep(5)
        if artist_lookup=={}:
            return(np.nan)
        else:
            return(artist_lookup)

def retrieve_credits(session, row, verbose=False):
    '''
        Search for a song's masters >> all releases >> all unique credits
    '''
    masters  = session.search_masters(artist=row['art_1'], title=row['title'], verbose=verbose)
    releases = session.get_main_releases(masters, verbose=verbose)
    credits  = session.get_credits(releases,      verbose=verbose)
    return(credits)

In [19]:
if __name__=='__main__':
    print('Downloading Billboard Charts...')
    charts = process_charts(min_year=1999)
    charts.to_pickle('../02_data/charts.p')
    rolled = charts.groupby(['art_1', 'title'], as_index=False).agg({'year':'min', 'rank': 'min', 'weeks': 'max'})
    rolled.art_1 = rolled.art_1.map(lambda x: x.lower())
    rolled.title = rolled.title.map(lambda x: x.lower())
    print('Found {} Billboard Tracks...'.format(len(rolled)))
    
    print('Searching for Wiki credits...')
    rolled    = wiki_credits(rolled)
    found     = rolled[rolled.cred_found==1].copy()
    not_found = rolled[rolled.cred_found==0].copy()
    print('{} tracks found wiki credits...\nSearching for {} additional credits on Discogs...'.format(len(found), len(not_found)))
    
    print('Searching for Discogs Credits...')
    d = discogs()
    not_found['credits'] = not_found.apply(lambda row: retrieve_credits(d, row, verbose=False), axis=1)
    artist_lookup        = d.get_artist_details(not_found)
    found['credits']     = np.nan
    
    print('Saving Files...')
    final = pd.concat([found, not_found])
    final.to_pickle('../02_data/all_tracks.p')
    pickle.dump(artist_lookup, open('../02_data/artist_lookup.p', 'wb'))

Downloading Billboard Charts...
Found 6718 Billboard Tracks...
Searching for Wiki credits...
6117 tracks found wiki credits...
Searching for 601 additional credits on Discogs...
Searching for Discogs Credits...
Saving Files...
