# Spectrogram Classifier

## Imports

In [2]:
import tensorflow.python.keras
import numpy as np
from tensorflow.keras.layers import Conv2D, MaxPooling2D
from tensorflow.keras.layers import Activation, Dropout, Flatten, Dense
from tensorflow.python.keras.models import Sequential
import h5py
import data_parser
import sys
import os
from pathlib import Path
import subprocess
import queue
import threading
import time
import _thread
import librosa.core
import matplotlib
import matplotlib.pyplot as plt
import itertools
import imageio
import numpy as np
from tensorflow.keras.preprocessing.image import ImageDataGenerator, array_to_img, img_to_array, load_img
from tensorflow.keras.datasets import cifar10
from tensorflow.keras.utils import to_categorical
from IPython.display import Image, display
from tensorflow.keras.callbacks import EarlyStopping

## Constants and Utilities

In [3]:
genres = ['blues', 'classical', 'country', 'disco', 'hiphop', 'jazz', 'metal', 'pop', 'reggae', 'rock']

IMAGE_HIGHT = data_parser.IMAGE_HIGHT
IMAGE_WIDTH = data_parser.IMAGE_WIDTH
IMAGE_CHANNELS = data_parser.IMAGE_CHANNELS
IMAGE_SLICES = data_parser.IMAGE_SLICES

def nowStr(): return time.strftime("%Y-%m-%d_%H-%M-%S")

In [4]:
# Loss Curves
def plot_loss_curve(history):
    plt.figure(figsize=[8,6])
    plt.plot(history.history['loss'],'r',linewidth=3.0)
    plt.plot(history.history['val_loss'],'b',linewidth=3.0)
    plt.legend(['Training loss', 'Validation Loss'],fontsize=18)
    plt.xlabel('Epochs ',fontsize=16)
    plt.ylabel('Loss',fontsize=16)
    plt.title('Loss Curves',fontsize=16)
    plt.show()

# Accuracy Curves
def plot_acc_curve(history):
    plt.figure(figsize=[8,6])
    plt.plot(history.history['acc'],'r',linewidth=3.0)
    plt.plot(history.history['val_acc'],'b',linewidth=3.0)
    plt.legend(['Training Accuracy', 'Validation Accuracy'],fontsize=18)
    plt.xlabel('Epochs ',fontsize=16)
    plt.ylabel('Accuracy',fontsize=16)
    plt.title('Accuracy Curves',fontsize=16)
    plt.show()

## Collect the Data

## Binary Model per Genre

In [None]:
training_data, training_data_labels = data_parser.load_png_training_data()
training_data_labels_onehot = to_categorical(training_data_labels)

for i in range(0,9000,450):
    print(genres[training_data_labels[i,0]])
    display(array_to_img(training_data[i]))

### Build Model

In [None]:
genre_models = []
for genre in genres:
    model = Sequential()
    model.add(Conv2D(filters=32, 
                     kernel_size=(2,6),
                     strides=(1,3),
                     input_shape=(IMAGE_HIGHT,
                                  IMAGE_WIDTH,
                                  IMAGE_CHANNELS),
                     data_format="channels_last"))
    model.add(Activation('relu'))
    model.add(MaxPooling2D(pool_size=(2,2)))

    model.add(Conv2D(32,4,1))
    model.add(Activation('relu'))
    model.add(MaxPooling2D(pool_size=(1,2)))

    model.add(Conv2D(64,1,1))
    model.add(Activation('relu'))
    model.add(MaxPooling2D(pool_size=(2,2)))

    model.add(Conv2D(128,3,3))
    model.add(Activation('relu'))
    model.add(MaxPooling2D(pool_size=(2,1)))

    model.add(Flatten())
    model.add(Dense(64))
    model.add(Activation('relu'))
    model.add(Dropout(0.5))
    model.add(Dense(1))
    model.add(Activation('sigmoid'))

    #TODO: Change `loss='binary_crossentropy'` to something better for 10 class problems.
    model.compile(loss='binary_crossentropy',
                  optimizer='rmsprop',
                  metrics=['accuracy'])
    genre_models.append(model)

### Train Model

In [None]:
for i, history in enumerate(genre_histories):
    print(genres[i])
    plot_loss_curve(history)
    plot_acc_curve(history)

In [None]:
for i, model in enumerate(genre_models):
    model.save_weights(nowStr() + '_Spectrogram_Classifier_Weights_genre{}.h5'.format(i))

### Test Model

In [None]:
# new
genre_histories = []
for i, model in enumerate(genre_models):
    print("Training "+genres[i]+" model.")
    history = model.fit(x=training_data,
                        y=training_data_labels_onehot[:,i],
                        epochs=10,
                        batch_size=100,
                        verbose=1,
                        validation_split=0.50,
                        shuffle=True,
                        class_weight = {0: 1., 1: 9.},
                        callbacks=[EarlyStopping(monitor='val_loss',
                                   min_delta=0,
                                   patience=2,
                                   verbose=0, mode='auto')])
    genre_histories.append(history)
    plot_loss_curve(history)
    plot_acc_curve(history)
    print(model.evaluate(training_data, training_data_labels_onehot[:,i]))

In [None]:
def classify(instance, by_ordering=False):
    best = (0, 0)
    if by_ordering:
        temp = [(i, model.predict(instance)[0,0]) for i, model in enumerate(genre_models)]
        list.sort(temp, key=lambda x: x[1], reverse=True)
        return temp
    for i, model in enumerate(genre_models):
        temp = model.predict(instance)[0,0]
        if best[1] < temp:
            best = (i, temp)
    return best[0]

def ranked_choice_election(instances):
    ballots = [classify(instances[i:i+1], by_ordering=True) for i in range(10)]
    winner = None
    counts = {genre: 0 for genre in range(10)}
    numCounting = 1
    while winner is None:
        counts = {k: 0 for k in counts.keys()}
        for ballot in ballots:
#             print('ballot')
            numCounted = 0
            for genre, conf in ballot:
#                 print("{} with {}".format(genre, conf))
                if genre in counts.keys():
                    counts[genre] += 1
                    numCounted += 1
                    if numCounted >= numCounting:
                        break
        for k, v in list(counts.items()):
            if v == 0:
                del counts[k]
        if max(counts.values()) >= 8:
            winner = max(counts.items(), key=lambda x: x[1])
        else:
            loser = min(counts.items(), key=lambda x: x[1])
#             print("Dropping {}".format(loser))
            del counts[loser[0]]
    return winner
            
        

In [None]:
correct = 0
incorrect = 0

for i in range(0,9000,10):
    winner = ranked_choice_election(training_data[i:i+10])
#     print("{} -> {} with {} votes".format(training_data_labels[i,0], winner[0], winner[1]))
    if winner[0] == training_data_labels[i,0]:
        correct += 1
    else:
        incorrect += 1
print("Accuracy: {}".format(correct/(correct+incorrect)))

## Model Based on Trevor's Model

### Build Model

In [None]:
model = Sequential()
model.add(Flatten(input_shape=(IMAGE_HIGHT,
                               IMAGE_WIDTH,
                               IMAGE_CHANNELS),
                  data_format="channels_last"))
model.add(Dense(128, activation=tensorflow.nn.relu))
model.add(Dropout(0.30))
model.add(Dense(128, activation='relu'))
model.add(Dense(10, activation='softmax'))
model.compile(optimizer="nadam", loss="categorical_crossentropy", metrics=['accuracy'])

### Train Model

In [None]:
history = model.fit(x=training_data,
                    y=training_data_labels_onehot,
                    epochs=30,
                    batch_size=100,
                    verbose=1,
                    validation_split=0.15,
                    shuffle=True)
plot_loss_curve(history)
plot_acc_curve(history)

### Test Model

In [None]:
model.evaluate(training_data, training_data_labels_onehot)

## Big Model

### Build Model

In [None]:
model = Sequential()
model.add(Conv2D(filters=8, 
                 kernel_size=2,
                 strides=1,
                 input_shape=(IMAGE_HIGHT,
                              IMAGE_WIDTH,
                              IMAGE_CHANNELS),
                 data_format="channels_last"))
model.add(Activation('relu'))
model.add(MaxPooling2D(pool_size=2))

model.add(Conv2D(16,2,1))
model.add(Activation('relu'))
model.add(MaxPooling2D(pool_size=2))

model.add(Conv2D(32,2,1))
model.add(Activation('relu'))
model.add(MaxPooling2D(pool_size=2))

model.add(Conv2D(64,2,1))
model.add(Activation('relu'))
model.add(MaxPooling2D(pool_size=2))

model.add(Flatten())
model.add(Dense(1024))
model.add(Activation('relu'))
model.add(Dropout(0.5))
model.add(Dense(10, activation='softmax'))

#TODO: Change `loss='binary_crossentropy'` to something better for 10 class problems.
model.compile(loss='categorical_crossentropy',
              optimizer='nadam',
              metrics=['accuracy'])
genre_models.append(model)

### Train Model

In [None]:
history = model.fit(x=training_data,
                    y=training_data_labels_onehot,
                    epochs=30,
                    batch_size=100,
                    verbose=1,
                    validation_split=0.15,
                    shuffle=True)
plot_loss_curve(history)
plot_acc_curve(history)

### Test Model

In [None]:
model.evaluate(training_data, training_data_labels_onehot)