In [None]:
# Import necessary libraries
import os
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras import layers, models
import librosa
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, classification_report
import zipfile
import glob

# Set random seeds for reproducibility
np.random.seed(42)
tf.random.set_seed(42)

# Configuration
SAMPLE_RATE = 16000
DURATION = 3  # seconds
N_MFCC = 40
HOP_LENGTH = 512
N_FFT = 1024
BATCH_SIZE = 32
EPOCHS = 50

# Print TensorFlow version
print(f"TensorFlow version: {tf.__version__}")

# Find the dataset path
dataset_paths = glob.glob('/kaggle/input/*/ASVspoof2019_LA_train/flac/*.flac')
if dataset_paths:
    DATASET_PATH = os.path.dirname(os.path.dirname(os.path.dirname(dataset_paths[0])))
    print(f"Found dataset at: {DATASET_PATH}")
else:
    # Try alternative paths
    dataset_paths = glob.glob('/kaggle/input/**/LA/ASVspoof2019_LA_train/flac/*.flac', recursive=True)
    if dataset_paths:
        DATASET_PATH = os.path.dirname(os.path.dirname(os.path.dirname(dataset_paths[0])))
        print(f"Found dataset at: {DATASET_PATH}")
    else:
        DATASET_PATH = '/kaggle/input/asvpoof-2019'
        print(f"Using default dataset path: {DATASET_PATH}")

# List directories to help debug
print("Available directories:")
for root, dirs, files in os.walk('/kaggle/input', topdown=True, maxdepth=3):
    print(root)
    if len(files) > 0:
        print(f"  - {len(files)} files")
    if 'protocol.txt' in files:
        print(f"  - Found protocol.txt")

# Find protocol files
protocol_files = []
for root, dirs, files in os.walk('/kaggle/input'):
    for file in files:
        if file.endswith('protocol.txt') or file.endswith('train.trn') or file.endswith('train_protocol.txt'):
            protocol_files.append(os.path.join(root, file))

print(f"Found {len(protocol_files)} protocol files: {protocol_files}")

# Function to parse ASVspoof protocol file
def parse_protocol_file(file_path):
    try:
        df = pd.read_csv(file_path, sep=' ', header=None)
        if df.shape[1] >= 5:  # Standard ASVspoof format
            df.columns = ['file_name', 'speaker_id', 'utterance_id', 'system_id', 'label']
        elif df.shape[1] == 2:  # Simplified format
            df.columns = ['file_name', 'label']
        else:
            print(f"Unknown protocol file format with {df.shape[1]} columns")
            return None
        
        # Map labels: 'bonafide' -> 1, 'spoof' -> 0
        if df['label'].dtype == object:
            df['label'] = df['label'].map(lambda x: 1 if x.lower() == 'bonafide' else 0)
        
        return df
    except Exception as e:
        print(f"Error parsing protocol file {file_path}: {e}")
        return None

# Try to load protocol file
df = None
for protocol_file in protocol_files:
    df = parse_protocol_file(protocol_file)
    if df is not None:
        print(f"Successfully loaded protocol file: {protocol_file}")
        print(f"DataFrame shape: {df.shape}")
        print(df.head())
        break

# If no protocol file found, try to find audio files directly
if df is None:
    print("No valid protocol file found. Searching for audio files directly...")
    
    # Find all audio files
    audio_files = []
    for ext in ['*.wav', '*.flac']:
        audio_files.extend(glob.glob(f'/kaggle/input/**/{ext}', recursive=True))
    
    print(f"Found {len(audio_files)} audio files")
    
    # Try to determine if files are bonafide or spoof from their path
    files = []
    labels = []
    
    for file_path in audio_files:
        file_name = os.path.basename(file_path)
        if 'bonafide' in file_path.lower() or 'genuine' in file_path.lower():
            files.append(file_path)
            labels.append(1)  # bonafide
        elif 'spoof' in file_path.lower() or 'fake' in file_path.lower():
            files.append(file_path)
            labels.append(0)  # spoof
        else:
            # Try to determine from filename patterns in ASVspoof
            if '_A' in file_name:  # Attack (spoof)
                files.append(file_path)
                labels.append(0)  # spoof
            elif any(g in file_name for g in ['_human', '_real', '_genuine']):
                files.append(file_path)
                labels.append(1)  # bonafide
    
    if files:
        df = pd.DataFrame({'file_path': files, 'label': labels})
        print(f"Created DataFrame from file paths. Shape: {df.shape}")
        print(df.head())
    else:
        raise ValueError("Could not find or create dataset. Please check the dataset structure.")

# Split data
train_df, test_df = train_test_split(df, test_size=0.2, random_state=42, stratify=df['label'])
train_df, val_df = train_test_split(train_df, test_size=0.2, random_state=42, stratify=train_df['label'])

print(f"Training samples: {len(train_df)}")
print(f"Validation samples: {len(val_df)}")
print(f"Testing samples: {len(test_df)}")

# Feature extraction function
def extract_features(file_path, label=None):
    try:
        # Handle different file path formats
        if 'file_path' in df.columns:
            # Direct file path
            audio_path = file_path
        else:
            # Need to find the file based on file_name
            file_name = file_path
            # Search for the file
            matches = []
            for root, dirs, files in os.walk('/kaggle/input'):
                for file in files:
                    if file.startswith(file_name) and file.endswith(('.wav', '.flac')):
                        matches.append(os.path.join(root, file))
            
            if not matches:
                print(f"Could not find audio file for {file_name}")
                return None
            
            audio_path = matches[0]
        
        # Load audio file
        y, sr = librosa.load(audio_path, sr=SAMPLE_RATE, duration=DURATION)
        
        # Pad or trim audio to fixed length
        if len(y) < SAMPLE_RATE * DURATION:
            y = np.pad(y, (0, SAMPLE_RATE * DURATION - len(y)))
        else:
            y = y[:SAMPLE_RATE * DURATION]
        
        # Extract MFCCs
        mfccs = librosa.feature.mfcc(
            y=y, 
            sr=sr, 
            n_mfcc=N_MFCC,
            hop_length=HOP_LENGTH,
            n_fft=N_FFT
        )
        
        # Normalize features
        mfccs = (mfccs - np.mean(mfccs)) / (np.std(mfccs) + 1e-8)
        
        if label is not None:
            return mfccs, label
        return mfccs
    
    except Exception as e:
        print(f"Error extracting features from {file_path}: {e}")
        return None

# Test feature extraction on a few samples
print("Testing feature extraction...")
if 'file_path' in df.columns:
    test_file = df['file_path'].iloc[0]
else:
    test_file = df['file_name'].iloc[0]

features = extract_features(test_file)
if features is not None:
    print(f"Feature shape: {features.shape}")
else:
    print("Feature extraction failed. Check the dataset structure.")

# Data generator
def data_generator(dataframe, batch_size=32):
    num_samples = len(dataframe)
    while True:
        dataframe = dataframe.sample(frac=1).reset_index(drop=True)
        for i in range(0, num_samples, batch_size):
            batch_df = dataframe.iloc[i:i+batch_size]
            batch_features = []
            batch_labels = []
            
            for _, row in batch_df.iterrows():
                if 'file_path' in row:
                    file_path = row['file_path']
                else:
                    file_path = row['file_name']
                
                features = extract_features(file_path)
                if features is not None:
                    batch_features.append(features)
                    batch_labels.append(row['label'])
            
            if batch_features:
                batch_features = np.array(batch_features)
                batch_labels = np.array(batch_labels)
                # Reshape for CNN input: (batch, height, width, channels)
                batch_features = np.expand_dims(batch_features, axis=-1)
                yield batch_features, batch_labels

# Build the model
def build_model():
    model = models.Sequential([
        # Input layer
        layers.Input(shape=(N_MFCC, None, 1)),
        
        # First convolutional block
        layers.Conv2D(32, (3, 3), activation='relu', padding='same'),
        layers.MaxPooling2D((2, 2)),
        layers.BatchNormalization(),
        layers.Dropout(0.2),
        
        # Second convolutional block
        layers.Conv2D(64, (3, 3), activation='relu', padding='same'),
        layers.MaxPooling2D((2, 2)),
        layers.BatchNormalization(),
        layers.Dropout(0.3),
        
        # Third convolutional block
        layers.Conv2D(128, (3, 3), activation='relu', padding='same'),
        layers.MaxPooling2D((2, 2)),
        layers.BatchNormalization(),
        layers.Dropout(0.4),
        
        # Global pooling and dense layers
        layers.GlobalAveragePooling2D(),
        layers.Dense(256, activation='relu'),
        layers.Dropout(0.5),
        layers.Dense(1, activation='sigmoid')
    ])
    
    # Compile the model
    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
        loss='binary_crossentropy',
        metrics=['accuracy']
    )
    
    return model

# Create and train the model
model = build_model()
model.summary()

# Calculate steps per epoch
steps_per_epoch = len(train_df) // BATCH_SIZE
validation_steps = len(val_df) // BATCH_SIZE

# Train the model
history = model.fit(
    data_generator(train_df, BATCH_SIZE),
    steps_per_epoch=steps_per_epoch,
    epochs=EPOCHS,
    validation_data=data_generator(val_df, BATCH_SIZE),
    validation_steps=validation_steps,
    verbose=1
)

# Plot training history
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('Model Accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend(['Train', 'Validation'], loc='lower right')

plt.subplot(1, 2, 2)
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Model Loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['Train', 'Validation'], loc='upper right')
plt.tight_layout()
plt.show()

# Evaluate on test set
test_generator = data_generator(test_df, BATCH_SIZE)
test_steps = len(test_df) // BATCH_SIZE
test_loss, test_accuracy = model.evaluate(test_generator, steps=test_steps)
print(f"Test accuracy: {test_accuracy:.4f}")

# Save the model in TensorFlow.js format
# First save as Keras H5
model.save('deepfake_audio_model.h5')

# Install tensorflowjs if not already installed
!pip install tensorflowjs

# Convert to TensorFlow.js format
import tensorflowjs as tfjs
tfjs.converters.save_keras_model(model, 'tfjs_model')

# Zip the model for download
!zip -r tfjs_model.zip tfjs_model/

print("Model saved and converted to TensorFlow.js format")
print("Download the tfjs_model.zip file to use in your web application")