In [1]:
import os
import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import Input, Conv1D, MaxPooling1D, GRU, Dense, Dropout, BatchNormalization, LayerNormalization, Reshape, Permute, Bidirectional, Add, Attention, Flatten, TimeDistributed
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping, ModelCheckpoint, Callback
from tensorflow.keras.layers import Layer, Concatenate
from tensorflow.keras import backend as K
from sklearn.metrics import f1_score
import librosa
import soundfile as sf
import noisereduce as nr
import matplotlib.pyplot as plt
from scipy.signal import butter, sosfilt

# Add this to create a directory for saving figures
FIGURES_DIR = 'training_figures_1'
os.makedirs(FIGURES_DIR, exist_ok=True)

2025-04-03 22:19:10.891814: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2025-04-03 22:19:10.902696: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:467] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1743698950.915726  119052 cuda_dnn.cc:8579] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1743698950.919527  119052 cuda_blas.cc:1407] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
W0000 00:00:1743698950.930018  119052 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking 

In [2]:
# Force GPU usage
physical_devices = tf.config.list_physical_devices('GPU')
if physical_devices:
    tf.config.set_visible_devices(physical_devices[0], 'GPU')
    tf.config.experimental.set_memory_growth(physical_devices[0], True)
else:
    print("No GPU devices found")

In [3]:
# Data augmentation and preprocessing
def load_and_preprocess_audio(file_path, sr=16000, duration=4):
    try:
        audio, sr = librosa.load(file_path, sr=sr, duration=duration)

        # Data Augmentation (increased probability and variety)
        if np.random.random() < 0.5:  # 50% chance of applying augmentation
            augmentation_type = np.random.choice(['noise', 'pitch', 'speed'])
            if augmentation_type == 'noise':
                noise = np.random.randn(len(audio)) * 0.005
                audio = audio + noise
            elif augmentation_type == 'pitch':
                audio = librosa.effects.pitch_shift(audio, sr=sr, n_steps=np.random.uniform(-2, 2))
            else:  # speed
                audio = librosa.effects.time_stretch(audio, rate=np.random.uniform(0.8, 1.2))

        # Normalize audio
        audio = audio - np.mean(audio)
        audio = audio / np.max(np.abs(audio))

        # Pad if necessary
        if len(audio) < sr * duration:
            audio = np.pad(audio, (0, sr * duration - len(audio)))
        return audio
    except Exception as e:
        print(f"Error loading {file_path}: {e}")
        return None

In [4]:
def extract_features(audio, sr=16000, n_mels=80, n_fft=2048, hop_length=512):
    if audio is None:
        return None

    # Extract mel spectrogram
    mel_spec = librosa.feature.melspectrogram(
        y=audio,
        sr=sr,
        n_mels=n_mels,
        n_fft=n_fft,
        hop_length=hop_length
    )
    log_mel_spec = librosa.power_to_db(mel_spec, ref=np.max)

    # Normalize features
    log_mel_spec = (log_mel_spec - np.mean(log_mel_spec)) / np.std(log_mel_spec)
    return log_mel_spec

In [5]:
# Analyze class distribution for debugging class imbalance
def analyze_class_distribution(data_path):
    real_count = len([f for f in os.listdir(os.path.join(data_path, 'real')) if f.endswith('.wav')])
    fake_count = len([f for f in os.listdir(os.path.join(data_path, 'fake')) if f.endswith('.wav')])
    total = real_count + fake_count
    print(f"\nClass Distribution for {data_path}:")
    print(f"Real: {real_count} ({real_count/total*100:.2f}%)")
    print(f"Fake: {fake_count} ({fake_count/total*100:.2f}%)")
    return {'real': real_count, 'fake': fake_count}

In [6]:
# Data generator with sample weights and debugging utilities
def data_generator(data_path, batch_size=128, shuffle=True):
    real_files = [os.path.join(data_path, 'real', f) for f in os.listdir(os.path.join(data_path, 'real')) if f.endswith('.wav')]
    fake_files = [os.path.join(data_path, 'fake', f) for f in os.listdir(os.path.join(data_path, 'fake')) if f.endswith('.wav')]
    
    all_files = real_files + fake_files
    labels = [1] * len(real_files) + [0] * len(fake_files)
    
    total_samples = len(all_files)
    class_weights = {
        1: total_samples / (2 * len(real_files)),
        0: total_samples / (2 * len(fake_files))
    }
    
    while True:
        if shuffle:
            temp = list(zip(all_files, labels))
            np.random.shuffle(temp)
            all_files, labels = zip(*temp)
        
        for i in range(0, len(all_files), batch_size):
            batch_files = all_files[i:i+batch_size]
            batch_labels = labels[i:i+batch_size]
            
            batch_x = []
            batch_y = []
            batch_weights = []
            max_length = 0
            
            for file_path, label in zip(batch_files, batch_labels):
                audio = load_and_preprocess_audio(file_path)
                features = extract_features(audio)
                
                if features is not None:
                    batch_x.append(features.T)
                    batch_y.append(label)
                    weight = class_weights[label]
                    batch_weights.append(weight)
                    max_length = max(max_length, features.T.shape[0])
            
            # Pad sequences to max_length
            padded_batch_x = []
            for x in batch_x:
                padded_x = np.pad(x, ((0, max_length - x.shape[0]), (0, 0)), mode='constant')
                padded_batch_x.append(padded_x)
            
            if padded_batch_x:
                yield np.array(padded_batch_x), np.array(batch_y), np.array(batch_weights)

In [7]:
class MFM(Layer):
    def __init__(self, **kwargs):
        super(MFM, self).__init__(**kwargs)

    def call(self, inputs):
        shape = tf.shape(inputs)
        # Ensure the last dimension is even for splitting
        features = shape[-1]
        features_div_2 = features // 2
        # Use tf.split for robustness
        split1, split2 = tf.split(inputs, num_or_size_splits=2, axis=-1)
        max_features = tf.math.maximum(split1, split2)
        # Reshape might not be necessary if subsequent layers handle variable channel size,
        # but let's keep it consistent with the original intent if MFM is meant to halve channels.
        # The original reshape could fail if dimensions aren't fully defined.
        # Let's rely on tf.math.maximum which preserves shape except last dim halved.
        # output_shape = (shape[0], shape[1], features_div_2) # Target shape
        # return tf.reshape(max_features, output_shape) # This can be problematic
        return max_features # Simpler, relies on max operation result shape

def create_enhanced_lc_grnn_model(input_shape):
    inputs = Input(shape=input_shape)

    x = BatchNormalization()(inputs)

    # Light Convolutional layers with REDUCED regularization
    # Reduced L2 and Dropout
    l2_reg = 0.001 # Significantly reduced L2
    conv_dropout_rate = 0.3 # Slightly reduced Dropout

    x = Conv1D(32 * 2, 5, padding='same', activation='relu', kernel_regularizer=tf.keras.regularizers.l2(l2_reg))(x) # Double filters for MFM
    x = MFM()(x)
    x = MaxPooling1D(pool_size=2)(x)
    x = Dropout(conv_dropout_rate)(x) # Use reduced rate

    x = Conv1D(64 * 2, 3, padding='same', activation='relu', kernel_regularizer=tf.keras.regularizers.l2(l2_reg))(x) # Double filters for MFM
    x = MFM()(x)
    x = MaxPooling1D(pool_size=2)(x)
    x = Dropout(conv_dropout_rate)(x) # Use reduced rate

    # Bidirectional GRU layers with residual connections and REDUCED regularization
    gru_dropout_rate = 0.4 # Slightly reduced Dropout
    for units in [64, 32]:
        # Keras Bidirectional merges outputs, so GRU units should be units//2 to match total 'units' effectively
        gru = Bidirectional(GRU(units // 2, return_sequences=True, kernel_regularizer=tf.keras.regularizers.l2(l2_reg)))
        gru_output = gru(x)

        # Projection Dense layer for residual connection - ensure dimensions match
        if K.int_shape(x)[-1] != units: # Check if projection is needed
             x_proj = Dense(units)(x) # Project x to match GRU output dim if necessary
        else:
             x_proj = x

        # Residual connection - ensure GRU output dim matches x_proj dim (should be 'units' total)
        if K.int_shape(gru_output)[-1] != units:
            gru_output_proj = Dense(units)(gru_output) # Project GRU output if needed (less common)
        else:
            gru_output_proj = gru_output

        x = Add()([x_proj, gru_output_proj])
        x = LayerNormalization()(x)
        x = Dropout(gru_dropout_rate)(x) # Use reduced rate

    # Attention mechanism (ensure input dimensions are handled correctly by Add)
    # Self-attention applied to the output of the GRU stack
    attention_output = Attention()([x, x]) # Computes attention weights and applies them
    x = Add()([x, attention_output]) # Add attention context back to original sequence

    # Final GRU layer - Reduce units to avoid bottleneck if needed
    final_gru_units = 16
    x = Bidirectional(GRU(final_gru_units, kernel_regularizer=tf.keras.regularizers.l2(l2_reg)))(x) # Output shape (batch, final_gru_units*2)
    x = LayerNormalization()(x)
    x = Dropout(gru_dropout_rate)(x) # Use reduced rate

    # Dense layers with REDUCED regularization
    x = Dense(32, activation='relu', kernel_regularizer=tf.keras.regularizers.l2(l2_reg))(x)
    x = Dropout(gru_dropout_rate)(x) # Use reduced rate

    outputs = Dense(1, activation='sigmoid')(x)

    model = Model(inputs=inputs, outputs=outputs)

    return model

In [8]:
# Paths to data
train_data_path = 'datasetNEW/train'
dev_data_path = 'datasetNEW/dev'
eval_data_path = 'datasetNEW/eval'

# Analyze class distribution
print("Training set class distribution:")
analyze_class_distribution(train_data_path)
analyze_class_distribution(dev_data_path)
analyze_class_distribution(eval_data_path)

Training set class distribution:

Class Distribution for datasetNEW/train:
Real: 2580 (10.17%)
Fake: 22800 (89.83%)

Class Distribution for datasetNEW/dev:
Real: 2548 (10.26%)
Fake: 22296 (89.74%)

Class Distribution for datasetNEW/eval:
Real: 7355 (10.32%)
Fake: 63882 (89.68%)


{'real': 7355, 'fake': 63882}

In [9]:
def count_files(path):
    real_files = [f for f in os.listdir(os.path.join(path, 'real')) if f.endswith('.wav')]
    fake_files = [f for f in os.listdir(os.path.join(path, 'fake')) if f.endswith('.wav')]
    return len(real_files) + len(fake_files)

# Create generators
batch_size = 128  # Increased batch size
train_gen = data_generator(train_data_path, batch_size=batch_size)
dev_gen = data_generator(dev_data_path, batch_size=batch_size)
eval_gen = data_generator(eval_data_path, batch_size=batch_size)

# Calculate steps per epoch
train_samples_count = count_files(train_data_path)
dev_samples_count = count_files(dev_data_path)
eval_samples_count = count_files(eval_data_path)

steps_per_epoch = train_samples_count // batch_size
validation_steps = dev_samples_count // batch_size
eval_steps = eval_samples_count // batch_size

In [10]:
# Cell 10: Callbacks Definition

import matplotlib.pyplot as plt
import os
import tensorflow as tf # Added for clarity, though Keras might import it implicitly
from tensorflow.keras.callbacks import Callback, ReduceLROnPlateau, EarlyStopping, ModelCheckpoint

# --- Ensure Figure Directory Exists ---
# This should match the directory defined in Cell 1
FIGURES_DIR = 'training_figures_1'
os.makedirs(FIGURES_DIR, exist_ok=True)

# --- Custom Callback for Plotting History ---
class PlotTrainingHistory(Callback):
    """Callback to plot training/validation metrics after each epoch."""
    def __init__(self, model_name='model'):
        super().__init__()
        self.model_name = model_name
        self.epochs_run = 0 # Track epochs completed

    def on_epoch_end(self, epoch, logs=None):
        self.epochs_run = epoch + 1 # Update epochs completed
        logs = logs or {} # Ensure logs is a dict

        # Check if history object and necessary keys are available
        if not hasattr(self.model, 'history') or not hasattr(self.model.history, 'history') or not self.model.history.history:
            print(f"Warning: model.history not available or empty at epoch {epoch+1}. Skipping plot.")
            return

        history_dict = self.model.history.history

        # Identify available metric keys dynamically
        acc_key = None
        val_acc_key = None
        loss_key = 'loss'
        val_loss_key = 'val_loss'
        auc_key = None
        val_auc_key = None

        # Find primary accuracy, loss, and AUC keys
        for key in history_dict.keys():
            # Prioritize weighted metrics if available, otherwise standard
            if 'accuracy' in key and 'weighted' in key and 'val_' not in key:
                acc_key = key
            elif 'accuracy' in key and 'val_' not in key and acc_key is None: # Fallback to standard acc
                acc_key = key

            if 'val_accuracy' in key and 'weighted' in key:
                val_acc_key = key
            elif 'val_accuracy' in key and val_acc_key is None: # Fallback to standard val_acc
                val_acc_key = key

            if 'auc' in key and 'weighted' in key and 'val_' not in key:
                 auc_key = key
            elif 'auc' in key and 'val_' not in key and auc_key is None : # Fallback to standard auc
                 auc_key = key

            if 'val_auc' in key and 'weighted' in key:
                 val_auc_key = key
            elif 'val_auc' in key and val_auc_key is None : # Fallback to standard val_auc
                 val_auc_key = key


        # Check if essential keys are found (loss keys are mandatory)
        essential_keys_found = loss_key in history_dict and val_loss_key in history_dict
        optional_metrics_found = acc_key and val_acc_key and auc_key and val_auc_key

        if not essential_keys_found:
             print(f"Warning: Essential loss keys ('{loss_key}', '{val_loss_key}') not found in history ({history_dict.keys()}). Skipping plot for epoch {epoch+1}.")
             return
        if not optional_metrics_found:
             print(f"Warning: Could not find all optional metric keys (accuracy/AUC variants) in history ({history_dict.keys()}) for epoch {epoch+1}.")
             # Proceed to plot loss anyway

        # --- Plotting Start ---
        try:
            # Create epoch range based on history length
            epochs_range = range(1, len(history_dict[loss_key]) + 1)

            num_plots = 1
            if acc_key and val_acc_key: num_plots += 1
            if auc_key and val_auc_key: num_plots += 1

            plt.figure(figsize=(5 * num_plots, 5))
            plot_index = 1

            # Plot Loss
            plt.subplot(1, num_plots, plot_index)
            plot_index += 1
            plt.plot(epochs_range, history_dict[loss_key], label='Training Loss')
            plt.plot(epochs_range, history_dict[val_loss_key], label='Validation Loss')
            plt.title(f'Model Loss (Epoch {self.epochs_run})')
            plt.xlabel('Epoch')
            plt.ylabel('Loss')
            plt.legend()
            plt.grid(True)

            # Plot Accuracy if available
            if acc_key and val_acc_key:
                plt.subplot(1, num_plots, plot_index)
                plot_index += 1
                plt.plot(epochs_range, history_dict[acc_key], label=f'Training Acc ({acc_key.split("_")[-1]})')
                plt.plot(epochs_range, history_dict[val_acc_key], label=f'Validation Acc ({val_acc_key.split("_")[-1]})')
                plt.title(f'Model Accuracy (Epoch {self.epochs_run})')
                plt.xlabel('Epoch')
                plt.ylabel('Accuracy')
                plt.legend()
                plt.grid(True)

            # Plot AUC if available
            if auc_key and val_auc_key:
                 plt.subplot(1, num_plots, plot_index)
                 plot_index += 1
                 plt.plot(epochs_range, history_dict[auc_key], label=f'Training AUC ({auc_key.split("_")[-1]})')
                 plt.plot(epochs_range, history_dict[val_auc_key], label=f'Validation AUC ({val_auc_key.split("_")[-1]})')
                 plt.title(f'Model AUC (Epoch {self.epochs_run})')
                 plt.xlabel('Epoch')
                 plt.ylabel('AUC')
                 plt.legend()
                 plt.grid(True)


            plt.tight_layout()
            filepath = os.path.join(FIGURES_DIR, f'{self.model_name}_epoch_{self.epochs_run}.png')
            plt.savefig(filepath)
            plt.close()
            # print(f'Training history plot saved to {filepath}') # Keep this quiet during training

        except KeyError as e:
            print(f"Error plotting history in PlotTrainingHistory: KeyError accessing {e}. Available keys: {history_dict.keys()}")
        except Exception as e:
            print(f"An unexpected error occurred during plotting in PlotTrainingHistory: {e}")
        # --- Plotting End ---


# --- Standard Keras Callbacks ---

# Reduce Learning Rate on Plateau
reduce_lr = ReduceLROnPlateau(
    monitor='val_loss', # Monitor validation loss
    factor=0.2,         # Reduce LR by factor of 0.2 (80% reduction)
    patience=5,         # Reduce LR after 5 epochs of no improvement in val_loss
    min_lr=1e-7,        # Do not reduce LR below this value
    verbose=1           # Print message when LR is reduced
)

# Early Stopping
early_stopping = EarlyStopping(
    monitor='val_loss',        # Monitor validation loss
    patience=10,               # Stop after 10 epochs of no improvement in val_loss
    restore_best_weights=True, # Restore model weights from the epoch with the best val_loss
    verbose=1                  # Print message when stopping early
)

# Model Checkpoint (Save Best Only)
checkpoint_dir = './training_checkpoints'
os.makedirs(checkpoint_dir, exist_ok=True)
# Define file path for the best model weights
checkpoint_filepath = os.path.join(checkpoint_dir, "best_model.weights.h5")

checkpoint_callback = ModelCheckpoint(
    filepath=checkpoint_filepath, # Path to save the best model file
    save_weights_only=True,      # Only save the model's weights
    monitor='val_loss',          # Monitor validation loss
    mode='min',                  # The monitored quantity should be minimized (val_loss)
    save_best_only=True,         # Save only when val_loss improves
    verbose=1                    # Print message when saving the best model
)

# --- Instantiate the Custom Callback ---
plot_training_callback = PlotTrainingHistory(model_name='audio_model')

print("Callbacks defined and ready.") # Confirmation message

Callbacks defined and ready.


In [11]:
# Create and compile the model
input_shape = (None, 80) # Time steps, Features (Mel bins)
model = create_enhanced_lc_grnn_model(input_shape)

# Compile with a potentially slightly lower initial LR and weighted metrics
initial_lr = 0.0005 # Try a slightly lower starting LR
optimizer = Adam(learning_rate=initial_lr)

model.compile(optimizer=optimizer,
              loss='binary_crossentropy',
              metrics=['accuracy', tf.keras.metrics.AUC(name='auc')], # Keep standard metrics
              weighted_metrics=['accuracy', tf.keras.metrics.AUC(name='auc_weighted')]) # Add weighted versions

model.summary() # Print model summary to check layers and parameters

I0000 00:00:1743698953.490794  119052 gpu_device.cc:2019] Created device /job:localhost/replica:0/task:0/device:GPU:0 with 2143 MB memory:  -> device: 0, name: NVIDIA GeForce RTX 3050 Ti Laptop GPU, pci bus id: 0000:01:00.0, compute capability: 8.6


In [12]:
# Train the model
epochs = 75 # Increased epochs slightly, early stopping will manage duration

print("Starting model training...")
history = model.fit(
    train_gen,
    steps_per_epoch=steps_per_epoch,
    epochs=epochs,
    validation_data=dev_gen,
    validation_steps=validation_steps,
    callbacks=[reduce_lr, early_stopping, checkpoint_callback, plot_training_callback],
    verbose=1 # Set verbose to 1 for progress bar and metrics per epoch
)
print("Model training finished.")

# Save the final model (optional, as checkpoint saves the best weights)
# model.save('final_audio_model.keras')
# print("Final model saved.")

# Plot final history after training completes
if history and history.history:
    print("\nPlotting final training history...")
    final_plotter = PlotTrainingHistory(model_name='audio_model_final')
    # Manually call on_epoch_end with the last epoch number and logs if needed
    # Or replot using the full history dict
    try:
        history_dict = history.history
        epochs_range = range(1, len(history_dict['loss']) + 1)
        acc_key = [k for k in history_dict if 'accuracy' in k and 'val_' not in k][0]
        val_acc_key = [k for k in history_dict if 'val_accuracy' in k][0]

        plt.figure(figsize=(15, 5))

        plt.subplot(1, 2, 1)
        plt.plot(epochs_range, history_dict[acc_key], label=f'Training {acc_key}')
        plt.plot(epochs_range, history_dict[val_acc_key], label=f'Validation {val_acc_key}')
        plt.title(f'Final Model Accuracy')
        plt.xlabel('Epoch')
        plt.ylabel('Accuracy')
        plt.legend()
        plt.grid(True)

        plt.subplot(1, 2, 2)
        plt.plot(epochs_range, history_dict['loss'], label='Training Loss')
        plt.plot(epochs_range, history_dict['val_loss'], label='Validation Loss')
        plt.title(f'Final Model Loss')
        plt.xlabel('Epoch')
        plt.ylabel('Loss')
        plt.legend()
        plt.grid(True)

        plt.tight_layout()
        final_filepath = os.path.join(FIGURES_DIR, f'{final_plotter.model_name}_summary.png')
        plt.savefig(final_filepath)
        plt.show()
        plt.close()
        print(f"Final training history plot saved to {final_filepath}")
    except Exception as e:
        print(f"Could not plot final history: {e}")
else:
    print("No history object found to plot final graphs.")

Starting model training...
Epoch 1/75


I0000 00:00:1743698962.322723  119161 cuda_dnn.cc:529] Loaded cuDNN version 90300


[1m140/198[0m [32m━━━━━━━━━━━━━━[0m[37m━━━━━━[0m [1m1:02[0m 1s/step - accuracy: 0.5480 - auc: 0.5577 - auc_weighted: 0.5577 - loss: 1.2856 - weighted_accuracy: 0.5336

KeyboardInterrupt: 

In [None]:
from sklearn.metrics import confusion_matrix, f1_score, roc_curve
import seaborn as sns
import matplotlib.pyplot as plt

# EER and t-DCF related imports
from scipy.interpolate import interp1d

# Define t-DCF parameters (these should be set according to your task)
p_target = 0.05  # Prior probability of target speaker
c_miss = 1       # Cost of a miss (false negative)
c_false_alarm = 1 # Cost of a false alarm (false positive)

# Reset the generator to its initial state
eval_gen = data_generator(eval_data_path, batch_size=batch_size)

# Generate predictions and collect true labels
y_pred = []
y_true = []
for _ in range(eval_steps):
    batch_x, batch_y, _ = next(eval_gen)
    batch_pred = model.predict(batch_x, verbose=0)
    y_pred.extend(batch_pred.flatten())
    y_true.extend(batch_y)

# Convert to numpy arrays and ensure same length
y_pred = np.array(y_pred)
y_true = np.array(y_true)
min_len = min(len(y_pred), len(y_true))
y_pred = y_pred[:min_len]
y_true = y_true[:min_len]

# Convert predictions to binary (0 or 1)
y_pred_binary = (y_pred > 0.5).astype(int)


# Calculate F1 Score
f1 = f1_score(y_true, y_pred_binary)
print(f"F1 Score: {f1:.4f}")


# Create confusion matrix
cm = confusion_matrix(y_true, y_pred_binary)

# Convert confusion matrix to percentages
cm_percentage = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis] * 100

# Visualize confusion matrix as percentages
plt.figure(figsize=(10, 8))
sns.heatmap(cm_percentage, annot=True, fmt='.2f', cmap='Blues', cbar_kws={'format': '%.0f%%'})
plt.title('Confusion Matrix (Percentage)')
plt.ylabel('True Label')
plt.xlabel('Predicted Label')
plt.savefig(os.path.join(FIGURES_DIR, 'confusion_matrix.png'))  # Save confusion matrix
plt.show()


#---------------------------------------------------------------
# EER Calculation
#---------------------------------------------------------------
fpr, tpr, thresholds = roc_curve(y_true, y_pred, pos_label=1)
# Find the EER threshold
eer_threshold = thresholds[np.argmin(np.abs(fpr - (1-tpr)))]

# Calculate EER
eer = fpr[np.argmin(np.abs(fpr - (1-tpr)))]

print(f"EER: {eer:.4f}")

#---------------------------------------------------------------
# t-DCF Calculation
#---------------------------------------------------------------
def calculate_t_dcf(y_true, y_pred, p_target, c_miss, c_false_alarm, threshold):
    """
    Calculates the tuned Detection Cost Function (t-DCF).
    """
    # Apply threshold to get binary predictions
    y_pred_binary = (y_pred >= threshold).astype(int)

    # Calculate confusion matrix elements
    tn, fp, fn, tp = confusion_matrix(y_true, y_pred_binary).ravel()

    # Calculate False Alarm Rate (FAR) and Miss Rate (FR)
    far = fp / (tn + fp)
    fr = fn / (tp + fn)

    # Calculate t-DCF
    t_dcf = c_miss * p_target * fr + c_false_alarm * (1 - p_target) * far

    return t_dcf

# Calculate t-DCF using the EER threshold
t_dcf = calculate_t_dcf(y_true, y_pred, p_target, c_miss, c_false_alarm, eer_threshold)
print(f"t-DCF: {t_dcf:.4f}")

In [None]:
import numpy as np
import librosa
import librosa.display
import matplotlib.pyplot as plt
import noisereduce as nr
from scipy.signal import butter, sosfilt
import soundfile as sf

def denoise_and_amplify(audio, sr):
    try:
        # Noise Reduction
        reduced_noise = nr.reduce_noise(y=audio, sr=sr, stationary=False)
        return reduced_noise
    except Exception as e:
        print(f"Error in denoise_and_amplify: {e}")
        return audio  # Return original audio if an error occurs

def preprocess_and_visualize(file_path, sr=16000, duration=4):
    # Load the audio file
    audio, sr = librosa.load(file_path, sr=sr, duration=duration)
    
    # Save original audio
    sf.write('original_audio.wav', audio, sr)
    
    # Original Mel Spectrogram
    plt.figure(figsize=(15, 5))
    plt.subplot(1, 3, 1)
    mel_spec = librosa.feature.melspectrogram(y=audio, sr=sr, n_mels=80, n_fft=2048, hop_length=512)
    librosa.display.specshow(librosa.power_to_db(mel_spec, ref=np.max), sr=sr, x_axis='time', y_axis='mel')
    plt.title('Original Mel Spectrogram')
    
    # Noise Reduction using denoise_and_amplify function
    reduced_noise = denoise_and_amplify(audio, sr)
    sf.write('noise_reduced_audio.wav', reduced_noise, sr)
    
    # Final Mel Spectrogram
    plt.subplot(1, 3, 3)
    final_mel_spec = librosa.feature.melspectrogram(y=reduced_noise, sr=sr, n_mels=80, n_fft=2048, hop_length=512)
    librosa.display.specshow(librosa.power_to_db(final_mel_spec, ref=np.max), sr=sr, x_axis='time', y_axis='mel')
    plt.title('Final Mel Spectrogram')
    
    plt.tight_layout()
    plt.savefig(os.path.join(FIGURES_DIR, 'mel_spectrograms.png')) # saves graphs
    plt.close()

# Use the function
file_path = 'A_2582_0_A.wav'
preprocess_and_visualize(file_path)