In [None]:
# Imports
import os
import torch
import torch.nn as nn
from torchvision import transforms
from torchvision.datasets import ImageFolder
import matplotlib.pyplot as plt
import pandas as pd
import torch.optim as optim
from tqdm.notebook import tqdm
import tempfile
import shutil
import random
from sklearn.model_selection import KFold

mount drive if running in google colab


In [None]:
try:
    import google.colab
    IN_COLAB = True
except:
    IN_COLAB = False

print("In Colab: {}".format(IN_COLAB))

if IN_COLAB:
    from image_utils import get_sample_image_size
    from csv_utils import split_classes, how_many, split_classes_threshold
    from file_utils import train_test_split, num_images
    from google.colab import drive
    drive.mount('/content/gdrive')

else:
    import sys
    sys.path.append("..")
    from src.utils.image_utils import get_sample_image_size
    from src.utils.csv_utils import split_classes, how_many, split_classes_threshold
    from src.utils.file_utils import train_test_split, num_images

set computation device to gpu if available


In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Computation device: {device}")

set seed for reproducibility


In [None]:
torch.manual_seed(0)
random.seed(0)

set hyper parameters


In [None]:
EPOCHS = 10
BATCH = 64
FOLDS = 5

# percent of all data to go towards testing and validation
# TRACKS_PER_CLASS = 500
PERCENT_TESTING = 0.15
PERCENT_VALIDATION = 0.15
LEARNING_RATE = 0.001

# spectrogram parameters
RESOLUTION = "low"
TRACK_DURATION = 15

# popularity thresholds
TRACKS_PER_CLASS = 1000
HIGH_THRESHOLD = 500000
LOW_THRESHOLD = 500000

#### load the train and test data


In [None]:
# Setting paths
base_path = f"../data" if not IN_COLAB else f"/content/gdrive/MyDrive/AI-Project/data"
dir_name = f"ch_{RESOLUTION}_{TRACK_DURATION}_all"
spectrograms_path = f"{base_path}/spectrograms/{dir_name}"
csv_path = f"{base_path}/audio_features.csv"
df = pd.read_csv(csv_path)

print(f"Csv Length: {len(df)}")
print(f"Number of spectrograms: {num_images(spectrograms_path)}")

num_viral = how_many(
    csv_path, 'number_of_videos', HIGH_THRESHOLD, 'above')
num_not_viral = how_many(
    csv_path, 'number_of_videos', LOW_THRESHOLD, 'below')

print(f"Number of tracks above threshold: {num_viral}")
print(f"Number of tracks below threshold: {num_not_viral}")

All images should be the same size but we resize them according to the first image just to be safe.


In [None]:
image_size = get_sample_image_size(spectrograms_path)
HEIGHT, WIDTH = image_size[0], image_size[1]
print(f"height: {image_size[0]}, width: {image_size[1]}")

In [None]:
transform = transforms.Compose(
    [
        transforms.Resize((HEIGHT, WIDTH)),
        transforms.Grayscale(num_output_channels=1),
        transforms.ToTensor(),
    ]
)


def get_datasets():

    # create temporary folder to store spectrograms
    root_dir = "../data/temp" if not IN_COLAB else './temp'
    os.makedirs(root_dir, exist_ok=True)
    temp_dir = tempfile.mkdtemp(dir=root_dir)

    # split into two classes based on popularity thresholds
    out_path = f"{temp_dir}/"
    # split_classes(
    #     csv_path, spectrograms_path, out_path, ["viral", "notviral"], TRACKS_PER_CLASS
    # )
    split_classes_threshold(csv_path, spectrograms_path, out_path, [
                            "viral", "notviral"], HIGH_THRESHOLD, LOW_THRESHOLD)

    # copy spectrograms to temporary folder split into train and test directories
    data_dir = tempfile.mkdtemp(prefix=dir_name + '_', dir=root_dir)
    print(f"Data directory: {data_dir}")
    train_dir, test_dir = train_test_split(temp_dir, data_dir, PERCENT_TESTING)
    train_dataset = ImageFolder(train_dir, transform=transform)
    test_dataset = ImageFolder(test_dir, transform=transform)

    # delete temporary folder
    shutil.rmtree(temp_dir)
    return train_dataset, test_dataset, data_dir

In [None]:
train_dataset, test_dataset, data_dir = get_datasets()

print(f"number of training images: {len(train_dataset)}")
print(f"number of testing images: {len(test_dataset)}")

#### Looking at the dataset


In [None]:
img, label = train_dataset[0]
print(img.shape, label)
print("classes : \n", train_dataset.classes)

# num images per class
print("num images per class")
print(train_dataset.targets.count(0))
print(train_dataset.targets.count(1))

display the first image in the dataset


In [None]:
def display_img(img, label):
    print(f"Label : {train_dataset.classes[label]}")
    plt.imshow(img.permute(1, 2, 0))


display_img(*train_dataset[0])

load the train and validation into batches.


In [None]:
from torch.utils.data.dataloader import DataLoader
from torch.utils.data import random_split

val_size = int(len(train_dataset) * PERCENT_VALIDATION)
train_size = len(train_dataset) - val_size

train_data, val_data = random_split(train_dataset, [train_size, val_size])
print(f"Length of Train Data : {len(train_data)}")
print(f"Length of Validation Data : {len(val_data)}")

# train_dl = DataLoader(train_data, BATCH, shuffle = True, num_workers = 4, pin_memory = True)
# val_dl = DataLoader(val_data, BATCH*2, num_workers = 4, pin_memory = True)

train_dl = DataLoader(train_data, batch_size=BATCH, shuffle=True)
val_dl = DataLoader(val_data, batch_size=BATCH)
test_dl = DataLoader(test_dataset, batch_size=BATCH)

visualize a single batch of images


In [None]:
from torchvision.utils import make_grid


def show_batch(dl):
    """Plot images grid of single batch"""
    for images, labels in dl:
        fig, ax = plt.subplots(figsize=(16, 12))
        ax.set_xticks([])
        ax.set_yticks([])
        ax.imshow(make_grid(images, nrow=8).permute(1, 2, 0))
        break


# show_batch(train_dl)

In [None]:
import torch.nn as nn


class VGG11(nn.Module):
    def __init__(self, in_channels=1, num_classes=2):
        super(VGG11, self).__init__()
        self.in_channels = in_channels
        self.num_classes = num_classes

        # convolutional layers
        self.conv_layers = nn.Sequential(
            nn.Conv2d(self.in_channels, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(128, 256, kernel_size=3, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.Conv2d(256, 256, kernel_size=3, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(256, 512, kernel_size=3, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(),
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(),
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
        )

        # Calculate the size of the feature maps after convolutional layers
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        self.conv_output_size = self.calculate_conv_output_size(
            in_channels, WIDTH, HEIGHT
        )

        # fully connected layers
        self.fc_layers = nn.Sequential(
            nn.Linear(in_features=self.conv_output_size, out_features=4096),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(in_features=4096, out_features=4096),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(in_features=4096, out_features=self.num_classes),
        )

    def calculate_conv_output_size(self, in_channels, width, height):
        with torch.no_grad():
            dummy_input = torch.zeros(1, in_channels, width, height)
            dummy_output = self.conv_layers(dummy_input)
            print(f"Dummy output shape: {dummy_output.shape}")
            return dummy_output.view(1, -1).size(1)

    def forward(self, x):
        x = self.conv_layers(x)
        x = x.view(x.size(0), -1)
        x = self.fc_layers(x)
        return x

Initialize model:


In [None]:
model = VGG11(in_channels=1, num_classes=2).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=LEARNING_RATE)

## Cross Validation


In [None]:
num_epochs = 10
batch_size = 64
learning_rate = 0.001
num_folds = 5

kf = KFold(n_splits=num_folds, shuffle=True, random_state=42)

for fold, (train_indices, val_indices) in enumerate(kf.split(train_dataset)):
    print(f'FOLD {fold}')
    print('--------------------------------')
    train_sampler = torch.utils.data.SubsetRandomSampler(train_indices)
    val_sampler = torch.utils.data.SubsetRandomSampler(val_indices)

    train_loader = torch.utils.data.DataLoader(
        train_dataset, batch_size=batch_size, sampler=train_sampler)
    val_loader = torch.utils.data.DataLoader(
        train_dataset, batch_size=batch_size, sampler=val_sampler)

    model = VGG11()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    criterion = nn.CrossEntropyLoss()

    for epoch in range(num_epochs):
        # Training loop
        model.train()
        for inputs, labels in tqdm(train_loader, total=len(train_loader), desc="Training"):
            # Forward pass
            outputs = model(inputs)
            loss = criterion(outputs, labels)

            # Backpropagation and optimization
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        # Validation loop
        model.eval()
        total_correct = 0
        total_samples = 0
        for inputs, labels in tqdm(train_loader, total=len(train_loader), desc="Training"):
            outputs = model(inputs)
            _, predicted = torch.max(outputs, 1)
            total_samples += labels.size(0)
            total_correct += (predicted == labels).sum().item()

        val_accuracy = 100 * total_correct / total_samples
        print(
            f'Fold {fold + 1}, Epoch [{epoch + 1}/{num_epochs}], Validation Accuracy: {val_accuracy:.2f}%')

    # Optionally, save model checkpoints or other information for each fold