In [None]:
import platform
import pathlib
if platform.system() != 'Windows': pathlib.WindowsPath = pathlib.PosixPath

In [None]:
from pathlib import Path
import gc
import numpy as np
import pandas as pd
from utils import *
from scipy import signal

In [None]:
import tensorflow as tf
tf.keras.mixed_precision.set_global_policy('mixed_float16')
from keras.utils import set_random_seed
from keras.callbacks import EarlyStopping
from keras.optimizers import Adadelta
from keras.losses import mean_squared_logarithmic_error

In [None]:
from sklearn.metrics import classification_report

In [None]:
random_seed = 13
set_random_seed(random_seed)
rng = np.random.default_rng(random_seed)

In [None]:
dataset_version = 1
root_path = Path('results') / f'dataset_V{dataset_version}'
params_path = root_path / 'params.npy'
params = np.load(params_path, allow_pickle=True).item()
locals().update(params)
datasets_path = root_path / 'datasets.npy'
datasets = np.load(datasets_path, allow_pickle=True).item()
locals().update(datasets)
all_train_files = train_files.copy()
train_all_annotations = pd.read_csv(params_path.with_name('train_annotations.csv'))
val_annotations = pd.read_csv(params_path.with_name('val_annotations.csv'))
test_annotations = pd.read_csv(params_path.with_name('test_annotations.csv'))
x_val, y_val = None, None
x_test, y_test = None, None
x_train, y_train = None, None

In [None]:
save_path_root = root_path / 'End2End_models'
save_path_root.mkdir(parents=True, exist_ok=True)
result_path = save_path_root / 'results.csv'

In [None]:
frame_length_seconds = frame_length / sr
hop_length_seconds = hop_length / sr

batch_size = 100
n_epochs = 100

In [None]:
if result_path.exists():
    results = pd.read_csv(result_path, index_col=0, header=[0, 1])
    results.rename(lambda x: '' if x.startswith('Unnamed') else x, axis=1, level=0, inplace=True)
    results.rename(lambda x: '' if x.startswith('Unnamed') else x, axis=1, level=1, inplace=True)
else: results = None

In [None]:
# End2End Simple

model_save_path = save_path_root / 'End2End_Simple/model.h5'
if not model_save_path.exists():
    if x_val is None or x_train is None:
        x_train, y_train, _ = create_dataset_from_annotations(annotations=train_all_annotations, sr=sr, frame_length=frame_length, hop_length=hop_length)
        x_train = x_train[..., None].astype(float)
        y_train = y_train.astype(float)
        
        x_val, y_val, _ = create_dataset_from_annotations(annotations=val_annotations, sr=sr, frame_length=frame_length, hop_length=test_hop_length)
        x_val = x_val[..., None].astype(float)
        y_val = y_val.astype(float)
        
    def idx_to_sample(idx):
        idx = np.array(idx)
        return x_train[idx], y_train[idx]
    train_data = tf.data.Dataset.from_tensor_slices(range(len(x_train)))
    train_data = train_data.shuffle(buffer_size=len(x_train), seed=random_seed)
    train_data = train_data.batch(batch_size, drop_remainder=True)
    train_data = train_data.map(lambda idx: tf.py_function(func=idx_to_sample, inp=[idx], Tout=(tf.float16, tf.float16)), num_parallel_calls=tf.data.experimental.AUTOTUNE)
    train_data = train_data.prefetch(1)
    
    model = create_classification_model(frame_length)
    model.compile(
        loss=mean_squared_logarithmic_error,
        optimizer=Adadelta(learning_rate=1e-2),
        metrics=[tf.keras.metrics.BinaryAccuracy()]
    )
    history = model.fit(
        train_data,
        batch_size=batch_size,
        epochs=n_epochs,
        shuffle=True,
        validation_data=(x_val, y_val),
        callbacks=[EarlyStopping(patience=10, restore_best_weights=True)]
    )
    
    model_save_path.parent.mkdir(exist_ok=True, parents=True)
    model.save(model_save_path)
    np.save(model_save_path.with_name('history.npy'), history.history, allow_pickle=True)
    gc.collect(), tf.keras.backend.clear_session()
    

In [None]:
# End2End + Gammatone initialized first layer

model_save_path = save_path_root / 'End2End_Gammaton/model.h5'
if not model_save_path.exists():
    if x_val is None or x_train is None:
        x_train, y_train, _ = create_dataset_from_annotations(annotations=train_all_annotations, sr=sr, frame_length=frame_length, hop_length=hop_length)
        x_train = x_train[..., None].astype(float)
        y_train = y_train.astype(float)
        
        x_val, y_val, _ = create_dataset_from_annotations(annotations=val_annotations, sr=sr, frame_length=frame_length, hop_length=test_hop_length)
        x_val = x_val[..., None].astype(float)
        y_val = y_val.astype(float)
        
    def idx_to_sample(idx):
        idx = np.array(idx)
        return x_train[idx], y_train[idx]
    train_data = tf.data.Dataset.from_tensor_slices(range(len(x_train)))
    train_data = train_data.shuffle(buffer_size=len(x_train), seed=random_seed)
    train_data = train_data.batch(batch_size, drop_remainder=True)
    train_data = train_data.map(lambda idx: tf.py_function(func=idx_to_sample, inp=[idx], Tout=(tf.float16, tf.float16)), num_parallel_calls=tf.data.experimental.AUTOTUNE)
    train_data = train_data.prefetch(1)
    
    model = create_classification_model_gammatone(frame_length)
    model.compile(
        loss=mean_squared_logarithmic_error,
        optimizer=Adadelta(learning_rate=1e-2),
        metrics=[tf.keras.metrics.BinaryAccuracy()]
    )
    history = model.fit(
        train_data,
        batch_size=batch_size,
        epochs=n_epochs,
        shuffle=True,
        validation_data=(x_val, y_val),
        callbacks=[EarlyStopping(patience=10, restore_best_weights=True)]
    )
    
    model_save_path.parent.mkdir(exist_ok=True, parents=True)
    model.save(model_save_path)
    np.save(model_save_path.with_name('history.npy'), history.history, allow_pickle=True)
    gc.collect(), tf.keras.backend.clear_session()

In [None]:
# End2End + frozen Gammatone initialized first layer

model_save_path = save_path_root / 'End2End_Gammaton_Frozen/model.h5'
if not model_save_path.exists():
    if x_val is None or x_train is None:
        x_train, y_train, _ = create_dataset_from_annotations(annotations=train_all_annotations, sr=sr, frame_length=frame_length, hop_length=hop_length)
        x_train = x_train[..., None].astype(float)
        y_train = y_train.astype(float)
        
        x_val, y_val, _ = create_dataset_from_annotations(annotations=val_annotations, sr=sr, frame_length=frame_length, hop_length=test_hop_length)
        x_val = x_val[..., None].astype(float)
        y_val = y_val.astype(float)
        
    def idx_to_sample(idx):
        idx = np.array(idx)
        return x_train[idx], y_train[idx]
    train_data = tf.data.Dataset.from_tensor_slices(range(len(x_train)))
    train_data = train_data.shuffle(buffer_size=len(x_train), seed=random_seed)
    train_data = train_data.batch(batch_size, drop_remainder=True)
    train_data = train_data.map(lambda idx: tf.py_function(func=idx_to_sample, inp=[idx], Tout=(tf.float16, tf.float16)), num_parallel_calls=tf.data.experimental.AUTOTUNE)
    train_data = train_data.prefetch(1)
    
    model = create_classification_model_gammatone(frame_length)
    model.layers[1].trainable = False
    model.compile(
        loss=mean_squared_logarithmic_error,
        optimizer=Adadelta(learning_rate=1e-2),
        metrics=[tf.keras.metrics.BinaryAccuracy()]
    )
    history = model.fit(
        train_data,
        batch_size=batch_size,
        epochs=n_epochs,
        shuffle=True,
        validation_data=(x_val, y_val),
        callbacks=[EarlyStopping(patience=10, restore_best_weights=True)]
    )
    
    model_save_path.parent.mkdir(exist_ok=True, parents=True)
    model.save(model_save_path)
    np.save(model_save_path.with_name('history.npy'), history.history, allow_pickle=True)
    gc.collect(), tf.keras.backend.clear_session()
else:
    model = tf.keras.models.load_model(model_save_path, compile=False)

In [None]:
# End2End + frozen Gammatone initialized first layer + Unfrozen and Fine-tuned

model_save_path = save_path_root / 'End2End_Gammaton_FineTuned/model.h5'
if not model_save_path.exists():
    if x_val is None or x_train is None:
        x_train, y_train, _ = create_dataset_from_annotations(annotations=train_all_annotations, sr=sr, frame_length=frame_length, hop_length=hop_length)
        x_train = x_train[..., None].astype(float)
        y_train = y_train.astype(float)
        
        x_val, y_val, _ = create_dataset_from_annotations(annotations=val_annotations, sr=sr, frame_length=frame_length, hop_length=test_hop_length)
        x_val = x_val[..., None].astype(float)
        y_val = y_val.astype(float)
        
    def idx_to_sample(idx):
        idx = np.array(idx)
        return x_train[idx], y_train[idx]
    train_data = tf.data.Dataset.from_tensor_slices(range(len(x_train)))
    train_data = train_data.shuffle(buffer_size=len(x_train), seed=random_seed)
    train_data = train_data.batch(batch_size, drop_remainder=True)
    train_data = train_data.map(lambda idx: tf.py_function(func=idx_to_sample, inp=[idx], Tout=(tf.float16, tf.float16)), num_parallel_calls=tf.data.experimental.AUTOTUNE)
    train_data = train_data.prefetch(1)

    model.layers[1].trainable = True
    model.compile(
        loss=mean_squared_logarithmic_error,
        optimizer=Adadelta(learning_rate=1e-3),
        metrics=[tf.keras.metrics.BinaryAccuracy()]
    )
    history = model.fit(
        train_data,
        batch_size=batch_size,
        epochs=n_epochs,
        shuffle=True,
        validation_data=(x_val, y_val),
        callbacks=[EarlyStopping(patience=10, restore_best_weights=True)]
    )
    
    model_save_path.parent.mkdir(exist_ok=True, parents=True)
    model.save(model_save_path)
    np.save(model_save_path.with_name('history.npy'), history.history, allow_pickle=True)
    gc.collect(), tf.keras.backend.clear_session()

In [None]:
# End2End Simple + 150Hz-3kHz Bandpass
sos = signal.butter(10, 3e3, 'low', fs=sr, output='sos')
preprocessing_fn = lambda x: signal.sosfiltfilt(sos, x)

model_save_path = save_path_root / 'End2End_Simple_3khzBP/model.h5'
if not model_save_path.exists():
    
    x_train, y_train, _ = create_dataset_from_annotations(annotations=train_all_annotations, sr=sr, frame_length=frame_length, hop_length=hop_length, preprocessing_fn=preprocessing_fn)
    x_train = x_train[..., None].astype(float)
    y_train = y_train.astype(float)
    
    x_val, y_val, _ = create_dataset_from_annotations(annotations=val_annotations, sr=sr, frame_length=frame_length, hop_length=test_hop_length, preprocessing_fn=preprocessing_fn)
    x_val = x_val[..., None].astype(float)
    y_val = y_val.astype(float)
    
    def idx_to_sample(idx):
        idx = np.array(idx)
        return x_train[idx], y_train[idx]
    train_data = tf.data.Dataset.from_tensor_slices(range(len(x_train)))
    train_data = train_data.shuffle(buffer_size=len(x_train), seed=random_seed)
    train_data = train_data.batch(batch_size, drop_remainder=True)
    train_data = train_data.map(lambda idx: tf.py_function(func=idx_to_sample, inp=[idx], Tout=(tf.float16, tf.float16)), num_parallel_calls=tf.data.experimental.AUTOTUNE)
    train_data = train_data.prefetch(1)
    
    model = create_classification_model(frame_length)
    model.compile(
        loss=mean_squared_logarithmic_error,
        optimizer=Adadelta(learning_rate=1e-2),
        metrics=[tf.keras.metrics.BinaryAccuracy()]
    )
    history = model.fit(
        train_data,
        batch_size=batch_size,
        epochs=n_epochs,
        shuffle=True,
        validation_data=(x_val, y_val),
        callbacks=[EarlyStopping(patience=10, restore_best_weights=True)]
    )
    
    model_save_path.parent.mkdir(exist_ok=True, parents=True)
    model.save(model_save_path)
    np.save(model_save_path.with_name('history.npy'), history.history, allow_pickle=True)
    
    x_train, x_val = None, None
    gc.collect(), tf.keras.backend.clear_session()
    

In [None]:
# End2End + frozen Gammatone initialized first layer + NoisyBee excluded

train_annotations_NoisyBee_excluded = train_all_annotations.copy()
train_annotations_NoisyBee_excluded.loc[train_annotations_NoisyBee_excluded.noisybee > 1e-6, 'is_selected'] = False
train_annotations_NoisyBee_excluded = balance_samples_by_annotation(train_annotations_NoisyBee_excluded, random_seed=random_seed)

model_save_path = save_path_root / 'End2End_Gammaton_Frozen_NoisyBee_excluded/model.h5'
if not model_save_path.exists():
    if x_val is None:
        x_val, y_val, _ = create_dataset_from_annotations(annotations=val_annotations, sr=sr, frame_length=frame_length, hop_length=test_hop_length)
        x_val = x_val[..., None].astype(float)
        y_val = y_val.astype(float)
    
    x_train, y_train, _ = create_dataset_from_annotations(annotations=train_annotations_NoisyBee_excluded, sr=sr, frame_length=frame_length, hop_length=hop_length)
    x_train = x_train[..., None].astype(float)
    y_train = y_train.astype(float)
        
    def idx_to_sample(idx):
        idx = np.array(idx)
        return x_train[idx], y_train[idx]
    train_data = tf.data.Dataset.from_tensor_slices(range(len(x_train)))
    train_data = train_data.shuffle(buffer_size=len(x_train), seed=random_seed)
    train_data = train_data.batch(batch_size, drop_remainder=True)
    train_data = train_data.map(lambda idx: tf.py_function(func=idx_to_sample, inp=[idx], Tout=(tf.float16, tf.float16)), num_parallel_calls=tf.data.experimental.AUTOTUNE)
    train_data = train_data.prefetch(1)
    
    model = create_classification_model_gammatone(frame_length)
    model.layers[1].trainable = False
    model.compile(
        loss=mean_squared_logarithmic_error,
        optimizer=Adadelta(learning_rate=1e-2),
        metrics=[tf.keras.metrics.BinaryAccuracy()]
    )
    history = model.fit(
        train_data,
        batch_size=batch_size,
        epochs=n_epochs,
        shuffle=True,
        validation_data=(x_val, y_val),
        callbacks=[EarlyStopping(patience=10, restore_best_weights=True)]
    )
    
    model_save_path.parent.mkdir(exist_ok=True, parents=True)
    model.save(model_save_path)
    np.save(model_save_path.with_name('history.npy'), history.history, allow_pickle=True)
    gc.collect(), tf.keras.backend.clear_session()

In [None]:
# End2End + frozen Gammatone initialized first layer + NoisyBee excluded + Colored Noise Augmentation

def add_noise_to_samples(x):
    augmentation_rng = np.random.default_rng(random_seed)
    sos = signal.butter(8, 150, btype='high', fs=sr, output='sos')
    def add_noise_to_sample(sample):
        noise_type = augmentation_rng.choice(['white', 'pink', 'brown', 'blue', 'violet', 'grey'])
        desired_snr_db = augmentation_rng.uniform(10, 100)
        sample = add_noise_to_signal(sample, noise_type, desired_snr_db, fs=sr)
        sample = signal.sosfiltfilt(sos, sample)
        return list(sample)
    x = np.array(list(map(add_noise_to_sample, x)))
    return x

model_save_path = save_path_root / 'End2End_Gammaton_Frozen_NoisyBee_excluded_Augmented/model.h5'
if not model_save_path.exists():
    if x_val is None:
        x_val, y_val, _ = create_dataset_from_annotations(annotations=val_annotations, sr=sr, frame_length=frame_length, hop_length=test_hop_length)
        x_val = x_val[..., None].astype(float)
        y_val = y_val.astype(float)
    
    x_train, y_train, _ = create_dataset_from_annotations(annotations=train_annotations_NoisyBee_excluded, sr=sr, frame_length=frame_length, hop_length=hop_length, preprocessing_fn=add_noise_to_samples)
    x_train = x_train[..., None].astype(float)
    y_train = y_train.astype(float)
    
    def idx_to_sample(idx):
        idx = np.array(idx)
        return x_train[idx], y_train[idx]
    
    train_data = tf.data.Dataset.from_tensor_slices(range(len(x_train)))
    train_data = train_data.shuffle(buffer_size=len(x_train), seed=random_seed)
    train_data = train_data.batch(batch_size, drop_remainder=True)
    train_data = train_data.map(lambda idx: tf.py_function(func=idx_to_sample, inp=[idx], Tout=(tf.float16, tf.float16)), num_parallel_calls=tf.data.experimental.AUTOTUNE)
    train_data = train_data.prefetch(1)
    
    model = create_classification_model_gammatone(frame_length)
    model.layers[1].trainable = False
    model.compile(
        loss=mean_squared_logarithmic_error,
        optimizer=Adadelta(learning_rate=1e-2),
        metrics=[tf.keras.metrics.BinaryAccuracy()]
    )
    history = model.fit(
        train_data,
        batch_size=batch_size,
        epochs=n_epochs,
        shuffle=True,
        validation_data=(x_val, y_val),
        callbacks=[EarlyStopping(patience=10, restore_best_weights=True)]
    )
    
    model_save_path.parent.mkdir(exist_ok=True, parents=True)
    model.save(model_save_path)
    np.save(model_save_path.with_name('history.npy'), history.history, allow_pickle=True)
    gc.collect(), tf.keras.backend.clear_session()

In [None]:
for model_path in save_path_root.glob('**/*.h5'):
    model_name = model_path.parent.name
    if results is not None and model_name in results.model_name.values: continue
    if model_name == 'End2End_Simple_3khzBP':
        x_test, y_test, _ = create_dataset_from_annotations(annotations=test_annotations, sr=sr, frame_length=frame_length, hop_length=test_hop_length, preprocessing_fn=preprocessing_fn)
        x_test = x_test[..., None].astype(float)
        y_test = np.vectorize({0: 'NoBee', 1: 'Bee'}.get)(y_test.astype(int))
    else:
        x_test, y_test, _ = create_dataset_from_annotations(annotations=test_annotations, sr=sr, frame_length=frame_length, hop_length=test_hop_length)
        x_test = x_test[..., None].astype(float)
        y_test = np.vectorize({0: 'NoBee', 1: 'Bee'}.get)(y_test.astype(int))
    model = tf.keras.models.load_model(model_path, compile=False)
    y_pred = model.predict(x_test, batch_size=256)
    gc.collect(), tf.keras.backend.clear_session()
    y_pred = np.vectorize({0: 'NoBee', 1: 'Bee'}.get)((y_pred > 0.5).astype(int))
    classification_report_results = classification_report(y_test, y_pred, output_dict=True)
    classification_report_results = convert_classification_report_to_df(classification_report_results)
    classification_report_results['model_name'] = model_name
    results = pd.concat([results, classification_report_results], axis=0, ignore_index=True) if results is not None else classification_report_results
    results.round(3).to_csv(result_path)
    results.to_json(result_path.with_suffix('.json'))