In [None]:
'''Tokenを使って認証を行う場合'''
'''環境変数にTokenを設定'''
'''
import os  
os.environ['SPOTIFY_TOKEN'] = '{YOUR SPOTIFY TOKEN}'  
token = os.environ['SPOTIFY_TOKEN']
'''

In [1]:
import os
import spotipy
from spotipy.oauth2 import SpotifyClientCredentials
import pprint

from collections import OrderedDict
import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import roc_auc_score
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
import keras
from keras.models import Model, Sequential, load_model, model_from_json
from keras.layers import Dense, Activation, Dropout
from keras.utils import np_utils, plot_model
from keras import backend as K
from keras.callbacks import Callback, CSVLogger

Using TensorFlow backend.


In [8]:
class SpotifyAPI:
    def __init__(self):
        self.client_id = <YOUR CLIENT ID>
        self.client_secret = <YOUR CLIENT SECRET>
        self.client_credentials_manager = spotipy.oauth2.SpotifyClientCredentials(self.client_id, self.client_secret)
        self.sp = spotipy.Spotify(client_credentials_manager=self.client_credentials_manager)
        self.category_ids =   ['family', 'party', 'indie_alt', 'metal', 'rock', 'rnb', 'reggae', 'karaoke', 'word',\
                               'country', 'malaypop', 'afro', 'soul', 'travel', 'romance', 'mood', 'pride', 'classical',\
                               'j_tracks', 'summer', 'comedy', 'toplists', 'popindo', 'inspirational', 'sleep', 'opm',\
                               'kids', 'roots', 'funk', 'jazz', 'desi', 'edm_dance', 'chill', 'punk', 'decades', 'gaming',\
                               'dinner', 'test_latin', 'popculture', 'hiphop', 'blues', 'kpop', 'brazilian', 'mandopop',\
                               'cantopop', 'pop', 'focus', 'arab', 'workout']
        self.country = 'JP'
        self.locale = 'ja_JP'

        
    def search_track(self, search_track='ride', search_artist='twenty one pilots'):
        search_kw = 'track:{0}% artist:{1}'.format(search_track, search_artist)
        result = self.sp.search(q=search_kw, type='track',  limit=1)
        item = result['tracks']['items'][0]
        spotify_track_url =item['external_urls']['spotify']  # spotify url of the track 
        artist_name = item['artists'][0]['name']  # artist name
        track_id = item['id']  # track id
        track_name = item['name']  # track name
        preview_url = item['preview_url']  # preview url
        album_image_url = item['album']['images'][0]  # album image url
        album_name = item['album']['name']  # album name
        
        print('track id', track_id)
        print('track name', track_name)
        print('track artist', artist_name)
        print()
        return  track_id, track_name, artist_name, preview_url, album_name, album_image_url, spotify_track_url
    
    
    def search_track_features(self, track_ids):
        # print('function: search_track_features')
        # print('len of track_ids', len(track_ids))
        
        if len(track_ids) > 100:
            print('number of track ids exceeds 100')
            return
        if len(track_ids) <= 50:
            results =  self.sp.audio_features(track_ids)
        else:
            results =  self.sp.audio_features(track_ids[:50])
            results += self.sp.audio_features(track_ids[50:])
        
        track_features = []
        for result in results:
            # pprint.pprint(result)
            # return
            features_dict = OrderedDict()
            # result = result[0]
                
            if result == None:
                features_dict['acousticness'] = 0
                features_dict['danceability'] = 0
                features_dict['duration_ms'] = 0
                features_dict['energy'] = 0
                features_dict['instrumentalness'] = 0
                features_dict['liveness'] = 0
                features_dict['loudness'] = 0
                features_dict['speechiness'] = 0
                features_dict['tempo'] = 0
                features_dict['valence'] = 0
                continue
           
            features_dict['acousticness'] = result['acousticness']
            features_dict['danceability'] =  result['danceability']
            features_dict['duration_ms'] = result['duration_ms']
            features_dict['energy'] = result['energy']
            features_dict['instrumentalness'] = result['instrumentalness']
            features_dict['liveness'] =  result['liveness']
            features_dict['loudness'] =  result['loudness']
            features_dict['speechiness'] = result['speechiness'] 
            features_dict['tempo'] =  result['tempo']
            features_dict['valence'] =  result['valence']
            track_features.append(features_dict)
            
        '''
        acousticness = result['acousticness']  # acousticness of the track, 0.0 - 1.0
        # analysis_url = result['analysis_url']  # url of the audio_analysis
        danceability = result['danceability']  # danceability of the track, 0.0 - 1.0
        duration_ms = result['duration_ms']  # duration_ms of the track
        energy = result['energy']  # energy of the track, 0.0 - 1.0
        instrumentalness = result['instrumentalness']  # instrumentalness of the track, 0.0 - 1.0
        # key = result['key']  # key of the track, 0 = C, 1 = C♯/D♭, 2 = D,...
        liveness = result['liveness']  # liveness of the track, 0.0 - 1.0
        loudness = result['loudness']  # loudness of the track, the average of the decibel values of the whole track
        # mode = result['mode']  # scale of the track, 0: major, 1: minor
        speechiness = result['speechiness']  # speechiness of the track, 0.0 - 1.0
        tempo = result['tempo']  # tempo of the track, the average of the BPM of the whole track
        # time_signature = result['time_signature']  # time_signature of the track
        valence  = result['valence']  # valence of the track, 0.0 - 1.0
        '''

        return track_features
    
    
    def get_categories(self):
        result = self.sp.categories(limit=50)
        categories = [item['id'] for item in result['categories']['items']]
        return categories
    
    
    def load_featured_playlists(self):
        result = self.sp.featured_playlists(country=self.country, limit=50)
        for playlist in result['playlists']['items']:
            playlist_url = playlist['external_urls']['spotify']
            playlist_api_url = playlist['href']
            playlist_id = playlist['id']
            playlist_name = playlist['name']
            print('playlist_url', playlist_url)
            print('playlist_api_url', playlist_api_url)
            print('playlist_id', playlist_id)
            print('playlist_name', playlist_name)
            print()
        return
    
    
    def get_categorys_playlists(self):
        playlist_ids = []
        for category_id in self.category_ids:
            try:
                result = self.sp.category_playlists(category_id=category_id, country=self.country, limit=50)
            except:
                print(category_id, "Specified id doesn't exist")
                continue
            
            print(category_id, len(result['playlists']['items']))
            for playlist in result['playlists']['items']:
                # playlist_url = playlist['external_urls']['spotify']
                # playlist_api_url = playlist['href']
                playlist_id = playlist['id']
                # playlist_name = playlist['name']
                playlist_ids.append(playlist_id)

        print('LENGTH OF PLAYLISTS IDS', len(playlist_ids))
        return playlist_ids
    
    
    def get_playlist_tracks(self, playlist_ids):
        playlists = [None] * len(playlist_ids)
        
        for i, playlist_id in enumerate(playlist_ids):
            result = self.sp.user_playlist(user='spotify', playlist_id=playlist_id)
            # print('Number of tracks in the playlist', len(result['tracks']['items']))
            playlist = [None] * len(result['tracks']['items'])
            
            for j, track_info in enumerate(result['tracks']['items']):
                if track_info['track'] == None:
                    continue
                track_name = track_info['track']['name']
                track_id = track_info['track']['id']
                artist_name = track_info['track']['artists'][0]['name']
                playlist[j] = track_id
                # print('track id', track_id)
                # print('track name', track_name)
                # print('artist name', artist_name)
                # print()

            playlists[i] = playlist
       
        print(len(playlists))
        print(len(playlists[0]))
        return playlists
    
    
    

In [101]:
class NN:
    def __init__(self, playlists):
        self.playlists = playlists
        self.market = 'JP'
        
        track_variation = []
        for playlist in self.playlists:
            track_variation = list(set(track_variation + playlist))
        
        track_variation = [track for track in track_variation if track != None]
        track_variation.sort()
        
        self.track_variation = len(track_variation)
        
        track2label = {}
        label2track = {}
        for i, track in enumerate(track_variation):
            track2label[track] = i
            label2track[i] = track
        
        self.track2label = track2label
        self.label2track = label2track
        self.log_dir = 'log'
        return
    
    def preprocess(self):

        y_dim = np.empty((self.track_variation))
        X_dim = np.empty((10))
        print(X_dim.shape)
        print(y_dim.shape)
        X = []
        y = []
        
        spotifyapi = SpotifyAPI()
        for playlist in self.playlists:
            playlist = [p for p in playlist if p != None]
            
            track_features = spotifyapi.search_track_features(playlist)
            
            for input_track, track_feature in zip(playlist, track_features):
                target_tracks = list(set(playlist) - set([input_track]))
                
                # print(input_track)
                # print(type(input_track))
                # acousticness, danceability, duration_ms, energy, instrumentalness, liveness, loudness, speechiness, tempo, valence = spotifyapi.search_track_features(input_track)
                if sum(track_feature.values()) == 0:
                    continue
                
                # input_features = np.array((acousticness, danceability, duration_ms, energy, instrumentalness, liveness, loudness, speechiness, tempo, valence))
                # input_features = np.array((track_feature.values()))
                input_features = np.array(list(track_feature.values()))
                output_label = np.zeros((len(y_dim)), dtype='int8')
                for target_track in target_tracks:
                    output_label[self.track2label[target_track]] = 1
                X.append(input_features)
                y.append(output_label)
        
        X = np.array(X)
        y = np.array(y)
        # print(X.shape)
        # print(y.shape)
        return X, y
    
    
    def binary_loss(y_true, y_pred):
        bce = K.binary_crossentropy(y_true, y_pred)
        return K.sum(bce, axis=-1)
    
    
    def total_acc(y_true, y_pred):
        pred = K.cast(K.greater_equal(y_pred, 0.5), "float")
        flag = K.cast(K.equal(y_true, pred), "float")
        return K.prod(flag, axis=-1)
    
    
    def binary_acc(y_true, y_pred):
        pred = K.cast(K.greater_equal(y_pred, 0.5), "float")
        flag = K.cast(K.equal(y_true, pred), "float")
        return K.mean(flag, axis=-1)
    
    
    def define_nn(self, input_dim, output_dim):
        model = Sequential()
        model.add(Dense(32, input_dim = input_dim, activation='relu', name = 'fully_connected-1'))
        model.add(Dropout(0.5))
        model.add(Dense(24, activation='relu', name = 'fully_connected-2'))
        model.add(Dense(output_dim, activation='sigmoid', name = 'output_layer'))
        print(model.summary())
        
        model.compile(
            loss=NN.binary_loss, 
            optimizer=keras.optimizers.Adam(lr=0.0001), 
            metrics=[NN.total_acc, NN.binary_acc]
                                 )
        return model
    

    def train_nn(self, X, y):
        '''preprocess'''
        transformer = MinMaxScaler()
        X[:, 2] = transformer.fit_transform(X[:, 2].reshape(-1, 1)).reshape(len(X[:, 2])) # duration_ms
        transformer = MinMaxScaler()
        X[:, 6] = transformer.fit_transform(X[:, 6].reshape(-1, 1)).reshape(len(X[:, 6])) # loudness
        transformer = MinMaxScaler()
        X[:, 8] = transformer.fit_transform(X[:, 8].reshape(-1, 1)).reshape(len(X[:, 8])) # tempo
        
        np.random.seed(2019)
        p = np.random.permutation(len(X))
        X = X[p]
        y = y[p]
        print(X.shape)
        print(y.shape)
        
        model = NN.define_nn(self, input_dim=X.shape[-1], output_dim=y.shape[-1])
        os.makedirs(self.log_dir, exist_ok = True)
        plot_model(model, to_file='{0}/model.png'.format(self.log_dir))
        path_save_csv_history = '{0}/csv_history/'.format(self.log_dir)
        path_save_png_history = '{0}/png_history/'.format(self.log_dir)
        os.makedirs(path_save_csv_history, exist_ok = True)
        os.makedirs(path_save_png_history, exist_ok = True)
        callbacks = []
        callbacks.append(CSVLogger('{0}/history.csv'.format(self.log_dir)))
        callbacks.append(keras.callbacks.EarlyStopping(monitor='val_loss', min_delta=0, patience=5, verbose=0, mode='auto'))
        
        fit = model.fit(
            X, y, 
            batch_size=32, 
            epochs=1, 
            verbose=2, 
            validation_split=0.2, 
            callbacks=callbacks
        )
        
        # 最終的な重みを保存
        model.save_weights('{0}/trained_weights.h5'.format(self.log_dir))
        
        # Plot train-loss/val-loss and train-accuracy/val-accuracy history
        fig, (axL, axR) = plt.subplots(ncols = 2, figsize = (10, 4))
        axL.plot(fit.history['loss'],label="loss for training")
        axL.plot(fit.history['val_loss'],label="loss for validation")
        axL.set_title('model loss')
        axL.set_xlabel('epoch')
        axL.set_ylabel('loss')
        axL.legend(loc='upper right')
        
        axR.plot(fit.history['binary_acc'],label="binary accuracy for training")
        axR.plot(fit.history['val_binary_acc'],label="binary accuracy for validation")
        axR.set_title('model binary accuracy')
        axR.set_xlabel('epoch')
        axR.set_ylabel('binary accuracy')
        axR.legend(loc='upper right')
        
        fig.savefig('{0}/hitsoty.png'.format(self.log_dir))
        plt.close()
        
        K.clear_session()
        return
    
    def predict(self, search_track, search_artist):
        
        spotifyapi = SpotifyAPI()
        track_id, track_name, artist_name, preview_url, album_name, album_image_url, spotify_track_url = \
            spotifyapi.search_track(search_track=search_track, search_artist=search_artist)
        
        track_features = spotifyapi.search_track_features([track_id])
        track_feature = np.array(list(track_features[0].values()))
        
        model = load_model('{0}/trained_weights.h5'.format(self.log_dir))
        intermediate_layer_model = Model(inputs = model.input,
                                 outputs = model.get_layer('output_layer').output)
        
        intermediate_output = intermediate_layer_model.predict(track_feature)
        ranking = np.argsort(intermediate_output)[::-1][:10]
        ranking = [self.label2track[r] for r in ranking]
        
        for track_id in ranking:
            result = spotifyapi.tracks(tracks=track_id, market=self.market)

In [102]:
'''
spotifyapi = SpotifyAPI()
playlist_ids = spotifyapi.get_categorys_playlists()
playlists = spotifyapi.get_playlist_tracks(playlist_ids)
'''
nn = NN(playlists)
X, y = nn.preprocess()
nn.train_nn(X, y)

(10,)
(51392,)
retrying ...1secs
retrying ...1secs
retrying ...1secs
retrying ...1secs
retrying ...1secs
retrying ...1secs
retrying ...1secs
retrying ...1secs
retrying ...1secs
retrying ...1secs
retrying ...2secs
retrying ...1secs
retrying ...1secs
retrying ...1secs
retrying ...1secs
retrying ...1secs
retrying ...1secs
retrying ...1secs
retrying ...1secs
retrying ...1secs
retrying ...1secs
(78422, 10)
(78422, 51392)
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
fully_connected-1 (Dense)    (None, 32)                352       
_________________________________________________________________
dropout_7 (Dropout)          (None, 32)                0         
_________________________________________________________________
fully_connected-2 (Dense)    (None, 24)                792       
_________________________________________________________________
output_layer (Dense)         (None, 51392)          

In [39]:
def predict(search_track, search_artist):
    spotifyapi = SpotifyAPI()
    track_id, track_name, artist_name, preview_url, album_name, album_image_url, spotify_track_url = \
        spotifyapi.search_track(search_track=search_track, search_artist=search_artist)

    track_features = spotifyapi.search_track_features([track_id])
    track_feature = np.array(np.array([list(track_features[0].values())]))
    
    model = model_from_json(open('{0}/model.json'.format('log')).read())
    model.load_weights('{0}/trained_weights.h5'.format('log'))
    intermediate_layer_model = Model(inputs = model.input,
                             outputs = model.get_layer('output_layer').output)

    print(track_feature.shape)
    intermediate_output = intermediate_layer_model.predict(track_feature)
    # intermediate_output = model.predict(track_feature)
    print(intermediate_output.shape)
    print(sum(intermediate_output[0]))
    print(max(intermediate_output[0]))
    print(min(intermediate_output[0]))
    print(list(intermediate_output[0]).count(1.0))
    print(list(intermediate_output[0]).index(1.0))
    ranking = np.argsort(intermediate_output)[::-1][:10]
    ranking = [self.label2track[r] for r in ranking]

    for track_id in ranking:
        result = spotifyapi.tracks(tracks=track_id, market=self.market)
        pprint.pprint(result)
        return

In [40]:
predict('mercy', 'muse')

track id 2qkmPUG7ARsRwhVICQVwQS
track name Mercy
track artist Muse



FileNotFoundError: [Errno 2] No such file or directory: 'log/model.json'

In [54]:
client_id = '036090cb7286459cbf5e410609ba5a98'
client_secret = 'ac269346e2b2477390237f200ff90e38'
client_credentials_manager = spotipy.oauth2.SpotifyClientCredentials(client_id, client_secret)
sp = spotipy.Spotify(client_credentials_manager=client_credentials_manager)
results = sp.tracks(['2qkmPUG7ARsRwhVICQVwQS', '5aSCCMFdPVgq9u0d2SD8T9'])
# result = sp.tracks(['5aSCCMFdPVgq9u0d2SD8T9'])
for result in results['tracks']:
    # pprint.pprint(result)
    track_name = result['name']
    artist_name = result['artists'][0]['name']
    track_url = result['external_urls']['spotify']
    image_url = result['album']['images'][0]['url']
    print(track_name, artist_name, track_url, image_url)
    print()

Mercy Muse https://open.spotify.com/track/2qkmPUG7ARsRwhVICQVwQS https://i.scdn.co/image/849eecf3c9df835181c2970c435ac2d008346ea3

I Still See Your Face San Holo https://open.spotify.com/track/5aSCCMFdPVgq9u0d2SD8T9 https://i.scdn.co/image/d16e37fb255f0cc00a27d8d2b1ae965f006724d7



In [9]:
spotifyapi = SpotifyAPI()

In [10]:
spotifyapi.search_track()

'https://open.spotify.com/track/2Z8WuEywRWYTKe1NybPQEW'


In [1]:
len(['family', 'party', 'indie_alt', 'metal', 'rock', 'rnb', 'reggae', 'karaoke', 'word',\
                               'country', 'malaypop', 'afro', 'soul', 'travel', 'romance', 'mood', 'pride', 'classical',\
                               'j_tracks', 'summer', 'comedy', 'toplists', 'popindo', 'inspirational', 'sleep', 'opm',\
                               'kids', 'roots', 'funk', 'jazz', 'desi', 'edm_dance', 'chill', 'punk', 'decades', 'gaming',\
                               'dinner', 'test_latin', 'popculture', 'hiphop', 'blues', 'kpop', 'brazilian', 'mandopop',\
                               'cantopop', 'pop', 'focus', 'arab', 'workout'])

49