In [113]:
import matplotlib.pyplot as plt
from PIL import Image

import torch
import torch.nn.functional as F
from torch import Tensor, nn
from torchvision import transforms
from torchvision.transforms import Compose, Resize, ToTensor

from sklearn.metrics import confusion_matrix
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, accuracy_score
from sklearn import preprocessing

import torch
import torchvision
from torchvision import datasets
from torchvision import transforms as T # for simplifying the transforms
from torch import nn, optim
from torch.utils.data import DataLoader, sampler, random_split
from torchvision import models

from xgboost import XGBClassifier

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import os

import sys
from tqdm import tqdm
import time
import copy

import timm
from timm.loss import LabelSmoothingCrossEntropy # This is better than normal nn.CrossEntropyLoss

# remove warnings
import warnings
warnings.filterwarnings("ignore")

import matplotlib.pyplot as plt
%matplotlib inline

device = "cuda" if torch.cuda.is_available() else "cpu"

In [2]:
def get_classes(data_dir):
    all_data = datasets.ImageFolder(data_dir)
    return all_data.classes

In [33]:
dataset_path = "../Data"
classes = get_classes(dataset_path + "/train" + "/images_original")
print(classes, len(classes))

['blues', 'classical', 'country', 'disco', 'hiphop', 'jazz', 'metal', 'pop', 'reggae', 'rock'] 10


In [83]:
df = pd.read_csv(f'{dataset_path}/features_3_sec.csv')
df.head()

Unnamed: 0,filename,length,chroma_stft_mean,chroma_stft_var,rms_mean,rms_var,spectral_centroid_mean,spectral_centroid_var,spectral_bandwidth_mean,spectral_bandwidth_var,...,mfcc16_var,mfcc17_mean,mfcc17_var,mfcc18_mean,mfcc18_var,mfcc19_mean,mfcc19_var,mfcc20_mean,mfcc20_var,label
0,blues.00000.0.wav,66149,0.335406,0.091048,0.130405,0.003521,1773.065032,167541.630869,1972.744388,117335.771563,...,39.687145,-3.24128,36.488243,0.722209,38.099152,-5.050335,33.618073,-0.243027,43.771767,blues
1,blues.00000.1.wav,66149,0.343065,0.086147,0.112699,0.00145,1816.693777,90525.690866,2010.051501,65671.875673,...,64.748276,-6.055294,40.677654,0.159015,51.264091,-2.837699,97.03083,5.784063,59.943081,blues
2,blues.00000.2.wav,66149,0.346815,0.092243,0.132003,0.00462,1788.539719,111407.437613,2084.565132,75124.921716,...,67.336563,-1.76861,28.348579,2.378768,45.717648,-1.938424,53.050835,2.517375,33.105122,blues
3,blues.00000.3.wav,66149,0.363639,0.086856,0.132565,0.002448,1655.289045,111952.284517,1960.039988,82913.639269,...,47.739452,-3.841155,28.337118,1.218588,34.770935,-3.580352,50.836224,3.630866,32.023678,blues
4,blues.00000.4.wav,66149,0.335579,0.088129,0.143289,0.001701,1630.656199,79667.267654,1948.503884,60204.020268,...,30.336359,0.664582,45.880913,1.689446,51.363583,-3.392489,26.738789,0.536961,29.146694,blues


In [99]:
train_filenames, val_filenames, test_filenames = [], [], []
train_path = os.path.join(dataset_path, "train", "genres_original")
val_path = os.path.join(dataset_path, "val", "genres_original")
test_path = os.path.join(dataset_path, "test", "genres_original")

# Iterate through all the files in the train, val, test directories and put them in their respective lists
for root, dirs, files in os.walk(train_path):
    for name in files:
        train_filenames.append(name)

print(f'There are {len(train_filenames)} training images.')

for root, dirs, files in os.walk(val_path):
    for name in files:
        val_filenames.append(name)

print(f'There are {len(val_filenames)} validation images.')

for root, dirs, files in os.walk(test_path):
    for name in files:
        test_filenames.append(name)

print(f'There are {len(test_filenames)} test images.')

There are 800 training images.
There are 100 validation images.
There are 99 test images.


In [104]:
# Go through the filenames in column "filename" 
# and if it can be found in train_filenames, val_filenames or test_filenames,
# then put the corresponding path in the column "path"
train_features_3_sec = pd.DataFrame(columns=df.columns)
val_features_3_sec = pd.DataFrame(columns=df.columns)
test_features_3_sec = pd.DataFrame(columns=df.columns)

for index, row in df.iterrows():
    splits = row['filename'].split('.')
    row['filename'] = splits[0] + "." + splits[1] + "." + splits[3] 
    if row['filename'] in train_filenames:
        new_df = pd.DataFrame([row], columns=df.columns)
        train_features_3_sec = pd.concat([train_features_3_sec, new_df], ignore_index=True)
    elif row['filename'] in val_filenames:
        new_df = pd.DataFrame([row], columns=df.columns)
        val_features_3_sec = pd.concat([val_features_3_sec, new_df], ignore_index=True)
    elif row['filename'] in test_filenames:
        new_df = pd.DataFrame([row], columns=df.columns)
        test_features_3_sec = pd.concat([test_features_3_sec, new_df], ignore_index=True)

train_features_3_sec.to_csv(f'{dataset_path}/train_features_3_sec.csv', index=False)
val_features_3_sec.to_csv(f'{dataset_path}/val_features_3_sec.csv', index=False)
test_features_3_sec.to_csv(f'{dataset_path}/test_features_3_sec.csv', index=False)

print(f'There are {len(train_features_3_sec)} training features.')
print(f'There are {len(val_features_3_sec)} validation features.')
print(f'There are {len(test_features_3_sec)} test features.')

There are 7993 training features.
There are 999 validation features.
There are 988 test features.


In [117]:
# Apply xgboost to the dataset
df = pd.read_csv(f'{dataset_path}/train_features_3_sec.csv')

genre_list = df.iloc[:, -1]
encoder = preprocessing.LabelEncoder()

df = df.iloc[0:, 1:] 

# Create a target variable 'y' by selecting the 'label' column from the DataFrame
y_train = encoder.fit_transform(genre_list)

# Create a feature matrix 'X' by selecting all columns from the DataFrame except 'label'
X_train = df.loc[:, df.columns != 'label']

# Normalize the feature matrix 'X' using Min-Max scaling
cols = X_train.columns
min_max_scaler = preprocessing.MinMaxScaler()
np_scaled = min_max_scaler.fit_transform(X_train)
X_train = pd.DataFrame(np_scaled, columns = cols)

In [121]:
df = pd.read_csv(f'{dataset_path}/val_features_3_sec.csv')

genre_list = df.iloc[:, -1]
encoder = preprocessing.LabelEncoder()

df = df.iloc[0:, 1:] 

# Create a target variable 'y' by selecting the 'label' column from the DataFrame
y_val = encoder.fit_transform(genre_list)

# Create a feature matrix 'X' by selecting all columns from the DataFrame except 'label'
X_val = df.loc[:, df.columns != 'label']

# Normalize the feature matrix 'X' using Min-Max scaling
cols = X_val.columns
min_max_scaler = preprocessing.MinMaxScaler()
np_scaled = min_max_scaler.fit_transform(X_val)
X_val = pd.DataFrame(np_scaled, columns = cols)

In [122]:
df = pd.read_csv(f'{dataset_path}/test_features_3_sec.csv')

genre_list = df.iloc[:, -1]
encoder = preprocessing.LabelEncoder()

df = df.iloc[0:, 1:] 

# Create a target variable 'y' by selecting the 'label' column from the DataFrame
y_test = encoder.fit_transform(genre_list)

# Create a feature matrix 'X' by selecting all columns from the DataFrame except 'label'
X_test = df.loc[:, df.columns != 'label']

# Normalize the feature matrix 'X' using Min-Max scaling
cols = X_test.columns
min_max_scaler = preprocessing.MinMaxScaler()
np_scaled = min_max_scaler.fit_transform(X_test)
X_test = pd.DataFrame(np_scaled, columns = cols)

In [123]:
def model_assess(X_train, y_train, X_test, y_test, model, title = "Default"):
    model.fit(X_train, y_train)
    preds = model.predict(X_test.values)
    print('Accuracy', title, ':', round(accuracy_score(y_test, preds), 5), '\n')

In [126]:
xgbmodel = XGBClassifier(n_estimators=1000, learning_rate=0.05)
X_train = pd.concat([X_train, X_val], ignore_index=True)
y_train = np.concatenate((y_train, y_val), axis=0)
print(X_train.shape, y_train.shape)
print(X_test.shape, y_test.shape)
model_assess(X_train, y_train, X_test, y_test, xgbmodel, "Cross Gradient Booster")

(9991, 58) (9991,)
(988, 58) (988,)
Accuracy Cross Gradient Booster : 0.47368 



In [53]:
def get_data_loaders(data_dir, batch_size):
    transform = T.Compose([ # We dont need augmentation for test transforms
        T.Resize(256),
        T.CenterCrop(224),
        T.ToTensor(),
        T.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)), # imagenet means
    ])
    test_data = datasets.ImageFolder(os.path.join(data_dir, "test/images_original/"), transform=transform)
    test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=False, num_workers=4)
    return test_loader, len(test_data)

In [54]:
(test_loader, test_data_len) = get_data_loaders(dataset_path, 32)

In [55]:
def load_swin_transformer_model(hub_url, model_name, num_classes, device, checkpoint_path=None):
    # Load pre-trained Swin Transformer model
    model = torch.hub.load(hub_url, model_name, pretrained=True)
    
    # Freeze the model parameters
    for param in model.parameters():
        param.requires_grad = False

    # Modify the model head
    n_inputs = model.head.in_features
    model.head = nn.Sequential(
        nn.Linear(n_inputs, 512),
        nn.ReLU(),
        nn.Dropout(0.3),
        nn.Linear(512, num_classes)
    )
    model = model.to(device)

    # Initialize criterion, optimizer, and scheduler
    criterion = LabelSmoothingCrossEntropy()
    criterion = criterion.to(device)
    optimizer = optim.Adam(model.head.parameters(), lr=0.001)
    scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.97)

    # Load model from checkpoint
    if checkpoint_path:
        checkpoint = torch.load(checkpoint_path)
        model.load_state_dict(checkpoint['model_state_dict'])
        optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
        scheduler.load_state_dict(checkpoint['scheduler_state_dict'])
        best_acc = checkpoint['best_acc']
        train_losses = checkpoint['train_losses']
        train_accuracies = checkpoint['train_accuracies']
        val_losses = checkpoint['val_losses']
        val_accuracies = checkpoint['val_accuracies']

    return model, criterion, optimizer, scheduler

In [56]:
checkpoint_path = "../Models/SwinT/_epoch_49.pth"
HUB_URL = "SharanSMenon/swin-transformer-hub:main"
MODEL_NAME = "swin_tiny_patch4_window7_224"
swin_model, _, _, _ = load_swin_transformer_model(
    hub_url = HUB_URL,
    model_name = MODEL_NAME,
    checkpoint_path=checkpoint_path,
    num_classes=len(classes),
    device=device
)

Using cache found in /home/sanyam/.cache/torch/hub/SharanSMenon_swin-transformer-hub_main


In [57]:
test_loss = 0.0
class_correct = list(0 for i in range(len(classes)))
class_total = list(0 for i in range(len(classes)))
swin_model.eval()

for data, target in tqdm(test_loader):
    data, target = data.to(device), target.to(device)
    with torch.no_grad(): # turn off autograd for faster testing
        output = swin_model(data)
    _, pred = torch.max(output, 1)
    correct_tensor = pred.eq(target.data.view_as(pred))
    correct = np.squeeze(correct_tensor.cpu().numpy())
    if len(target) == 32:
        for i in range(32):
            label = target.data[i]
            class_correct[label] += correct[i].item()
            class_total[label] += 1

for i in range(len(classes)):
    if class_total[i] > 0:
        print("Test Accuracy of %5s: %2d%% (%2d/%2d)" % (
            classes[i], 100*class_correct[i]/class_total[i], np.sum(class_correct[i]), np.sum(class_total[i])
        ))
    else:
        print("Test accuracy of %5s: NA" % (classes[i]))
print("Test Accuracy of %2d%% (%2d/%2d)" % (
            100*np.sum(class_correct)/np.sum(class_total), np.sum(class_correct), np.sum(class_total)
        ))

100%|██████████| 4/4 [00:07<00:00,  1.81s/it]

Test Accuracy of blues: 50% ( 5/10)
Test Accuracy of classical: 100% (10/10)
Test Accuracy of country: 80% ( 8/10)
Test Accuracy of disco: 30% ( 3/10)
Test Accuracy of hiphop: 80% ( 8/10)
Test Accuracy of  jazz: 88% ( 8/ 9)
Test Accuracy of metal: 90% ( 9/10)
Test Accuracy of   pop: 80% ( 8/10)
Test Accuracy of reggae: 90% ( 9/10)
Test Accuracy of  rock: 57% ( 4/ 7)
Test Accuracy of 75% (72/96)





In [58]:
def get_data_loaders_2(data_dir, batch_size, train=False):
    test_transforms = Compose([
        ToTensor(),
        transforms.Normalize(mean=[0.4931, 0.9151, 0.9960], std=[
                             0.4495, 0.1716, 0.0602])
    ])
    test_dataset = datasets.ImageFolder(os.path.join(data_dir, "test/images_original/"), transform=test_transforms)
    test_dataloader = torch.utils.data.DataLoader(
        dataset=test_dataset,
        batch_size=batch_size,
        shuffle=False,
        num_workers=4
    )
    return test_dataloader, len(test_dataset)


In [59]:
(test_loader, test_data_len) = get_data_loaders_2(dataset_path, 32, train=False)

In [60]:
def load_resnet18_model(checkpoint_path, classes, device):
    # Load a Pretrained ResNet18 Model
    resnet = models.resnet18(pretrained=True)
    in_features = resnet.fc.in_features
    fc = nn.Linear(in_features=in_features, out_features=len(classes))
    resnet.fc = fc
    resnet = resnet.to(device)

    # Load the checkpoint
    checkpoint = torch.load(checkpoint_path, map_location=device)

    # Restore the model state
    resnet.load_state_dict(checkpoint['model_state_dict'])

    # If you need to continue training, also restore the optimizer state
    optimizer = optim.Adam(resnet.fc.parameters(), lr=0.001)
    optimizer.load_state_dict(checkpoint['optimizer_state_dict'])

    # Other data from the checkpoint, if needed
    train_losses = checkpoint.get('train_losses', [])
    val_losses = checkpoint.get('val_losses', [])

    return resnet, optimizer, train_losses, val_losses

In [61]:
checkpoint_path = "../Models/Resnet18/checkpoint_50.pth"
resnet18_model, optimizer, train_losses, val_losses = load_resnet18_model(checkpoint_path, classes, device)

In [62]:
y_test = []
y_pred = []

# Iterate through the test dataloader
for img, label in test_loader:
    img = img.to(device)
    resnet18_model.eval()
    with torch.no_grad():
        prediction = resnet18_model(img)
    
    final_preds = torch.max(prediction, dim=1)[1]
    y_test.extend(label.tolist())
    y_pred.extend(final_preds.cpu().tolist())

# Map predicted indices back to class names
y_pred_labels = [classes[pred] for pred in y_pred]

# Calculate class-wise accuracy
class_correct = list(0. for i in range(len(classes)))
class_total = list(0. for i in range(len(classes)))
classwise_accuracy = {}

for i in range(len(y_test)):
    label = y_test[i]
    pred = y_pred[i]
    if label == pred:
        class_correct[label] += 1
    class_total[label] += 1

for i in range(len(classes)):
    classwise_accuracy[classes[i]] = 100 * class_correct[i] / class_total[i] if class_total[i] > 0 else 0

# Print Class-wise accuracy
for class_name, accuracy in classwise_accuracy.items():
    print(f'Accuracy of {class_name} : {accuracy:.2f} %')

print("Accuracy:",(100*(np.array(y_test) == np.array(y_pred)).sum()/len(y_test)))

Accuracy of blues : 30.00 %
Accuracy of classical : 70.00 %
Accuracy of country : 50.00 %
Accuracy of disco : 50.00 %
Accuracy of hiphop : 70.00 %
Accuracy of jazz : 88.89 %
Accuracy of metal : 60.00 %
Accuracy of pop : 80.00 %
Accuracy of reggae : 30.00 %
Accuracy of rock : 60.00 %
Accuracy: 58.58585858585859


In [80]:
batch_size = 32
def weighted_ensemble_predictions(resnet_model, swin_model, resnet_loader, swin_loader, device, resnet_weight=0.3, swin_weight=0.7):
    assert resnet_weight + swin_weight == 1, "Weights must sum up to 1."

    resnet_model.eval()
    swin_model.eval()
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for (resnet_batch, swin_batch) in zip(resnet_loader, swin_loader):
            resnet_inputs, labels = resnet_batch
            swin_inputs, _ = swin_batch

            resnet_inputs, swin_inputs = resnet_inputs.to(device), swin_inputs.to(device)
            labels = labels.to(device)

            # Get predictions from both models
            outputs_resnet = nn.functional.softmax(resnet_model(resnet_inputs), dim=1)
            outputs_swin = nn.functional.softmax(swin_model(swin_inputs), dim=1)

            # Weighted average of outputs
            averaged_outputs = (outputs_resnet * resnet_weight) + (outputs_swin * swin_weight)

            # Final prediction
            _, preds = torch.max(averaged_outputs, 1)

            all_preds.extend(preds.view(-1).cpu().numpy())
            all_labels.extend(labels.view(-1).cpu().numpy())

    return all_preds, all_labels

dataset_path = "../Data"

# Example usage
swin_dataloader, _ = get_data_loaders(dataset_path, batch_size)
resnet_dataloader, _ = get_data_loaders_2(dataset_path, batch_size)

predictions, labels = weighted_ensemble_predictions(resnet18_model, swin_model, resnet_dataloader, swin_dataloader, device, resnet_weight=0.2, swin_weight=0.8)

In [81]:
def per_class_accuracy(predictions, labels, class_names):
    """
    Compute per-class accuracy given predictions and labels.

    :param predictions: List or array of model predictions.
    :param labels: List or array of ground truth labels.
    :param class_names: List of class names.
    :return: Dictionary of per-class accuracy.
    """
    # Compute the confusion matrix
    cm = confusion_matrix(labels, predictions)
    
    # Calculate per-class accuracies
    class_accuracies = {}
    for i, class_name in enumerate(class_names):
        if i < len(cm):
            true_positives = cm[i, i]
            total = sum(cm[i, :])
            class_accuracy = true_positives / total if total > 0 else 0
            class_accuracies[class_name] = class_accuracy
    
    return class_accuracies

# Example usage
class_accuracies = per_class_accuracy(predictions, labels, classes)

for class_name, accuracy in class_accuracies.items():
    print(f"Accuracy for {class_name}: {accuracy:.2f}")

print("Accuracy:",(100*(np.array(labels) == np.array(predictions)).sum()/len(labels)))

Accuracy for blues: 0.60
Accuracy for classical: 1.00
Accuracy for country: 0.80
Accuracy for disco: 0.40
Accuracy for hiphop: 0.80
Accuracy for jazz: 0.89
Accuracy for metal: 0.90
Accuracy for pop: 0.80
Accuracy for reggae: 0.90
Accuracy for rock: 0.50
Accuracy: 75.75757575757575


In [77]:
def prediction_analysis(resnet_model, swin_model, resnet_loader, swin_loader, device):
    resnet_model.eval()
    swin_model.eval()

    correct_by_both = 0
    correct_by_resnet_only = 0
    correct_by_swin_only = 0
    incorrect_by_both = 0
    total_samples = 0

    with torch.no_grad():
        for (resnet_batch, swin_batch) in zip(resnet_loader, swin_loader):
            resnet_inputs, labels = resnet_batch
            swin_inputs, _ = swin_batch

            # Move to device
            resnet_inputs, swin_inputs, labels = resnet_inputs.to(device), swin_inputs.to(device), labels.to(device)

            # Get predictions
            outputs_resnet = resnet_model(resnet_inputs)
            outputs_swin = swin_model(swin_inputs)

            _, preds_resnet = torch.max(outputs_resnet, 1)
            _, preds_swin = torch.max(outputs_swin, 1)

            # Update counts
            correct_resnet = (preds_resnet == labels)
            correct_swin = (preds_swin == labels)

            correct_by_both += torch.sum(correct_resnet & correct_swin).item()
            correct_by_resnet_only += torch.sum(correct_resnet & ~correct_swin).item()
            correct_by_swin_only += torch.sum(~correct_resnet & correct_swin).item()
            incorrect_by_both += torch.sum(~correct_resnet & ~correct_swin).item()

            total_samples += labels.size(0)

    return {
        "correct_by_both": correct_by_both,
        "correct_by_resnet_only": correct_by_resnet_only,
        "correct_by_swin_only": correct_by_swin_only,
        "incorrect_by_both": incorrect_by_both,
        "total_samples": total_samples
    }

# Example usage
device = "cuda" if torch.cuda.is_available() else "cpu"
analysis_results = prediction_analysis(resnet18_model, swin_model, resnet_dataloader, swin_dataloader, device)

for key, value in analysis_results.items():
    print(f"{key}: {value}")

correct_by_both: 50
correct_by_resnet_only: 8
correct_by_swin_only: 23
incorrect_by_both: 18
total_samples: 99
