In [3]:
DIR_PLAYLIST_FEATURES = './playlist_features/'

In [4]:
import os
import numpy as np
import pickle
import sys
import matplotlib.pyplot as plt

In [None]:
def save_pickle(file_path_without_extension, data, feedback=True):
    """
    Save data to a pickle file
    :param file_path_without_extension: path of save location with .pkl omitted
    :param data: data to be saved
    """
    with open(file_path_without_extension + '.pkl', 'wb') as f:
        pickle.dump(data, f)

    if feedback:
        print('Done pickling %s' % file_path_without_extension)

In [None]:
def load_pickle(file_path, feedback=False):
    """
    Load data from pickle file
    :param file_path: path to .pkl file
    :param feedback: if True print messages indicating when loading starts and finishes
    :return: data from pickle file
    """
    if feedback:
        print('Loading %s', file_path)

    with open(file_path, 'rb') as f:
        data = pickle.load(f)

    if feedback:
        print('Done loading pickle')

    return data


In [None]:
# Print iterations progress
def print_progress(iteration, total, prefix='', suffix='', decimals=1, barLength=100):
    """
    source: http://stackoverflow.com/questions/3173320/text-progress-bar-in-the-console

    Call in a loop to create terminal progress bar
    @params:
        iteration   - Required  : current iteration (Int)
        total       - Required  : total iterations (Int)
        prefix      - Optional  : prefix string (Str)
        suffix      - Optional  : suffix string (Str)
        decimals    - Optional  : positive number of decimals in percent complete (Int)
        barLength   - Optional  : character length of bar (Int)
    """
    formatStr = "{0:." + str(decimals) + "f}"
    percent = formatStr.format(100 * (iteration / float(total)))
    filledLength = int(round(barLength * iteration / float(total)))
    bar = 'X' * filledLength + '-' * (barLength - filledLength)
    sys.stdout.write('\r%s |%s| %s%s %s' % (prefix, bar, percent, '%', suffix)),
    if iteration == total:
        sys.stdout.write('\n')
    sys.stdout.flush()

In [None]:
def plot_training_history(training_history):
    """

    :param training_history:

    :type training_history: dict
    :return:
    """
    # print(training_history)

    metrics = list(training_history.keys())
    metric_history = list(training_history.values())
    plt.figure(figsize=(14, 14))

    for i in range(len(metrics)):
        plt.plot(metric_history[i], label=metrics[i])

    plt.ylabel('Metric value')
    plt.xlabel('Epoch')
    plt.legend()
    plt.show()
    print

In [None]:
def save_track_lists():
    genres = os.listdir(DIR_PLAYLIST_FEATURES)
    for genre_name in genres:
        genre_dir = DIR_PLAYLIST_FEATURES + genre_name + '/'
        with open(genre_dir + 'track_list.txt', 'w') as f:
            f.write('Genre: %s\n' % genre_name)
            playlist_file_names = os.listdir(genre_dir)
            for playlist_name in playlist_file_names:
                if playlist_name != 'track_list.txt':
                    f.write('\n\nPlaylist name: %s\n\n' % playlist_name.strip('.pkl'))
                    playlist_path = genre_dir + playlist_name
                    tracks = load_pickle(playlist_path)

                    # print('"%s",' % playlist_path)
                    for track_obj in tracks:
                        for artist in track_obj['artists']:
                            f.write('%s, ' % artist)
                        f.write("- %s\n" % track_obj['name'])


In [None]:
def load_songs_from_playlists(playlist_file_paths):
    '''

    :param playlist_file_paths: List of file paths (without extension) to playlists

    :type playlist_file_paths: List
    :return: List of songs dicts, with from provided playlists
    '''

    total_number_of_songs = 0
    result = []
    count = 0
    print("Unpickling playlists")
    for playlist_path in playlist_file_paths:
        playlist = load_pickle(playlist_path)
        total_number_of_songs += len(playlist)
        result += playlist

        count += 1
        print_progress(count, len(playlist_file_paths))

    return result

In [None]:
def label_songs(X, label):
    """

    :param X: Input features samples
    :param label: 1 for likeable songs and 0 for annoying songs
    :return:
    """
    return [label for _ in X]

In [None]:
def split_dataset(data, labels, validation_proportion, test_proportion):
    """

    :param data:
    :param labels:
    :param validation_proportion: Proportion of data to use for validation. Value range: [0, 1]. 1 means 100%
    :param test_proportion: Proportion of data to use for test. Value range: [0, 1]. 1 means 100%
    :return:
    """
    np.random.seed(4)
    # shuffle and split the data into a training set, validation set and test set
    indices = np.arange(data.shape[0])
    np.random.shuffle(indices)
    data = data[indices]
    labels = labels[indices]

    nb_validation_samples = int(validation_proportion * data.shape[0])
    nb_test_samples = int(test_proportion * data.shape[0])

    x_train = data[:-nb_validation_samples - nb_test_samples]
    y_train = labels[:-nb_validation_samples - nb_test_samples]

    x_val = data[-nb_validation_samples - nb_test_samples:-nb_test_samples]
    y_val = labels[-nb_validation_samples - nb_test_samples:-nb_test_samples]

    x_test = data[-nb_test_samples:]
    y_test = labels[-nb_test_samples:]

    return x_train, y_train, x_val, y_val, x_test, y_test

In [None]:
def remove_duplicates(tracks):
    """
    Removes duplicate tracks
    :param tracks: List of tracks
    :return: List of tracks without duplicates
    """
    tracks_without_duplicates = []
    duplicates = 0
    for track_obj in tracks:
        duplicate_found = False
        for t in tracks_without_duplicates:
            if track_obj['artists'] == t['artists'] and track_obj['name'].lower() == t['name'].lower():
                duplicate_found = True
                duplicates += 1

        if not duplicate_found:
            tracks_without_duplicates.append(track_obj)

    # print("Found %i duplicates" % duplicates)
    return tracks_without_duplicates


In [None]:
def remove_training_set_tracks(training_set_list, world_songs):
    tracks_without_overlap = []
    track_overlaps = 0
    for track_obj in world_songs:
        duplicate_found = False
        for t in training_set_list:
            if track_obj['artists'] == t['artists'] and track_obj['name'] == t['name']:
                duplicate_found = True
                track_overlaps += 1

        if not duplicate_found:
            tracks_without_overlap.append(track_obj)

    print("Number of training set tracks in world songs: %s" % track_overlaps)

    return tracks_without_overlap

In [None]:
def pre_process_cluster_data(audio_features):
    """
    Preprocesses the data.
    :param audio_features: All audio_features gathered. List of dictionaries
    :return: Three lists. The first one is the features, the two next ones are the corresponding songname and artist.
    """
    X_data, X_songname, X_artist = [], [], []
    
    import importlib
    import features_config
    importlib.reload(features_config)
    
    total_features = features_config.get_features()

    for a in audio_features:

        new_list = [a['audio_features'][b] for b in a['audio_features'] if not isinstance(a['audio_features'][b], str)
                    and b in total_features]

        X_data.append(new_list)
        X_songname.append(a['name'])
        X_artist.append(a['artists'])

    return X_data, X_songname, X_artist



In [6]:
def filter_features(tracks):
    """

    :param tracks: List of track dictionaries containing audio features
    :return: List X containing feature vectors for each sample, and list Y with labels 0 or 1
    """
    import importlib
    import features_config
    importlib.reload(features_config)
    
    features = features_config.get_features()

    X = []

    for track_obj in tracks:
        X.append([track_obj['audio_features'][b] for b in track_obj['audio_features']
                       if not isinstance(track_obj['audio_features'][b], str) and b in features])

    return X


In [None]:
def plot(data, tot_features, avg):
    """
    Plots data
    :param data: data to plot
    :param tot_features: all features you have chosen
    :param avg: flag if it is standard deviation or average data we are plotting
    :return:
    """
    # Finds the length of the features
    len_tot_features = np.arange(len(tot_features))

    # Various plotting parameters
    plt.figure(figsize=(20, 20))
    plt.bar(len_tot_features, data)
    plt.xticks(len_tot_features, tot_features)
    if avg:
        plt.title("Average of the chosen features for all your songs!")
    else:
        plt.title("Standard Deviation of the chosen features for all your songs!")

    for i, v in enumerate(data):
        plt.text(i - 0.2, v + 0.3, "%.2f" % v, color='black', fontweight='bold')

    plt.show()

In [None]:
def calculate_average(X_data):
    return np.matrix(X_data).mean(0)


def calculate_standard_deviation(X_data):
    return np.matrix(X_data).std(0)

In [None]:
def print_kmeans(Z, xx, yy, reduced_data, X_songname, X_artist, X_labels,  kmeans, x_min, x_max, y_min, y_max):
    """
    Visualize the kmeans clustering algorithm.
    You do not need to do anything in here.
    """

    fig = plt.figure(figsize=(7, 7))

    plt.clf()
    plt.imshow(Z, interpolation='nearest',
               extent=(xx.min(), xx.max(), yy.min(), yy.max()),
               cmap=plt.cm.Paired,
               aspect='auto', origin='lower')

    ax = fig.add_subplot(111)
    sc = ax.scatter(reduced_data[:, 0], reduced_data[:, 1], s=8, c=X_labels)

    # Plot the centroids as a white X
    centroids = kmeans.cluster_centers_
    plt.scatter(centroids[:, 0], centroids[:, 1],
                marker='x', s=169, linewidths=3,
                color='w', zorder=10)

    plt.title('K-means clustering on your spotify playlist (PCA-reduced data)\n'
              'Centroids are marked with white cross')

    plt.xlim(x_min, x_max)
    plt.ylim(y_min, y_max)
    plt.xticks(())
    plt.yticks(())

    annot = plt.annotate("", xy=(0, 0), xytext=(20, 20), textcoords="offset points",
                         bbox=dict(boxstyle="round", fc="w"),
                         arrowprops=dict(arrowstyle="->"))

    annot.set_visible(False)

    def update_annot(ind):

        pos = sc.get_offsets()[ind["ind"][0]]
        annot.xy = pos

        artist = ""
        for a in X_artist[ind['ind'][0]]:
            artist += a

        songname = X_songname[ind['ind'][0]]

        annot.set_text(artist + ": " + songname)
        annot.get_bbox_patch().set_facecolor((1, 1, 1))
        annot.get_bbox_patch().set_alpha(1)

    def hover(event):
        vis = annot.get_visible()
        if event.inaxes == ax:
            cont, ind = sc.contains(event)
            if cont:
                update_annot(ind)
                annot.set_visible(True)
                fig.canvas.draw_idle()
            else:
                if vis:
                    annot.set_visible(False)
                    fig.canvas.draw_idle()

    fig.canvas.mpl_connect("motion_notify_event", hover)
    plt.show()


In [None]:
def print_dbscan(labels, reduced_data, n_clusters_, core_samples_mask, X_songname, X_artist):
    """
    Visualize the dbscan clustering algorithm.
    You do not need to do anything in here.
    """

    fig = plt.figure(figsize=(7, 7))
    ax = fig.add_subplot(111)

    sc = ax.scatter(reduced_data[:, 0], reduced_data[:, 1], s=5)

    # Black removed and is used for noise instead.
    unique_labels = set(labels)

    colors = [plt.cm.Spectral(each) for each in np.linspace(0, 1, len(unique_labels))]

    for k, col in zip(unique_labels, colors):
        if k == -1:
            # Black used for noise.
            col = [0, 0, 0, 1]

        class_member_mask = (labels == k)

        xy = reduced_data[class_member_mask & core_samples_mask]
        plt.plot(xy[:, 0], xy[:, 1], 'o', markerfacecolor=tuple(col),
                 markeredgecolor='k', markersize=5)

        xy = reduced_data[class_member_mask & ~core_samples_mask]
        plt.plot(xy[:, 0], xy[:, 1], 'o', markerfacecolor=tuple(col),
                 markeredgecolor='k', markersize=2)

    plt.title('DBSCAN: Estimated number of clusters: %d' % n_clusters_)

    annot = plt.annotate("", xy=(0, 0), xytext=(20, 20), textcoords="offset points",
                         bbox=dict(boxstyle="round", fc="w"),
                         arrowprops=dict(arrowstyle="->"))

    annot.set_visible(False)

    def update_annot(ind):

        pos = sc.get_offsets()[ind["ind"][0]]
        annot.xy = pos

        artist = ""
        for a in X_artist[ind['ind'][0]]:
            artist += a

        songname = X_songname[ind['ind'][0]]

        annot.set_text(artist + ": " + songname)
        annot.get_bbox_patch().set_facecolor((1, 1, 1))
        annot.get_bbox_patch().set_alpha(1)

    def hover(event):
        vis = annot.get_visible()
        if event.inaxes == ax:
            cont, ind = sc.contains(event)
            if cont:
                update_annot(ind)
                annot.set_visible(True)
                fig.canvas.draw_idle()
            else:
                if vis:
                    annot.set_visible(False)
                    fig.canvas.draw_idle()

    fig.canvas.mpl_connect("motion_notify_event", hover)

    plt.show()



In [None]:
def print_agglomerative(reduced_data, X_labels, X_songname, X_artist):
    """
    Visualize the hierarchical agglomerative clustering algorithm.
    You do not need to do anything in here.
    """

    fig = plt.figure(figsize=(7, 7))
    ax = fig.add_subplot(111)

    sc = ax.scatter(reduced_data[:, 0], reduced_data[:, 1],
                    c=X_labels, s=8)

    ax.set_title('Agglomerative Clustering')

    annot = plt.annotate("", xy=(0, 0), xytext=(20, 20), textcoords="offset points",
                         bbox=dict(boxstyle="round", fc="w"),
                         arrowprops=dict(arrowstyle="->"))

    annot.set_visible(False)

    def update_annot(ind):

        pos = sc.get_offsets()[ind["ind"][0]]
        annot.xy = pos

        artist = ""
        for a in X_artist[ind['ind'][0]]:
            artist += a

        songname = X_songname[ind['ind'][0]]

        annot.set_text(artist + ": " + songname)
        annot.get_bbox_patch().set_facecolor((1, 1, 1))
        annot.get_bbox_patch().set_alpha(1)

    def hover(event):
        vis = annot.get_visible()
        if event.inaxes == ax:
            cont, ind = sc.contains(event)
            if cont:
                update_annot(ind)
                annot.set_visible(True)
                fig.canvas.draw_idle()
            else:
                if vis:
                    annot.set_visible(False)
                    fig.canvas.draw_idle()

    fig.canvas.mpl_connect("motion_notify_event", hover)
    plt.colorbar(sc)
    plt.show()