In [11]:
import os
import json
import librosa
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from collections import Counter

# Create directories for saving results
os.makedirs('results/tables', exist_ok=True)
os.makedirs('results/plots', exist_ok=True)

# Paths to JSON and audio files
json_folder = 'JSON'
audio_folder = './output/cleaned_wav_files'

# Match audio and JSON files
json_files = {os.path.splitext(f)[0]: os.path.join(json_folder, f) for f in os.listdir(json_folder) if f.endswith('.json')}
audio_files = {os.path.splitext(f)[0]: os.path.join(audio_folder, f) for f in os.listdir(audio_folder) if f.endswith(('.wav', '.m4a', '.mp3'))}
matched_files = {name: (json_files[name], audio_files[name]) for name in json_files if name in audio_files}

# Function to load annotations from a JSON file
def load_annotations(json_file):
    with open(json_file, 'r') as f:
        data = json.load(f)
    annotations = []
    for item in data[0]['annotations'][0]['result']:
        if item['type'] == 'labels':
            start = item['value']['start']
            end = item['value']['end']
            label = item['value']['labels'][0]
            annotations.append((start, end, label))
    return annotations

# Function to extract audio segments based on annotations
def extract_audio_segments(audio_file, annotations, sr=16000):
    y, _ = librosa.load(audio_file, sr=sr)
    segments = []
    for start, end, label in annotations:
        segment = y[int(start * sr):int(end * sr)]
        segments.append((segment, label))
    return segments

# Function to extract features (MFCC) from audio segments with improved handling of short segments
def extract_features(segments, n_mfcc=40, max_length=300, min_segment_length=512):
    features, labels = [], []
    skipped_segments = 0
    
    for i, (segment, label) in enumerate(segments):
        try:
            # Check if the segment is too short for any FFT processing
            if len(segment) < min_segment_length:
                print(f"Skipping segment {i} due to insufficient length: {len(segment)} samples")
                skipped_segments += 1
                continue
                
            # For very short segments, use a smaller n_fft and hop_length
            if len(segment) < 2048:
                n_fft = 512
                hop_length = 128
            else:
                n_fft = 2048
                hop_length = 512
                
            # Extract MFCC features with adjusted parameters
            mfcc = librosa.feature.mfcc(
                y=segment, 
                sr=16000, 
                n_mfcc=n_mfcc, 
                n_fft=n_fft,
                hop_length=hop_length
            )
            
            # Handle variable lengths (pad if short, truncate if long)
            if mfcc.shape[1] < max_length:
                padded_mfcc = np.pad(mfcc, ((0, 0), (0, max_length - mfcc.shape[1])), mode='constant')
            else:
                padded_mfcc = mfcc[:, :max_length]
                
            # Append features and labels
            features.append(padded_mfcc.T)
            labels.append(0 if label == 'Field pause' else 1)
            
        except Exception as e:
            print(f"Error processing segment {i}: {e}")
            skipped_segments += 1
    
    print(f"Total segments skipped: {skipped_segments} out of {len(segments)}")
    return np.array(features), np.array(labels)

# Load the dataset
dataset = []
files_loaded = []

for name, (json_path, audio_path) in matched_files.items():
    annotations = load_annotations(json_path)
    if len(annotations) != 0:
        files_loaded.append(json_path)
        audio_segments = extract_audio_segments(audio_path, annotations)
        dataset.extend(audio_segments)

# Extract features with improved handling of short segments
X, y = extract_features(dataset, min_segment_length=256)  # Lowered minimum segment length

# Display class distribution
print("Class distribution:")
unique, counts = np.unique(y, return_counts=True)
for label, count in zip(unique, counts):
    print(f"Class {label} ({'Field pause' if label == 0 else 'Filled pause'}): {count} samples")

# Plot class distribution
plt.figure(figsize=(8, 6))
sns.countplot(x=y)
plt.title('Class Distribution in Dataset')
plt.xlabel('Class (0: Field pause, 1: Filled pause)')
plt.ylabel('Count')
plt.savefig('results/plots/class_distribution.png')
plt.close()

# Split data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)

# Create few-shot learning datasets
def create_few_shot_dataset(X, y, n_shots=5, random_state=42):
    """
    Create a few-shot learning dataset with n examples per class
    """
    np.random.seed(random_state)
    
    # Find indices for each class
    unique_labels = np.unique(y)
    few_shot_indices = []
    
    for label in unique_labels:
        indices = np.where(y == label)[0]
        n_available = len(indices)
        selected_n = min(n_shots, n_available)
        
        if selected_n < n_shots:
            print(f"Warning: Only {selected_n} samples available for class {label}")
        
        selected = np.random.choice(indices, selected_n, replace=False)
        few_shot_indices.extend(selected)
    
    return X[few_shot_indices], y[few_shot_indices]

# Prepare few-shot datasets with different numbers of shots
few_shot_configs = [1, 3, 5, 10]
few_shot_datasets = {}

for n_shots in few_shot_configs:
    X_few, y_few = create_few_shot_dataset(X_train, y_train, n_shots=n_shots)
    few_shot_datasets[n_shots] = (X_few, y_few)
    
    # Display class distribution for each few-shot dataset
    print(f"\n{n_shots}-shot dataset class distribution:")
    unique, counts = np.unique(y_few, return_counts=True)
    for label, count in zip(unique, counts):
        print(f"Class {label} ({'Field pause' if label == 0 else 'Filled pause'}): {count} samples")

Skipping segment 166 due to insufficient length: 0 samples
Skipping segment 167 due to insufficient length: 0 samples
Skipping segment 304 due to insufficient length: 0 samples
Skipping segment 401 due to insufficient length: 0 samples
Skipping segment 466 due to insufficient length: 0 samples
Skipping segment 467 due to insufficient length: 0 samples
Skipping segment 468 due to insufficient length: 0 samples
Skipping segment 469 due to insufficient length: 0 samples
Skipping segment 470 due to insufficient length: 0 samples
Skipping segment 471 due to insufficient length: 0 samples
Skipping segment 472 due to insufficient length: 0 samples
Skipping segment 473 due to insufficient length: 0 samples
Skipping segment 474 due to insufficient length: 0 samples
Skipping segment 475 due to insufficient length: 0 samples
Skipping segment 476 due to insufficient length: 0 samples
Skipping segment 477 due to insufficient length: 0 samples
Skipping segment 478 due to insufficient length: 0 sampl

# 2. Balanced Sampling Approach

In [5]:
# Modified balanced training setup for extreme imbalance
def create_balanced_datasets(X_train, y_train, X_test, y_test):
    """Create balanced datasets for training using different techniques"""
    
    # Calculate class distribution
    class_counts = Counter(y_train)
    minority_class = min(class_counts, key=class_counts.get)
    minority_count = class_counts[minority_class]
    
    print(f"\nCreating balanced datasets from {len(y_train)} samples")
    print(f"Original class distribution: {dict(class_counts)}")
    
    balanced_datasets = {}
    
    # 1. Random Undersampling - use all minority samples, randomly select majority samples
    majority_indices = np.where(y_train != minority_class)[0]
    minority_indices = np.where(y_train == minority_class)[0]
    
    # Randomly select majority samples equal to minority count * factor
    undersampling_factor = 2  # Use 2x minority samples from majority class
    selected_majority = np.random.choice(
        majority_indices, 
        size=min(minority_count * undersampling_factor, len(majority_indices)),
        replace=False
    )
    
    # Combine indices and create balanced dataset
    balanced_indices = np.concatenate([minority_indices, selected_majority])
    X_balanced = X_train[balanced_indices]
    y_balanced = y_train[balanced_indices]
    
    balanced_datasets['undersampled'] = (X_balanced, y_balanced)
    
    # Print the new distribution
    print(f"Undersampled dataset: {len(X_balanced)} samples, distribution: {dict(Counter(y_balanced))}")
    
    # 2. Create few-shot learning datasets
    minority_shots = min(minority_count, 20)  # Cap at 20 samples
    shot_configs = [1, 3, 5, 10, minority_shots]
    
    for n_shots in shot_configs:
        if n_shots <= minority_count:
            X_few, y_few = create_few_shot_dataset(X_train, y_train, n_shots)
            balanced_datasets[f'{n_shots}-shot'] = (X_few, y_few)
            print(f"{n_shots}-shot dataset: {len(X_few)} samples, distribution: {dict(Counter(y_few))}")
    
    return balanced_datasets

# Function to balance predictions based on class probabilities
def balanced_predict(model, X, threshold=0.5):
    """
    Make predictions with adjusted threshold to account for class imbalance
    """
    if hasattr(model, 'predict_proba'):
        y_prob = model.predict_proba(X)[:, 1]
        y_pred = (y_prob > threshold).astype(int)
    else:
        # If model doesn't support probability
        y_pred = model.predict(X)
    
    return y_pred

# 3. Specialized Models for Imbalanced Data

In [6]:
from imblearn.ensemble import BalancedRandomForestClassifier, RUSBoostClassifier
from imblearn.under_sampling import RandomUnderSampler
from imblearn.pipeline import Pipeline

def train_balanced_models(X_train, y_train, X_test, y_test, shot_config="full"):
    """Train models specifically designed for imbalanced datasets"""
    
    # Reshape data
    X_train_2d = X_train.reshape(X_train.shape[0], -1)
    X_test_2d = X_test.reshape(X_test.shape[0], -1)
    
    # 1. Balanced Random Forest
    print(f"\nTraining Balanced Random Forest ({shot_config})...")
    brf = BalancedRandomForestClassifier(
        n_estimators=100,
        replacement=True,
        sampling_strategy='auto',
        random_state=42
    )
    
    brf.fit(X_train_2d, y_train)
    results = evaluate_model(brf, X_test_2d, y_test, 'BalancedRF', shot_config)
    all_results.append(results)
    plot_confusion_matrix(results['confusion_matrix'], 'BalancedRF', shot_config)
    
    # 2. RUSBoost
    print(f"\nTraining RUSBoost ({shot_config})...")
    rusboost = RUSBoostClassifier(
        n_estimators=100,
        learning_rate=0.1,
        sampling_strategy='auto',
        random_state=42
    )
    
    rusboost.fit(X_train_2d, y_train)
    results = evaluate_model(rusboost, X_test_2d, y_test, 'RUSBoost', shot_config)
    all_results.append(results)
    plot_confusion_matrix(results['confusion_matrix'], 'RUSBoost', shot_config)
    
    # 3. Cost-sensitive SVM
    print(f"\nTraining Cost-sensitive SVM ({shot_config})...")
    
    # Calculate class weights inversely proportional to class frequencies
    class_counts = Counter(y_train)
    n_samples = len(y_train)
    class_weights = {
        c: n_samples / (len(class_counts) * count)
        for c, count in class_counts.items()
    }
    
    # Scale features
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train_2d)
    X_test_scaled = scaler.transform(X_test_2d)
    
    # Train weighted SVM
    svm_weighted = SVC(
        kernel='rbf',
        class_weight=class_weights,
        probability=True,
        random_state=42
    )
    
    svm_weighted.fit(X_train_scaled, y_train)
    results = evaluate_model(svm_weighted, X_test_scaled, y_test, 'WeightedSVM', shot_config)
    all_results.append(results)
    plot_confusion_matrix(results['confusion_matrix'], 'WeightedSVM', shot_config)
    
    # 4. XGBoost with scale_pos_weight
    print(f"\nTraining Weighted XGBoost ({shot_config})...")
    
    # Calculate positive class weight
    scale_pos_weight = class_counts[0] / class_counts[1] if 1 in class_counts else 1.0
    
    xgb_weighted = xgb.XGBClassifier(
        n_estimators=100,
        learning_rate=0.1,
        max_depth=3,
        scale_pos_weight=scale_pos_weight,
        random_state=42,
        use_label_encoder=False,
        eval_metric='logloss'
    )
    
    xgb_weighted.fit(X_train_2d, y_train)
    results = evaluate_model(xgb_weighted, X_test_2d, y_test, 'WeightedXGB', shot_config)
    all_results.append(results)
    plot_confusion_matrix(results['confusion_matrix'], 'WeightedXGB', shot_config)
    
    return {
        'BalancedRF': brf,
        'RUSBoost': rusboost,
        'WeightedSVM': {'model': svm_weighted, 'scaler': scaler},
        'WeightedXGB': xgb_weighted
    }

# 4. Modified Evaluation Metrics for Imbalanced Data

In [7]:
from sklearn.metrics import precision_recall_curve, average_precision_score, f1_score

def evaluate_model_imbalanced(model, X_test, y_test, model_name, shot_config="full"):
    """
    Evaluate a model with metrics suitable for imbalanced datasets
    """
    start_time = time.time()
    
    # Reshape data if needed
    X_test_reshaped = X_test.reshape(X_test.shape[0], -1) if len(X_test.shape) > 2 else X_test
    
    # For models that return probabilities
    if hasattr(model, "predict_proba"):
        y_prob = model.predict_proba(X_test_reshaped)[:, 1]
        
        # Calculate precision-recall curve
        precision, recall, thresholds = precision_recall_curve(y_test, y_prob)
        
        # Find threshold that maximizes F1 score
        f1_scores = [f1_score(y_test, (y_prob >= t).astype(int)) for t in thresholds]
        best_threshold_idx = np.argmax(f1_scores[:-1])  # last element has no threshold
        best_threshold = thresholds[best_threshold_idx]
        
        # Calculate predictions with optimal threshold
        y_pred = (y_prob >= best_threshold).astype(int)
        
        # Calculate AUC-PR (Area Under Precision-Recall Curve)
        average_precision = average_precision_score(y_test, y_prob)
    else:
        # If no probability predictions available
        y_pred = model.predict(X_test_reshaped)
        average_precision = None
        best_threshold = None
    
    # Calculate standard metrics
    accuracy = accuracy_score(y_test, y_pred)
    precision = precision_score(y_test, y_pred)
    recall = recall_score(y_test, y_pred)
    f1 = f1_score(y_test, y_pred)
    
    # Create confusion matrix
    cm = confusion_matrix(y_test, y_pred)
    
    # Execution time
    execution_time = time.time() - start_time
    
    # Print results
    print(f"\n{model_name} ({shot_config}) Results:")
    print(f"Accuracy: {accuracy:.4f}")
    print(f"Precision: {precision:.4f}")
    print(f"Recall: {recall:.4f}")
    print(f"F1 Score: {f1:.4f}")
    if average_precision:
        print(f"Average Precision (AP): {average_precision:.4f}")
    if best_threshold:
        print(f"Best threshold: {best_threshold:.4f}")
    print(f"Execution Time: {execution_time:.2f} seconds")
    
    # Plot precision-recall curve if available
    if average_precision:
        plt.figure(figsize=(8, 6))
        plt.step(recall, precision, where='post')
        plt.xlabel('Recall')
        plt.ylabel('Precision')
        plt.ylim([0.0, 1.05])
        plt.xlim([0.0, 1.0])
        plt.title(f'Precision-Recall curve: AP={average_precision:.3f} ({model_name})')
        plt.savefig(f'results/plots/pr_curve_{model_name}_{shot_config}.png')
        plt.close()
    
    # Return results
    results = {
        'model_name': f"{model_name}_{shot_config}",
        'accuracy': accuracy,
        'precision': precision,
        'recall': recall,
        'f1_score': f1,
        'average_precision': average_precision,
        'best_threshold': best_threshold,
        'confusion_matrix': cm,
        'execution_time': execution_time,
        'y_pred': y_pred
    }
    
    return results

# 5. Multiple Test Sets Analysis

In [8]:
from sklearn.model_selection import StratifiedKFold

def create_multiple_test_sets(X, y, n_splits=5):
    """Create multiple stratified train-test splits for robust evaluation"""
    
    skf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=42)
    test_sets = []
    
    for train_idx, test_idx in skf.split(X, y):
        X_train, X_test = X[train_idx], X[test_idx]
        y_train, y_test = y[train_idx], y[test_idx]
        test_sets.append((X_train, y_train, X_test, y_test))
    
    return test_sets

def cross_validation_evaluation(model_trainer, X, y, model_name, n_splits=5):
    """Evaluate model with cross-validation for more robust results"""
    
    test_sets = create_multiple_test_sets(X, y, n_splits)
    all_fold_results = []
    
    for fold, (X_train, y_train, X_test, y_test) in enumerate(test_sets):
        print(f"\nTraining {model_name} - Fold {fold+1}/{n_splits}")
        model = model_trainer(X_train, y_train)
        
        # Reshape for evaluation
        X_test_reshaped = X_test.reshape(X_test.shape[0], -1) if len(X_test.shape) > 2 else X_test
        
        # Evaluate
        results = evaluate_model_imbalanced(model, X_test_reshaped, y_test, f"{model_name}_fold{fold+1}")
        all_fold_results.append(results)
    
    # Calculate average metrics
    avg_metrics = {
        'accuracy': np.mean([r['accuracy'] for r in all_fold_results]),
        'precision': np.mean([r['precision'] for r in all_fold_results]),
        'recall': np.mean([r['recall'] for r in all_fold_results]),
        'f1_score': np.mean([r['f1_score'] for r in all_fold_results]),
    }
    
    print(f"\n{model_name} - Average Cross-Validation Results:")
    print(f"Accuracy: {avg_metrics['accuracy']:.4f}")
    print(f"Precision: {avg_metrics['precision']:.4f}")
    print(f"Recall: {avg_metrics['recall']:.4f}")
    print(f"F1 Score: {avg_metrics['f1_score']:.4f}")
    
    return avg_metrics, all_fold_results

# 6. Main Script

In [10]:
def main():
    """Main execution script to train and evaluate all models for filled pause detection"""
    print("Starting filled pause detection model evaluation with imbalanced data handling...")
    
    # Analyze data imbalance
    print("\nData distribution analysis:")
    class_counts = Counter(y)
    total_samples = len(y)
    for label, count in class_counts.items():
        print(f"Class {label} ({'Field pause' if label == 0 else 'Filled pause'}): {count} samples ({count/total_samples*100:.2f}%)")
    
    # Create balanced datasets
    balanced_datasets = create_balanced_datasets(X_train, y_train, X_test, y_test)
    
    # Train specialized models for imbalanced data with full dataset
    print("\n\n==== Training Specialized Models for Imbalanced Data ====")
    imbalanced_models = train_balanced_models(X_train, y_train, X_test, y_test, "full")
    
    # Train regular models with the undersampled balanced dataset
    print("\n\n==== Training with Undersampled Balanced Dataset ====")
    X_balanced, y_balanced = balanced_datasets['undersampled']
    
    # Statistical Methods
    gmm_models, _ = train_gmm(X_balanced, y_balanced, X_test, y_test, "balanced")
    hmm_models, _ = train_hmm(X_balanced, y_balanced, X_test, y_test, "balanced")
    
    # Machine Learning Models
    knn_model, _ = train_knn(X_balanced, y_balanced, X_test, y_test, "balanced")
    rf_model, _ = train_rf_rfe(X_balanced, y_balanced, X_test, y_test, "balanced")
    svm_model, _ = train_svm(X_balanced, y_balanced, X_test, y_test, "balanced")
    
    # Deep Learning Models
    mlp_model, _ = train_mlp(X_balanced, y_balanced, X_test, y_test, "balanced")
    cnn_model, _ = train_cnn(X_balanced, y_balanced, X_test, y_test, "balanced")
    cnn_xgb_model, _ = train_cnn_xgboost(X_balanced, y_balanced, X_test, y_test, "balanced")
    lstm_model, _ = train_lstm(X_balanced, y_balanced, X_test, y_test, "balanced")
    
    # Train models with few-shot learning
    for shot_config, (X_few, y_few) in balanced_datasets.items():
        if shot_config == 'undersampled' or int(shot_config.split('-')[0]) < 3:
            continue  # Skip undersampled and very small datasets
            
        print(f"\n\n==== Training with {shot_config} Learning ====")
        
        # Machine Learning Models - these can work with small datasets
        train_knn(X_few, y_few, X_test, y_test, shot_config)
        train_rf_rfe(X_few, y_few, X_test, y_test, shot_config)
        train_svm(X_few, y_few, X_test, y_test, shot_config)
        
        # Only train these with more data
        if int(shot_config.split('-')[0]) >= 5:
            train_gmm(X_few, y_few, X_test, y_test, shot_config)
            train_balanced_models(X_few, y_few, X_test, y_test, shot_config)
            train_mlp(X_few, y_few, X_test, y_test, shot_config)
            train_cnn(X_few, y_few, X_test, y_test, shot_config)
            train_lstm(X_few, y_few, X_test, y_test, shot_config)
    
    # Create results table
    results_df = create_results_table()
    
    # Identify best model based on F1 score
    best_result = results_df.loc[results_df['f1_score'].idxmax()]
    best_model_name = best_result['model_name']
    print(f"\nBest model: {best_model_name} with F1 score: {best_result['f1_score']:.4f}")
    
    # Apply explainable AI to the best model
    model_type, shot_config = best_model_name.split('_', 1)
    
    # Get the model object
    model_key = best_model_name
    if model_key in best_models:
        model_info = best_models[model_key]
        apply_xai_to_model(model_info, X_test, y_test, model_type, shot_config)
    
    # Additional analysis for top models
    print("\n\n==== Detailed Analysis of Top Models ====")
    top_models = results_df.head(3)['model_name'].values
    
    for model_name in top_models:
        model_type, shot_config = model_name.split('_', 1)
        
        # Get the corresponding model
        if model_name in best_models:
            model = best_models[model_name]
            
            # Perform threshold analysis if the model supports probabilities
            if isinstance(model, dict) and 'model' in model and hasattr(model['model'], 'predict_proba'):
                print(f"\nThreshold analysis for {model_name}:")
                X_test_reshaped = X_test.reshape(X_test.shape[0], -1) if len(X_test.shape) > 2 else X_test
                
                if 'scaler' in model:
                    X_test_scaled = model['scaler'].transform(X_test_reshaped)
                    y_prob = model['model'].predict_proba(X_test_scaled)[:, 1]
                else:
                    y_prob = model['model'].predict_proba(X_test_reshaped)[:, 1]
                
                # Test different thresholds
                thresholds = np.arange(0.1, 1.0, 0.1)
                threshold_results = []
                
                for threshold in thresholds:
                    y_pred = (y_prob >= threshold).astype(int)
                    precision = precision_score(y_test, y_pred)
                    recall = recall_score(y_test, y_pred)
                    f1 = f1_score(y_test, y_pred)
                    
                    threshold_results.append({
                        'threshold': threshold,
                        'precision': precision,
                        'recall': recall,
                        'f1_score': f1
                    })
                
                # Convert to DataFrame and plot
                threshold_df = pd.DataFrame(threshold_results)
                plt.figure(figsize=(10, 6))
                plt.plot(threshold_df['threshold'], threshold_df['precision'], 'b-', label='Precision')
                plt.plot(threshold_df['threshold'], threshold_df['recall'], 'g-', label='Recall')
                plt.plot(threshold_df['threshold'], threshold_df['f1_score'], 'r-', label='F1 Score')
                plt.xlabel('Threshold')
                plt.ylabel('Score')
                plt.title(f'Threshold vs. Metrics for {model_name}')
                plt.legend()
                plt.grid(True)
                plt.savefig(f'results/plots/threshold_analysis_{model_name}.png')
                plt.close()
    
    # Save all results and models
    with open('results/tables/all_results.pkl', 'wb') as f:
        pickle.dump(all_results, f)
    
    print("\nFilled pause detection model evaluation completed!")
    print(f"Results saved to 'results/tables/' and 'results/plots/' directories")

# Run the main script
if __name__ == "__main__":
    main()

Starting filled pause detection model evaluation with imbalanced data handling...

Data distribution analysis:
Class 0 (Field pause): 21 samples (1.60%)
Class 1 (Filled pause): 1293 samples (98.40%)

Creating balanced datasets from 1051 samples
Original class distribution: {1: 1034, 0: 17}
Undersampled dataset: 51 samples, distribution: {0: 17, 1: 34}
1-shot dataset: 2 samples, distribution: {0: 1, 1: 1}
3-shot dataset: 6 samples, distribution: {0: 3, 1: 3}
5-shot dataset: 10 samples, distribution: {0: 5, 1: 5}
10-shot dataset: 20 samples, distribution: {0: 10, 1: 10}
17-shot dataset: 34 samples, distribution: {0: 17, 1: 17}


==== Training Specialized Models for Imbalanced Data ====

Training Balanced Random Forest (full)...


  warn(


NameError: name 'evaluate_model' is not defined