In [None]:
#Importing the libraries
import os
import numpy as np
import tarfile
import tempfile
import librosa
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix
import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.utils import to_categorical
import audiomentations as am
from collections import Counter

# Configuration setup
config = {
    'sample_rate': 16000,
    'duration': 3,
    'hop_length': 512,
    'n_mels': 128,
    'n_fft': 1024,
    'batch_size': 32,
    'epochs': 50,
    'patience': 8,
    'learning_rate': 0.0005,
    'augment_prob': 0.7  # Probability of applying augmentation
}

#Data augmentation setup
augmenter = am.Compose([
    am.AddGaussianNoise(min_amplitude=0.001, max_amplitude=0.015, p=0.3),
    am.PitchShift(min_semitones=-4, max_semitones=4, p=0.3),
    am.TimeStretch(min_rate=0.8, max_rate=1.2, p=0.3),
    am.Gain(min_gain_db=-6, max_gain_db=6, p=0.2),
])

def extract_dataset(compressed_path):
  #facing problem with extraction
    extracted_dir = tempfile.mkdtemp()
    with tarfile.open(compressed_path, 'r:gz') as tar:
        tar.extractall(path=extracted_dir)
    
    #Discover all audio files
    audio_files = []
    for root, _, files in os.walk(extracted_dir):
        for file in files:
            if file.endswith('.flac') or file.endswith('.wav'):
                audio_files.append(os.path.join(root, file))
    
    #Create labels- problem
    labels = []
    for file in audio_files:
        if 'spoof' in file.lower() or 'fake' in file.lower():
            labels.append(1)  # Spoof
        else:
            labels.append(0)  # Bonafide
    
    return audio_files, labels, extracted_dir

def apply_augmentation(audio, sr):
    if np.random.rand() < config['augment_prob']:
        return augmenter(samples=audio, sample_rate=sr)
    return audio

def load_and_preprocess(file_path, label, augment=False):
    try:
        audio, sr = librosa.load(file_path, sr=config['sample_rate'])
        
        #Ensure consistent length
        if len(audio) > config['sample_rate'] * config['duration']:
            audio = audio[:config['sample_rate'] * config['duration']]
        else:
            audio = np.pad(audio, (0, max(0, config['sample_rate'] * config['duration'] - len(audio))))
        
        #Apply augmentation only to training data
        if augment:
            audio = apply_augmentation(audio, sr)
        
        #Extract mel-spectrogram
        mel_spec = librosa.feature.melspectrogram(
            y=audio,
            sr=config['sample_rate'],
            n_fft=config['n_fft'],
            hop_length=config['hop_length'],
            n_mels=config['n_mels']
        )
        mel_spec = librosa.power_to_db(mel_spec, ref=np.max)
        mel_spec = (mel_spec - mel_spec.min()) / (mel_spec.max() - mel_spec.min() + 1e-8)
        return np.expand_dims(mel_spec, axis=-1), label
    
    except Exception as e:
        print(f"Error processing {file_path}: {str(e)}")
        return None, None

def create_dataset(file_paths, labels, augment=False): #using tensorflow
    features = []
    processed_labels = []
    
    for path, label in zip(file_paths, labels):
        feat, lbl = load_and_preprocess(path, label, augment=augment)
        if feat is not None:
            features.append(feat)
            processed_labels.append(lbl)
    
    features = np.array(features)
    labels = to_categorical(processed_labels, num_classes=2)
    
    return tf.data.Dataset.from_tensor_slices((features, labels)) \
                         .shuffle(len(features)) \
                         .batch(config['batch_size']) \
                         .prefetch(tf.data.AUTOTUNE)

def build_model(input_shape): #Combining CRNN with attention
    inputs = layers.Input(shape=input_shape)
    
    #Enhanced CNN Block
    x = layers.Conv2D(64, (3,3), activation='relu', padding='same')(inputs)
    x = layers.BatchNormalization()(x)
    x = layers.MaxPooling2D((2,2))(x)
    x = layers.Dropout(0.2)(x)
    
    x = layers.Conv2D(128, (3,3), activation='relu', padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.MaxPooling2D((2,2))(x)
    x = layers.Dropout(0.3)(x)
    
    #RNN Block with Attention
    x = layers.Reshape((-1, x.shape[2]*x.shape[3]))(x)
    x = layers.Bidirectional(layers.GRU(128, return_sequences=True))(x)
    x = layers.Bidirectional(layers.GRU(64, return_sequences=True))(x)
    
    #Multi-head Attention
    attn_output = layers.MultiHeadAttention(num_heads=4, key_dim=64)(x, x)
    x = layers.Add()([x, attn_output])
    x = layers.LayerNormalization()(x)
    
    #Classifier
    x = layers.GlobalAveragePooling1D()(x)
    x = layers.Dense(128, activation='relu')(x)
    x = layers.Dropout(0.4)(x)
    outputs = layers.Dense(2, activation='softmax')(x)
    
    return models.Model(inputs, outputs)

def evaluate_model(model, dataset):
    y_true = []
    y_pred = []
    
    for x, y in dataset:
        y_true.extend(y.numpy())
        y_pred.extend(model.predict(x))
    
    y_true = np.array(y_true)
    y_pred = np.array(y_pred)
    
    print("\nClassification Report:")
    print(classification_report(
        np.argmax(y_true, axis=1),
        np.argmax(y_pred, axis=1),
        target_names=['bonafide', 'spoof']
    ))
    
    #Confusion Matrix
    cm = confusion_matrix(np.argmax(y_true, axis=1), np.argmax(y_pred, axis=1))
    plt.figure(figsize=(6,6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                xticklabels=['bonafide', 'spoof'],
                yticklabels=['bonafide', 'spoof'])
    plt.title('Confusion Matrix')
    plt.show()

def main():
    dataset_path = r"C:\Users\KIIT\AppData\Local\Temp\train-clean-100.tar.gz"
    
    #Extract and get file paths with labels
    print("Extracting and processing dataset...")
    audio_files, labels, _ = extract_dataset(dataset_path)
    
    #Check class distribution
    print("\nClass distribution:")
    print(f" - Bonafide (0): {Counter(labels)[0]}")
    print(f" - Spoof (1): {Counter(labels)[1]}")
    
    # Split data
    train_paths, test_paths, train_labels, test_labels = train_test_split(
        audio_files, labels, test_size=0.2, random_state=42, stratify=labels
    )
    
    #Create datasets
    print("\nCreating datasets...")
    train_ds = create_dataset(train_paths, train_labels, augment=True)
    val_ds = create_dataset(test_paths, test_labels, augment=False)
    
    sample, _ = load_and_preprocess(train_paths[0], train_labels[0])
    model = build_model(sample.shape)
    model.compile(
        optimizer=Adam(config['learning_rate']),
        loss='categorical_crossentropy',
        metrics=['accuracy', tf.keras.metrics.Precision(), tf.keras.metrics.Recall()]
    )
    
    #Callbacks
    callbacks = [
        EarlyStopping(patience=config['patience'], restore_best_weights=True),
        ModelCheckpoint('best_anti_spoof_model.h5', save_best_only=True)
    ]
    
    #Training
    print("\nTraining model...")
    history = model.fit(
        train_ds,
        validation_data=val_ds,
        epochs=config['epochs'],
        callbacks=callbacks,
        verbose=1
    )
    
    #Plot training history
    plt.figure(figsize=(12, 5))
    plt.subplot(1, 2, 1)
    plt.plot(history.history['accuracy'], label='Train Accuracy')
    plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
    plt.title('Model Accuracy')
    plt.legend()
    
    plt.subplot(1, 2, 2)
    plt.plot(history.history['loss'], label='Train Loss')
    plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.title('Model Loss')
    plt.legend()
    plt.show()
    
    #Evaluate
    print("\nEvaluating model...")
    evaluate_model(model, val_ds)

if __name__ == "__main__":
    try:
        import audiomentations
    except ImportError:
        print("Installing required packages...")
        import subprocess
        subprocess.run(['pip', 'install', 'audiomentations'])
    
    main()


Extracting and processing dataset...

Class distribution:
 - Bonafide (0): 28539
 - Spoof (1): 0

Creating datasets...
