In [113]:
# ejecucion
proyecto_en = "PC"  # kaggle - PC

In [114]:
def get_folder_out(project):
    if project == "kaggle":
        return "/kaggle/output/working/"
    if project == "PC":
        return os.sep.join(os.getcwd().split(os.sep)[:-1]) + os.sep + 'kaggle/output/working'

def get_folder_int(project):
    if project == "kaggle":
        return "/kaggle/input"
    if project == "PC":
        return os.sep.join(os.getcwd().split(os.sep)[:-1]) + os.sep + 'kaggle/input/musicnet-dataset'

In [116]:
FOLDER_OUT = get_folder_out(proyecto_en) # kaggle - PC

In [117]:
# CONFIG

MINIMA_VARIANA_EXPLICADA = 0.93 # se debe definir porque este valor
TIME_SPLIT = 2 # falta definir porque este split de 2

# Folder
FOLDER_SAVE_NORMALIZADOR_PCA = FOLDER_OUT + 'normalizador_pca.pkl'
FOLDER_MODEL = FOLDER_OUT + "randomforest.pkl"

FOLDER_TRAIN_DATA = FOLDER_OUT + "train_data.csv"
FOLDER_TRAIN_LABEL = FOLDER_OUT + "train_label.csv"
FOLDER_TEST_DATA = FOLDER_OUT + "test_data.csv"
FOLDER_TEST_LABEL = FOLDER_OUT + "test_label.csv"

# Model
SPLIT_DATA_TRAIN = 0.2



In [118]:
import os
from distutils.version import StrictVersion

# preprocesando los datos
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import make_pipeline

# para guardar el preprocesador de datos
import pickle
from scipy.io.wavfile import write

# Modelado
from sklearn.model_selection import KFold
from sklearn.model_selection import GridSearchCV
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier

# Evaluacion
import sklearn.metrics as metrics

# quitar alertas innecesarias
import warnings

warnings.filterwarnings('ignore')

In [119]:
# librerias personalizadas

In [120]:
import scipy.io.wavfile as wav
import pandas as pd


class Corte_audio():
    SEGUNDOS_CORTE = 5

    def __init__(self, config_time:int=5):
        self.SEGUNDOS_CORTE = config_time

    def __hallar_instrumentos(self, corte_start, corte_end, instrument, start_time):
        acumulador = list()
        for (i, time_start) in enumerate(start_time):
            if corte_end >= start_time[i] >= corte_start:
                # print(start_time[i], end_time[i], instrument[i])
                if not instrument[i] in acumulador:
                    acumulador.append(instrument[i])
            else:
                if time_start > corte_end:
                    break
        return acumulador

    def split_data(self, path_wav:str= "", path_csv:str= ""):
        (rate, sig) = wav.read(path_wav)
        labels = pd.read_csv(path_csv)

        instrument = labels['instrument']
        start_time = labels['start_time']

        pivote = 0
        corte = rate * self.SEGUNDOS_CORTE

        muestras = list()
        instrumentos = list()
        for _ in range(round(len(sig) / corte)):
            corte_start, corte_end = pivote, pivote + corte
            data = sig[corte_start:corte_end]
            muestras.append(data)
            instrumentos.append(self.__hallar_instrumentos(corte_start, corte_end, instrument, start_time))
            pivote += corte

        if pivote < len(sig):
            data = sig[pivote:]
            muestras.append(data)
            instrumentos.append(self.__hallar_instrumentos(pivote, len(sig), instrument, start_time))

        return muestras, instrumentos, rate

In [121]:
import librosa
vers_required = "0.7.2"
if StrictVersion(librosa.__version__) < StrictVersion(vers_required):
    print("Error: minimum librosa vers: {}, current vers {}".format(vers_required, librosa.__version__))
    !pip install librosa===0.7.2 --force-reinstall

print("current vers", librosa.__version__)

current vers 0.9.2


In [122]:
import matplotlib.pyplot as plt
import librosa.display
import numpy as np

class ProcessAudio(object):
    data = None

    def __init__(self, sr:int = 44100):
        self.mfcc = None
        self.zcr = None
        self.rolloff = None
        self.spec_bw = None
        self.spec_cent = None
        self.rmse = None
        self.chroma_stft = None
        self.sr = sr

    def set_data(self, data):
        self.data = data

    def display_waveform(self):
        if self.data is None:
            return None
        # display waveform
        plt.figure(figsize=(14, 5))
        librosa.display.waveplot(self.data , sr=self.sr)

    def get_croma(self):
        if self.data is None:
            return None
        self.chroma_stft = librosa.feature.chroma_stft(y=self.data, sr=self.sr)
        return self.chroma_stft

    def get_rmse(self):
        if self.data is None:
            return None
        self.rmse = librosa.feature.rms(y=self.data)
        return self.rmse

    def get_centroide_espectral(self):
        """centroide espectral"""
        if self.data is None:
            return None
        self.spec_cent = librosa.feature.spectral_centroid(y=self.data, sr=self.sr)
        return self.spec_cent

    def get_ancho_banda_espectral(self):
        if self.data is None:
            return None
        self.spec_bw = librosa.feature.spectral_bandwidth(y=self.data, sr=self.sr)
        return self.spec_bw

    def get_rolloff(self):
        """tambien conocido como reduccion espectral"""
        if self.data is None:
            return None
        self.rolloff = librosa.feature.spectral_rolloff(y=self.data, sr=self.sr)
        return self.rolloff

    def get_cruce_por_cero(self):
        if self.data is None:
            return None
        self.zcr = librosa.feature.zero_crossing_rate(self.data)
        return self.zcr

    def get_mfcc(self):
        if self.data is None:
            return None
        self.mfcc = librosa.feature.mfcc(y=self.data, sr=self.sr)
        return self.mfcc

    def get_all(self, i:int) -> list:
        if self.data is None:
            return []

        self.get_croma()
        self.get_rmse()
        self.get_centroide_espectral()
        self.get_ancho_banda_espectral()
        self.get_rolloff()
        self.get_cruce_por_cero()
        self.get_mfcc()

        data_compresed = f'train{i} {np.mean(self.chroma_stft)} {np.mean(self.rmse)} {np.mean(self.spec_cent)} {np.mean(self.spec_bw)} {np.mean(self.rolloff)} {np.mean(self.zcr)}'
        for e in self.mfcc:
            data_compresed += f' {np.mean(e)}'

        return data_compresed.split()

In [123]:
from functools import wraps
from time import time


def count_elapsed_time(f):
    @wraps(f)
    def cronometro(*args, **kwargs):
        t_inicial = time()  # tomo la hora antes de ejecutar la funcion
        salida = f(*args, **kwargs)
        t_final = time()  # tomo la hora despues de ejecutar la funcion
        print('Tiempo transcurrido (en segundos): {}'.format(t_final - t_inicial))
        return salida

    return cronometro

In [124]:
# iniciador de clases

cortador = Corte_audio()
pca_pipe = make_pipeline(StandardScaler(), PCA(MINIMA_VARIANA_EXPLICADA))

In [125]:
# organizando los datos en train y test (data y label para cada uno)

TRAIN = {}
TEST = {}

@count_elapsed_time
def crear_diccionarios_train_test():
    for dirname, _, filenames in os.walk(get_folder_int(proyecto_en)):
        for filename in filenames:
            file = os.path.join(dirname, filename)
            name_file, extension = filename.split(".")

            if extension == "csv" or extension == "wav":
                if dirname.find("train")>0:
                    if name_file not in TRAIN:
                        TRAIN[name_file] = {}
                    if filename.find("csv")>0:
                        TRAIN[name_file]['label'] = file
                    else:
                        TRAIN[name_file]['data'] = file
                else:
                    if name_file not in TEST:
                        TEST[name_file] = {}
                    if filename.find("csv")>0:
                        TEST[name_file]['label'] = file
                    else:
                        TEST[name_file]['data'] = file
crear_diccionarios_train_test()

Tiempo transcurrido (en segundos): 0.014384746551513672


In [126]:
TODOS_LABEL = {}

In [127]:
def save_data(data_save, file):
    df = pd.DataFrame(data=data_save)
    df.to_csv(file)

In [128]:
# Abriendo cada audio, a cada uno se le aplica el split y a cada audio resultante se le extraen caracteristicas


@count_elapsed_time
def files_to_data(diccionario_datos, buscar_todos_label: bool = False, save_new_data: bool = False):
    global TODOS_LABEL
    conteo = 0

    DATA = []
    LABEL = []

    rate = 0
    for name_file, value in diccionario_datos.items():
        conteo += 1
        print(name_file, end=" - ")
        if conteo >= 10:
            print()
            conteo = 0

        muestras_wav, instrumentos, rate = cortador.split_data(value['data'], value['label'])

        processAudio = ProcessAudio(rate)
        for id_audio, dat in enumerate(muestras_wav):
            processAudio.set_data(dat)
            caracteristicas = processAudio.get_all(id_audio)  # Extrayendo caracteristicas audios, salen 26 caracteristicas
            DATA.append(caracteristicas[1:])
            LABEL.append(instrumentos[id_audio])

    # buscando todos los label
    if buscar_todos_label:
        for lab in LABEL:
            for la in lab:
                if la not in TODOS_LABEL:
                    TODOS_LABEL[la] = 0
        TODOS_LABEL = tuple(sorted(TODOS_LABEL))

    # expandiendo los label a su respectivo vector de etiquetas
    for j, lab in enumerate(LABEL):
        new_label = [0 for _ in TODOS_LABEL]
        for la in lab:
            for i, l in enumerate(TODOS_LABEL):
                if l == la:
                    new_label[i] = 1
        LABEL[j] = new_label

    if save_new_data:
        for idx, dat in enumerate(DATA):
            new_name_data = FOLDER_OUT + "new_data/" + "data" + "_" + str(idx)
            write(new_name_data + ".wav", rate, data=dat)
            save_data([LABEL[idx]], new_name_data + ".csv")

    # convirtiendo a numpy los datos
    DATA = np.array(DATA, dtype=float)
    LABEL = np.array(LABEL, dtype=float)

    return DATA, LABEL

In [129]:
DATA, LABEL = files_to_data(TRAIN, buscar_todos_label=True)

print(len(DATA), len(LABEL))

1727
Unexpected exception formatting exception. Falling back to standard exception


Traceback (most recent call last):
  File "/media/wisrovi/J/TFM/2022/dataset/archive/musicnet/tf/libraries/venv/lib/python3.8/site-packages/IPython/core/interactiveshell.py", line 3398, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "/tmp/ipykernel_28759/154304909.py", line 1, in <cell line: 1>
    DATA, LABEL = files_to_data(TRAIN, buscar_todos_label=True)
  File "/tmp/ipykernel_28759/4200871736.py", line 9, in cronometro
    salida = f(*args, **kwargs)
  File "/tmp/ipykernel_28759/2858362108.py", line 18, in files_to_data
    caracteristicas = processAudio.get_all(id_audio)  # Extrayendo caracteristicas audios, salen 26 caracteristicas
  File "/tmp/ipykernel_28759/9644389.py", line 79, in get_all
    self.get_ancho_banda_espectral()
  File "/tmp/ipykernel_28759/9644389.py", line 50, in get_ancho_banda_espectral
    self.spec_bw = librosa.feature.spectral_bandwidth(y=self.data, sr=self.sr)
  File "/media/wisrovi/J/TFM/2022/dataset/archive/musicnet/tf/librarie

In [None]:
# Normalizando y aplicando PCA
pca_pipe.fit(DATA)
pickle.dump(pca_pipe, open(FOLDER_SAVE_NORMALIZADOR_PCA,'wb'))

In [None]:
def preparar_datos_para_modelo(datos):
    normalizador_pca = pickle.load(open(FOLDER_SAVE_NORMALIZADOR_PCA, 'rb'))
    x_for_model = normalizador_pca.transform(X=datos)
    return x_for_model

In [None]:
DATA = preparar_datos_para_modelo(DATA)

In [None]:
print(f"Original vector size: {len(DATA[0])} -> New vector size {len(x_for_model[0])} ({int(MINIMA_VARIANA_EXPLICADA*100)}% información mantenida)")

In [None]:
# Guardando datos
save_data(DATA, FOLDER_TRAIN_DATA)
save_data(LABEL, FOLDER_TRAIN_LABEL)

In [None]:
mean = lambda lst: int((sum(lst) / len(lst)) * 100) / 100

def calcular_porcentajes_aciertos(y_f, y_t):
    verdaderos = dict()
    falsos = dict()
    for j in range(y_f.shape[1]):
        verdaderos[j] = 0
        falsos[j] = 0

    for i in range(y_f.shape[0]):
        for j in range(y_f.shape[1]):
            if y_f[i][j] == y_t[i][j]:
                verdaderos[j] += 1
            else:
                falsos[j] += 1

    for j in range(y_f.shape[1]):
        # y_final.shape[1] -> 100%
        # verdaderos[j]    -> X
        verdaderos[j] = int(verdaderos[j] * 100 / y_f.shape[0])
        falsos[j] = int(falsos[j] * 100 / y_f.shape[0])

    return verdaderos, falsos, str(mean([v for i, v in verdaderos.items()])) + "%"

In [None]:
seed = 1
grid = GridSearchCV(
          estimator = RandomForestClassifier(),
          param_grid={},
          cv = KFold(n_splits=10, shuffle=True, random_state=seed)
        )

In [None]:
X_train, X_valid, y_train, y_valid = train_test_split(DATA, LABEL, test_size=0.1)  # 0.2

In [None]:
# Entrenando
grid.fit(X_train, y_train)
model = grid.best_estimator_
pickle.dump(model, open(FOLDER_MODEL, 'wb'))

In [None]:
y_final = model.predict(X_valid)
print("ACC", metrics.accuracy_score(y_valid, y_final))
print("PREC", metrics.precision_score(y_valid, y_final, average='micro'))

In [None]:
# TESTEAR EL MODELO
model = pickle.load(open(FOLDER_MODEL, 'rb'))

DATA, LABEL = files_to_data(TEST, save_new_data=True)
DATA = preparar_datos_para_modelo(DATA)

In [None]:
# Guardando datos
save_data(DATA, FOLDER_TEST_DATA)
save_data(LABEL, FOLDER_TEST_LABEL)

In [None]:
y_final = model.predict(DATA)
print("ACC", metrics.accuracy_score(LABEL, y_final))
print("PREC", metrics.precision_score(LABEL, y_final, average='micro'))