# 📊 Audio Data Preprocessing (Saving Features for Inspection)

In [None]:

import os
import librosa
import numpy as np
import pandas as pd
from sklearn.preprocessing import LabelEncoder
import joblib

# Constants
SAMPLE_RATE = 22050
DURATION = 3  # seconds
SAMPLES_PER_TRACK = SAMPLE_RATE * DURATION

# Load noise files from the "noise" directory
def load_noise_files(noise_dir):
    noise_files = []
    for file in os.listdir(noise_dir):
        if file.endswith('.wav') or file.endswith('.mp3'):
            audio, _ = librosa.load(os.path.join(noise_dir, file), sr=SAMPLE_RATE, duration=DURATION)
            if len(audio) < SAMPLES_PER_TRACK:
                padding = SAMPLES_PER_TRACK - len(audio)
                audio = np.pad(audio, (0, padding), mode='constant')
            noise_files.append(audio)
    return noise_files

# Data processing functions
def load_audio_file(file_path, duration=DURATION, sample_rate=SAMPLE_RATE):
    audio, sr = librosa.load(file_path, sr=sample_rate, duration=duration)
    if len(audio) < SAMPLES_PER_TRACK:
        padding = SAMPLES_PER_TRACK - len(audio)
        audio = np.pad(audio, (0, padding), mode='constant')
    else:
        audio = audio[:SAMPLES_PER_TRACK]
    return audio

def reduce_noise(audio, sr=SAMPLE_RATE):
    return librosa.effects.preemphasis(audio)

def augment_audio(audio, noise_files):
    noise = np.random.randn(len(audio))
    audio_noise = audio + 0.005 * noise
    
    # Adding real-world noise from the "noise" folder
    audio_with_real_noise = []
    for noise_sample in noise_files:
        if len(noise_sample) == len(audio):
            mixed_audio = audio + 0.02 * noise_sample
            audio_with_real_noise.append(mixed_audio)
    
    audio_pitch_shift = librosa.effects.pitch_shift(audio, sr=SAMPLE_RATE, n_steps=4)
    audio_stretch = librosa.effects.time_stretch(audio, rate=0.8)
    
    return [audio_noise, audio_pitch_shift, audio_stretch] + audio_with_real_noise

def extract_features(audio, sr=SAMPLE_RATE):
    mfccs = librosa.feature.mfcc(y=audio, sr=sr, n_mfcc=40)
    return np.mean(mfccs.T, axis=0)

# Process audio files
def process_audio_files(directory, noise_dir='noise'):
    dataframes = {1: [], 2: [], 3: []}
    labels = {1: [], 2: [], 3: []}
    noise_files = load_noise_files(os.path.join(directory, noise_dir))

    for filename in os.listdir(directory):
        if filename.endswith(".wav") or filename.endswith(".mp3") and not filename.startswith('noise'):
            name, method_audio = filename.rsplit('-', 1)
            method_part, _ = method_audio.split('.')
            method = int(method_part[:2])

            if method not in dataframes:
                continue

            file_path = os.path.join(directory, filename)
            audio = load_audio_file(file_path)
            audio = reduce_noise(audio)
            features = extract_features(audio)

            dataframes[method].append(features)
            labels[method].append(name)

            # Data augmentation with both synthetic and real noise
            augmented_audios = augment_audio(audio, noise_files)
            for aug_audio in augmented_audios:
                aug_features = extract_features(aug_audio)
                dataframes[method].append(aug_features)
                labels[method].append(name)

    le = LabelEncoder()

    for method, features in dataframes.items():
        method_labels = labels[method]
        encoded_labels = le.fit_transform(method_labels)

        df = pd.DataFrame(features)
        df['label'] = encoded_labels
        df['person'] = method_labels

        # Save feature vectors for manual inspection
        df.to_csv(f'features_method_{method}.csv', index=False)

    joblib.dump(le, 'label_encoder.pkl')

# Specify the directory containing audio files
directory = "path_to_your_audio_files"
process_audio_files(directory)


# 🤖 Model Training with Diagnostics

In [None]:

from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report
import joblib
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

# Training models for each method
for method in [1, 2, 3]:
    print(f"--- Training Model for Method {method} ---")
    
    df = pd.read_csv(f'features_method_{method}.csv')
    X = df.drop(['label', 'person'], axis=1).values
    y = df['label'].values

    scaler = StandardScaler()
    X = scaler.fit_transform(X)

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)

    classifier = RandomForestClassifier(n_estimators=100)
    classifier.fit(X_train, y_train)

    # Predictions
    y_pred = classifier.predict(X_test)

    # Accuracy Score
    accuracy = accuracy_score(y_test, y_pred)
    print(f"Accuracy: {accuracy * 100:.2f}%")

    # Classification Report
    print("Classification Report:")
    print(classification_report(y_test, y_pred))

    # Confusion Matrix
    cm = confusion_matrix(y_test, y_pred)
    plt.figure(figsize=(10, 8))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=df['person'].unique(), yticklabels=df['person'].unique())
    plt.xlabel('Predicted Label')
    plt.ylabel('True Label')
    plt.title(f'Confusion Matrix for Method {method}')
    plt.show()

    # Save models and scalers
    joblib.dump(classifier, f'model_method_{method}.pkl')
    joblib.dump(scaler, f'scaler_method_{method}.pkl')

    # Diagnostic: Check data distribution
    print("Data Distribution:")
    print(df['person'].value_counts())

    # Verify Mario's Data Presence
    if 'Mario' in df['person'].unique():
        mario_data = df[df['person'] == 'Mario']
        print(f"Mario's Data in Training Set: {len(mario_data)} samples")
        print(mario_data.describe())  # Summarize Mario's feature data


# 🎤 Real-Time Audio Recording & Prediction

In [None]:

import sounddevice as sd
import librosa
import numpy as np
import joblib

# Constants
SAMPLE_RATE = 22050
DURATION = 3  # seconds
SAMPLES_PER_TRACK = SAMPLE_RATE * DURATION

# Load models and scalers
models = {method: joblib.load(f'model_method_{method}.pkl') for method in [1, 2, 3]}
scalers = {method: joblib.load(f'scaler_method_{method}.pkl') for method in [1, 2, 3]}
label_encoder = joblib.load('label_encoder.pkl')

# Functions for real-time recording and prediction
def record_audio():
    print("Recording...")
    audio = sd.rec(int(SAMPLES_PER_TRACK), samplerate=SAMPLE_RATE, channels=1)
    sd.wait()
    print("Recording complete.")
    return audio.flatten()

def extract_features(audio):
    mfccs = librosa.feature.mfcc(y=audio, sr=SAMPLE_RATE, n_mfcc=40)
    return np.mean(mfccs.T, axis=0)

def predict(audio):
    features = extract_features(audio).reshape(1, -1)
    predictions = {}

    for method in models:
        scaled_features = scalers[method].transform(features)
        pred = models[method].predict(scaled_features)
        predictions[method] = label_encoder.inverse_transform(pred)[0]

    return predictions

# Run real-time prediction
audio = record_audio()
predictions = predict(audio)
for method, prediction in predictions.items():
    print(f"Prediction for Method {method}: {prediction}")
