In [1]:
import pandas as pd
import numpy as np
import sqlite3
import requests
import json
import copy
import random
from time import sleep

For this prediction task, we require two datasets. First, we need a list of the top artists of users in order to make predictions. This information will be used to train our recommender system. The other is a list of the nationalities of each artist within our recommender system. This will be used to debias our model. 

We'll start with the top artists list. We'll obtain this via the Last.fm API. The following code logs into the API and enables us to pull the relevant data via Python. For information on how to set up a Last.fm  API account, follow [this link](https://www.last.fm/api) for documentation.

In [3]:
# Read in key and secret from file
with open('../lastfm.key', 'r') as keys:
    key = keys.readline().strip()
    secret = keys.readline()

In [4]:
# Pull access token from Last.fm
token_request = requests.get(f'http://ws.audioscrobbler.com/2.0/?method=auth.gettoken&api_key={key}&format=json',)
token = json.loads(token_request.text)['token']

In [6]:
# Approval link (follow this and approve manually every time you generate a token)
f'http://www.last.fm/api/auth/?api_key={key}&token={token}'

'http://www.last.fm/api/auth/?api_key=6353b7f632a9fd6487aa7093cf57084f&token=BQMJwPmSUd8b88EAawZToZh5IjOu9ujG'

In [14]:
# Combine our key, secret, and access token to generate an API signature
# This is needed for our requests to be approved
import hashlib

def generate_api_signature(api_key, method, token, shared_secret):
    # Create a dictionary of parameters
    params = {
        'api_key': api_key,
        'method': method,
        'token': token
    }

    # Sort the parameters alphabetically by key
    sorted_params = sorted(params.items())

    # Concatenate the sorted parameters
    concatenated = ''.join('%s%s' % (key, value) for key, value in sorted_params)

    # Append the shared secret
    string_to_hash = concatenated + shared_secret

    # Generate the MD5 hash of the string
    api_sig = hashlib.md5(string_to_hash.encode('utf-8')).hexdigest()

    return api_sig

# Usage example
api_key = key
method = 'auth.getSession'
token = token
shared_secret = secret

api_signature = generate_api_signature(api_key, method, token, shared_secret)
print("API Signature:", api_signature)

API Signature: c240e5fa7e2c736b5bf56e398b219589


In [15]:
# Use our API signature to acquire our session key
sesh = requests.get(f'http://ws.audioscrobbler.com/2.0/?method=auth.getSession&api_key={key}&token={token}&api_sig={api_signature}&format=json').text
sk = sesh['session']['key']

Now we can pull our data. There is no publicly available Last.fm username dataset, and the only function the API has to get a list of users is user.getFriends, which returns the friends of a specified user. We can use this to generate a dataset of usernames by pulling a user's friends, and then their friends' friends, and so on and so forth. This requires us to specify a starting user.

In [23]:
starting_user = '' # Replace this with the username of the user you want to start scraping from
seen_users = set(users.keys())
next_user_queue = [starting_user]
current_user = next_user_queue.pop()
while current_user in seen_users:
    current_user = next_user_queue.pop()

for i in range(5000):
    print(current_user)
    last_pg_hit = False
    current_page = 1
    while not last_pg_hit:
        try:
            user_friends = requests.get(f'http://ws.audioscrobbler.com/2.0/?method=user.getfriends&user={current_user}&api_key={key}&format=json&page={current_page}').text
            friends_dict = json.loads(user_friends)['friends']['user']
            friends_list = [friend['name'] for friend in friends_dict]
            if current_user not in users:
                users[current_user] = friends_list
            else:
                users[current_user] += friends_list
            random.shuffle(friends_list)
            next_user_queue += friends_list
            current_page += 1
        except KeyError:
            last_pg_hit = True
            seen_users.add(current_user)
            while current_user in seen_users:
                current_user = next_user_queue.pop()

In [24]:
# Set of all users in the dataset
all_users = set()
for lst in users.values():
    for user in lst:
        all_users.add(user)
len(all_users)

By now, we've pulled all our usernames, but we don't have any information on the artists the users like. We will now use the API's user.getTopArtists method to pull that data.

In [None]:
user_df = pd.DataFrame(columns=['user', 'artist_name', 'play_count', 'artist_url'])
df_user_set = set(user_df['user'])
print('df read.')
rows_to_add = []

for current_user in all_users:
    if current_user not in df_user_set and current_user not in no_artist_users:
        print(current_user)
        try:
            sleep(0.2) # Follow last.fm regulations on rate limits
            user_tops = json.loads(requests.get(f'http://ws.audioscrobbler.com/2.0/?method=user.gettopartists&user={current_user}&api_key={key}&format=json&page=1').text)
            user_tops = user_tops['topartists']['artist']
            if not user_tops:
                print('User had no top artists.')
                no_artist_users.add(current_user)
                if len(no_artist_users) % 10 == 0:
                    with open('no_artist_users.txt', 'w') as file:
                        file.write(repr(no_artist_users))
                pass # Don't add users with no top artists
            else:
                user_rows = [{'user': current_user, 'artist_name': row['name'], 'play_count': row['playcount'], 'artist_url': row['url']} for row in user_tops]
                rows_to_add += user_rows
                df_user_set.add(current_user)
                print('Logged user!')

                if len(rows_to_add) > 10000:
                    print(f'++++++++++++Logging... ({user_df.shape[0] + len(rows_to_add)} rows)')
                    user_df = pd.concat([user_df, pd.DataFrame(rows_to_add)])
                    rows_to_add = []
                    with open('user-df.csv', 'w', encoding='utf-8') as file:
                        file.write(user_df.to_csv())
                    
                
        except KeyError:
            no_artist_users.add(current_user)
            print('------------------', user_tops)

In [17]:
user_df_file_location = '' # Replace with file location of df of user info saved from previous step
user_df = pd.read_csv(user_df_file_location)[['user', 'artist_name', 'play_count', 'artist_url']]

Now we have to pull the dataset of artist nationalities. Last.fm's API does not provide this information. However, it does provide the ids of its artists on the site musicbrainz, which in turn has the information we need. In order to use their API, we have to specify a user agent, consisting of an app name and a contact email. For example, with the app name Service and the contact email service@mail.com, our agent would be: Service ( service@mail.com )

In [None]:
from datetime import datetime
mb_user_agent = '' # Add your user agent here
mb_headers = {
    'User-Agent': mb_user_agent
}


artists = user_df['artist_name'].unique()
with open('artists_info_backups/artists_info_2024-02-02 10_23_02.292744.json', 'r', encoding='utf-8') as file:
    artists_info = json.load(file)
with open('artists_info_backups/no_info_2024-02-02 10_23_12.465961.json', 'r', encoding='utf-8') as file:
    no_info_artists = set(json.load(file))
with open('artists_info_backups/strange_error_2024-02-02 10_23_12.558135.json', 'r', encoding='utf-8') as file:
    strange_error_artists = json.load(file)

for artist in artists: 
    if artist not in artists_info and artist not in no_info_artists and artist not in strange_error_artists and type(artist) is str:
        sleep(0.2)
        artist = artist.replace('&', 'and')
        if not (artist not in artists_info and artist not in no_info_artists and artist not in strange_error_artists and type(artist) is str):
            pass
        print(artist)
        try:
            artist_req = requests.get(f'http://ws.audioscrobbler.com/2.0/?method=artist.getinfo&artist={artist}&api_key={key}&format=json&page=1').json()
        except ValueError as e:
            
            print(f'+++++++++++Strange new error: {e}')
            strange_error_artists[artist] = str(e)
            if len(strange_error_artists) % 5 == 0:
                with open('strange_error_test.json', 'w', encoding='utf-8') as file:
                    json.dump(strange_error_artists, file)
                with open('strange_error_test.json', 'r', encoding='utf-8') as file:
                    data_loader = json.load(file)
                    if len(data_loader) < len(strange_error_artists) - 2:
                        raise Exception('JSON WRITE ERROR')
                    strange_error_artists = data_loader
        try:
            artist_mbid= artist_req['artist']['mbid']
            artist_request = requests.get(f'https://musicbrainz.org/ws/2/artist/{artist_mbid}?fmt=json', headers=mb_headers)
            artist_info = artist_request.json()
            artists_info[artist] = artist_info
            
            if len(artists_info) % 100 == 0:
                with open('artists_info_test.json', 'w', encoding='utf-8') as file:
                    json.dump(artists_info, file)
                with open('artists_info_test.json', 'r', encoding='utf-8') as file:
                    data_loader = json.load(file)
                    if len(data_loader) < len(artists_info) - 2:
                        raise Exception('JSON WRITE ERROR')
                    artists_info = data_loader
                    
            # Save backups
            if len(artists_info) % 5000 == 0:
                print(len(artists_info))
                with open(f'artists_info_backups/artists_info_{str(datetime.now())}.json'.replace(':', '_'), 'w') as file:
                    json.dump(artists_info, file)
                with open(f'artists_info_backups/no_info_{str(datetime.now())}.json'.replace(':', '_'), 'w', encoding='utf-8') as file:
                    json.dump(list(no_info_artists), file)
                with open(f'artists_info_backups/strange_error_{str(datetime.now())}.json'.replace(':', '_'), 'w', encoding='utf-8') as file:
                    json.dump(strange_error_artists, file)
                    
        except KeyError as k:
            if str(k) == "'mbid'":
                print('No mbid found!')
                no_info_artists.add(artist)
                if len(no_info_artists) % 30 == 0:
                    with open('no_info_test.json', 'w', encoding='utf-8') as file:
                        json.dump(list(no_info_artists), file)
                    with open('no_info_test.json', 'r', encoding='utf-8') as file:
                        data_loader = json.load(file)
                        if len(data_loader) < len(no_info_artists) - 2:
                            raise Exception('JSON WRITE ERROR')
                        no_info_artists = set(data_loader)
            else:
                print(f'+++++++++++Strange new error: {k}')
                strange_error_artists[artist] = str(k)
                if len(strange_error_artists) % 5 == 0:
                    with open('strange_error_test.json', 'w', encoding='utf-8') as file:
                        json.dump(strange_error_artists, file)
                    with open('strange_error_test.json', 'r', encoding='utf-8') as file:
                        data_loader = json.load(file)
                        if len(data_loader) < len(strange_error_artists) - 2:
                            raise Exception('JSON WRITE ERROR')
                        strange_error_artists = data_loader