In [5]:
import os

import matplotlib.pyplot as plt
import numpy as np

"Machine learning tools"
import pickle

from sklearn.decomposition import PCA
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis as LDA
from sklearn.model_selection import StratifiedKFold, train_test_split


from classification.datasets import Dataset
from classification.utils.audio_student import AudioUtil, Feature_vector_DS

from classification.utils.plots import (
    plot_decision_boundaries,
    plot_specgram,
    show_confusion_matrix,
)
from classification.utils.utils import accuracy

In [6]:
np.random.seed(0)

In [7]:
### TO RUN
dataset = Dataset()
classnames = dataset.list_classes()

print("\n".join(classnames))

chainsaw
fire
fireworks
gunshot


In [8]:
### TO RUN
fm_dir = "data/feature_matrices/"  # where to save the features matrices
model_dir = "data/models/xgb"  # where to save the models
os.makedirs(fm_dir, exist_ok=True)
os.makedirs(model_dir, exist_ok=True)

In [9]:
### TO RUN

"Creation of the dataset"
myds = Feature_vector_DS(dataset, Nft=512, nmel=20, duration=950, shift_pct=0.0)

"Some attributes..."
myds.nmel
myds.duration
myds.shift_pct
myds.sr
myds.data_aug
myds.ncol

idx = 0

NONORM = False
zScore = False
MinMax = True

# XGBOOST PARAMETERS
if NONORM:
    n_estimators = 130
    max_depth = 2
    learning_rate = 0.2286
    subsample = 0.5984
    colsample_bytree = 0.6445

if zScore:  
    n_estimators = 311
    max_depth = 3
    learning_rate = 0.199
    subsample = 0.9796
    colsample_bytree = 0.6357
    
if MinMax:
    n_estimators = 130
    max_depth = 2
    learning_rate = 0.2286
    subsample = 0.5984
    colsample_bytree = 0.6445

In [10]:

import numpy as np
from sklearn.calibration import LabelEncoder
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.preprocessing import StandardScaler

data_aug_factor = 1
featveclen = len(myds["fire", 0, "", ""])  # Same for all classes
classnames = ["chainsaw", "fire", "fireworks", "gunshot"]  # Or wherever you store class names
nclass = len(classnames)

# Determine number of samples per class
naudio_per_class = {cls: len(dataset.files.get(cls, [])) for cls in classnames}
print(naudio_per_class)

# Allocate feature matrix
total_samples_basic = sum(naudio_per_class[c] for c in classnames)

X_train = []
X_test = []
y_train = []
y_test = []

for class_idx, classname in enumerate(classnames):
    for i in range(naudio_per_class[classname]):
        featvec = myds[classname, i, "", ""]
        if i < naudio_per_class[classname] * 0.8:
            X_train.append(featvec)
            y_train.append(classname)
        else:
            X_test.append(featvec)
            y_test.append(classname)

# Split the dataset into training and testing sets
X_train = np.array(X_train)
X_test = np.array(X_test)
y_train = np.array(y_train)
y_test = np.array(y_test)

# Normalization of the data
scaler = StandardScaler()
X_train_norm = scaler.fit_transform(X_train)
X_test_norm = scaler.transform(X_test)
with open(os.path.join(model_dir, f"scaler.pkl"), "wb") as f:
    pickle.dump(scaler, f)

label_encoder           = LabelEncoder()
y_train     = label_encoder.fit_transform(y_train)
y_test      = label_encoder.transform(y_test)
with open(os.path.join(model_dir, f"label_encoder.pkl"), "wb") as f:
    pickle.dump(label_encoder, f)

# Save the feature matrix and labels
np.save(os.path.join(fm_dir, "X_train.npy"), X_train)
np.save(os.path.join(fm_dir, "X_test.npy"), X_test)
np.save(os.path.join(fm_dir, "y_train.npy"), y_train)
np.save(os.path.join(fm_dir, "y_test.npy"), y_test)
np.save(os.path.join(fm_dir, "X_train_norm.npy"), X_train_norm)
np.save(os.path.join(fm_dir, "X_test_norm.npy"), X_test_norm)

print(f"Shape of the training matrix : {X_train.shape}")
print(f"Shape of the test matrix : {X_test.shape}")

{'chainsaw': 315, 'fire': 303, 'fireworks': 315, 'gunshot': 40}
Shape of the training matrix : (779, 400)
Shape of the test matrix : (194, 400)


We can now create a new augmented dataset and observe if the classification results improve. 

In [11]:
### AUGMENTED DATASET
TEST_WITH_AUGMENTED_FV = False

list_augmentation = ["original", "shifting"]
myds.mod_data_aug(list_augmentation)
print("Number of transformations : ", myds.data_aug_factor)

n_bands = 20
frames = 20
# Préparer les splits
X_train_list, y_train_list = [], []
X_test_list,  y_test_list  = [], []

for classname in classnames:
    n = naudio_per_class[classname]

    # Création des indices de base pour les sons originaux
    limit = round(n*0.8)
    train_idx = list(range(limit))
    test_idx  = list(range(limit+1, n))
    
    for i in train_idx:
        for aug in list_augmentation:
            featvec = myds[classname, i, aug, ""]
            X_train_list.append(featvec)
            y_train_list.append(classname)

    for i in test_idx:
        if TEST_WITH_AUGMENTED_FV:
            for aug in list_augmentation:
                featvec = myds[classname, i, aug, ""]
                X_test_list.append(featvec)
                y_test_list.append(classname)
        else:
            featvec = myds[classname, i, "", ""]
            X_test_list.append(featvec)
            y_test_list.append(classname)

# Conversion en tableaux numpy
X_train_aug = np.array(X_train_list)
y_train_aug = np.array(y_train_list, dtype=object)

X_test_aug = np.array(X_test_list)
y_test_aug = np.array(y_test_list, dtype=object)

# --- 1) Z-SCORE GLOBAL -------------------------------
scaler_global = StandardScaler().fit(X_train_aug)
X_train_aug_norm_zscore_global = scaler_global.transform(X_train_aug)
X_test_aug_norm_zscore_global  = scaler_global.transform(X_test_aug)

with open(os.path.join(model_dir, "scaler_aug_zscore_global.pkl"), "wb") as f:
    pickle.dump(scaler_global, f)
np.save(os.path.join(fm_dir, "X_train_aug_norm_zscore_global.npy"), X_train_aug_norm_zscore_global)
np.save(os.path.join(fm_dir, "X_test_aug_norm_zscore_global.npy"),  X_test_aug_norm_zscore_global)

# --- 2) Z-SCORE PAR BANDE MEL ------------------------------------
X_train_resh = X_train_aug.reshape(-1, n_bands, frames)
X_test_resh  = X_test_aug.reshape(-1, n_bands, frames)

mean_band = X_train_resh.mean(axis=(0, 2))          # (20,)
std_band  = X_train_resh.std(axis=(0, 2)) + 1e-8    # (20,)

X_train_aug_norm_zscore_band = ((X_train_resh - mean_band[None,:,None])
                           / std_band[None,:,None]).reshape(-1, n_bands*frames)
X_test_aug_norm_zscore_band  = ((X_test_resh  - mean_band[None,:,None])
                           / std_band[None,:,None]).reshape(-1, n_bands*frames)

np.save(os.path.join(model_dir, "zscore_band_mean.npy"), mean_band)
np.save(os.path.join(model_dir, "zscore_band_std.npy"),  std_band)
np.save(os.path.join(fm_dir, "X_train_aug_norm_zscore_band.npy"), X_train_aug_norm_zscore_band)
np.save(os.path.join(fm_dir, "X_test_aug_norm_zscore_band.npy"),  X_test_aug_norm_zscore_band)

# --- 3) MIN-MAX PAR BANDE MEL ------------------------------------
min_band = X_train_resh.min(axis=(0, 2))
max_band = X_train_resh.max(axis=(0, 2)) + 1e-8

X_train_aug_norm_minmax = ((X_train_resh - min_band[None,:,None])
                           / (max_band - min_band)[None,:,None]).reshape(-1, n_bands*frames)
X_test_aug_norm_minmax  = ((X_test_resh  - min_band[None,:,None])
                           / (max_band - min_band)[None,:,None]).reshape(-1, n_bands*frames)

np.save(os.path.join(model_dir, "min_band.npy"), min_band)
np.save(os.path.join(model_dir, "max_band.npy"), max_band)
np.save(os.path.join(fm_dir, "X_train_aug_norm_minmax.npy"), X_train_aug_norm_minmax)
np.save(os.path.join(fm_dir, "X_test_aug_norm_minmax.npy"),  X_test_aug_norm_minmax)

# --- 4) GLOBAL MAX NORMALIZATION -------------------------------
# Chaque spectrogramme est divisé par son max propre (par ligne)
X_train_aug_norm_max = X_train_aug.copy().astype(np.float32)
X_test_aug_norm_max  = X_test_aug.copy().astype(np.float32)

X_train_max = X_train_aug_norm_max.max(axis=1, keepdims=True) + 1e-8 # éviter la division par 0
X_test_max  = X_test_aug_norm_max.max(axis=1, keepdims=True) + 1e-8

X_train_aug_norm_max /= X_train_max
X_test_aug_norm_max  /= X_test_max

np.save(os.path.join(fm_dir, "X_train_aug_norm_max.npy"), X_train_aug_norm_max)
np.save(os.path.join(fm_dir, "X_test_aug_norm_max.npy"),  X_test_aug_norm_max)

# --- 5) L2 NORMALIZATION -------------------------------
X_train_aug_norm_l2 = X_train_aug.copy().astype(np.float32)
X_test_aug_norm_l2  = X_test_aug.copy().astype(np.float32)

X_train_l2 = np.sqrt(np.sum(X_train_aug_norm_l2**2, axis=1, keepdims=True)) + 1e-8
X_test_l2  = np.sqrt(np.sum(X_test_aug_norm_l2**2, axis=1, keepdims=True)) + 1e-8

X_train_aug_norm_l2 /= X_train_l2
X_test_aug_norm_l2  /= X_test_l2

np.save(os.path.join(fm_dir, "X_train_aug_norm_l2.npy"), X_train_aug_norm_l2)
np.save(os.path.join(fm_dir, "X_test_aug_norm_l2.npy"),  X_test_aug_norm_l2)


# Label encoding
label_encoder_aug = LabelEncoder()
y_train_aug     = label_encoder_aug.fit_transform(y_train_aug)
y_test_aug      = label_encoder_aug.transform(y_test_aug)
with open(os.path.join(model_dir, f"label_encoder_aug.pkl"), "wb") as f:
    pickle.dump(label_encoder_aug, f)

np.save(os.path.join(fm_dir, "X_train_aug.npy"), X_train_aug)
np.save(os.path.join(fm_dir, "X_test_aug.npy"), X_test_aug)
np.save(os.path.join(fm_dir, "y_train_aug.npy"), y_train_aug)
np.save(os.path.join(fm_dir, "y_test_aug.npy"), y_test_aug)

print(f"Shape of the training matrix : {X_train_aug.shape}")
print(f"Shape of the test matrix : {X_test_aug.shape}")
print(f"------------------------------------------------------------")
print(f"Transformations: {list_augmentation}.")


Number of transformations :  2
Shape of the training matrix : (1556, 400)
Shape of the test matrix : (191, 400)
------------------------------------------------------------
Transformations: ['original', 'shifting'].


In [12]:
AUG = False

if AUG:
    import os
    import numpy as np
    import matplotlib.pyplot as plt
    from classification.utils.plots import plot_specgram_textlabel

    # Charger les données
    X = np.load(os.path.join(fm_dir, "X_basic_aug.npy"), allow_pickle=True)
    y = np.load(os.path.join(fm_dir, "y_basic_aug.npy"), allow_pickle=True)

    # Dossier où sauvegarder les images
    save_dir = os.path.join("src/classification/soundfiles_melspec_augmentation")
    os.makedirs(save_dir, exist_ok=True)

    # Nombre d'exemples de base (avant augmentation)
    length_X_basic = int(len(X) / len(list_augmentation))

    # Boucle de sauvegarde
    for i in range(length_X_basic):
        for j, aug_name in enumerate(list_augmentation):
            idx = i + j * length_X_basic
            melspec = X[idx]
            class_of_spec = y[idx]

            fig, ax = plt.subplots()
            plot_specgram_textlabel(
                melspec.reshape((20, 20)),
                ax=ax,
                is_mel=True,
                title=f"MEL Spectrogram #{i} - {aug_name}",
                xlabel="Mel vector",
                textlabel=f"{class_of_spec} (aug: {aug_name})",
            )
            plt.tight_layout()
            save_path = os.path.join(save_dir, f"melspec_{i}_{aug_name}.png")
            fig.savefig(save_path)
            plt.close(fig)


FINAL MODEL SAVE

In [None]:
import os
import pickle
from pathlib import Path

import numpy as np
import matplotlib.pyplot as plt
from xgboost import XGBClassifier
from sklearn.decomposition import PCA
from sklearn.metrics import (confusion_matrix, precision_score, recall_score)
from sklearn.model_selection import cross_val_score
from sklearn.preprocessing import normalize

# -----------------------------------------------------------------------------
# CONFIG
# -----------------------------------------------------------------------------
TEST_SET = True

# Activer / désactiver chaque scénario
SCENARIO_FLAGS = {
    "A": True,
    "B": True,
    "C": True,
    "D": True,
    "E": True,
    "F": True,
    "G": True,
    "H": True,
}

# -----------------------------------------------------------------------------
# DATASETS déjà chargés en mémoire
# -----------------------------------------------------------------------------
X_train_aug = np.load(os.path.join(fm_dir, "X_train_aug.npy"))
X_test_aug = np.load(os.path.join(fm_dir, "X_test_aug.npy"))
y_train_aug = np.load(os.path.join(fm_dir, "y_train_aug.npy"))
y_test_aug = np.load(os.path.join(fm_dir, "y_test_aug.npy"))

X_train_aug_norm = np.load(os.path.join(fm_dir, "X_train_aug_norm_max.npy"))
X_test_aug_norm = np.load(os.path.join(fm_dir, "X_test_aug_norm_max.npy"))


X_train = np.load(os.path.join(fm_dir, "X_train.npy"))
X_test = np.load(os.path.join(fm_dir, "X_test.npy"))
y_train = np.load(os.path.join(fm_dir, "y_train.npy"))
y_test = np.load(os.path.join(fm_dir, "y_test.npy"))

X_train_norm = np.load(os.path.join(fm_dir, "X_train_norm.npy"))
X_test_norm = np.load(os.path.join(fm_dir, "X_test_norm.npy"))

# -----------------------------------------------------------------------------
# HELPERS
# -----------------------------------------------------------------------------

def train_xgb(Xtr, ytr):
    model = XGBClassifier(
        n_estimators=100,
        max_depth=5,
        learning_rate=0.1,
        subsample=0.8,
        colsample_bytree=0.8,
        eval_metric="mlogloss",
        random_state=42,
    )
    model.fit(Xtr, ytr)
    return model


def evaluate(model, Xte, yte, tag: str):
    pred = model.predict(Xte)
    acc = (pred == yte).mean()

    classes = np.unique(yte)
    prec = precision_score(yte, pred, average=None, labels=classes)
    rec  = recall_score(yte, pred, average=None, labels=classes)
    cm   = confusion_matrix(yte, pred, labels=classes)
    cv   = cross_val_score(model, Xte, yte, cv=5, scoring="accuracy").mean()

    print(f"\n=== {tag} ===")
    print(f"Overall accuracy : {acc:.4f} | CV accuracy : {cv:.4f}")
    for i, c in enumerate(classes):
        cls_acc = cm[i, i] / cm[i, :].sum()
        print(f"Class {c}: P={prec[i]:.3f} R={rec[i]:.3f} Acc={cls_acc:.3f}")


# -----------------------------------------------------------------------------
# RUN SCENARIOS
# -----------------------------------------------------------------------------
# dictionnaire pour stocker les objets utiles (PCA + modèle)
models = {}

def save(obj, name):
    with open(os.path.join(model_dir, f"{name}.pkl"), "wb") as f:
        pickle.dump(obj, f)

# A : PCA sur no‑aug, no‑norm
if SCENARIO_FLAGS["A"]:
    pca_A = PCA(n_components=0.98).fit(X_train)
    Xtr_A = pca_A.transform(X_train)
    Xte_A = pca_A.transform(X_test)
    model_A = train_xgb(Xtr_A, y_train)
    models["A"] = (model_A, Xte_A, y_test)
    save(pca_A, "pca_A")
    save(model_A, "xgb_A")

# B : no PCA, no‑aug, no‑norm
if SCENARIO_FLAGS["B"]:
    model_B = train_xgb(X_train, y_train)
    models["B"] = (model_B, X_test, y_test)
    save(model_B, "xgb_B")

# C : PCA sur aug, no‑norm
if SCENARIO_FLAGS["C"]:
    pca_C = PCA(n_components=0.98).fit(X_train_aug)
    Xtr_C = pca_C.transform(X_train_aug)
    Xte_C = pca_C.transform(X_test_aug)
    model_C = train_xgb(Xtr_C, y_train_aug)
    models["C"] = (model_C, Xte_C, y_test_aug)
    save(pca_C, "pca_C")
    save(model_C, "xgb_C")

# D : no PCA, aug, no‑norm
if SCENARIO_FLAGS["D"]:
    model_D = train_xgb(X_train_aug, y_train_aug)
    models["D"] = (model_D, X_test_aug, y_test_aug)
    save(model_D, "xgb_D")

# E : alias de B (gardé pour compatibilité)
if SCENARIO_FLAGS["E"]:
    models["E"] = models.get("B")

# F : PCA sur no‑aug, L2‑norm
if SCENARIO_FLAGS["F"]:
    pca_F = PCA(n_components=0.98).fit(X_train_norm)
    Xtr_F = pca_F.transform(X_train_norm)
    Xte_F = pca_F.transform(X_test_norm)
    model_F = train_xgb(Xtr_F, y_train)
    models["F"] = (model_F, Xte_F, y_test)
    save(pca_F, "pca_F")
    save(model_F, "xgb_F")

# G : PCA sur aug, L2‑norm
if SCENARIO_FLAGS["G"]:
    pca_G = PCA(n_components=0.98).fit(X_train_aug_norm)
    Xtr_G = pca_G.transform(X_train_aug_norm)
    Xte_G = pca_G.transform(X_test_aug_norm)
    model_G = train_xgb(Xtr_G, y_train_aug)
    models["G"] = (model_G, Xte_G, y_test_aug)
    save(pca_G, "pca_G")
    save(model_G, "xgb_G")

# H : no PCA, aug, L2‑norm
if SCENARIO_FLAGS["H"]:
    model_H = train_xgb(X_train_aug_norm, y_train_aug)
    models["H"] = (model_H, X_test_aug_norm, y_test_aug)
    save(model_H, "xgb_H")

# -----------------------------------------------------------------------------
# EVALUATION
# -----------------------------------------------------------------------------
if TEST_SET:
    for tag in models:
        mdl, X_eval, y_eval = models[tag]
        desc = {
            "A": "PCA NOAUG NONORM",
            "B": "NOPCA NOAUG NONORM",
            "C": "PCA AUG NONORM",
            "D": "NOPCA AUG NONORM",
            "E": "NOPCA NOAUG NONORM (alias)",
            "F": "PCA NOAUG NORM",
            "G": "PCA AUG NORM",
            "H": "NOPCA AUG NORM",
        }[tag]
        evaluate(mdl, X_eval, y_eval, f"Scenario {tag}: {desc}")

TypeError: an integer is required (got type str)

HYPERPARAMETER TUNING

In [None]:
# Modified Bayesian Optimization script for XGBoost with consistent normalization

import os
import numpy as np
import pickle
import matplotlib.pyplot as plt

from xgboost import XGBClassifier
from sklearn.model_selection import cross_val_score, train_test_split
from sklearn.metrics import confusion_matrix, precision_score, recall_score
from bayes_opt import BayesianOptimization

from classification.utils.utils import accuracy

# --- CONFIG FLAGS ---
TRANSFORMATION = True
NORMALIZATION = True
# --- STEP 1: Load/Select Data ---
if TRANSFORMATION:
    try:
        X = X_basic_aug
        y = y_basic_aug
    except NameError:
        raise ValueError("X_basic_aug and y_basic_aug must be defined before running this script.")
else:
    try:
        X = X_basic
        y = y_basic
    except NameError:
        raise ValueError("X_basic and y_basic must be defined before running this script.")

# --- STEP 2: Normalize if needed ---
if NORMALIZATION and NORMALIZATION_METHOD:
    X = apply_normalization(X, method=NORMALIZATION_METHOD)

# --- STEP 3: Define the Objective Function for Bayesian Optimization ---
def xgb_cv(
    n_estimators,
    max_depth,
    learning_rate,
    subsample,
    colsample_bytree
):
    n_estimators = int(n_estimators)
    max_depth = int(max_depth)

    model = XGBClassifier(
        n_estimators=n_estimators,
        max_depth=max_depth,
        learning_rate=learning_rate,
        subsample=subsample,
        colsample_bytree=colsample_bytree,
        eval_metric='mlogloss',
        random_state=42
    )
    cv_scores = cross_val_score(model, X, y, cv=5, scoring='accuracy')
    return cv_scores.mean()

# --- STEP 4: Set Up the Bayesian Optimizer ---
pbounds = {
    'n_estimators': (50, 400),
    'max_depth': (2, 15),
    'learning_rate': (0.01, 0.3),
    'subsample': (0.5, 1),
    'colsample_bytree': (0.5, 1)
}

optimizer = BayesianOptimization(
    f=xgb_cv,
    pbounds=pbounds,
    random_state=42
)

print("Starting Bayesian Optimization...")
optimizer.maximize(init_points=3, n_iter=20)

# --- STEP 5: Final Training with Best Params ---
best_params = optimizer.max['params']

final_model = XGBClassifier(
    n_estimators=int(best_params['n_estimators']),
    max_depth=int(best_params['max_depth']),
    learning_rate=best_params['learning_rate'],
    subsample=best_params['subsample'],
    colsample_bytree=best_params['colsample_bytree'],
    eval_metric='mlogloss',
    random_state=999
)

# === Train/Test Split + re-normalisation ===
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, stratify=y, random_state=999)
if NORMALIZATION and NORMALIZATION_METHOD:
    X_train, X_test = apply_normalization(X_train, X_test, method=NORMALIZATION_METHOD)

final_model.fit(X_train, y_train)

y_pred = final_model.predict(X_test)
test_acc = accuracy(y_pred, y_test)

print("\n=== FINAL EVALUATION ON HOLDOUT TEST SET ===")
print(f"Test Accuracy: {test_acc:.4f}")


Starting Bayesian Optimization...
|   iter    |  target   | colsam... | learni... | max_depth | n_esti... | subsample |
-------------------------------------------------------------------------------------


KeyboardInterrupt: 