### Extract features from the processed and augmented audio files.

In [None]:
# Imports
import os
import random
import logging
import warnings
import gc

import numpy as np
import pandas as pd
from tqdm import tqdm


# Data processing and scientific computing
from scipy.io import wavfile
from scipy.signal import butter, lfilter
from scipy.spatial.distance import cosine

# Audio processing
import librosa
import soundfile as sf

# Visualization
import matplotlib.pyplot as plt
import plotly.express as px

# Set up logging
logging.basicConfig(level=logging.INFO)

In [None]:
# Load the prepared csv
augmented_data = pd.read_csv("augmented_data.csv")

In [None]:
augmented_data.head()

In [None]:
def is_valid_audio(audio, sr, min_duration=0.1, silence_threshold=-60):
    """Check if the audio segment is valid (not too short and not silent)."""
    duration = librosa.get_duration(y=audio, sr=sr)
    if duration < min_duration:
        return False
    
    # Check if the audio is mostly silent
    db = librosa.amplitude_to_db(np.abs(audio), ref=np.max)
    if np.mean(db) < silence_threshold:
        return False
    
    return True


def extract_features(audio, sr):
    # Mel-spectrogram
    mel_spec = librosa.feature.melspectrogram(y=audio, sr=sr, n_mels=128)
    mel_spec_db = librosa.power_to_db(mel_spec, ref=np.max)
    
    mfccs = librosa.feature.mfcc(y=audio, sr=sr, n_mfcc=13)
    spectral_centroids = librosa.feature.spectral_centroid(y=audio, sr=sr)[0]
    chroma = librosa.feature.chroma_stft(y=audio, sr=sr)
    zero_crossing_rate = librosa.feature.zero_crossing_rate(audio)[0]
    spectral_rolloff = librosa.feature.spectral_rolloff(y=audio, sr=sr)[0]
    
    return {
        'mel_spectrogram_db': mel_spec_db,
        'mfccs': mfccs,
        'spectral_centroids': spectral_centroids,
        'chroma': chroma,
        'zero_crossing_rate': zero_crossing_rate,
        'spectral_rolloff': spectral_rolloff
    }

def summarize_feature(feature):
    if feature.ndim == 1:
        return np.array([np.mean(feature), np.std(feature), np.max(feature)])
    elif feature.ndim == 2:
        return np.hstack([
            np.mean(feature, axis=1),
            np.std(feature, axis=1),
            np.max(feature, axis=1)
        ])
    else:
        raise ValueError(f"Unsupported feature dimension: {feature.ndim}")
    
def save_mel_spectrogram(mel_spec, output_dir, base_filename, sr):
    plt.figure(figsize=(10, 4))
    librosa.display.specshow(mel_spec, sr=sr, x_axis='time', y_axis='mel')
    plt.colorbar(format='%+2.0f dB')
    plt.title('Mel-spectrogram')
    plt.tight_layout()
    plt.savefig(os.path.join(output_dir, f"{base_filename}_mel_spectrogram.png"))
    plt.close()
    

In [None]:
def process_audio_file(file_path, output_dir):
    try:
        # Load audio file
        audio, sr = librosa.load(file_path, sr=None)
        
        # Check if audio segment is valid
        if not is_valid_audio(audio, sr):
            print(f"Warning: Audio file {file_path} is too short or silent. Skipping.")
            return None, None
        
        # Extract features
        with warnings.catch_warnings():
            warnings.simplefilter("ignore")
            features = extract_features(audio, sr)
        
        # Summarize features
        feature_summary = {}
        for key, value in features.items():
            if key != 'mel_spectrogram_db':
                feature_summary[f"{key}_summary"] = summarize_feature(value)
        
        # Create feature vector
        feature_vector = np.hstack([
            feature_summary.get('mfccs_summary', np.array([])),
            feature_summary.get('spectral_centroids_summary', np.array([])),
            feature_summary.get('chroma_summary', np.array([])),
            feature_summary.get('zero_crossing_rate_summary', np.array([])),
            feature_summary.get('spectral_rolloff_summary', np.array([]))
        ])
        
        # Save mel-spectrogram as image
        base_filename = os.path.splitext(os.path.basename(file_path))[0]
        save_mel_spectrogram(features['mel_spectrogram_db'], output_dir, base_filename, sr)
        
        return feature_vector, features
    except Exception as e:
        print(f"Error processing {file_path}: {str(e)}")
        return None, None

def process_audio_files_batch(df, base_dir, batch_size=1000):
    feature_data = []
    skipped_files = []
    
    for i in range(0, len(df), batch_size):
        batch = df.iloc[i:i+batch_size]
        for _, row in tqdm(batch.iterrows(), total=len(batch)):
            possible_paths = [
                os.path.join(base_dir, 'Augmented Recordings', row['processed_file']),
                os.path.join(base_dir, 'Processed Recordings', row['processed_file'])
            ]
            
            file_path = next((path for path in possible_paths if os.path.exists(path)), None)
            
            if file_path is None:
                print(f"File not found: {row['processed_file']}")
                skipped_files.append(row['processed_file'])
                continue
            
            feature_vector = process_audio_file(file_path)
            
            if feature_vector is not None:
                feature_data.append({
                    'processed_file': row['processed_file'],
                    'feature_vector': feature_vector,
                })
            else:
                skipped_files.append(row['processed_file'])
        
        # Garbage collection after each batch
        gc.collect()
    
    print(f"Total files skipped: {len(skipped_files)}")
    return feature_data, skipped_files

In [None]:
# Select a random file
random_file = random.choice(augmented_data['processed_file'])
possible_paths = [
    os.path.join('Augmented Recordings', random_file),
    os.path.join('Processed Recordings', random_file)
]
file_path = next((path for path in possible_paths if os.path.exists(path)), None)

if file_path is None:
    print(f"Error: File not found - {random_file}")
else:
    print(f"Testing feature extraction on file: {file_path}")

    # Load the audio file
    audio, sr = librosa.load(file_path, sr=None)

    # Extract features
    features = extract_features(audio, sr)

    # Print a summary of each feature
    for feature_name, feature_data in features.items():
        if feature_name == 'mel_spectrogram_db':
            print(f"{feature_name} shape: {feature_data.shape}")
        else:
            print(f"{feature_name} shape: {feature_data.shape}, mean: {np.mean(feature_data):.4f}, std: {np.std(feature_data):.4f}")

    # Summarize features
    feature_summary = {}
    for key, value in features.items():
        if key != 'mel_spectrogram_db':
            feature_summary[f"{key}_summary"] = summarize_feature(value)

    # Print summary of summarized features
    print("\nSummarized Features:")
    for key, value in feature_summary.items():
        if isinstance(value, np.ndarray):
            print(f"{key} type: numpy.ndarray, shape: {value.shape}, mean: {np.mean(value):.4f}, std: {np.std(value):.4f}")
        elif isinstance(value, list):
            print(f"{key} type: list, length: {len(value)}, first few elements: {value[:5]}")
        else:
            print(f"{key} type: {type(value)}, value: {value}")

    # Create feature vector
    feature_vector = []
    for key in ['mfccs_summary', 'spectral_centroids_summary', 'chroma_summary', 'zero_crossing_rate_summary', 'spectral_rolloff_summary']:
        value = feature_summary.get(key, np.array([]))
        if isinstance(value, list):
            feature_vector.extend(value)
        else:
            feature_vector.append(value)
    
    feature_vector = np.hstack(feature_vector)

    print(f"\nFinal feature vector shape: {feature_vector.shape}")

    # Save mel-spectrogram as image
    output_dir = 'Test'
    os.makedirs(output_dir, exist_ok=True)
    base_filename = os.path.splitext(os.path.basename(file_path))[0]
    save_mel_spectrogram(features['mel_spectrogram_db'], output_dir, base_filename, sr)

    print(f"Mel-spectrogram saved as: {base_filename}_mel_spectrogram.png in {output_dir}")

print("Feature extraction test complete.")

In [None]:
# Usage

base_dir = os.getcwd()
output_dir = 'mel-spectrograms'

feature_data, skipped_files = process_audio_files_batch(augmented_data, base_dir)

# Convert feature_data to DataFrame
feature_df = pd.DataFrame(feature_data)

# Merge the new feature DataFrame with the existing final_data DataFrame
final_data = pd.merge(augmented_data, feature_df, on='processed_file', how='left')

# Drop rows corresponding to skipped files
final_data = final_data[~final_data['processed_file'].isin(skipped_files)]

print(final_data.info())

print(f"\nTotal files in augmented_data: {len(augmented_data)}")
print(f"Files successfully processed: {len(feature_df)}")
print(f"Files skipped (too short or silent): {len(skipped_files)}")
print(f"Files in final_data after dropping skipped files: {len(final_data)}")

In [None]:
# Review the dataframe
final_data.info()

In [None]:
# Save DataFrame to CSV
final_data.to_csv('final_data.csv', index=False)

print("Processing complete. Summary data saved to 'final_data.csv'.")