In [None]:
import pandas as pd
import numpy as np
import torch
from torch import nn
import torch.nn.functional as F
import torch.optim as optim
import torch.nn.init as init
import matplotlib.pyplot as plt
from IPython.display import Audio
import librosa.display
import librosa
import zipfile
import os
from sklearn.preprocessing import LabelEncoder, OneHotEncoder
from torchvision import transforms
from google.colab import drive
from torchvision import datasets, models, transforms
from torch.optim import lr_scheduler
import time
import os
import copy
from PIL import Image
from torch.utils.data import Dataset, DataLoader
import time
import copy
from collections import Counter
drive.mount('/content/drive')
# FILEPATH: /Users/kmpandey/Desktop/3-2/Deep Learning/Project/Notebooks/Project_1 (1).ipyn


Mounted at /content/drive


In [None]:
data_dir = '/content/drive/My Drive/audio_dataset/audio_dataset/train'
classes = os.listdir(data_dir)

In [None]:
def print_duration_info(data):
    duration_df = pd.DataFrame(columns=['Duration'])

    # Iterate over each waveform in the list and get its duration
    for i, waveform in enumerate(train_data):
        duration = librosa.get_duration(y=waveform)
        duration_df.loc[i] = duration

    # Use .describe() to get summary statistics
    summary_stats = duration_df.describe()

    print(summary_stats)

def load_audio(file_path):
  waveform, sample_rate = librosa.load(file_path, sr=None)
  waveform, index = librosa.effects.trim(waveform, top_db=60)

    # pad to a length of 4s
  if len(waveform) > hyper_params['duration']:
      waveform = waveform[:hyper_params['duration']]
  else:
      padding = hyper_params['duration'] - len(waveform)
      offset = padding // 2
      waveform = np.pad(waveform, (offset, hyper_params['duration'] - len(waveform) - offset), 'constant')

  return waveform, sample_rate

def create_melspec(params, audio_data, sampling_rate):
  S = librosa.feature.melspectrogram(y = audio_data,
                                       sr=sampling_rate,
                                       n_mels=params['n_mels'],
                                       hop_length=params['hop_length'],
                                       n_fft=params['n_fft'],
                                       fmin=params['fmin'],
                                       fmax=(sampling_rate // 2))
  Sb = librosa.power_to_db(S, ref=np.max)
  Sb = Sb.astype(np.float32)

  return Sb

def load_data(data_dir):
    data = []
    labels = []

    for class_name in classes:
        class_dir = os.path.join(data_dir, class_name)
        for file_name in os.listdir(class_dir):
            if file_name.endswith('.wav'):
                file_path = os.path.join(class_dir, file_name)
                waveform, sample_rate = load_audio(file_path)
                mel_spec = create_melspec(hyper_params, waveform, sample_rate)
                data.append(mel_spec)
                labels.append(class_name)

    return data, labels

def display_audio(audio_file_path):
  waveform, sample_rate = librosa.load(audio_file_path, sr=None)

  # Plot the waveform
  plt.figure(figsize=(10, 4))
  librosa.display.waveshow(waveform, sr=sample_rate)
  plt.xlabel('Time (s)')
  plt.ylabel('Amplitude')
  plt.title('Waveform of Audio File')
  plt.show()

def display_spec(mel_spec):
  plt.figure(figsize=(10, 4))
  librosa.display.specshow(mel_spec, x_axis='off', y_axis='mel')
  plt.colorbar(format='%+2.0f dB')
  plt.title('Mel Spectrogram')
  plt.show()

In [None]:
class CNNModel1(nn.Module):

  def __init__(self, in_channels_1 = 3, num_classes = 10):
    super().__init__()

    self.conv1 = nn.Conv2d(in_channels = in_channels_1, out_channels = 10, kernel_size = 5, stride = 1)
    self.conv2 = nn.Conv2d(in_channels = 10, out_channels = 10, kernel_size = 5, stride = 1)
    self.pool = nn.MaxPool2d(kernel_size = 2, stride = 2)
    self.lin1 = nn.Linear(in_features=28090,out_features = 100)
    self.lin2 = nn.Linear(100,num_classes)

  def forward(self, x):
    x = F.relu(self.conv1(x))

    # print("1", x.shape)
    x = self.pool(x)

    # print("2", x.shape)

    x = F.relu(self.conv2(x))

    # print("3", x.shape)
    x = self.pool(x)

    # print("4", x.shape)
    x = torch.flatten(x,1)

    # print("5", x.shape)
    x = F.relu(self.lin1(x))

    # print("6", x.shape)
    x = self.lin2(x)

    # print("7", x.shape)
    return x


class ConvBlock(nn.Module):
    def __init__(self, in_channels, out_channels):
        super().__init__()

        self.conv1 = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, 3, 1, 1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(),
        )
        self.conv2 = nn.Sequential(
            nn.Conv2d(out_channels, out_channels, 3, 1, 1),
            nn.ReLU(),
            nn.Dropout(0.5)
        )

        self._init_weights()

    def _init_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight)
                if m.bias is not None:
                    nn.init.zeros_(m.bias)
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.zeros_(m.bias)

    def forward(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        x = F.avg_pool2d(x, 2)
        return x

class Classifier(nn.Module):
    def __init__(self, num_classes=13):
        super().__init__()

        self.conv = nn.Sequential(
            ConvBlock(in_channels=3, out_channels=64),
            ConvBlock(in_channels=64, out_channels=128),
            ConvBlock(in_channels=128, out_channels=256),
            ConvBlock(in_channels=256, out_channels=512),
        )



        self.fc = nn.Sequential(
            nn.Dropout(0.4),
            nn.Linear(512, 128),
            nn.PReLU(),
            nn.BatchNorm1d(128),
            nn.Dropout(0.2),
            nn.Linear(128, num_classes),
        )

    def forward(self, x):
        x = self.conv(x)
        x = torch.mean(x, dim=3)
        x, _ = torch.max(x, dim=2)
        x = self.fc(x)
        return x


class ResNet(nn.Module):

  def __init__(self, block, layers, num_classes):

    super().__init__()

    self.in_channels = 64

    # resnet stem
    self.conv1 = nn.Conv2d(in_channels=3, out_channels=self.in_channels, kernel_size=7, stride=2, padding=3, bias=False)
    self.bn1 = nn.BatchNorm2d(num_features = self.in_channels)
    self.relu = nn.ReLU(inplace=True)
    self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

    #res-blocks
    self.layer1 = self._make_layer(block, 64, layers[0])
    self.layer2 = self._make_layer(block, 128, layers[1], stride=2)
    self.layer3 = self._make_layer(block, 256, layers[2], stride=2)
    self.layer4 = self._make_layer(block, 512, layers[3], stride=2)

    #classifier block
    self.adppool = nn.AdaptiveAvgPool2d((2,2))
    self.classifier = nn.Linear(in_features=512 * block.expansion, out_features = num_classes)

  def _make_layer(self, block, out_channels, blocks, stride=1):

    downsample = None

    if stride!=1 or self.in_channels != out_channels * block.expansion:

      downsample = nn.Sequential(
          nn.Conv2d(in_channels=self.in_channels, out_channels=out_channels * block.expansion, kernel_size=1, stride=stride, bias=False),
          nn.BatchNorm2d(num_features=out_channels * block.expansion)
    )

    layers=[]

    layers.append(block(self.in_channels, out_channels, stride, downsample))

    self.in_channels = out_channels * block.expansion

    for i in range(1, blocks):
      layers.append(block(self.in_channels, out_channels))

    return nn.Sequential(*layers)



  def forward(self, x):

    x = self.conv1(x)
    x = self.bn1(x)
    x = self.relu(x)
    x = self.maxpool(x)
    x = self.layer1(x)
    x = self.layer2(x)
    x = self.layer3(x)
    x = self.layer4(x)
    x = self.adppool(x)
    x = torch.flatten(x, 1)

    return self.classifier(x)


class ClassifierWResNet(nn.Module):
    def __init__(self, num_classes=10):
        super().__init__()

        self.conv = nn.Sequential(
            ConvBlock(in_channels=3, out_channels=64),

            ConvBlock(in_channels=64, out_channels=128),
            ConvBlock(in_channels=128, out_channels=256),
            ConvBlock(in_channels=256, out_channels=512),
        )


        self.layer1 = self._make_layer(block, 64, layers[0])

        self.fc = nn.Sequential(
            nn.Dropout(0.4),
            nn.Linear(512, 128),
            nn.PReLU(),
            nn.BatchNorm1d(128),
            nn.Dropout(0.2),
            nn.Linear(128, num_classes),
        )


    def _make_layer(self, block, out_channels, blocks, stride=1):
      downsample = None

      if stride!=1 or self.in_channels != out_channels * block.expansion:

        downsample = nn.Sequential(
            nn.Conv2d(in_channels=self.in_channels, out_channels=out_channels * block.expansion, kernel_size=1, stride=stride, bias=False),
            nn.BatchNorm2d(num_features=out_channels * block.expansion)
      )

      layers=[]

      layers.append(block(self.in_channels, out_channels, stride, downsample))

      self.in_channels = out_channels * block.expansion

      for i in range(1, blocks):
        layers.append(block(self.in_channels, out_channels))

      return nn.Sequential(*layers)


    def forward(self, x):
        x = self.conv(x)
        x = torch.mean(x, dim=3)
        x, _ = torch.max(x, dim=2)
        x = self.fc(x)
        return x


class ConvBlock(nn.Module):
    def __init__(self, in_channels, out_channels):
        super().__init__()

        self.conv1 = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, 3, 1, 1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(),
        )
        self.conv2 = nn.Sequential(
            nn.Conv2d(out_channels, out_channels, 3, 1, 1),
            nn.ReLU(),
            nn.Dropout(0.5)
        )

        self._init_weights()

    def _init_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight)
                if m.bias is not None:
                    nn.init.zeros_(m.bias)
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.zeros_(m.bias)

    def forward(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        x = F.avg_pool2d(x, 2)
        return x

# RESNETS:
class BasicBlock(nn.Module):

  expansion=1 # expansion is 1 as there is no expansion factor is basic block

  def __init__(self, in_channels, out_channels, kernel_size=3, stride=1, downsample=None):

    super().__init__()

    self.conv1 = nn.Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=3, padding=1, stride=1, bias=False) # 3x3 Conv Layer
    self.bn1 = nn.BatchNorm2d(num_features = out_channels)
    self.relu = nn.ReLU(inplace=True)
    self.conv2 = nn.Conv2d(in_channels=out_channels, out_channels=out_channels, kernel_size=3, padding=1, stride=1, bias=False)
    self.bn2 = nn.BatchNorm2d(num_features=out_channels)
    self.downsample = downsample

  def forward(self, x):

    identity = x
    x = self.conv1(x)
    x = self.bn1(x)
    x = self.relu(x)

    x = self.conv2(x)
    x = self.bn2(x)

    if self.downsample is not None:
      identity = self.downsample(x)

    x += identity

    return (self.relu(x))


class BottleNeckBlock(nn.Module):

  expansion = 4

  def __init__(self, in_channels, out_channels, kernel_size=3, stride=1, downsample=None):

    super().__init__()

    base_width = 64

    width = int(out_channels * (base_width / 64.)) * 1

    self.conv1 = nn.Conv2d(in_channels=in_channels, out_channels=width, kernel_size=1, stride=stride, padding=1, bias=False)
    self.bn1 = nn.BatchNorm2d(num_features=width)
    self.conv2 = nn.Conv2d(in_channels=width, out_channels=width, kernel_size=3, stride=stride, padding=1, bias=False)
    self.bn2 = nn.BatchNorm2d(num_features = width)
    self.conv3 = nn.Conv2d(in_channels=width, out_channels=width * self.expansion , kernel_size=1, stride=stride, padding=1, bias=False)
    self.bn3 = nn.BatchNorm2d(num_features = width * self.expansion)
    self.relu = nn.ReLU(inplace=True)
    self.downsample = downsample

  def forward(self, x):

    identity = x

    x = self.conv1(x)
    x = self.bn1(x)
    x = self.relu(x)
    x = self.conv2(x)
    x = self.bn2(x)
    x = self.relu(x)
    x = self.conv3(x)
    x = self.bn3(x)
    if self.downsample is not None:
      identity = self.downsample(x)
    x+= identity

    return (self.relu(x))

class ResNetBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1):
        super().__init__()

        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_channels != out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels)
            )

    def forward(self, x):
        residual = x
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += self.shortcut(residual)
        out = F.relu(out)
        return out

class ResNet(nn.Module):

  def __init__(self, block, layers, num_classes):

    super().__init__()

    self.in_channels = 64

    # resnet stem
    self.conv1 = nn.Conv2d(in_channels=3, out_channels=self.in_channels, kernel_size=7, stride=2, padding=3, bias=False)
    self.bn1 = nn.BatchNorm2d(num_features = self.in_channels)
    self.relu = nn.ReLU(inplace=True)
    self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

    #res-blocks
    self.layer1 = self._make_layer(block, 64, layers[0])
    self.layer2 = self._make_layer(block, 128, layers[1], stride=2)
    self.layer3 = self._make_layer(block, 256, layers[2], stride=2)
    self.layer4 = self._make_layer(block, 512, layers[3], stride=2)

    #classifier block
    self.adppool = nn.AdaptiveAvgPool2d((2,2))
    self.classifier = nn.Linear(in_features=512 * block.expansion, out_features = num_classes)

  def _make_layer(self, block, out_channels, blocks, stride=1):

    downsample = None

    if stride!=1 or self.in_channels != out_channels * block.expansion:

      downsample = nn.Sequential(
          nn.Conv2d(in_channels=self.in_channels, out_channels=out_channels * block.expansion, kernel_size=1, stride=stride, bias=False),
          nn.BatchNorm2d(num_features=out_channels * block.expansion)
    )

    layers=[]

    layers.append(block(self.in_channels, out_channels, stride, downsample))

    self.in_channels = out_channels * block.expansion

    for i in range(1, blocks):
      layers.append(block(self.in_channels, out_channels))

    return nn.Sequential(*layers)



  def forward(self, x):

    x = self.conv1(x)
    x = self.bn1(x)
    x = self.relu(x)
    x = self.maxpool(x)
    x = self.layer1(x)
    x = self.layer2(x)
    x = self.layer3(x)
    x = self.layer4(x)
    x = self.adppool(x)
    x = torch.flatten(x, 1)

    return self.classifier(x)


class ExtendedResNetClassifier(nn.Module):
    def __init__(self, num_classes=10):
        super().__init__()

        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        self.layer1 = self.make_layer(64, 64, 3)
        self.layer2 = self.make_layer(64, 128, 4, stride=2)
        self.layer3 = self.make_layer(128, 256, 6, stride=2)
        self.layer4 = self.make_layer(256, 512, 3, stride=2)

        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512, num_classes)

    def make_layer(self, in_channels, out_channels, num_blocks, stride=1):
        layers = []
        layers.append(ResNetBlock(in_channels, out_channels, stride))
        for _ in range(1, num_blocks):
            layers.append(ResNetBlock(out_channels, out_channels))
        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.relu(self.bn1(self.conv1(x)))
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)
        return x


In [None]:
# HYPERPARAMETERS
duration_seconds = 4
sample_rate = 22050
hyper_params = {
    'duration': duration_seconds*sample_rate,
     'n_mels': 128,
    'hop_length': 512,
    'n_fft': 2048,
    'fmin': 20
}

model_params = {
    'num_epochs': 50,
    'batch_size': 64,
    'learning_rate': 0.001,
    'num_clases': 10,
    'eta_min': 1e-5,
    't_max': 10,
    'num_workers': 0
}

In [None]:
data_transforms = {
    'train': transforms.Compose([
        transforms.ToPILImage(),  # Convert numpy array to PIL Image
        transforms.Resize((224, 224)),  # Resize image
        transforms.RandomHorizontalFlip(),  # Apply random horizontal flip
        transforms.ToTensor(),  # Convert PIL Image to tensor
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Normalize
    ]),
    'val': transforms.Compose([
        transforms.ToPILImage(),
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ]),
}

In [None]:
# Define a custom dataset class to calculate mean and standard deviation
class CustomDataset(datasets.ImageFolder):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def compute_mean_std(self, batch_size=64):
        mean = torch.zeros(3)
        std = torch.zeros(3)
        for inputs, _ in self:
            mean += inputs.mean(dim=(0, 2,3))  # Calculate mean per channel
            std += inputs.std(dim=(0,2,3))  # Calculate std per channel
        mean /= len(self)
        std /= len(self)
        return mean, std



In [None]:
# CALCULATE THE MEAN & STD OF THE DISTRIBUTIONS [ONLY RUN ONCE; HARD-CODED FURTHER INTO THE TRANSFORMER]
meantrain, meanval, stdtrain, stdval = 0, 0, 0, 0

for x in ['train', 'val']:
    dataset = CustomDataset(os.path.join(data_dir, x), transform=data_transforms[x])
    mean, std = dataset.compute_mean_std()

    if x == 'train':
      meantrain = mean
      stdtrain = std

    if x == "val":
      meanval = mean
      stdval = std

In [None]:
meantrain, stdtrain, meanval, stdval

(tensor([102.9763,  63.6342,  96.5561]),
 tensor([53.5803, 55.6223, 55.1867]),
 tensor([87.1124, 54.9829, 89.5216]),
 tensor([56.6368, 61.6945, 60.1764]))

In [None]:
meantrain, stdtrain, meanval, stdval

(tensor([1.6516, 1.0081, 1.5352]),
 tensor([1.0014, 1.0021, 1.0013]),
 tensor([0.0079, 0.0043, 0.0062]),
 tensor([0.0056, 0.0049, 0.0043]))

In [None]:
# Define transformations for data augmentation and normalization


# Define the directory containing the numpy arrays
data_dir = '/content/drive/My Drive/DLproject1'

# Create datasets
image_datasets = {x: datasets.ImageFolder(os.path.join(data_dir, x), transform=data_transforms[x])
                  for x in ['train', 'val']}

# Create dataloaders
dataloaders = {x: torch.utils.data.DataLoader(image_datasets[x], batch_size=64, shuffle=True, num_workers=2)
               for x in ['train', 'val']}

# Check if CUDA is available and set device
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")


FileNotFoundError: Found no valid file for the classes Gunshot_and_gunfire, Hi-hat, Knock, Laughter, Shatter, Snare_drum, Splash_and_splatter, car_horn, dog_barking, drilling, siren. Supported extensions are: .jpg, .jpeg, .png, .ppm, .bmp, .pgm, .tif, .tiff, .webp

In [None]:
dataset_sizes = {x: len(image_datasets[x]) for x in ['train', 'val']}
class_names = image_datasets['train'].classes

In [None]:
def train_model(model, criterion, optimizer, scheduler, num_epochs=25):
    since = time.time()

    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0

    for epoch in range(num_epochs):
        print(f'Epoch {epoch}/{num_epochs - 1}')
        print('-' * 10)

        print("HELLO")

        # Each epoch has a training and validation phase
        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()  # Set model to training mode
            else:
                model.eval()   # Set model to evaluate mode

            running_loss = 0.0
            running_corrects = 0

            # Iterate over data.
            index = 0
            print("STARTING ITERATION")

            for inputs, labels in dataloaders[phase]:
                inputs = inputs.to(device)
                labels = labels.to(device)

                print("LABELS", labels)

                print("Input shape:", inputs.shape)  # Debugging statement


                print(len(inputs))
                print("BATCH NUMBER = ", index)
                index += 1

                # zero the parameter gradients
                optimizer.zero_grad()

                # forward
                # track history if only in train
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)


                    print(outputs, "\nPREDS: \n", preds)
                    loss = criterion(outputs, labels)

                    # backward + optimize only if in training phase
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                # statistics
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)

            if phase == 'train':
                scheduler.step()

            epoch_loss = running_loss / dataset_sizes[phase]
            epoch_acc = running_corrects.double() / dataset_sizes[phase]

            print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')

            # deep copy the model weights for the model which has the highest acc.
            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())

        print()

    time_elapsed = time.time() - since
    print(f'Training complete in {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s')
    print(f'Best val Acc: {best_acc:4f}')

    # load best model weights
    model.load_state_dict(best_model_wts)
    return model

In [None]:
model_ft = Classifier(num_classes = len(class_names)) #Load the pretrained model

criterion = nn.CrossEntropyLoss()

# Observe that all parameters are being optimized
# optimizer_ft = optim.SGD(model_ft.parameters(), lr=0.001, momentum=0.9)
optimizer_ft = optim.Adam(model_ft.parameters(), lr=0.001)

# Decay LR by a factor of 0.1 every 7 epochs
# exp_lr_scheduler = lr_scheduler.StepLR(optimizer_ft, step_size=7, gamma=0.1)
exp_lr_scheduler = lr_scheduler.ExponentialLR(optimizer_ft, gamma=0.9)

In [None]:
model_ft = train_model(model_ft, criterion, optimizer_ft, exp_lr_scheduler,
                       num_epochs=2)

Epoch 0/1
----------
HELLO
STARTING ITERATION
LABELS tensor([ 3, 10,  1, 12, 11,  1, 11,  0, 11,  9,  7,  6, 11,  1,  0,  0,  4,  9,
        10,  3,  7,  1, 11, 12,  0,  8, 11,  7,  7,  1, 11,  1,  1,  1,  0, 10,
         7, 10,  7,  5, 12,  9, 10,  2,  7, 11,  1,  2,  6, 10, 12, 12, 10, 11,
        12,  3, 11,  1,  2,  9, 11,  5,  9,  3])
Input shape: torch.Size([64, 3, 224, 224])
64
BATCH NUMBER =  0
tensor([[ 6.3982e-01,  1.0960e+00,  5.2815e-01, -2.3888e-01, -1.7210e-01,
         -4.5508e-02,  3.5425e-01,  3.8809e-01, -1.9117e-01, -3.3324e-01,
         -5.4340e-01,  1.4023e-01, -1.9887e-01],
        [-7.8204e-01, -4.0670e-01,  1.6881e+00,  1.8120e+00, -6.2531e-01,
          6.7571e-01, -4.9838e-02,  2.4799e-01, -4.2835e-01,  2.2924e-01,
         -2.3335e-01, -1.0890e-01, -1.7919e-01],
        [-1.2012e+00, -1.8451e+00, -1.6526e-01,  7.8668e-01,  2.0505e+00,
         -2.3812e-01, -8.4001e-01,  8.8268e-01, -1.2398e+00,  1.4196e+00,
          7.0223e-01, -6.4508e-01,  7.4688e-01],
   

KeyboardInterrupt: 

In [None]:
len(class_names)

8

In [None]:
class_names

['Fart',
 'Guitar',
 'Gunshot_and_gunfire',
 'Hi-hat',
 'Knock',
 'car_horn',
 'dog_barking',
 'drilling']

In [None]:

train_dir = '/content/drive/My Drive/DLproject/train'

# Iterate over each subdirectory in the train directory
for class_name in os.listdir(train_dir):
    class_path = os.path.join(train_dir, class_name)
    if os.path.isdir(class_path):
        # Count the number of files (images) in the subdirectory
        num_images = len(os.listdir(class_path))
        print(f"Class: {class_name}, Number of Images: {num_images}")

Class: dog_barking, Number of Images: 640
Class: car_horn, Number of Images: 344
Class: Fart, Number of Images: 291
Class: Guitar, Number of Images: 548
Class: drilling, Number of Images: 560
Class: Gunshot_and_gunfire, Number of Images: 448
Class: Hi-hat, Number of Images: 171
Class: Knock, Number of Images: 168
Class: Splash_and_splatter, Number of Images: 174
Class: Snare_drum, Number of Images: 449
Class: Shatter, Number of Images: 212
Class: Laughter, Number of Images: 295
Class: siren, Number of Images: 560


In [None]:
# ENSEMBLE TRAINING AND PREDICTION


def train_ensemble(models, criterion, optimizers, schedulers, num_epochs=25):
    since = time.time()

    best_model_wtss = [copy.deepcopy(model.state_dict()) for model in models]
    best_accs = [0.0] * len(models)

    for epoch in range(num_epochs):
        print(f'Epoch {epoch}/{num_epochs - 1}')
        print('-' * 10)

        # Each epoch has a training and validation phase
        for phase in ['train', 'val']:
            for model_idx, model in enumerate(models):
                if phase == 'train':
                    model.train()  # Set model to training mode
                else:
                    model.eval()   # Set model to evaluate mode

                running_loss = 0.0
                running_corrects = 0

                # Iterate over data.
                for inputs, labels in dataloaders[phase]:
                    inputs = inputs.to(device)
                    labels = labels.to(device)

                    # zero the parameter gradients
                    optimizers[model_idx].zero_grad()

                    # forward
                    with torch.set_grad_enabled(phase == 'train'):
                        outputs = model(inputs)
                        _, preds = torch.max(outputs, 1)
                        loss = criterion(outputs, labels)

                        # backward + optimize only if in training phase
                        if phase == 'train':
                            loss.backward()
                            optimizers[model_idx].step()

                    # statistics
                    running_loss += loss.item() * inputs.size(0)
                    running_corrects += torch.sum(preds == labels.data)

                if phase == 'train':
                    schedulers[model_idx].step()

                epoch_loss = running_loss / dataset_sizes[phase]
                epoch_acc = running_corrects.double() / dataset_sizes[phase]

                print(f'{phase} Loss for Model {model_idx}: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')

                # deep copy the model weights for the model which has the highest acc.
                if phase == 'val' and epoch_acc > best_accs[model_idx]:
                    best_accs[model_idx] = epoch_acc
                    best_model_wtss[model_idx] = copy.deepcopy(model.state_dict())

        print()

    time_elapsed = time.time() - since
    print(f'Training complete in {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s')

    # Load best model weights
    for model_idx, model in enumerate(models):
        model.load_state_dict(best_model_wtss[model_idx])

    return models

def ensemble_predict(models, dataloader):
    predictions = []

    for inputs, _ in dataloader:
        inputs = inputs.to(device)
        preds = []

        for model in models:
            model.eval()
            outputs = model(inputs)
            _, pred = torch.max(outputs, 1)
            preds.append(pred.cpu().numpy())

        # Take a vote for each sample
        ensemble_pred = [Counter(sample_preds).most_common(1)[0][0] for sample_preds in zip(*preds)]
        predictions.extend(ensemble_pred)

    return predictions

# Usage:
# 1. Define your models, optimizers, and schedulers.
# 2. Call train_ensemble with your models, criterion, optimizers, and schedulers.
# 3. Use ensemble_predict to make predictions using the ensemble of models.


In [None]:
# HYPERPARAMETER TUNING
import itertools

# Define the hyperparameters to tune
learning_rates = [0.001, 0.01, 0.1]
momentums = [0.9, 0.95]
hidden_sizes = [64, 128]
num_epochs = 10

# Define a function to train and validate the model with given hyperparameters
def train_and_validate(learning_rate, momentum, hidden_size):
    # Define your model, optimizer, criterion, and scheduler with the given hyperparameters

    # Example:
    model = YourModel(input_size, hidden_size, output_size).to(device)
    optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate, momentum=momentum)
    criterion = nn.CrossEntropyLoss()

    # Optionally define a scheduler
    scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.1)

    # Train the model
    trained_model = train_model(model, criterion, optimizer, scheduler, num_epochs=num_epochs)

    # Validate the model and return accuracy
    accuracy = validate_model(trained_model, dataloaders['val'])

    return accuracy

# Perform grid search or random search
best_accuracy = 0
best_hyperparameters = {}

# Uncomment one of the following options: grid search or random search

# Option 1: Grid Search
# for lr, momentum, hidden_size in itertools.product(learning_rates, momentums, hidden_sizes):
#     accuracy = train_and_validate(lr, momentum, hidden_size)
#     if accuracy > best_accuracy:
#         best_accuracy = accuracy
#         best_hyperparameters = {'lr': lr, 'momentum': momentum, 'hidden_size': hidden_size}

# Option 2: Random Search
# num_iterations = 5
# for _ in range(num_iterations):
#     lr = random.choice(learning_rates)
#     momentum = random.choice(momentums)
#     hidden_size = random.choice(hidden_sizes)
#     accuracy = train_and_validate(lr, momentum, hidden_size)
#     if accuracy > best_accuracy:
#         best_accuracy = accuracy
#         best_hyperparameters = {'lr': lr, 'momentum': momentum, 'hidden_size': hidden_size}

# Print the best hyperparameters and accuracy
print("Best Hyperparameters:", best_hyperparameters)
print("Best Accuracy:", best_accuracy)
