# Feature Extration

In [None]:
import os
import librosa
import pywt
import numpy as np
from tqdm import tqdm
import scipy.stats

# Function to extract Mel-Spectrogram and MFCC features
def extract_mel_mfcc(file_path, sr=22050, n_mels=128, n_mfcc=20):
    y, sr = librosa.load(file_path, sr=sr, mono=True)
    mel_spec = librosa.feature.melspectrogram(y=y, sr=sr, n_mels=n_mels)
    mfcc = librosa.feature.mfcc(S=librosa.power_to_db(mel_spec), n_mfcc=n_mfcc)
    return mel_spec, mfcc

# Function to extract DWT-based statistical features
def extract_dwt_features(file_path, wavelet='db4', level=5):
    y, sr = librosa.load(file_path, sr=22050, mono=True)
    coeffs = pywt.wavedec(y, wavelet, level=level)
    features = []
    for c in coeffs:
        stats = [np.mean(c), np.var(c), np.std(c), 
                 np.mean(np.abs(c)), np.mean(c**2), 
                 scipy.stats.skew(c), scipy.stats.entropy(np.abs(c))]
        features.extend(stats)
    return np.array(features)

# Function to process training data
def process_training_data(train_dir, save_path):
    feature_data = []
    labels = []
    for subdir in os.listdir(train_dir):
        subfolder_path = os.path.join(train_dir, subdir)
        if os.path.isdir(subfolder_path):
            for file in tqdm(os.listdir(subfolder_path), desc=f"Processing {subdir}"):
                if file.endswith(".wav"):
                    file_path = os.path.join(subfolder_path, file)
                    mel_spec, mfcc = extract_mel_mfcc(file_path)
                    dwt_features = extract_dwt_features(file_path)
                    label = subdir  # Subfolder name as the label
                    feature_data.append({'mel_spec': mel_spec, 'mfcc': mfcc, 'dwt': dwt_features})
                    labels.append(label)
    np.savez(save_path, features=feature_data, labels=labels)

# Function to process testing data
def process_testing_data(test_dir, save_path):
    feature_data = []
    labels = []
    for file in tqdm(os.listdir(test_dir), desc="Processing Test Data"):
        if file.endswith(".wav"):
            file_path = os.path.join(test_dir, file)
            txt_file_path = os.path.splitext(file_path)[0] + ".txt"
            mel_spec, mfcc = extract_mel_mfcc(file_path)
            dwt_features = extract_dwt_features(file_path)

            # Read labels from the corresponding .txt file
            with open(txt_file_path, "r") as txt_file:
                file_labels = txt_file.read().splitlines()  # Multi-labels
            feature_data.append({'mel_spec': mel_spec, 'mfcc': mfcc, 'dwt': dwt_features})
            labels.append(file_labels)
    np.savez(save_path, features=feature_data, labels=labels)

# Process training and testing data
train_dir = "./../IRMAS/IRMAS-TrainingData"
test_dir_part1 = "./../IRMAS/IRMAS-TestingData-Part1"
test_dir_part2 = "./../IRMAS/IRMAS-TestingData-Part2"

# Save paths
save_path_train = "train_features.npz"
save_path_test_part1 = "test_features_part1.npz"
save_path_test_part2 = "test_features_part2.npz"

# Process and save
process_training_data(train_dir, save_path_train)
process_testing_data(test_dir_part1, save_path_test_part1)
process_testing_data(test_dir_part2, save_path_test_part2)

# Data Preparation


In [None]:
import torch
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split

# Custom Dataset class for IRMAS
class IRMASDataset(Dataset):
    def __init__(self, feature_file, train=True, transform=None):
        # Load features and labels
        data = np.load(feature_file, allow_pickle=True)
        self.features = data['features']
        self.labels = data['labels']

        # Encode labels for training data (single-label encoding)
        le = LabelEncoder()
        if isinstance(self.labels[0], list):  # Multi-label for testing data
            self.labels = [le.fit_transform(label) for label in self.labels]
        else:  # Single-label for training data
            self.labels = le.fit_transform(self.labels)
        self.label_mapping = dict(zip(le.classes_, range(len(le.classes_))))

        # Split into train and validation sets
        if train:
            self.data, _, self.targets, _ = train_test_split(
                self.features, self.labels, test_size=0.2, random_state=42)
        else:
            _, self.data, _, self.targets = train_test_split(
                self.features, self.labels, test_size=0.2, random_state=42)

        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        sample = self.data[idx]
        mel_spec = sample['mel_spec']
        mfcc = sample['mfcc']
        dwt = sample['dwt']
        label = self.targets[idx]

        # Apply transformations if provided
        if self.transform:
            mel_spec = self.transform(mel_spec)
            mfcc = self.transform(mfcc)
            dwt = self.transform(dwt)

        # Convert features to tensors
        mel_spec_tensor = torch.tensor(mel_spec, dtype=torch.float32).unsqueeze(0)
        mfcc_tensor = torch.tensor(mfcc, dtype=torch.float32).unsqueeze(0)
        dwt_tensor = torch.tensor(dwt, dtype=torch.float32)

        return (mel_spec_tensor, mfcc_tensor, dwt_tensor), torch.tensor(label, dtype=torch.long)

# Create DataLoaders
def create_dataloaders(feature_file, batch_size=32):
    train_dataset = IRMASDataset(feature_file, train=True)
    val_dataset = IRMASDataset(feature_file, train=False)

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

    return train_loader, val_loader, train_dataset.label_mapping

# Example usage
train_feature_file = "train_features.npz"
batch_size = 32
train_loader, val_loader, label_mapping = create_dataloaders(train_feature_file, batch_size)

# Print label mapping
print("Label mapping:", label_mapping)

# Deep CNN Model


In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

# Define the Deep Convolutional Neural Network (CNN)
class IRMASModel(nn.Module):
    def __init__(self, num_classes):
        super(IRMASModel, self).__init__()
        
        # CNN pathway for Mel-Spectrogram and MFCC
        self.cnn_path = nn.Sequential(
            nn.Conv2d(1, 16, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.BatchNorm2d(16),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Dropout(0.25),
            
            nn.Conv2d(16, 32, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.BatchNorm2d(32),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Dropout(0.25),
            
            nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.BatchNorm2d(64),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Dropout(0.25),
            
            nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.BatchNorm2d(128),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Dropout(0.25),
            
            nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.BatchNorm2d(256),
            nn.Dropout(0.25),
            nn.AdaptiveAvgPool2d(1)  # Global Average Pooling
        )
        
        # Dense pathway for DWT-based statistical features
        self.dwt_path = nn.Sequential(
            nn.Linear(35, 64),
            nn.ReLU(),
        )
        
        # Combined dense layers
        self.fc = nn.Sequential(
            nn.Linear(256 + 64, 512),
            nn.ReLU(),
            nn.Dropout(0.5),
            
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.BatchNorm1d(256),
            nn.Dropout(0.5),
            
            nn.Linear(256, num_classes),
            nn.Softmax(dim=1)
        )
    
    def forward(self, x):
        mel_spec, mfcc, dwt = x
        
        # Process Mel-Spectrogram/MFCC features through CNN
        mel_spec_out = self.cnn_path(mel_spec)
        mfcc_out = self.cnn_path(mfcc)
        
        # Flatten the outputs
        mel_spec_out = mel_spec_out.view(mel_spec_out.size(0), -1)
        mfcc_out = mfcc_out.view(mfcc_out.size(0), -1)
        
        # Combine Mel-Spectrogram and MFCC outputs
        combined_cnn_out = mel_spec_out + mfcc_out
        
        # Process DWT features through Dense Pathway
        dwt_out = self.dwt_path(dwt)
        
        # Combine CNN and DWT features
        combined_features = torch.cat((combined_cnn_out, dwt_out), dim=1)
        
        # Final classification layers
        out = self.fc(combined_features)
        return out

# Instantiate the model
num_classes = 11  # Number of instrument classes in IRMAS dataset
model = IRMASModel(num_classes)

# Print model summary
print(model)

# Training

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from tqdm import tqdm
import os

# Training function
def train_model(model, train_loader, val_loader, num_epochs, device, save_dir):
    # Define loss function and optimizer
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', patience=5, verbose=True)
    
    # Move model to the device
    model = model.to(device)
    
    # Track accuracy and loss
    history = {"train_loss": [], "val_loss": [], "train_acc": [], "val_acc": []}

    for epoch in range(num_epochs):
        print(f"Epoch {epoch + 1}/{num_epochs}")
        
        # Training phase
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0

        for inputs, labels in tqdm(train_loader, desc="Training"):
            inputs = [x.to(device) for x in inputs]
            labels = labels.to(device)

            # Zero the parameter gradients
            optimizer.zero_grad()

            # Forward pass
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            running_loss += loss.item()

            # Backward pass and optimize
            loss.backward()
            optimizer.step()

            # Calculate training accuracy
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        train_loss = running_loss / len(train_loader)
        train_acc = correct / total
        history["train_loss"].append(train_loss)
        history["train_acc"].append(train_acc)

        # Validation phase
        model.eval()
        val_loss = 0.0
        correct = 0
        total = 0
        with torch.no_grad():
            for inputs, labels in tqdm(val_loader, desc="Validation"):
                inputs = [x.to(device) for x in inputs]
                labels = labels.to(device)

                # Forward pass
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                val_loss += loss.item()

                # Calculate validation accuracy
                _, predicted = torch.max(outputs, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()

        val_loss /= len(val_loader)
        val_acc = correct / total
        history["val_loss"].append(val_loss)
        history["val_acc"].append(val_acc)

        # Print results
        print(f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}")
        print(f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")

        # Adjust learning rate
        scheduler.step(val_loss)

        # Save model and history
        torch.save(model.state_dict(), os.path.join(save_dir, f"model_epoch_{epoch + 1}.pth"))
        torch.save(history, os.path.join(save_dir, "history.pth"))

    return history

# Set up device and directories
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Current Training Session is Running on: ", device)
save_dir = "saved_models"
os.makedirs(save_dir, exist_ok=True)

# Load data
train_feature_file = "train_features.npz"
batch_size = 32
train_loader, val_loader, label_mapping = create_dataloaders(train_feature_file, batch_size)

# Train the model
num_epochs = 20
history = train_model(model, train_loader, val_loader, num_epochs, device, save_dir)

# Accuracy and Loss

In [None]:
import torch
import matplotlib.pyplot as plt

# Load training history
history_path = "saved_models/history.pth"
history = torch.load(history_path)

# Plot loss curves
plt.figure(figsize=(10, 5))
plt.plot(history['train_loss'], label='Train Loss')
plt.plot(history['val_loss'], label='Validation Loss')
plt.title('Loss Curve')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.grid()
plt.show()

# Plot accuracy curves
plt.figure(figsize=(10, 5))
plt.plot(history['train_acc'], label='Train Accuracy')
plt.plot(history['val_acc'], label='Validation Accuracy')
plt.title('Accuracy Curve')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
plt.grid()
plt.show()

# Testset Evaluation

In [None]:
from sklearn.metrics import precision_score, recall_score, f1_score, classification_report

# Load the test features and labels
test_feature_file = "test_features_part1.npz"  # Use Part1 or Part2 as needed
test_dataset = IRMASDataset(test_feature_file, train=False)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# Load the trained model
model_path = "saved_models/model_epoch_20.pth"  # Use the best epoch
model.load_state_dict(torch.load(model_path))
model = model.to(device)
model.eval()

# Evaluate the model on the test set
all_labels = []
all_preds = []

with torch.no_grad():
    for inputs, labels in tqdm(test_loader, desc="Testing"):
        inputs = [x.to(device) for x in inputs]
        labels = labels.to(device)

        # Forward pass
        outputs = model(inputs)
        _, predicted = torch.max(outputs, 1)

        # Store predictions and true labels
        all_labels.extend(labels.cpu().numpy())
        all_preds.extend(predicted.cpu().numpy())

# Calculate performance metrics
precision = precision_score(all_labels, all_preds, average='macro')
recall = recall_score(all_labels, all_preds, average='macro')
f1 = f1_score(all_labels, all_preds, average='macro')

print("Precision:", precision)
print("Recall:", recall)
print("F1-Score:", f1)
print("\nClassification Report:\n", classification_report(all_labels, all_preds, target_names=list(label_mapping.keys())))