In [1]:
# Import relevant packages
import torch
import torch.optim as optim
import torch.nn as nn
import torch.nn.functional as F
import torchvision
import torchvision.transforms as transforms
import torchvision.datasets as datasets


import os
import time
import warnings
warnings.filterwarnings("ignore", category = FutureWarning)

ModuleNotFoundError: No module named 'torch'

In [0]:
# Flags
DISABLE_CUDA = False

In [0]:
# Hyperparameters
input_dim = [16, 32, 64, 128]
lr = [0.001, 0.0001, 0.00001]
train_test_ratio = 0.8

# Declare important file paths
notebook_path = os.path.abspath("Custom_CNN.ipynb")
data_path = os.path.dirname(notebook_path) + '/project/data/columbia-prcg-datasets/'
model_path = os.path.dirname(notebook_path) + '/project/model.pth'

In [0]:
# Select accelerator device
def get_default_device():
    """Returns device, is_cuda (bool)."""
    if not DISABLE_CUDA and torch.cuda.is_available():
        print("Running on CUDA!")
        return torch.device('cuda'), True
    else:
        print("Running on CPU!")
        return torch.device('cpu'), False
device, using_cuda = get_default_device()

In [0]:
# Specifying the hyperparameters, this function runs a full experiment and returns lists of loss
def run_experiment(input_dim, lr, crop=False, train_test_ratio=0.8):
    transform = transforms.Compose([
                    transforms.RandomCrop(input_dim) if crop else transforms.Resize(input_dim),
                    transforms.ToTensor(),
                    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
    ])

    full_set = datasets.ImageFolder(root=data_path, transform=transform)
    train_size = int(train_test_ratio * len(full_set))
    val_size = (len(full_set) - train_size) / 2
    test_size = len(full_set) - train_size - val_size
    train_set, val_set, test_set = torch.utils.data.random_split(full_set, [train_size, val_size, test_size])

    train_loader = torch.utils.data.DataLoader(train_set, shuffle=True, num_workers=1)
    val_loader = torch.utils.data.DataLoader(val_set, shuffle=False, num_workers=1)
    test_loader = torch.utils.data.DataLoader(test_set, shuffle=False, num_workers=1)

    # Declare our model architecture
    class ConvNet(nn.Module):  # Convolutional Neural Network
        def __init__(self):
            super(ConvNet, self).__init__()
            self.layer1 = nn.Sequential(
                nn.Conv2d(3, 32, kernel_size=5, stride=1, padding=2),
                nn.ReLU(),
                nn.MaxPool2d(kernel_size=2, stride=2))  
            self.layer2 = nn.Sequential(
                nn.Conv2d(32, 64, kernel_size=5, stride=1, padding=2),  
                nn.ReLU(),
                nn.MaxPool2d(kernel_size=2, stride=2))  
            # self.drop_out = nn.Dropout()
            self.fc1 = nn.Linear(input_dim * input_dim * 0.25 * 0.25 * 64, 1)
            # self.fc1 = nn.Linear(128 * 128 * 64, 1000)
            # self.fc2 = nn.Linear(1000, 1)
            self.sigmoid = nn.Sigmoid()
            
        def forward(self, x):
            out = self.layer1(x)
            out = self.layer2(out)
            out = out.reshape(out.size(0), -1)
            # out = self.drop_out(out)
            out = self.fc1(out)
            # out = self.fc2(out)
            out = self.sigmoid(out)
            return out
    model = ConvNet()
    model.to(device)

    loss_fn = torch.nn.BCELoss()
    optimizer = optim.Adam(model.parameters(), lr = lr)

    loss_list, train_accuracy_list, val_accuracy_list = train_model(model, loss_fn, optimizer, train_loader, val_loader, num_epochs=30)
    return loss_list, train_accuracy_list, val_accuracy_list

In [0]:
run_experiment(input_dim=16, lr=0.001, crop=False, train_test_ratio=0.8)

In [0]:
# Declare our model architecture
class ConvNet(nn.Module):  # Convolutional Neural Network
    def __init__(self):
        super(ConvNet, self).__init__()
        self.layer1 = nn.Sequential(
            nn.Conv2d(3, 32, kernel_size=5, stride=1, padding=2),  # (512, 512, 32)
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2))  # (256, 256, 32)
        self.layer2 = nn.Sequential(
            nn.Conv2d(32, 64, kernel_size=5, stride=1, padding=2),  # (256, 256, 64)
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2))  #  (128, 128, 64)
        # self.drop_out = nn.Dropout()
        self.fc1 = nn.Linear(128 * 128 * 64, 1)
        # self.fc1 = nn.Linear(128 * 128 * 64, 1000)
        # self.fc2 = nn.Linear(1000, 1)
        self.sigmoid = nn.Sigmoid()
        
    def forward(self, x):
        out = self.layer1(x)
        out = self.layer2(out)
        out = out.reshape(out.size(0), -1)
        # out = self.drop_out(out)
        out = self.fc1(out)
        # out = self.fc2(out)
        out = self.sigmoid(out)
        return out

model = ConvNet()
model.to(device)

In [0]:
# Define the loss function and optimizer
loss_fn = torch.nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr = lr)

In [0]:
def train_model(model, loss_fn, optimizer, train_loader, val_loader, num_epochs):
    loss_list = []
    train_accuracy_list = []
    test_accuracy_list = []

    t = torch.Tensor([0.5]).to(device)  # 0.5 acts as threshold
    torch.backends.cudnn.benchmark = True  # make training faster on Cuda
    # start_time = time.time()

    model.train()  # switch to train mode       
    for epoch in range(num_epochs):
        # Train the model
        running_loss = 0.0
        train_correct = train_total = 0 
        for i, data in enumerate(train_loader):
            inputs = data[0].to(device, non_blocking=True)
            labels = data[1].to(device, non_blocking=True)
        
            probs = model(inputs)

            outputs = (probs > t).float() * 1  # obtain train accuracies
            train_total += len(outputs)
            train_correct += (outputs == labels.float()).float().sum()

            loss = loss_fn(probs, labels.float())
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            running_loss += loss.item()

            # if False and (i + 1) % 10 == 0:
            #     print ('# Images: {:} | Loss: {:.6f} | Time: {:.6f}'.format(i + 1, running_loss / (i + 1), time.time() - start_time))
        train_accuracy = train_correct / train_total
            
        # Test current version of model to obtain accuracy    
        val_correct = val_total = 0 
        with torch.no_grad():
            for data in val_loader:
                inputs = data[0].to(device, non_blocking=True)
                labels = data[1].to(device, non_blocking=True)
                probs = model(inputs)
                outputs = (probs > t).float() * 1
                val_total += len(outputs)
                val_correct += (outputs == labels.float()).float().sum()
        val_accuracy = val_correct / val_total

        loss_list.append(running_loss)
        train_accuracy_list.append(train_accuracy)
        val_accuracy_list.append(val_accuracy)
        # print ('Epoch: {:} | Time (m): {:.6f} | Loss: {:.6f} | Test Accuracy: {:}'.format(epoch, (time.time() - start_time)/60, running_loss, test_accuracy))

    return loss_list, train_accuracy_list, val_accuracy_list