In [1]:
import os
from glob import glob

import dask
import dask.bag as db
import dask.dataframe as dd
from distributed import Client
from dask_jobqueue import SLURMCluster
import pandas as pd 
import numpy as np

from IPython.display import display
import matplotlib.pyplot as plt

import sqlite3

In [2]:
public_path = '/scratch/work/public/MillionSongDataset/AdditionalFiles/'
local_path = '/scratch/tj810/final-project-team_unsupervised_learners/feature_files/'

In [3]:
# Set LOCAL to True for single-machine execution while developing
# Set LOCAL to False for cluster execution
LOCAL = False

if LOCAL:
    # This line creates a single-machine dask client
    client = Client()
else:    
    # This line creates a SLURM cluster dask and dask client
    # Logging outputs will be stored in /scratch/{your-netid}
    
    cluster = SLURMCluster(memory='4GB', cores=2, python='/scratch/work/public/dask/bin/python', 
                               local_directory='/tmp/{}/'.format(os.environ['SLURM_JOB_USER']),
                               job_extra=['--output=/scratch/{}/slurm-%j.out'.format(os.environ['SLURM_JOB_USER'])])

    cluster.submit_command = 'slurm'
    cluster.scale(100)

    display(cluster)
    client = Client(cluster)

display(client)

Perhaps you already have a cluster running?
Hosting the HTTP server on port 42963 instead


VBox(children=(HTML(value='<h2>SLURMCluster</h2>'), HBox(children=(HTML(value='\n<div>\n  <style scoped>\n    …

0,1
Client  Scheduler: tcp://10.32.35.15:43565  Dashboard: http://10.32.35.15:42963/status,Cluster  Workers: 0  Cores: 0  Memory: 0 B


In [4]:
os.listdir(public_path)

['BROKEN_lastfm_tags.db',
 'README',
 'lastfm_tags.db',
 'LICENSE',
 'unique_terms.txt',
 'artist_term.db',
 'artist_location.txt',
 'artist_similarity.db',
 'mxm_779k_matches.txt',
 'sid_mismatches.csv',
 'rdio_ids.txt',
 'sid_mismatches.txt',
 'TRACKIDS.txt',
 'msd_summary_file.h5',
 'unique_tracks.txt',
 'mxm_dataset.db',
 'unique_mbtags.txt',
 'unique_artists.txt',
 'tracks_per_year.txt',
 'track_metadata.db']

In [8]:
# conn = sqlite3.connect(path + '.db')
# cur = conn.cursor()
# cur.execute("SELECT * FROM artist_term LIMIT 1")
# cur.execute("""SELECT sql FROM sqlite_master WHERE type='table'""")
# print(cur.fetchall())

# Save tables to local folder

In [9]:
conn = sqlite3.connect(public_path + 'track_metadata.db')
df = pd.read_sql_query("SELECT * from songs", conn).replace([None], np.nan) 
# df.to_csv('/scratch/tj810/final-project-team_unsupervised_learners/feature_files/track_metadata.csv',index=False)

In [138]:
conn = sqlite3.connect(public_path + 'artist_term.db')
df = pd.read_sql_query("SELECT * from artist_term", conn)\
        .replace([None], np.nan)\
        .groupby('artist_id')['term']\ # combine all terms associated with each artist
        .apply(list).reset_index()
# df.to_csv('/scratch/tj810/final-project-team_unsupervised_learners/feature_files/artist_term.csv',index=False)

# Combine all features from Additional_Files

In [156]:
# features: duration, artist_familiarity, artist_hotttnessss, year
# includes track_id, artist_id
track_metadata = dd.read_csv(local_path + 'track_metadata.csv', header = 0).repartition(100)

# features: artist location (map using artist_id)
artist_location = dd.read_csv(public_path+'artist_location.txt',
                              sep= '<SEP>',
                              header = None, 
                              names = ['artist_id','latitude','longtitude','artist_name','location'])

# features: artist term (map using artist_id)
artist_term = dd.read_csv(local_path + 'artist_term.csv', header = 0)

  head = reader(BytesIO(b_sample), **kwargs)


In [157]:
print(artist_term.columns) # (43943, 2)
print(artist_location.columns) # 13850 rows × 5 columns
print(track_metadata.columns) # (1000000, 11)

Index(['artist_id', 'term'], dtype='object')
Index(['artist_id', 'latitude', 'longtitude', 'artist_name', 'location'], dtype='object')
Index(['track_id', 'title', 'song_id', 'release', 'artist_id', 'artist_mbid',
       'artist_name', 'duration', 'artist_familiarity', 'artist_hotttnesss',
       'year'],
      dtype='object')


In [158]:
artist_features = artist_term.merge(artist_location, how='outer', on = 'artist_id').repartition(npartitions=1)
all_features = track_metadata.merge(artist_features, how='left', on = 'artist_id').compute()

In [167]:
# all_features.drop(columns='artist_name_y').rename(columns={'artist_name_x':'artist_name'}).reset_index(drop=True).to_csv(local_path + 'all_features.csv')

In [196]:
features.loc[2]

track_id                                             TRMMMRX128F93187D9
title                                                 No One Could Ever
song_id                                              SOGTUKN12AB017F4F1
release                                                          Butter
artist_id                                            ARGEKB01187FB50750
artist_mbid                        3d403d44-36ce-465c-ad43-ae877e65adc4
artist_name                                              Hudson Mohawke
duration                                                      138.97098
artist_familiarity                                             0.643681
artist_hotttnesss                                              0.437504
year                                                               2006
term                  ['broken beat', 'hip hop', 'trip hop', 'glitch...
latitude                                                        55.8578
longtitude                                                     -

In [8]:
# for i in range(features.shape[1]):
#     print(f'Number of unique {features.columns[i]}:')
#     print(features[features.columns[i]].unique().size)

# Get additional features from hdf5 files directly

In [5]:
import PythonSrc.hdf5_getters as hdf5_getters
from tqdm import tqdm

In [6]:
public_path = '/scratch/work/public/MillionSongDataset/'
local_path = '/scratch/tj810/final-project-team_unsupervised_learners/feature_files/'

In [7]:
def count_all_files(basedir,ext='.h5') :
    cnt = 0
    for root, dirs, files in os.walk(basedir):
        files = glob(os.path.join(root,'*'+ext))
        cnt += len(files)
    return cnt
count_all_files(public_path)

1000001

In [9]:
# list all getters
for x in filter(lambda x: x[:3] == 'get',hdf5_getters.__dict__.keys()):
    print(x)

get_num_songs
get_artist_familiarity
get_artist_hotttnesss
get_artist_id
get_artist_mbid
get_artist_playmeid
get_artist_7digitalid
get_artist_latitude
get_artist_longitude
get_artist_location
get_artist_name
get_release
get_release_7digitalid
get_song_id
get_song_hotttnesss
get_title
get_track_7digitalid
get_similar_artists
get_artist_terms
get_artist_terms_freq
get_artist_terms_weight
get_analysis_sample_rate
get_audio_md5
get_danceability
get_duration
get_end_of_fade_in
get_energy
get_key
get_key_confidence
get_loudness
get_mode
get_mode_confidence
get_start_of_fade_out
get_tempo
get_time_signature
get_time_signature_confidence
get_track_id
get_segments_start
get_segments_confidence
get_segments_pitches
get_segments_timbre
get_segments_loudness_max
get_segments_loudness_max_time
get_segments_loudness_start
get_sections_start
get_sections_confidence
get_beats_start
get_beats_confidence
get_bars_start
get_bars_confidence
get_tatums_start
get_tatums_confidence
get_artist_mbtags
get_arti

In [10]:
def get_additional_features(basedir,ext='.h5') :
    track_ids = []
    loudness = []
    song_hotttnesss = []
    tempo = []
#     time_signature = []
#     time_signature_confidence = []
    for root, dirs, files in tqdm(os.walk(basedir)):
        files = glob(os.path.join(root,'*'+ext))
        for f in files:
            h5 = hdf5_getters.open_h5_file_read(f)
            track_ids.append(hdf5_getters.get_track_id(h5))
            loudness.append(hdf5_getters.get_loudness(h5))
            song_hotttnesss.append(hdf5_getters.get_song_hotttnesss(h5))
            tempo.append(hdf5_getters.get_tempo(h5))
#             time_signature.append(hdf5_getters.get_time_signature(h5))
#             time_signature_confidence.append(hdf5_getters.get_time_signature_confidence(h5))
            h5.close()
    df_dict = {'track_id': track_ids,
     'loudness': loudness,
     'song_hotttnesss': song_hotttnesss,
     'tempo': tempo}
#      'time_signature': time_signature,
#      'time_signature_confidence': time_signature_confidence}
    return df_dict

In [None]:
df_dict = get_additional_features(public_path)

27it [00:46,  3.00s/it]

In [19]:
additional_features = pd.DataFrame(df_dict)
additional_features['track_id'] = additional_features['track_id'].apply(lambda x: x.split("\'")[1])

In [39]:
additional_features.to_csv(local_path+'additional_features_first.csv')
# additional_features.to_csv(local_path + 'additional_features_second.csv')

In [41]:
additional_features['song_hotttnesss'].isna().sum()

418035

Use Dask for merging new features with features extracted from AddtionalFiles

In [None]:
features = dd.read_csv(local_path+'all_features.csv',header = 0).repartition(100)
additional_features = dd.read_csv(local_path+'additional_features_first.csv',header = 0).repartition(100)

In [None]:
features = features[features.columns[1:]].set_index('track_id')
additional_features = additional_features[additional_features.columns[1:]].set_index('track_id')

In [None]:
all_features = features.merge(additional_features,how = 'left', left_index = True, right_index= True)

In [None]:
all_features = all_features.compute()

In [None]:
all_features.reset_index().to_csv(local_path+'merged_all_features.csv')