# Using a Neural Network to Predict Record Producer from Featurized Audio Data

## Background

Spotify's Audio Analysis contains a feature called `timbre` which contains information about the qualities of sound that are not found in pitch. From Spotify

>*Timbre is the quality of a musical note or sound that distinguishes different types of musical instruments, or voices. It is a complex notion also referred to as sound color, texture, or tone quality, and is derived from the shape of a segment’s spectro-temporal surface, independently of pitch and loudness. The timbre feature is a vector that includes 12 unbounded values roughly centered around 0. Those values are high level abstractions of the spectral surface, ordered by degree of importance.*

I believe that a producer's ***Signature Sound*** can be found in these timbre vectors.

I will use `TensorFlow.keras` to create a Convolutional Neural Network that will categorically predict record producer from audio snippets.

In [8]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Builds the neural network model
"""

# Standard Imports
from matplotlib.pyplot import imread, imshow
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import datasets, layers, models
import keras.backend as K
from tensorflow.keras.constraints import min_max_norm, non_neg
# import kernels
# from artist import CustomImage, ImageBundle
import pickle
import sys
import os

In [9]:
# Load MongoDB

from pymongo import MongoClient
client = MongoClient()
# Access/Initiate Database
db = client['producer_db']
# Access/Initiate Table
tab = db['songs']
collection = db.tab

# Authorize Spotify API

import spotipy
from spotipy.oauth2 import SpotifyClientCredentials

client_id = os.environ['SPOTIFY_CLIENT_ID']
client_secret = os.environ['SPOTIFY_CLIENT_SECRET']
client_credentials_manager = SpotifyClientCredentials(client_id=client_id, client_secret=client_secret)
sp = spotipy.Spotify(client_credentials_manager=client_credentials_manager)

In [10]:
model = models.Sequential()
model.add(layers.Flatten())
model.add(layers.Dense(64, activation='relu'))
# model.add(layers.Dense(32, activation='relu'))
# model.add(layers.Dense(12, activation='relu'))
model.add(layers.Dense(7, activation='softmax'))

# model.add(layers.Conv2D(32, (3, 3), activation='relu', input_shape=(28, 28, 1)))
# model.add(layers.MaxPooling2D((2, 2)))
# model.add(layers.Conv2D(64, (3, 3), activation='relu'))
# model.add(layers.MaxPooling2D((2, 2)))
# model.add(layers.Conv2D(64, (3, 3), activation='relu'))

# model.add(layers.Flatten())
# model.add(layers.Dense(64, activation='relu'))
# model.add(layers.Dense(10, activation='softmax'))

In [14]:
a = np.array([[1,1,1],[2,2,2],[3,3,3]])

b = np.concatenate([a,a,a], axis = 0)
c = np.concatenate([a,a,a], axis = 1)
d = np.stack([a,a,a], axis = 2)

a.shape, b.shape, c.shape, d.shape

((3, 3), (9, 3), (3, 9), (3, 3, 3))

In [16]:
gm = collection.find({'producer':'George Martin'})
rr = collection.find({'producer':'Rick Rubin'})

In [18]:
collection.distinct('producer')

['George Martin',
 'Dr. Dre',
 'Rick Rubin',
 'Brian Eno',
 'Stock Aitken Waterman',
 'Paul Epworth',
 'Pete Rock']

In [19]:
from bson.son import SON
pipeline = [
    {"$unwind": "$producer"},
    {"$group": {"_id": "$producer", "count": {"$sum": 1}}},
    {"$sort": SON([("count", -1), ("_id", -1)])}
]
import pprint
pprint.pprint(list(collection.aggregate(pipeline)))

[{'_id': 'Rick Rubin', 'count': 2039},
 {'_id': 'Dr. Dre', 'count': 1498},
 {'_id': 'George Martin', 'count': 1420},
 {'_id': 'Pete Rock', 'count': 1034},
 {'_id': 'Brian Eno', 'count': 924},
 {'_id': 'Paul Epworth', 'count': 478},
 {'_id': 'Stock Aitken Waterman', 'count': 436}]


### Create timbre vectors and a target vector with 200 songs from each producer. Create test vectors with 100 songs from each producer.

In [22]:
def make_timbre_train_test(collection, train_size, test_size):
    
    producers = collection.distinct('producer')
    timbre_train = []
    timbre_test = []
    target_train = []
    target_test = []
    for producer in producers:
        train_count = 0
        test_count = 0
        for song in collection.find({'producer':producer}):
            try:
                # Add data to training set
                if train_count < train_size:
                    song_timbre_segments = []
                    #songs must have at least 80 segments to be in the analysis
                    if len((song['audio_analysis']['segments'])) >= 80:
                        for i in range(80):
                            song_timbre_segments.append(song['audio_analysis']['segments'][i]['timbre'])
                        song_timbre_vector = np.concatenate(song_timbre_segments, axis=0)
                    timbre_train.append(song_timbre_vector)
                    target_train.append(song['producer'])
                    train_count += 1

                # Add data to test set
                elif test_count < test_size:
                    song_timbre_segments = []
                    #songs must have at least 80 segments to be in the analysis
                    if len((song['audio_analysis']['segments'])) >= 80:
                        for i in range(80):
                            song_timbre_segments.append(song['audio_analysis']['segments'][i]['timbre'])
                        song_timbre_vector = np.concatenate(song_timbre_segments, axis=0)
                    timbre_train.append(song_timbre_vector)
                    target_train.append(song['producer'])
                    test_count += 1

                else:
                    continue
            
            except:
                pass
                
            
    return (timbre_train, timbre_test, target_train, target_test)
            

In [23]:
timbre_train, timbre_test, target_train, target_test = make_timbre_train_test(collection, 10, 5)

In [25]:
np.array(timbre_train).shape

(8020, 960)

In [24]:
timbre_train.shape(), timbre_test.shape(), target_train.shape(), target_test.shape()

AttributeError: 'list' object has no attribute 'shape'

In [None]:
# Define a simple triangular kernel and kernel constraints
kernel_tri = tf.constant_initializer(kernels.triangle_5())
kernel_const = min_max_norm(0.001, None, rate=1, axis=0)
kernel_nonneg = non_neg()

# TensorFlow expects 4D tensors of shape (samples, rows, cols, channels)
# Note that the first index (the sample index out of the batch) is stripped
model = keras.Sequential([
        # Maxpool the image
        keras.layers.MaxPool2D(
            input_shape=(512, 512, 1),
            pool_size=2,
            padding='same',
            data_format='channels_last'),

        # Convolve the pooled image by the shape kernel(s)
        # ??? Use LocallyConnected2D instead?
        keras.layers.Conv2D(
            filters=5,
            kernel_size=(8, 8),
            strides=(8, 8),
            padding='same',
            data_format='channels_last',
            activation='sigmoid',
            use_bias=True),
            # ??? kernel_initializer=kernel_tri,
            # kernel_constraint=kernel_nonneg),
        keras.layers.Conv2D(
            filters=5,
            kernel_size=(8, 8),
            strides=(8, 8),
            padding='same',
            data_format='channels_last',
            activation='sigmoid',
            use_bias=True),
        # Flatten
        keras.layers.Flatten(),

        # Basic Dense layer
        keras.layers.Dense(
            units=25,
            activation=None,
            # kernel_constraint=kernel_nonneg,
            use_bias=True),

        # Activation layer
        keras.layers.PReLU(),

        # Reshape & output
        keras.layers.Reshape((5, 5))
        ])

# Define optimizer
optimizer = keras.optimizers.Adadelta()

# Compile the model
model.compile(
    optimizer=optimizer,
    loss='mean_squared_error',
    metrics=['mean_squared_error'])


if (__name__ == '__main__'):
    assert len(sys.argv) == 3, 'Pass me both the training and save filepaths!'
    # XXX Testing constants - Remove
    try:
        TRAINING_SET = sys.argv[1]
        SAVE_PATH = sys.argv[2]
    except IndexError:
        print('Pass me both the training set and save filepaths!')
        TRAINING_SET = '../data/train_set_01.pkl'
        SAVE_PATH = '../models/saved_model_01.h5'
#        sys.exit()

    # Load the training set from the pickled ImageBundle
    train_bundle = pickle.load(open(TRAINING_SET, 'rb'))
    train_X = train_bundle.images
    train_y = train_bundle.tri_list

    # IN: (samples, rows, cols, channels)
    IN_SHAPE = train_X.shape
    # OUT: (samples, shape_idx, shape_attrs, channels)
    OUT_SHAPE = train_y.shape
    # Initialize the training set

    # Fit the model to the training ImageBundle
    model.fit(
        train_X,
        train_y[:, :, :, 0],
        epochs=50,
        verbose=1,
        batch_size=5)

    # Write model config to YAML
    model_yaml = model.to_yaml()
    with open('../models/model_config.yaml', 'w') as yaml_file:
        yaml_file.write(model_yaml)

    # Save model
    model.save(SAVE_PATH, overwrite=True, include_optimizer=True)
    print('\nModel saved at: %s' % SAVE_PATH)