In [None]:
import os

from tqdm import tqdm
from json import dumps
from ujson import load as json_load

import numpy as np
import matplotlib as plt
import random
import torch
import torch.utils.data as data
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.optim.lr_scheduler as sched

from PIL import Image
from torch import from_numpy
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms


In [None]:
gpu_ids = []
if torch.cuda.is_available():
    gpu_ids += [gpu_id for gpu_id in range(torch.cuda.device_count())]
    device = torch.device('cuda:{}'.format(gpu_ids[0]))
    torch.cuda.set_device(device)
else:
    device = torch.device('cpu')

In [None]:
GRID_DIMS = (18, 11)
batch_size = 64
# load the mean and std images
imageDirName = "../DATA/image"
# compute the train image statistics
mean_image = np.mean([np.load(os.path.join(imageDirName, "mean_train_img_2017.npy"))], axis=0)
std_image = np.mean([np.load(os.path.join(imageDirName, "std_train_img_2017.npy"))], axis=0)
# compute the pixels statistics
MEAN_PIX = np.mean(np.reshape(mean_image, (-1,3)), axis=0)         # (255, 255, 255) convention
STD_PIX = np.mean(np.reshape(std_image, (-1,3)), axis=0)           # (255, 255, 255) convention

# TRAIN image transformation pipeline
train_transformer = transforms.Compose([
    transforms.Resize(256),                                         # resize to (393, 256)
    transforms.Lambda(lambda img: img.crop(box=(0, 0, 256, 384))),  # crop to (384, 256)
    transforms.RandomHorizontalFlip(),                              # randomly flip image horizontally
    transforms.ToTensor(),                                          # transform it into a torch tensor and put in (1, 1, 1) convention
    transforms.Normalize(MEAN_PIX/255, (1,1,1))])                   # normalize the image

# EVAL image transformation pipeline (no horizontal flip)
eval_transformer = transforms.Compose([
    transforms.Resize(256),                                         # resize to (393, 256)
    transforms.Lambda(lambda img: img.crop(box=(0, 0, 256, 384))),  # crop to (384, 256)
    transforms.ToTensor(),                                          # transform it into a torch tensor and put in (1, 1, 1) convention
    transforms.Normalize(MEAN_PIX/255, (1,1,1))])                   # normalize the image

# EVAL image transformation pipeline for visualization (no normalization)
visual_transformer = transforms.Compose([
    transforms.Resize(256),                                         # resize to (393, 256)
    transforms.Lambda(lambda img: img.crop(box=(0, 0, 256, 384))),  # crop to (384, 256)
    transforms.ToTensor()])

In [None]:
class ClimbBinaryDataset(Dataset):
    def __init__(self, data_dir, split, n_dev=3*64):
        self.X = np.load(os.path.join(data_dir, "X_{}.npy".format(split)))
        self.y = np.load(os.path.join(data_dir, "y_{}.npy".format(split)))
        self.X = np.reshape(self.X, (-1, *GRID_DIMS))
        print(self.X.shape, self.y.shape, self.y.size)

    def __len__(self):
        return self.y.size

    def __getitem__(self, idx):
        x = from_numpy(self.X[idx]).long()
        return x, self.y[idx]

In [None]:
data_loaders = {}
n_examples = {}

data_path = "../DATA/binary"

for split in ['train', 'val', 'test']:
    if split == 'train':
        shuffle = True
    else:
        shuffle = False

    dataset = ClimbBinaryDataset(data_path, split, n_dev=batch_size)

    n_examples[split] = len(dataset)
    data_loaders[split] = DataLoader(dataset, batch_size=batch_size, shuffle=shuffle, num_workers=4)

(16025, 18, 11) (16025,) 16025
(2009, 18, 11) (2009,) 2009
(2009, 18, 11) (2009,) 2009


In [None]:

batch_size = 64
# train_dataloader = DataLoader(training_data, batch_size=batch_size, shuffle=True)
# test_dataloader = DataLoader(test_data, batch_size=batch_size, shuffle=True)

train_dataloader = data_loaders['train']
val_dataloader = data_loaders['val']
test_dataloader = data_loaders['test']

GRID_DIMS = (18, 11) # dimensions



class NeuralNetwork(nn.Module):

    def __init__(self, 15, n_channels=8):
        super(NeuralNetwork, self).__init__()

        self.cnn.Sequential(
            nn.Conv2d(1, 8, 3, 1),
            nn.BatchNorm2d(8),
            nn.ReLU(),
            nn.MaxPool2d(2),

            nn.Conv2d(8, 16, 3, 1),
            nn.BatchNorm2d(16),
            nn.ReLU(),
            nn.MaxPool2d(2),

            nn.Conv2d(16, 32, 3, 1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(2),

            nn.Conv2d(32, 64, 3, 1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.AvgPool2d(2),

            nn.Conv2d(64, 64, 3, 1),
            nn.BatchNorm2d(64),
            nn.ReLU())


        # initialization
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
            elif isinstance(m, (nn.BatchNorm2d)):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)

    def forward(self, x):
        # create channel dimension
        x = torch.unsqueeze(x, 1)       # (N, 1, 18, 11)

        # forward pass through the network
        logits = self.network(x)        # (N, n_classes, 1, 1)

        # reshape to get the scores
        logits = torch.squeeze(logits)       # (N, n_classes)

        return logits

model = NeuralNetwork().to(device)
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3, weight_decay=1e-5)

def train(dataloader, model, loss_fn, optimizer):
    model.train()
    loss_list, accuracy_list = [], []
    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)

        # Compute prediction and loss
        pred = model(X)
        loss = loss_fn(pred, y)

        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # Calculate accuracy
        correct = (pred.argmax(1) == y).type(torch.float).sum().item()
        accuracy = correct / X.shape[0]

        loss_list.append(loss.item())
        accuracy_list.append(accuracy)

        if batch % 100 == 0:
            print(f"loss: {loss.item():>7f}, accuracy: {accuracy * 100:>0.1f}%")

    return loss_list, accuracy_list


def test(dataloader, model):
    model.eval()
    test_loss, correct = 0, 0
    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()

    test_loss /= len(dataloader)
    accuracy = correct / len(dataloader.dataset)
    return test_loss, accuracy

# Training loop with plotting
epochs = 10
train_loss, train_accuracy, test_loss, test_accuracy = [], [], [], []

for epoch in range(epochs):
    print(f"Epoch {epoch + 1}\n-------------------------------")
    epoch_train_loss, epoch_train_accuracy = train(train_dataloader, model, loss_fn, optimizer)
    epoch_test_loss, epoch_test_accuracy = test(test_dataloader, model)

    train_loss.append(np.mean(epoch_train_loss))
    train_accuracy.append(np.mean(epoch_train_accuracy))
    test_loss.append(epoch_test_loss)
    test_accuracy.append(epoch_test_accuracy)

# Plotting loss and accuracy
plt.figure(figsize=(12, 5))

# Plot loss
plt.subplot(1, 2, 1)
plt.plot(range(1, epochs + 1), train_loss, label="Training Loss")
plt.plot(range(1, epochs + 1), test_loss, label="Testing Loss")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.legend()
plt.title("Loss over Epochs")

# Plot accuracy
plt.subplot(1, 2, 2)
plt.plot(range(1, epochs + 1), train_accuracy, label="Training Accuracy")
plt.plot(range(1, epochs + 1), test_accuracy, label="Testing Accuracy")
plt.xlabel("Epoch")
plt.ylabel("Accuracy")
plt.legend()
plt.title("Accuracy over Epochs")

plt.tight_layout()
plt.show()

TypeError: NeuralNetwork.__init__() missing 1 required positional argument: 'n_classes'