In [1]:
import os
from pathlib import Path
import pandas as pd
import librosa
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.neighbors import KNeighborsClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score, classification_report
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader



# file_path
audio_path = Path("dataset") / "audio"
csv_path = Path("dataset") / "esc50.csv"

metadata = pd.read_csv(csv_path)

In [2]:
"""
sr = 22050
duration = 5 # ESC50 has 5 sec duration
n_mfcc = 13
n_mels = 128

# Output
mfcc_dir = Path("processed_data") / "mfcc"
mel_dir = Path("processed_data") / "mel_spectrogram"
mfcc_dir.mkdir(parents=True, exist_ok=True)
mel_dir.mkdir(parents=True, exist_ok=True)

# Function to save MFCC and Mel Spectrogram
def save_features(audio_path, filename, sr=22050):
    # Load audio
    y, _ = librosa.load(audio_path, sr=sr, duration=duration)

    # Compute MFCC and Mel_spectrogram
    mfcc = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=13)
    mel_spectrogram = librosa.feature.melspectrogram(y=y, sr=sr, n_mels=n_mels)

    # Save as .npy
    np.save(f"processed_data/mfcc/{filename}_mfcc.npy", mfcc)
    np.save(f"processed_data/mel_spectrogram/{filename}_mel_spectrogram.npy", mel_spectrogram)

# Save features
for _, row in metadata.iterrows():
    audio_file = f"dataset/audio/{row['filename']}"
    filename = row["filename"].split(".")[0]
    save_features(audio_file, filename) """

'\nsr = 22050\nduration = 5 # ESC50 has 5 sec duration\nn_mfcc = 13\nn_mels = 128\n\n# Output\nmfcc_dir = Path("processed_data") / "mfcc"\nmel_dir = Path("processed_data") / "mel_spectrogram"\nmfcc_dir.mkdir(parents=True, exist_ok=True)\nmel_dir.mkdir(parents=True, exist_ok=True)\n\n# Function to save MFCC and Mel Spectrogram\ndef save_features(audio_path, filename, sr=22050):\n    # Load audio\n    y, _ = librosa.load(audio_path, sr=sr, duration=duration)\n\n    # Compute MFCC and Mel_spectrogram\n    mfcc = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=13)\n    mel_spectrogram = librosa.feature.melspectrogram(y=y, sr=sr, n_mels=n_mels)\n\n    # Save as .npy\n    np.save(f"processed_data/mfcc/{filename}_mfcc.npy", mfcc)\n    np.save(f"processed_data/mel_spectrogram/{filename}_mel_spectrogram.npy", mel_spectrogram)\n\n# Save features\nfor _, row in metadata.iterrows():\n    audio_file = f"dataset/audio/{row[\'filename\']}"\n    filename = row["filename"].split(".")[0]\n    save_features(audi

In [None]:
# Load metadata
metadata = pd.read_csv("dataset/esc50.csv")

# Function to load features and flatten them
def load_features(feature_type="mfcc"):
    feature_dir = Path(f"processed_data/{feature_type}")
    X, y = [], []
    for _, row in metadata.iterrows():
        class_label = row["category"]
        file_name = row["filename"].split(".")[0]
        
        # Load .npy file
        feature_path = feature_dir / f"{file_name}_{feature_type}.npy"
        features = np.load(feature_path)
        
        # Flatten the features to 1D for simple models
        X.append(features.flatten())
        y.append(class_label)
        
    return np.array(X), np.array(y)

# Load and split data for MFCC
X_mfcc, y_mfcc = load_features("mfcc")
X_train_mfcc, X_test_mfcc, y_train_mfcc, y_test_mfcc = train_test_split(X_mfcc, y_mfcc, test_size=0.2, random_state=12)

# Load and split data for Mel Spectrogram
X_mel, y_mel = load_features("mel_spectrogram")
X_train_mel, X_test_mel, y_train_mel, y_test_mel = train_test_split(X_mel, y_mel, test_size=0.2, random_state=12)



In [4]:
# ML Models 
"""
# Initialize models
knn = KNeighborsClassifier(n_neighbors=4)
rf = RandomForestClassifier(n_estimators=100, random_state=12)
svm = SVC(kernel="linear")

# Model List
models = {"KNN": knn, "Random Forest": rf, "SVM": svm}

# Function to train and evaluate models
def train_and_evaluate(X_train, X_test, y_train, y_test, feature_type):
    print(f"\nEvaluating models for {feature_type} features:\n")
    for name, model in models.items():
        # Train the model
        model.fit(X_train, y_train)
        
        y_pred = model.predict(X_test)
        
        # Evaluation
        accuracy = accuracy_score(y_test, y_pred)
        print(f"{name} Model Accuracy: {accuracy:.2f}")

# MFCC
train_and_evaluate(X_train_mfcc, X_test_mfcc, y_train_mfcc, y_test_mfcc, "MFCC")

# Mel Spectrogram
train_and_evaluate(X_train_mel, X_test_mel, y_train_mel, y_test_mel, "Mel Spectrogram") """

'\n# Initialize models\nknn = KNeighborsClassifier(n_neighbors=4)\nrf = RandomForestClassifier(n_estimators=100, random_state=12)\nsvm = SVC(kernel="linear")\n\n# Model List\nmodels = {"KNN": knn, "Random Forest": rf, "SVM": svm}\n\n# Function to train and evaluate models\ndef train_and_evaluate(X_train, X_test, y_train, y_test, feature_type):\n    print(f"\nEvaluating models for {feature_type} features:\n")\n    for name, model in models.items():\n        # Train the model\n        model.fit(X_train, y_train)\n        \n        y_pred = model.predict(X_test)\n        \n        # Evaluation\n        accuracy = accuracy_score(y_test, y_pred)\n        print(f"{name} Model Accuracy: {accuracy:.2f}")\n\n# MFCC\ntrain_and_evaluate(X_train_mfcc, X_test_mfcc, y_train_mfcc, y_test_mfcc, "MFCC")\n\n# Mel Spectrogram\ntrain_and_evaluate(X_train_mel, X_test_mel, y_train_mel, y_test_mel, "Mel Spectrogram") '

In [5]:
class SimpleCNN(nn.Module):
    def __init__(self, num_classes):
        super(SimpleCNN, self).__init__()
        self.conv1 = nn.Conv2d(1, 16, kernel_size=3, stride=1, padding=1)
        self.conv2 = nn.Conv2d(16, 32, kernel_size=3, stride=1, padding=1)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(0.2)

        dummy_input = torch.zeros(1, 1, 128, 216)  # (batch_size, channels, height, width)
        out = self.pool(self.relu(self.conv1(dummy_input)))
        out = self.pool(self.relu(self.conv2(out)))
        self.flattened_size = out.numel()

        # Fully connected layers
        self.fc1 = nn.Linear(self.flattened_size, 128)
        self.fc2 = nn.Linear(128, num_classes)

    def forward(self, x):
        # Convolutional layers
        x = self.relu(self.conv1(x))
        x = self.pool(x)
        x = self.relu(self.conv2(x))
        x = self.pool(x)

        # Flatten the output
        x = x.view(x.size(0), -1)

        x = self.dropout(F.relu(self.fc1(x)))
        x = self.fc2(x)
        return x

In [6]:
class AudioDataset(Dataset):
    def __init__(self, metadata, feature_type, feature_dir, num_classes):
        self.metadata = metadata
        self.feature_type = feature_type
        self.feature_dir = feature_dir
        self.num_classes = num_classes
        self.label_map = {label: idx for idx, label in enumerate(metadata['category'].unique())}

    def __len__(self):
        return len(self.metadata)

    def __getitem__(self, idx):
        row = self.metadata.iloc[idx]
        file_name = row["filename"].split(".")[0]
        feature_path = f"{self.feature_dir}/{file_name}_{self.feature_type}.npy"
        features = np.load(feature_path)

        # Normalize features and add channel dimension
        features = (features - np.mean(features)) / np.std(features)
        features = torch.tensor(features, dtype=torch.float32).unsqueeze(0)

        label = self.label_map[row["category"]]
        label = torch.tensor(label, dtype=torch.long)
        return features, label


In [61]:


def train_model(model, train_loader, test_loader, device, num_epochs=25):
    model = model.to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    for epoch in range(num_epochs):
        model.train()
        epoch_loss = 0
        correct_predictions = 0
        total_predictions = 0
        
        for features, labels in train_loader:
            features, labels = features.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(features)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            epoch_loss += loss.item()

            # Calculate accuracy
            _, predicted = torch.max(outputs, 1)  # Get the predicted class (index of max logit)
            correct_predictions += (predicted == labels).sum().item()
            total_predictions += labels.size(0)

        epoch_accuracy = (correct_predictions / total_predictions) * 100  # in percentage
        print(f"Epoch [{epoch + 1}/{num_epochs}], Loss: {epoch_loss / len(train_loader):.4f}, Accuracy: {epoch_accuracy:.2f}%")

    print("Training complete!")


In [9]:
def evaluate_model(model, data_loader, device):
    model.eval()
    correct = 0
    total = 0

    with torch.no_grad():
        for features, labels in data_loader:
            features, labels = features.to(device), labels.to(device)
            outputs = model(features)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    accuracy = 100 * correct / total
    return accuracy

In [63]:
class ImprovedCRNN(nn.Module):
    def __init__(self, input_channels=1, img_height=128, img_width=256, num_classes=50,
                 map_to_seq_hidden=128, rnn_hidden_size=256, num_rnn_layers=3, dropout=0.3):
        super(ImprovedCRNN, self).__init__()
        
        # CNN backbone
        self.cnn, (output_channels, output_height, output_width) = self._cnn_backbone(
            input_channels, img_height, img_width
        )
        
        # Map CNN output to sequence
        self.map_to_seq = nn.Linear(output_channels * output_height, map_to_seq_hidden)
        
        # Recurrent layers
        self.rnn1 = nn.LSTM(
            map_to_seq_hidden,
            rnn_hidden_size,
            num_layers=num_rnn_layers,
            bidirectional=True,
            dropout=dropout,
            batch_first=False
        )
        
        # Fully connected layer
        self.fc = nn.Linear(rnn_hidden_size * 2, num_classes)

    def _cnn_backbone(self, input_channels, img_height, img_width):
        
        cnn = nn.Sequential(
            # Block 1
            nn.Conv2d(input_channels, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2),  # Halves height and width
            nn.Dropout(0.2),

            # Block 2
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2),  # Halves height and width
            nn.Dropout(0.3),

            # Block 3
            nn.Conv2d(128, 256, kernel_size=3, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=(2, 1)),  # Halves height, keeps width
            nn.Dropout(0.3),

            # Block 4
            nn.Conv2d(256, 512, kernel_size=3, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=(2, 1)),  # Halves height, keeps width
            nn.Dropout(0.3)
        )

        # Calculate output shape
        final_height = img_height // (2 * 2 * 2 * 2)  # Height halved 4 times
        final_width = img_width // (2 * 2)  # Width halved 2 times
        output_shape = (512, final_height, final_width)

        return cnn, output_shape


    def forward(self, x):
        
        x = self.cnn(x)
        batch_size, channels, height, width = x.shape
        x = x.view(batch_size, channels * height, width).permute(2, 0, 1)
        x = self.map_to_seq(x)
        x, _ = self.rnn1(x)
        x = self.fc(x[-1])
        return x



In [64]:

if __name__ == "__main__":

    feature_type = "mel_spectrogram"
    feature_dir = f"processed_data/{feature_type}"
    metadata = pd.read_csv("dataset/esc50.csv")
    num_classes = len(metadata["category"].unique())
    num_epochs = 10

    # Create dataset and dataloaders
    dataset = AudioDataset(metadata, feature_type, feature_dir, num_classes)
    train_size = int(0.8 * len(dataset))
    test_size = len(dataset) - train_size
    train_dataset, test_dataset = torch.utils.data.random_split(dataset, [train_size, test_size])

    train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

    # Initialize model, loss, and optimizer
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = ImprovedCRNN(input_channels=1, num_classes=num_classes).to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4, weight_decay=1e-5)

    
    """
    # Train SimpleCNN
    print("Training SimpleCNN...")
    cnn_model = SimpleCNN(num_classes=num_classes).to(device)
    train_model(cnn_model, train_loader, test_loader, device, num_epochs)

    # Evaluate SimpleCNN
    cnn_accuracy = evaluate_model(cnn_model, test_loader, device)
    print(f"SimpleCNN Test Accuracy: {cnn_accuracy:.2f}%")
    """
    # Train CRNN
    print("\nTraining CRNN...")
    crnn_model = ImprovedCRNN(input_channels=1, num_classes=num_classes).to(device)
    train_model(crnn_model, train_loader, test_loader, device, num_epochs)

    # Evaluate CRNN
    crnn_accuracy = evaluate_model(crnn_model, test_loader, device)
    print(f"CRNN Test Accuracy: {crnn_accuracy:.2f}%")
    


Training CRNN...
Epoch [1/10], Loss: 3.6861, Accuracy: 5.38%
Epoch [2/10], Loss: 3.2596, Accuracy: 10.56%
Epoch [3/10], Loss: 3.0771, Accuracy: 13.00%
Epoch [4/10], Loss: 2.9714, Accuracy: 16.19%
Epoch [5/10], Loss: 2.8264, Accuracy: 19.19%
Epoch [6/10], Loss: 2.7035, Accuracy: 20.94%
Epoch [7/10], Loss: 2.6218, Accuracy: 23.56%
Epoch [8/10], Loss: 2.5252, Accuracy: 25.12%
Epoch [9/10], Loss: 2.4960, Accuracy: 25.06%
Epoch [10/10], Loss: 2.4147, Accuracy: 27.81%
Training complete!
CRNN Test Accuracy: 29.00%


In [None]:
def probabilities_to_text(probabilities, class_labels, top_n=3):
    # Ensure probabilities are a 2D tensor
    if len(probabilities.shape) != 2:
        raise ValueError("Probabilities should be a 2D tensor (batch_size, num_classes).")
    
    # Get the top N indices for the first item in the batch
    top_indices = probabilities[0].argsort(descending=True)[:top_n]

    # Ensure indices don't exceed class_labels length
    assert len(class_labels) >= probabilities.size(1), (
        "Number of class labels does not match the number of classes in the model output."
    )
    
    # Construct the description
    description = "The audio likely contains "
    for i, idx in enumerate(top_indices):
        prob = probabilities[0][idx].item() * 100
        description += f"{class_labels[idx]} ({prob:.1f}%)"
        if i < top_n - 1:
            description += ", "
        else:
            description += "."
    return description



In [None]:
# Generate description with top predictions
description = probabilities_to_text(probabilities, class_labels)
print(description)


The audio likely contains shuffling (2.2%), printer (2.1%), whistling (2.1%).


In [None]:
# Load GPT-2 model and tokenizer
model_name = "gpt2"
gpt2_model = GPT2LMHeadModel.from_pretrained(model_name)
gpt2_tokenizer = GPT2Tokenizer.from_pretrained(model_name)
gpt2_model.eval()  # Set model to evaluation mode

# Function to generate description from probabilities
# Function to generate description from probabilities
def probabilities_to_text(probabilities, class_labels, top_n=3):
    # Ensure probabilities is a 2D tensor with shape (batch_size, num_classes)
    # Select the first (and only) sample from the batch (index 0)
    probs = probabilities[0]  # This is the probability vector for the first sample

    # Sort probabilities and get top_n
    top_indices = torch.argsort(probs, descending=True)[:top_n]
    description = "Based on the classification, the audio likely contains: "

    for i, idx in enumerate(top_indices):
        prob = probs[idx].item() * 100  # Use the selected probability
        description += f"{class_labels[idx]} ({prob:.1f}%)"
        if i < top_n - 1:
            description += ", "
    
    return description


# Function to generate text interpretation from GPT-2 based on classification results
def generate_text_with_gpt2(classification_description):
    prompt = f"""
Here is the analysis from an audio classification model:
The audio contains sounds like dog barking (30%) and bird chirping (10%).

The classification suggests that the audio likely contains the following sounds:
1. Rain - 2.0%
2. Footsteps - 2.0%
3. Cat - 2.0%

Please provide:
1. A detailed explanation of what might be happening in the audio based on the classification results above.
2. Specific recommendations or actions to take if this audio is being monitored in an environmental monitoring system. For example, what should be done if the audio suggests human presence, or an anomaly in environmental conditions.
"""

    
    # Encode the input prompt
    inputs = gpt2_tokenizer(prompt, return_tensors="pt")

    # Generate output
    with torch.no_grad():
        outputs = gpt2_model.generate(inputs['input_ids'], max_length=200, num_return_sequences=1, no_repeat_ngram_size=2)

    # Decode and return the generated text
    return gpt2_tokenizer.decode(outputs[0], skip_special_tokens=True)

# Example audio input (replace this with actual audio input processing)
dummy_audio_input = torch.randn(1, 1, 128, 216)  # Example input (batch_size, channels, height, width)

# Example class labels (replace with actual class labels for your dataset)
class_labels = class_labels = [
    "dog", "cat", "car_horn", "chirping_birds", "airplane", "rain",
    "siren", "engine", "crying_baby", "crow", 
    "footsteps", "laughing", "brushing_teeth", "clapping", "keyboard_typing",
    "coughing", "sneezing", "knocking", "phone_ringing", "door_wood_knock",
    "washing_machine", "vacuum_cleaner", "glass_breaking", "fireworks", "door_wood_creak",
    "helicopter", "chainsaw", "mouse_click", "cup_filling", "printer", 
    "thunderstorm", "wind", "dripping_water", "toilet_flush", "drill",
    "camera", "camera_shutter", "snoring", "speech", "harmonica",
    "guitar", "flute", "drums", "violin", "double_bass", 
    "fart", "belch", "whistling", "shouting", "shuffling"
]  # Add all 50 labels as needed

# Initialize the CRNN model and run a forward pass
crnn_model = CRNN(num_classes=len(class_labels))
output = crnn_model(dummy_audio_input)

# Convert logits to probabilities
probabilities = F.softmax(output, dim=1)

# Get a text description of the classification results
classification_description = probabilities_to_text(probabilities, class_labels)

# Generate a text interpretation from GPT-2
generated_text = generate_text_with_gpt2(classification_description)

print(generated_text)

NameError: name 'GPT2LMHeadModel' is not defined