In [196]:
import csv
import pandas as pd
import pickle
import soundfile as sf
import librosa
import numpy as np

THRESHOLD = 5
SAMPLE_LENGTH = 1000

STRIDE_SIZE = 10.
FRAME_SIZE = 25.
N_MFCC = 16

from pathlib import Path
home = str(Path.home())
SONG_DIR = home + "/Downloads/songdata_90/songdata/"

def get_mfcc_features(song_data, sample_rate, stride_size = STRIDE_SIZE, frame_size = FRAME_SIZE):
    mfccs = librosa.feature.mfcc(song_data, sample_rate, 
                                 n_mfcc=N_MFCC,
                                 hop_length=int(STRIDE_SIZE / 1000. * sample_rate), 
                                 n_fft=int(FRAME_SIZE / 1000. * sample_rate))
    return mfccs

def load_song(song_file):
    data, samplerate = sf.read(song_file)
    data = data[:, 0]
    mfccs = get_mfcc_features(data, samplerate)
    mfccs = np.asarray(mfccs).T
    return mfccs

def peaks_to_windows_flat(song_name, good_peaks, bad_peaks):
    mfccs = load_song(song_name)
    good_samples = []
    bad_samples = []
    for p in good_peaks:
        n = int((p[0] - (FRAME_SIZE/1000.)) / (STRIDE_SIZE/1000.))
        features = mfccs[n-(SAMPLE_LENGTH//2):n+(SAMPLE_LENGTH//2), :]
        good_samples.append(features)
    for p in bad_peaks:
        n = int((p[0] - (FRAME_SIZE/1000.)) / (STRIDE_SIZE/1000.))
        features = mfccs[n-(SAMPLE_LENGTH//2):n+(SAMPLE_LENGTH//2), :]
        bad_samples.append(features)
    good_samples = np.concatenate(good_samples)
    bad_samples = np.concatenate(bad_samples)
    return good_samples, bad_samples

def peaks_to_windows_mat(song_name, good_peaks, bad_peaks):
    mfccs = load_song(song_name)
    good_samples = []
    bad_samples = []
    for p in good_peaks:
        n = int((p[0] - (FRAME_SIZE/1000.)) / (STRIDE_SIZE/1000.))
        features = mfccs[n-(SAMPLE_LENGTH//2):n+(SAMPLE_LENGTH//2), :]
        good_samples.append(features)
    for p in bad_peaks:
        n = int((p[0] - (FRAME_SIZE/1000.)) / (STRIDE_SIZE/1000.))
        features = mfccs[n-(SAMPLE_LENGTH//2):n+(SAMPLE_LENGTH//2), :]
        bad_samples.append(features)
    good_samples = np.asarray(good_samples)
    bad_samples = np.asarray(bad_samples)
    return good_samples, bad_samples

def get_performance_measures(Y_pred, Y_test):
    diffs = [x == y for x, y in zip(list(Y_pred), list(Y_test))]
    print("Accuracy:", diffs.count(True) / len(diffs))

    recall = [x==1 and y==1 for x, y in zip(Y_pred, Y_test)].count(True) / list(Y_test).count(1)
    print("Recall:", recall)

    precision = [x==1 and y==1 for x, y in zip(Y_pred, Y_test)].count(True) / list(Y_pred).count(1)
    print("Precision:", precision)
    print("F1:", 2 * recall * precision / (recall + precision))
    
    

In [None]:
results_file = open('resultsFile', 'rb')      
results = pickle.load(results_file) 
list.sort(results, key = lambda x : x[0])

df = pd.read_csv("songs_fixed.csv")

detection_count = 0
drop_count = 0
bad_samples_overall = []
good_samples_overall = []

for i in range(len(results)):
    drops = str(df.iloc[[i]]["Drops"][i]).split(", ")
    peaks = results[i][1]
    good_peaks = set()
    temp_detection_count = 0
    songname = df.iloc[[i]]["Song Name"][i]
    drops_found = []
    for d in drops:
        for p in peaks:
            dval = float(d)
            pval = float(p[0])
            if pval >= dval - 7.5 and pval <= dval + 1.5:
                temp_detection_count += 1
                drops_found.append(d)
                good_peaks.add(p)
                break
    if temp_detection_count < len(drops):
        print(songname)
        print(drops_found)
    bad_peaks = set(peaks) - good_peaks
    drop_count += len(drops)
    detection_count += temp_detection_count
    
    good_samples, bad_samples = peaks_to_windows_mat(SONG_DIR + "/" + results[i][0], good_peaks, bad_peaks)
    for b in bad_samples:
        bad_samples_overall.append(b)
        
    for g in good_samples:
        good_samples_overall.append(g)

print(detection_count / drop_count)

Ariana_Grande___Side_To_Side_(TRU_Concept_Remix_ft._Romany)
['170.1']


In [None]:
# from hmmlearn import hmm

# TEST_SIZE = 40

# model = hmm.GaussianHMM(n_components=5)
# lengths = [SAMPLE_LENGTH] * (int((np.shape(good_samples_overall)[0]) / SAMPLE_LENGTH) - TEST_SIZE)
# train_sample_count = int(np.shape(good_samples_overall)[0] - TEST_SIZE * SAMPLE_LENGTH)
# print(np.shape(lengths), train_sample_count)
# model = model.fit(np.asarray(good_samples_overall)[: train_sample_count], lengths)

In [None]:
# score = model.score(np.asarray(good_samples_overall)[: train_sample_count], lengths)
# print(score)

In [None]:
from sklearn.neural_network import MLPClassifier
import sklearn

TEST_SIZE = 250

def unison_shuffled_copies(a, b):
    assert len(a) == len(b)
    p = np.random.permutation(len(a))
    return a[p], b[p]

good_flat = []
for g in good_samples_overall:
    good_flat.append(g.flatten())
bad_flat = []
for b in bad_samples_overall:
    bad_flat.append(b.flatten())

X = np.concatenate([good_flat, bad_flat])
Y = np.concatenate([[1] * np.shape(good_samples_overall)[0], [0] * np.shape(bad_samples_overall)[0]])

X, Y = unison_shuffled_copies(X, Y)
print(np.shape(X), np.shape(Y))

X_test = X[:TEST_SIZE]
Y_test = Y[:TEST_SIZE]

X_train = X[TEST_SIZE:]
Y_train = Y[TEST_SIZE:]

clf = MLPClassifier(solver='lbfgs', alpha=1e-2, hidden_layer_sizes=(100, 10), random_state=30, max_iter = 200)
clf.fit(X_train, Y_train)

Y_pred_NN = clf.predict(X_test)
Y_probs_NN = clf.predict_proba(X_test)
get_performance_measures(Y_pred_NN, Y_test)
print("Done!")



In [None]:
from sklearn.ensemble import RandomForestClassifier

clf = RandomForestClassifier(n_estimators=1000, max_depth=10, random_state=25)
clf.fit(X_train, Y_train)
Y_pred_RFC = clf.predict(X_test)
Y_probs_RFC = clf.predict_proba(X_test)
get_performance_measures(Y_pred_RFC, Y_test)

In [None]:
from sklearn.linear_model import LogisticRegression

clf = LogisticRegression(random_state=20, solver='lbfgs',multi_class='multinomial', max_iter = 1000)
clf.fit(X_train, Y_train)
Y_pred_LR = clf.predict(X_test)
Y_probs_LR = clf.predict_proba(X_test)
get_performance_measures(Y_pred_LR, Y_test)

In [None]:
Y_pred_votes = Y_pred_NN + Y_pred_RFC + Y_pred_LR
Y_pred_votes = np.asarray([1 if x > 1 else 0 for x in Y_pred_votes])
get_performance_measures(Y_pred_votes, Y_test)

In [None]:
Y_probs = Y_probs_NN * 1.2 + Y_probs_RFC + Y_probs_LR
Y_probs_pred = [1 if x[0] < x[1] else 0 for x in Y_probs]
get_performance_measures(Y_probs_pred, Y_test)

In [None]:
raise BaseException

import math


Y_prob = clf.predict_proba(X_test)


get_performance_measures(Y_pred_NN, Y_test)

correct_probs = []
incorrect_probsFP = []
incorrect_probsFN = []
for i in range(len(Y_pred)):
    if Y_pred[i] != Y_test[i] and Y_pred[i] == 1:
        incorrect_probsFP.append(abs(Y_prob[i][0] - Y_prob[i][1]))
    elif Y_pred[i] != Y_test[i] and Y_pred[i] == 0:
        incorrect_probsFN.append(abs(Y_prob[i][0] - Y_prob[i][1]))
    else:
        correct_probs.append(abs(Y_prob[i][0] - Y_prob[i][1]))

# print(incorrect_probsFP)
# print(incorrect_probsFN)
# print(correct_probs)



In [None]:
from sklearn.svm import SVC

clf = SVC(degree=5)
clf.fit(X_train, Y_train)

In [None]:
import keras
from keras.models import Sequential
from keras.layers import Dense, Dropout,Flatten, BatchNormalization, Activation
from keras.layers.convolutional import Conv2D, MaxPooling2D

BATCH_SIZE = 50

X = np.concatenate([good_samples_overall, bad_samples_overall])
Y = np.concatenate([[1] * np.shape(good_samples_overall)[0], [0] * np.shape(bad_samples_overall)[0]])

X, Y = unison_shuffled_copies(X, Y)

X_test = X[:TEST_SIZE]
Y_test = Y[:TEST_SIZE]

X_train = X[TEST_SIZE:]
Y_train = Y[TEST_SIZE:]
input_shape = np.shape(X_train)
input_shape = (input_shape[1], input_shape[2], 1)
num_classes = 2
Y_train = keras.utils.to_categorical(Y_train, num_classes=num_classes, dtype='float32')
Y_test = keras.utils.to_categorical(Y_test, num_classes=num_classes, dtype='float32')

model = Sequential()
model.add(Conv2D(32, kernel_size=(2, 2), strides=(1, 1),
                 input_shape=input_shape))
model.add(BatchNormalization(trainable=True))
model.add(Activation('relu'))
model.add(MaxPooling2D(pool_size=(2, 2), strides=(2, 2)))
model.add(Dropout(0.5, name='dropout2', trainable=False))
model.add(Conv2D(64, (2, 2)))
model.add(BatchNormalization(trainable=True))
model.add(Activation('relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Dropout(0.5, name='dropout3', trainable=False))
model.add(Flatten())
model.add(Dense(10, activation='relu'))
model.add(Dense(num_classes, activation='softmax'))

model.compile(loss=keras.losses.categorical_crossentropy,
              optimizer=keras.optimizers.Adam(lr=0.01),
              metrics=['accuracy'])

train_shape = np.shape(X_train)
X_train = X_train.reshape([train_shape[0], train_shape[1], train_shape[2], 1])
test_shape = np.shape(X_test)
X_test = X_test.reshape([test_shape[0], test_shape[1], test_shape[2], 1])
print(train_shape, np.shape(X_train))
model.fit(X_train, Y_train,
          epochs=5,
          verbose=1,
          validation_data=(X_test, Y_test))

In [None]:
model.predict(X_test)