Cele projektu:
Rozpoznanie kierunku dźwięku nagrań binauralnych pojedynczych próbek białego szumu. Rozpoznawanie kierunku poziomego lub pionowego. Porównanie metryk sukcesu klasyfikatora dla próbek pochodzących z nagrań binauralnych i sygnałów wygenerowanych przy pomocy hrtf'ów. 

In [None]:
#bilioteki
from scipy import signal
import scipy
import numpy as np
import librosa
import sofa
import soundfile as sf
from IPython.display import Audio
import sys,glob
import math
import pandas as pd
import sklearn
from spafe.features.gfcc import gfcc
import pdb
import functools
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import KMeans
from sklearn.svm import SVC
import optuna
from sklearn.metrics import accuracy_score, f1_score, make_scorer, confusion_matrix
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_validate, StratifiedKFold

PREPROCESSING:

In [None]:
def Efficient_ccf(x1,x2):
    """calculate cross-crrelation function in frequency domain, which is more 
    efficient than the direct calculation"""
    
    if x1.shape[0] != x2.shape[0]:
        raise Exception('length mismatch')
    wav_len = x1.shape[0]
    # hanning window before fft
    wf = np.hanning(wav_len)
    x1 = x1*wf
    x2 = x2*wf

    X1 = np.fft.fft(x1,2*wav_len-1)# equivalent to add zeros 
    X2 = np.fft.fft(x2,2*wav_len-1)
    ccf_unshift = np.real(np.fft.ifft(np.multiply(X1,np.conjugate(X2))))
    ccf = np.concatenate([ccf_unshift[wav_len:],ccf_unshift[:wav_len]],axis=0)
    
    return ccf

In [None]:
def get_ITD(x,fs,max_delay=None,inter_method='parabolic'):
    """
    estimate ITD based on interaural corss-correlation function
    itd = chann0_delay - chann1_delay
    corr(i) = sum(x0[t]*x1[t-i])
        | >0 chann1 lead
    itd |
        | <0 chann1 lead
    input: 
        max_delay: maximum value of ITD, default value: 1ms
        inter_method: method of ccf interpolation, "None"(default),"parabolic",'exponential'.
    """
    wav_len = x.shape[0]
    
    # detrend
    # x_detrend = x-np.mean(x,axis=0)
    x_detrend = x
    
    if max_delay == None:
        max_delay = int(1e-3*fs)
    
    if False:
        # time domain
        ccf_full = np.correlate(x_detrend[:,0],x_detrend[:,1],mode='full')
        ccf = ccf_full[wav_len-1-max_delay:wav_len+max_delay]
    else:
        # frequency domain
        ccf_full = Efficient_ccf(x_detrend[:,0],x_detrend[:,1])
        ccf = ccf_full[wav_len-1-max_delay:wav_len+max_delay]
    
    ccf_std = ccf/(np.sqrt(np.sum(x_detrend[:,0]**2)*np.sum(x_detrend[:,1]**2)))
    max_pos = np.argmax(ccf)
    
    ######################
    if False:
        plt.figure(1)
        plt.clf()
        plt.subplot(311);    plt.plot(ccf_std);
        plt.plot([wav_len-max_delay-1,wav_len-max_delay-1],[0,1],'r')
        plt.plot([wav_len+max_delay-1,wav_len+max_delay-1],[0,1],'r')
        plt.plot(wav_len-1-max_delay+max_pos,ccf_std[wav_len-1-max_delay+max_pos],'x',linewidth=2)

        plt.subplot(312);    plt.plot(x[:,0])
        plt.subplot(313);    plt.plot(x[:,1])

        plt.show(block=False)
        plt.pause(0.001)
    ######################
    
    # exponential interpolation 
    delta = 0
    if inter_method == 'exponential':
        if max_pos> 0 and max_pos < max_delay*2-2:
            if np.min(ccf[max_pos-1:max_pos+2]) > 0:
                delta = (np.log(ccf[max_pos+1])-np.log(ccf[max_pos-1]))/\
                            (4*np.log(ccf[max_pos])-
                             2*np.log(ccf[max_pos-1])-
                             2*np.log(ccf[max_pos+1]))
    elif inter_method == 'parabolic':
        if max_pos> 0 and max_pos < max_delay*2-2:
            delta = (ccf[max_pos-1]-ccf[max_pos+1])/(2*(ccf[max_pos+1]-2*ccf[max_pos]+ccf[max_pos-1]))
        
    ITD = float((max_pos-max_delay-1+delta))/fs*1e3

    return [ITD,ccf_std]

In [None]:
def parameters(array, sr):
    
    params = []
    for i in range(2):
        params.append(np.mean(array[i]))  #statystyka
        params.append(np.std(array[i]))
        params.append(np.median(array[i]))
        params.append(np.percentile(array[i], 25))
        params.append(np.percentile(array[i], 75))
        params.append(scipy.stats.iqr(array[i], rng=(10, 90)))
        params.append(scipy.stats.kurtosis(array[i]))
        params.append(scipy.stats.skew(array[i]))
        params.append(np.min(array[i]))
        params.append(np.max(array[i]))
        
        params.append(np.mean(librosa.feature.spectral_centroid(array[i], sr=sr)))#cechy spectrum(spectral centroiod, spectral rollof)
        params.append(np.mean(librosa.feature.spectral_rolloff(array[i], sr=sr)))
        
        #params.extend(librosa.feature.mfcc(array[i], sr=sr, n_mfcc=13).flatten())#MFCC i GFCC
        #params.extend(gfcc(array[i], fs=sr, num_ceps=13, nfft=sr).flatten())
        
        #params.append(librosa.feature.zero_crossing_rate(array[i], frame_length=sr))#RMS ratio, zero crossing rate
        #params.append(librosa.feature.rms(array[i], frame_length=sr))
        
        
    length = len(params)
    idx = length//2
    l = params[:idx]
    r = params[idx:]
    for i in range(len(l)):
        params.append(np.mean([l[i],r[i]]))
        params.append(np.std([l[i],r[i]]))
    params.append(10*np.log10(np.sum(array[:,1]**2)/np.sum(array[:,0]**2)+1e-10)) #ILD, ILC
    params.append(get_ITD(array, sr)[0])
    return params 

In [None]:
#załadowanie danych szumu
filenames = list(os.listdir('dane_wav/sound1_tiltLOW/Recorded'))
data_sound_1 = np.vstack([parameters(2*((scipy.io.wavfile.read('dane_wav/sound1_tiltLOW/Recorded/'+x)[1]+32768)/65535)-1, scipy.io.wavfile.read('dane_wav/sound1_tiltLOW/Recorded/'+x)[0]) for x in filenames])

#pogrupowanie danych według kierunku poziomego i stworzenie labeli(ograniczenie ilości kerunków, do 6), spłaszczenie danych
LabelHorizontal = []
LabelVertical = []

with open("dane_wav/sound1_tiltLOW/motors_ground_truth") as file: 
  for line in file:
    line = line.strip().split('    ')
    #print(line)
    if float(line[1]) > -30 and float(line[1]) < 30 and f'{line[0]}.wav' in filenames:
      LabelVertical.append(1)
    if float(line[1]) >= 30 and float(line[1]) <= 90 and f'{line[0]}.wav' in filenames:
      LabelVertical.append(2)
    if float(line[1]) > 90 and float(line[1]) < 150 and f'{line[0]}.wav' in filenames:
      LabelVertical.append(3)
    if (float(line[1]) >= 150 or float(line[1]) <= -150) and f'{line[0]}.wav' in filenames:
      LabelVertical.append(4)
    if float(line[1]) > -150 and float(line[1]) < -90 and f'{line[0]}.wav' in filenames:
      LabelVertical.append(5)
    if float(line[1]) >= -90 and float(line[1]) <= -30 and f'{line[0]}.wav' in filenames:
      LabelVertical.append(6)
#pogrupowanie danych według kierunku pionowego i stworzenie labeli(ograniczenie ilosci kierunków, do 3), spłaszcenie danych
    if float(line[2]) > -60 and float(line[2]) < -21 and f'{line[0]}.wav' in filenames:
      LabelHorizontal.append(1)
    if float(line[2]) > -20 and float(line[2]) < 20 and f'{line[0]}.wav' in filenames:
      LabelHorizontal.append(2)
    if float(line[2]) > 21 and float(line[2]) < 60 and f'{line[0]}.wav' in filenames:
      LabelHorizontal.append(3)
#podział na zbiór uczący i testowy(dla dwóch wariantów-kierunek poziomy i pionowy)
data_sound_1[np.isnan(data_sound_1)] = 0.0
X_train_Ver, X_test_Ver, y_train_Ver, y_test_Ver = train_test_split(data_sound_1, LabelVertical, random_state=42)
X_train_Hor, X_test_Hor, y_train_Hor, y_test_Hor = train_test_split(data_sound_1, LabelHorizontal, random_state=42)
#normalizacja danych
scaler = StandardScaler().fit(X_train_Ver)
X_train_Ver = scaler.transform(X_train_Ver)
X_test_Ver = scaler.transform(X_test_Ver)
scaler = StandardScaler().fit(X_train_Hor)
X_train_Hor = scaler.transform(X_train_Hor)
X_test_Hor = scaler.transform(X_test_Hor)
print(len(LabelHorizontal))
print(len(LabelVertical))

KLASYFIKATOR:
klasyfikator 1->SVM
klasyfikator 2->randomForest

In [None]:
#klasyfikator 1, dane poziome
SVM = SVC(C=1.0, random_state=42)
SVM.fit(X_train_Ver, y_train_Ver)
SVM_test_preds = SVM.predict(X_test_Ver)
#metryki sukcesu
print('test accuracy = ', accuracy_score(y_test_Ver, SVM_test_preds))
print('test F1 = ', f1_score(y_test_Ver, SVM_test_preds, average='weighted'))
print(confusion_matrix(y_test_Ver, SVM_test_preds))

In [None]:
#klasyfikator 1, dane pionowe
SVM = SVC(C=1.0, random_state=42)
SVM.fit(X_train_Hor, y_train_Hor)
SVM_test_preds = SVM.predict(X_test_Hor)
#metryki sukcesu
print('test accuracy = ', accuracy_score(y_test_Hor, SVM_test_preds))
print('test F1 = ', f1_score(y_test_Hor, SVM_test_preds, average='weighted'))
print(confusion_matrix(y_test_Hor, SVM_test_preds))

In [None]:
#klasyfikator 2, dane poziome
RandomForestClf = RandomForestClassifier(random_state=42)
RandomForestClf.fit(X_train_Ver, y_train_Ver)
RandomForestClf_test_preds = RandomForestClf.predict(X_test_Ver)
#metryki sukcesu
print('test accuracy = ', accuracy_score(y_test_Ver, RandomForestClf_test_preds))
print('test F1 = ', f1_score(y_test_Ver, RandomForestClf_test_preds, average='weighted'))
print(confusion_matrix(y_test_Ver, RandomForestClf_test_preds))

In [None]:
#klasyfikator 2, dane pionowe
RandomForestClf = RandomForestClassifier(random_state=42)
RandomForestClf.fit(X_train_Hor, y_train_Hor)
RandomForestClf_test_preds = RandomForestClf.predict(X_test_Hor)
#metryki sukcesu
print('test accuracy = ', accuracy_score(y_test_Hor, RandomForestClf_test_preds))
print('test F1 = ', f1_score(y_test_Hor, RandomForestClf_test_preds, average='weighted'))
print(confusion_matrix(y_test_Hor, RandomForestClf_test_preds))

OPtymalizacja hiperparametrów:

In [None]:
#funkcje klasyfikatora 1
scoring = {'f1_macro': make_scorer(f1_score, average='macro')}
model = SVC

def get_space(trial): 
    space = {"C": trial.suggest_uniform("C", 0, 1), 
           "kernel": trial.suggest_categorical("kernel", ['linear', 'poly', 'rbf', 'sigmoid']),
            'degree': trial.suggest_int('degree', 1,3)}
    return space

trials = 100 #liczba prob

def objective(trial, model, get_space, X, y):
    model_space = get_space(trial)

    mdl = model(**model_space)
    scores = cross_validate(mdl, X, y, scoring=scoring, cv=StratifiedKFold(n_splits=5), return_train_score=True)

    return np.mean(scores['test_f1_macro'])

In [None]:
#optymalizacja klasyfikatora 1 dla danych poziomych
study = optuna.create_study(direction='maximize')
study.optimize(lambda x: objective(x, model, get_space ,X_train_Ver, y_train_Ver), n_trials=trials)

In [None]:
#obliczenie metryk sukcesu dla zoptymalizowanych hiperparamterów dla klasyfikatora 1 dla danych poziomych
params = study.best_params
SVM = SVC(random_state=42, **params)
SVM.fit(X_train_Ver, y_train_Ver)
SVM_test_preds = SVM.predict(X_test_Ver)
#metryki sukcesu
print('test accuracy = ', accuracy_score(y_test_Ver, SVM_test_preds))
print('test F1 = ', f1_score(y_test_Ver, SVM_test_preds, average='weighted'))
print(confusion_matrix(y_test_Ver, SVM_test_preds))

In [None]:
#optymalizacja klasyfikatora 1 dla danych pionowych
study = optuna.create_study(direction='maximize')
study.optimize(lambda x: objective(x, model, get_space ,X_train_Hor, y_train_Hor), n_trials=trials)

In [None]:
#obliczenie metryk sukcesu dla zoptymalizowanych hiperparamterów dla klasyfikatora 1 dla danych pionowych
params = study.best_params
SVM = SVC(random_state=42, **params)
SVM.fit(X_train_Hor, y_train_Hor)
SVM_test_preds = SVM.predict(X_test_Hor)
#metryki sukcesu
print('test accuracy = ', accuracy_score(y_test_Hor, SVM_test_preds))
print('test F1 = ', f1_score(y_test_Hor, SVM_test_preds, average='weighted'))
print(confusion_matrix(y_test_Hor, SVM_test_preds))

In [None]:
#funkcje klasyfikatora 2
scoring = {'f1_macro': make_scorer(f1_score, average='macro')}
model = RandomForestClassifier

def get_space(trial): 
    space = {"n_estimators": trial.suggest_int("n_estimators", 10, 200),
        "max_depth": trial.suggest_int("max_depth", 1, 20),
        "min_samples_split": trial.suggest_int("min_samples_split", 2, 20),
        "n_jobs": trial.suggest_int("n_jobs", -1, -1)}
    return space

trials = 100 #liczba prób

def objective(trial, model,get_space, X, y):
    model_space = get_space(trial)

    mdl = model(**model_space)
    scores = cross_validate(mdl, X, y, scoring=scoring, cv=StratifiedKFold(n_splits=5), return_train_score=True)

    return np.mean(scores['test_f1_macro'])

In [None]:
#optymalizacja klasyfikatora 2 dla danych poziomych
study = optuna.create_study(direction='maximize')
study.optimize(lambda x: objective(x, model,get_space, X_train_Ver, y_train_Ver), n_trials=trials)

In [None]:
#obliczenie metryk sukcesu dla zoptymalizowanych hiperparamterów dla klasyfikatora 2 dla danych poziomych
params = study.best_params
RandomForestClf = RandomForestClassifier(random_state=42, **params)
RandomForestClf.fit(X_train_Ver, y_train_Ver)
RandomForestClf_test_preds = RandomForestClf.predict(X_test_Ver)
#metryki sukcesu
print('test accuracy = ', accuracy_score(y_test_Ver, RandomForestClf_test_preds))
print('test F1 = ', f1_score(y_test_Ver, RandomForestClf_test_preds, average='weighted'))
print(confusion_matrix(y_test_Ver, RandomForestClf_test_preds))


In [None]:
#optymalizacja klasyfikatora 2 dla danych pionowych
study = optuna.create_study(direction='maximize')
study.optimize(lambda x: objective(x, model,get_space, X_train_Hor, y_train_Hor), n_trials=trials)

In [None]:
#obliczenie metryk sukcesu dla zoptymalizowanych hiperparamterów dla klasyfikatora 2 dla danych pionowych
params = study.best_params
SVM = RandomForestClassifier(random_state=42)
SVM.fit(X_train_Hor, y_train_Hor)
SVM_test_preds = RandomForestClf.predict(X_test_Hor)
#metryki sukcesu
print('test accuracy = ', accuracy_score(y_test_Hor, SVM_test_preds))
print('test F1 = ', f1_score(y_test_Hor, SVM_test_preds, average='weighted'))
print(confusion_matrix(y_test_Hor, SVM_test_preds))

PORÓWNANIE WYNIKÓW Z WYGENEROWANYMI DANYMI:

In [None]:
#wygenerowanie białego szumu
size = 44100
WhiteNoise = np.random.normal(0, 1, size=size)

#załadowanie hrtf'ów
sofaDir = 'dane_hrtf/sofa/*.sofa'
_SOFA = glob.glob(sofaDir)
HRTFs = list([sofa.Database.open(_SOFA[x]) for x in range(5)])
fs_H = HRTFs[0].Data.SamplingRate.get_values()[0]
positions = HRTFs[0].Source.Position.get_values(system='spherical')
#przefiltrowanie danych przez hrtf'y(stworzenie plików odpowiadających realnym danom)
data = np.empty([5,len(positions),50], dtype=object)
angles = np.empty([5,len(positions),2])
for j in range(len(positions)):
    for i in range(5):
        angles[i,j] = [positions[j,0].round(),positions[j,1].round()]
        H_L = HRTFs[i].Data.IR.get_values(indices={"M":j,"R":0, "E":0})
        H_R = HRTFs[i].Data.IR.get_values(indices={"M":j,"R":1, "E":0})
        L = (signal.fftconvolve(WhiteNoise, H_L))
        R = (signal.fftconvolve(WhiteNoise, H_R))
        mix = np.vstack([L,R])
        data[i,j] = parameters(mix, int(fs_H))


In [None]:
def combine_dims(a, i=0, n=1):
  """
  Combines dimensions of numpy array `a`, 
  starting at index `i`,
  and combining `n` dimensions
  """
  s = list(a.shape)
  combined = functools.reduce(lambda x,y: x*y, s[i:i+n+1])
  return np.reshape(a, s[:i] + [combined] + s[i+n+1:])

In [None]:
#pogrupowanie danych według kierunku poziomego i stworzenie labeli(ograniczenie ilości kerunków, do 6), spłaszczenie danych
data1 = combine_dims(data,0,1)
angles1 = combine_dims(angles, 0, 1)
print(data1.shape)
print(angles1.shape)
LabelVertical = []
LabelHorizontal = []
for line in angles1:
    if float(line[0]) > -30 and float(line[0]) < 30:
      LabelVertical.append(1)
    if float(line[0]) >= 30 and float(line[0]) <=90 :
      LabelVertical.append(2)
    if float(line[0]) > 90 and float(line[0]) < 150:
      LabelVertical.append(3)
    if float(line[0]) >= 150 or float(line[0]) <= -150:
      LabelVertical.append(4)
    if float(line[0]) > -150 and float(line[0]) < -90:
      LabelVertical.append(5)
    if float(line[0]) >= -90 and float(line[0]) <= -30:
      LabelVertical.append(6)

    if float(line[1]) < -20:  #pogrupowanie danych według kierunku pionowego i stworzenie labeli(ograniczenie ilosci kierunków, do 3), spłaszcenie danych
      LabelHorizontal.append(1)
    if float(line[1]) >= -20 and float(line[1]) <= 20:
      LabelHorizontal.append(2)
    if float(line[1]) > 20:
      LabelHorizontal.append(3)
#rint(len(LabelVertical))
#print(len(LabelHorizontal))
#podział na zbiór uczący i testowy(dla dwóch wariantów-kierunek poziomy i pionowy)
X_train_Ver, X_test_Ver, y_train_Ver, y_test_Ver = train_test_split(data1, LabelVertical, random_state=42)
X_train_Hor, X_test_Hor, y_train_Hor, y_test_Hor = train_test_split(data1, LabelHorizontal, random_state=42)
#normalizacja danych
scaler = StandardScaler().fit(X_train_Ver)
X_train_Ver = scaler.transform(X_train_Ver)
X_test_Ver = scaler.transform(X_test_Ver)
scaler = StandardScaler().fit(X_train_Hor)
X_train_Hor = scaler.transform(X_train_Hor)
X_test_Hor = scaler.transform(X_test_Hor)

KLASYFIKATOR:

In [None]:
#klasyfikator 1, dane poziome
SVM = SVC(C=1.0, random_state=42)
SVM.fit(X_train_Ver, y_train_Ver)
SVM_test_preds = SVM.predict(X_test_Ver)
#metryki sukcesu
print('test accuracy = ', accuracy_score(y_test_Ver, SVM_test_preds))
print('test F1 = ', f1_score(y_test_Ver, SVM_test_preds, average='weighted'))
print(confusion_matrix(y_test_Ver, SVM_test_preds))

In [None]:
#klasyfikator 1, dane pionowe
SVM = SVC(C=1.0, random_state=42)
SVM.fit(X_train_Hor, y_train_Hor)
SVM_test_preds = SVM.predict(X_test_Hor)
#metryki sukcesu
print('test accuracy = ', accuracy_score(y_test_Hor, SVM_test_preds))
print('test F1 = ', f1_score(y_test_Hor, SVM_test_preds, average='weighted'))
print(confusion_matrix(y_test_Hor, SVM_test_preds))

In [None]:
#klasyfikator 2, dane poziome
RandomForestClf = RandomForestClassifier(random_state=42)
RandomForestClf.fit(X_train_Ver, y_train_Ver)
RandomForestClf_test_preds = RandomForestClf.predict(X_test_Ver)
#metryki sukcesu
print('test accuracy = ', accuracy_score(y_test_Ver, RandomForestClf_test_preds))
print('test F1 = ', f1_score(y_test_Ver, RandomForestClf_test_preds, average='weighted'))
print(confusion_matrix(y_test_Ver, RandomForestClf_test_preds))

In [None]:
#klasyfikator 2, dane pionowe
RandomForestClf = RandomForestClassifier(random_state=42)
RandomForestClf.fit(X_train_Hor, y_train_Hor)
RandomForestClf_test_preds = RandomForestClf.predict(X_test_Hor)
#metryki sukcesu
print('test accuracy = ', accuracy_score(y_test_Hor, RandomForestClf_test_preds))
print('test F1 = ', f1_score(y_test_Hor, RandomForestClf_test_preds, average='weighted'))
print(confusion_matrix(y_test_Hor, RandomForestClf_test_preds))

OPTYMALIZACJA HIPERPARAMETRÓW:

In [None]:
#funkcje klasyfikatora 1
scoring = {'f1_macro': make_scorer(f1_score, average='macro')}
model = SVC

def get_space(trial): 
    space = {"C": trial.suggest_uniform("C", 0, 1), 
           "kernel": trial.suggest_categorical("kernel", ['linear', 'poly', 'rbf', 'sigmoid']),
            'degree': trial.suggest_int('degree', 1,3)}
    return space

trials = 100 #liczba prob

def objective(trial, model, get_space, X, y):
    model_space = get_space(trial)

    mdl = model(**model_space)
    scores = cross_validate(mdl, X, y, scoring=scoring, cv=StratifiedKFold(n_splits=5), return_train_score=True)

    return np.mean(scores['test_f1_macro'])

In [None]:
#optymalizacja klasyfikatora 1 dla danych poziomych
study = optuna.create_study(direction='maximize')
study.optimize(lambda x: objective(x, model, get_space ,X_train_Ver, y_train_Ver), n_trials=trials)

In [None]:
#obliczenie metryk sukcesu dla zoptymalizowanych hiperparamterów dla klasyfikatora 1 dla danych poziomych
params = study.best_params
SVM = SVC(random_state=42, **params)
SVM.fit(X_train_Ver, y_train_Ver)
SVM_test_preds = SVM.predict(X_test_Ver)
#metryki sukcesu
print('test accuracy = ', accuracy_score(y_test_Ver, SVM_test_preds))
print('test F1 = ', f1_score(y_test_Ver, SVM_test_preds, average='weighted'))
print(confusion_matrix(y_test_Ver, SVM_test_preds))

In [None]:
#optymalizacja klasyfikatora 1 dla danych pionowych
study = optuna.create_study(direction='maximize')
study.optimize(lambda x: objective(x, model, get_space ,X_train_Hor, y_train_Hor), n_trials=trials)

In [None]:
#obliczenie metryk sukcesu dla zoptymalizowanych hiperparamterów dla klasyfikatora 1 dla danych pionowych
params = study.best_params
SVM = SVC(random_state=42, **params)
SVM.fit(X_train_Hor, y_train_Hor)
SVM_test_preds = SVM.predict(X_test_Hor)
#metryki sukcesu
print('test accuracy = ', accuracy_score(y_test_Hor, SVM_test_preds))
print('test F1 = ', f1_score(y_test_Hor, SVM_test_preds, average='weighted'))
print(confusion_matrix(y_test_Hor, SVM_test_preds))

In [None]:
#funkcje klasyfikatora 2
scoring = {'f1_macro': make_scorer(f1_score, average='macro')}
model = RandomForestClassifier

def get_space(trial): 
    space = {"n_estimators": trial.suggest_int("n_estimators", 10, 200),
        "max_depth": trial.suggest_int("max_depth", 1, 20),
        "min_samples_split": trial.suggest_int("min_samples_split", 2, 20),
        "n_jobs": trial.suggest_int("n_jobs", -1, -1)}
    return space

trials = 100 #liczba prób

def objective(trial, model,get_space, X, y):
    model_space = get_space(trial)

    mdl = model(**model_space)
    scores = cross_validate(mdl, X, y, scoring=scoring, cv=StratifiedKFold(n_splits=5), return_train_score=True)

    return np.mean(scores['test_f1_macro'])

In [None]:
#optymalizacja klasyfikatora 2 dla danych poziomych
study = optuna.create_study(direction='maximize')
study.optimize(lambda x: objective(x, model,get_space, X_train_Ver, y_train_Ver), n_trials=trials)

In [None]:
#obliczenie metryk sukcesu dla zoptymalizowanych hiperparamterów dla klasyfikatora 2 dla danych poziomych
params = study.best_params
RandomForestClf = RandomForestClassifier(random_state=42, **params)
RandomForestClf.fit(X_train_Ver, y_train_Ver)
RandomForestClf_test_preds = RandomForestClf.predict(X_test_Ver)
#metryki sukcesu
print('test accuracy = ', accuracy_score(y_test_Ver, RandomForestClf_test_preds))
print('test F1 = ', f1_score(y_test_Ver, RandomForestClf_test_preds, average='weighted'))
print(confusion_matrix(y_test_Ver, RandomForestClf_test_preds))


In [None]:
#optymalizacja klasyfikatora 2 dla danych pionowych
study = optuna.create_study(direction='maximize')
study.optimize(lambda x: objective(x, model,get_space, X_train_Hor, y_train_Hor), n_trials=trials)

In [None]:
#obliczenie metryk sukcesu dla zoptymalizowanych hiperparamterów dla klasyfikatora 2 dla danych pionowych
params = study.best_params
SVM = RandomForestClassifier(random_state=42)
SVM.fit(X_train_Hor, y_train_Hor)
SVM_test_preds = RandomForestClf.predict(X_test_Hor)
#metryki sukcesu
print('test accuracy = ', accuracy_score(y_test_Hor, SVM_test_preds))
print('test F1 = ', f1_score(y_test_Hor, SVM_test_preds, average='weighted'))
print(confusion_matrix(y_test_Hor, SVM_test_preds))