In [None]:
import os
import librosa
import math
import json

import json
import numpy as np
from sklearn.model_selection import train_test_split
import tensorflow.keras as keras
import matplotlib.pyplot as plt

import pyaudio
import wave

import pandas as pd
from collections import Counter

In [None]:
SAMPLE_RATE = 22050
TRACK_DURATION = 30 # measured in seconds
SAMPLES_PER_TRACK = SAMPLE_RATE * TRACK_DURATION

def save_mfcc1(dataset_path, json_path, n_mfcc=13, n_fft=2048, hop_length=512, num_segments=5): #num_segments,
    # instead of saving track inputs, we would separate them in segments
    
    data = {
        "mapping": [], # index of the list like ['classical', 'blues']
        "labels": [], #they are the training data, like [[],[],[],...]
        "mfcc": [] #they are the targets like [0,0,1,...], following the little list in the list above
    }
    
    num_samples_per_segment = int(SAMPLES_PER_TRACK / num_segments)
    expected_num_mfcc_vectors_per_segment = math.ceil(num_samples_per_segment / hop_length)#round value up eg. 1.2->2
    
    #loop through all the genres in the folder
    for i, (dirpath, dirnames, filenames) in enumerate(os.walk(dataset_path)):

        #ensure we are not at the root level
        if dirpath is not dataset_path:
            
            #save the semantic level (music genre)
            dirpath_components = dirpath.split("/") # genre/blues --> ['genre', 'blues']
            semantic_label = dirpath_components[-1] # get the value above at the right
            data["mapping"].append(semantic_label)
            print("\nProcessing {}".format(semantic_label))
            
            #process file for especific genre
            for f in filenames:
                
                #load the audio file
                file_path = os.path.join(dirpath, f)
                signal, sr = librosa.load(file_path, sr = SAMPLE_RATE)
                
                #process segments extracting mfcc and storing data
                for s in range(num_segments):
                    start_sample = num_samples_per_segment * s #starts at 0, cause s=0.
                    finish_sample = start_sample + num_samples_per_segment #when s=0 then finish_sample=num_samples_per_segment

                    mfcc = librosa.feature.mfcc(signal[start_sample:finish_sample],
                                                sr,
                                                n_mfcc = n_mfcc,
                                                n_fft = n_fft,
                                                hop_length = hop_length)
                    mfcc = mfcc.T

                    #store mfcc for segment if it has the expected length
                    if len(mfcc) == expected_num_mfcc_vectors_per_segment:
                        data["mfcc"].append(mfcc.tolist())
                        data["labels"].append(i-1)
                        print("{}, segment:{}".format(file_path, s))
                        
    with open(json_path, "w") as fp:
        json.dump(data, fp, indent = 4)

In [None]:
DATASET_PATH = "/Users/marcosera/desktop/python_projects/deep_learning/sound/data/genres_original"
JSON_PATH = "data.json"

if __name__ == "__main__":
    save_mfcc1(DATASET_PATH, JSON_PATH, num_segments=10)

# IA

In [None]:
DATA_PATH = "data.json"

def load_data(data_path):
    
    with open(data_path, "r") as fp:
        data = json.load(fp)
        
    X = np.array(data["mfcc"])
    y = np.array(data["labels"])
    
    return X, y # X is the Inputs, y is the Targets

def prepare_dataset(test_size, validation_size):
    
    # load the data
    X, y = load_data(DATA_PATH)
    
    # create the train/test split
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = test_size)
    
    # create the train/validation split, the validation is a % of data from train data
    X_train, X_validation, y_train, y_validation = train_test_split(X_train, y_train, test_size=validation_size)
    
    # for CNN, TensorFlow expects a 3D array for each sample, which untill now are 2D (time frame, mfcc values)
    # the 3rd dimention is the channel (when RGB, channel=3)
    X_train = X_train[..., np.newaxis] # 4D --> (n_samples, time_frame, mfcc value, channel)
    X_test = X_test[..., np.newaxis]
    X_validation = X_validation[..., np.newaxis]
    
    return X_train, X_validation, X_test, y_train, y_validation, y_test

def plot_history(history):

    fig, axs = plt.subplots(2)

    # create accuracy sublpot
    axs[0].plot(history.history["accuracy"], label="train accuracy")
    axs[0].plot(history.history["val_accuracy"], label="test accuracy")
    axs[0].set_ylabel("Accuracy")
    axs[0].legend(loc="lower right")
    axs[0].set_title("Accuracy eval")

    # create error sublpot
    axs[1].plot(history.history["loss"], label="train error")
    axs[1].plot(history.history["val_loss"], label="test error")
    axs[1].set_ylabel("Error")
    axs[1].set_xlabel("Epoch")
    axs[1].legend(loc="upper right")
    axs[1].set_title("Error eval")

    plt.show()

def build_model(input_shape):
    
    # create model
    model = keras.Sequential()
    
    # 1st convolutional layer
    model.add(keras.layers.Conv2D(32, (3,3), activation='relu', input_shape=input_shape)) 
        #(n_kernels, grid_size, activation, input_shape)
    model.add(keras.layers.MaxPool2D((3,3), strides=(2,2), padding='same')) 
        #(grid_size, strives(vertical and horizontal), padding)
    model.add(keras.layers.BatchNormalization()) 
        # speed up models, converging faster and more reliable
    
    # 2nd convolutional layer
    model.add(keras.layers.Conv2D(32, (3,3), activation='relu', input_shape=input_shape))
    model.add(keras.layers.MaxPool2D((3,3), strides=(2,2), padding='same'))
    model.add(keras.layers.BatchNormalization())
    
    # 3rd convolutional layer
    model.add(keras.layers.Conv2D(32, (2,2), activation='relu', input_shape=input_shape)) #grid_size=(2,2)
    model.add(keras.layers.MaxPool2D((2,2), strides=(2,2), padding='same')) #grid_size=(2,2)
    model.add(keras.layers.BatchNormalization())    

    # flatten the output and feed it into dense layer
    model.add(keras.layers.Flatten())
    model.add(keras.layers.Dense(64, activation='relu'))#(n_neurons, activation)
    model.add(keras.layers.Dropout(0.3))
    
    # output layer, softmax
    model.add(keras.layers.Dense(10, activation='softmax'))#(n_neurons=n_targets, activation)
    
    return model

def predict(model, X, y):
    # X is 3D array (time_frame, mfcc_value, channel),
    # but model.predict expects 4D array (batch_index, time_frame, mfcc_value, channel)
    X = X[np.newaxis, ...]
    prediction = model.predict(X) # the output of prediction is a 2D array, with softmax probability
                                  # for every target. We need to extract the index which has the max value
    
    # extract index which has the max value
    predicted_index = np.argmax(prediction, axis=1) # output is 1D array with the index with the highest probability
    print("Expected index: {}\n Predicted index: {}".format(y, predicted_index))

In [None]:
if __name__ == "__main__":
    
    # create train, validation and test sets
    X_train, X_validation, X_test, y_train, y_validation, y_test = prepare_dataset(0.25, 0.2)
    
    # build CNN net
    input_shape = (X_train.shape[1], X_train.shape[2], X_train.shape[3])#X_train.shape->(n_samples, time_frame, mfcc value, channel)
    model = build_model(input_shape)
    
    # compile network
    optimizer = keras.optimizers.Adam(learning_rate = 0.0001)
    model.compile(optimizer = optimizer, 
                 loss = "sparse_categorical_crossentropy",
                 metrics=['accuracy'])
    
    # train the CNN
    history = model.fit(X_train, y_train, validation_data=(X_validation, y_validation), batch_size=32, epochs=50)
    
    #plot graphs
    plot_history(history)
    
    # evaluate the CNN on the test set
    test_error, test_accuracy = model.evaluate(X_test, y_test, verbose=1)
    print("Accuracy on test set is: {}".format(test_accuracy))
    
    # make predictions on a sample (inference)
    X = X_test[100]
    y = y_test[100]
    predict(model, X, y)

# APP

In [None]:
with open("data.json", "r") as fp:
    data_ichi = json.load(fp)

my_set = list(set(data_ichi['labels']))
print(my_set)

Song_styles={
    0:"pop",
    1:"metal",
    2:"disco",
    3:"blues",
    4:"reggae",
    5:"classical",
    6:"rock",
    7:"hiphop",
    8:"country",
    9:"jazz"}

In [None]:
CHUNK = 512
FORMAT = pyaudio.paInt16
CHANNELS = 2
RATE = 22500
RECORD_SECONDS = 30
WAVE_OUTPUT_FILENAME = "record_from_mic.wav"

p = pyaudio.PyAudio()

stream = p.open(format=FORMAT,
                channels=CHANNELS,
                rate=RATE,
                input=True,
                frames_per_buffer=CHUNK)

print("* recording")

frames = []

for i in range(0, int(RATE / CHUNK * RECORD_SECONDS)):
    data = stream.read(CHUNK)
    frames.append(data)

print("* done recording")

stream.stop_stream()
stream.close()
p.terminate()

wf = wave.open(WAVE_OUTPUT_FILENAME, 'wb')
wf.setnchannels(CHANNELS)
wf.setsampwidth(p.get_sample_size(FORMAT))
wf.setframerate(RATE)
wf.writeframes(b''.join(frames))
wf.close()

In [None]:
WAVE_OUTPUT_FILENAME = "record_from_mic.wav"
SAMPLE_RATE = 22050
TRACK_DURATION = 30
SAMPLES_PER_TRACK = SAMPLE_RATE * TRACK_DURATION

def save_mfcc1(dataset_path, json_path, n_mfcc=13, n_fft=2048, hop_length=512, num_segments=5):
    
    data = {"mfcc": []}
    
    num_samples_per_segment = int(SAMPLES_PER_TRACK / num_segments)
    expected_num_mfcc_vectors_per_segment = math.ceil(num_samples_per_segment / hop_length)
                
    #load the audio file
    file_path = os.path.join(dataset_path, WAVE_OUTPUT_FILENAME)
    signal, sr = librosa.load(file_path, sr = SAMPLE_RATE)

    #process segments extracting mfcc and storing data
    for s in range(num_segments):
        start_sample = num_samples_per_segment * s #starts at 0, cause s=0.
        finish_sample = start_sample + num_samples_per_segment #when s=0 then finish_sample=num_samples_per_segment

        mfcc = librosa.feature.mfcc(signal[start_sample:finish_sample],
                                    sr,
                                    n_mfcc = n_mfcc,
                                    n_fft = n_fft,
                                    hop_length = hop_length)
        mfcc = mfcc.T

        #store mfcc for segment if it has the expected length
        if len(mfcc) == expected_num_mfcc_vectors_per_segment:
            data["mfcc"].append(mfcc.tolist())
#             print("{}, segment:{}".format(file_path, s))
                        
    with open(json_path, "w") as fp:
        json.dump(data, fp, indent = 4)

In [None]:
DATASET_PATH = "/Users/marcosera/desktop/python_projects/deep_learning/sound/"
JSON_PATH = "record_from_mic.json"

if __name__ == "__main__":
    save_mfcc1(DATASET_PATH, JSON_PATH, num_segments=10)

In [None]:
with open("record_from_mic.json", "r") as fp:
    data_prev = json.load(fp)
X = np.array(data_prev["mfcc"])
print("Inicial X shape",X.shape)

print("X_test shape", X_test[100].shape)
print("X shape", X.shape)
X = X[..., np.newaxis]
print("X new axis", X.shape)
a_correct = X[0,...]
print("a_correct shape", a_correct.shape)

os.remove("/Users/marcosera/desktop/python_projects/deep_learning/sound/record_from_mic.json")
os.remove("/Users/marcosera/desktop/python_projects/deep_learning/sound/record_from_mic.wav")

In [None]:
prediction = model.predict(X)
predicted_index = np.argmax(prediction, axis=1)
print("Predicted index: {}".format(predicted_index))

df=pd.DataFrame({'Number': predicted_index})
list_df = df['Number'].tolist()
count_dict = dict(Counter(list_df).items())
print(count_dict)
k = Counter(count_dict)
n=1
if len(count_dict)>2:
    high = k.most_common(3)

    for i in high:
        print("{} Most probable: {}".format(n,Song_styles[i[0]]))
        n+=1
else:
    high = k.most_common(len(count_dict))
    for i in high:
        print("{} Most probable: {}".format(n,Song_styles[i[0]]))
        n+=1 