In [1]:
KERNEL_LAUNCH_TIMEOUT = 100

In [2]:
import pandas as pd
from PIL import Image
import numpy as np
import glob
from sklearn.model_selection import train_test_split
from functools import partial

from torchvision.models import resnet18
from torchvision import transforms
from torch.utils.data import DataLoader, Dataset
from torch.nn import Linear, Conv2d, Module, Flatten, Softmax, CrossEntropyLoss, MaxPool2d, AvgPool2d, Dropout, BatchNorm2d, ReLU
from torch.optim import Adam, SGD
from torch import tensor, from_numpy, argmax, BoolStorage, concat
import torch
import matplotlib.pyplot as plt
from matplotlib.lines import Line2D

from tqdm import tqdm

EPOCHS = 50
BATCH_SIZE = 20

  from .autonotebook import tqdm as notebook_tqdm


In [3]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [4]:
train_dataset = { "images": [], "labels": []}
for i in glob.glob("./Dataset/train/*/*"):
    img = Image.open(i)
    train_dataset["images"].append(img)
    if "2" in i:
        train_dataset["labels"].append(tensor([1]))
    else:
        train_dataset["labels"].append(tensor([0]))
data = pd.DataFrame(data = train_dataset)
train_data, val_data = train_test_split(data, test_size = 0.2, train_size=0.8, random_state=225, shuffle=True, stratify=data["labels"])
train_data.reset_index(drop=True, inplace=True)
val_data.reset_index(drop=True, inplace=True)

In [5]:
class ImageDataset(Dataset):
    def __init__(self, df):
        self.df = df

    def __len__(self):
        return len(self.df)

    def __getitem__(self, index):
        return self.df.iloc[index]
train_dataset = ImageDataset(train_data)
val_dataset = ImageDataset(val_data)

In [6]:
def collate(preprocess, batch):
    images = None
    labels = None
    for data in batch:
        if images == None:
            images = tensor(preprocess(data["images"]).resize(1,1,224,224).cuda())
            labels = tensor(data["labels"].cuda())
        else:
            images = torch.cat((images, preprocess(data["images"]).resize(1,1,224,224).cuda()), 0)
            labels = torch.cat((labels, data["labels"].cuda()), 0)
    return images, labels

In [7]:
preprocess = transforms.Compose([
            transforms.Resize(256),
            transforms.CenterCrop(224),
            transforms.ToTensor(),
            transforms.Normalize((0.5), (0.5)),
            # transforms.Grayscale()
        ])
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, collate_fn=partial(collate, preprocess), shuffle=True, drop_last=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, collate_fn=partial(collate, preprocess), shuffle=True, drop_last=True)

In [8]:
class ResNetBlock(Module):
    def __init__(self, in_channel_size, hidden_channel_size):
        super(ResNetBlock, self).__init__()
        self.conv1 = Conv2d(in_channel_size, hidden_channel_size, 1, stride=1)
        self.conv2 = Conv2d(hidden_channel_size, hidden_channel_size, 1, stride=2)
        self.batchnorm = BatchNorm2d(hidden_channel_size)
        self.batchnorm2 = BatchNorm2d(hidden_channel_size)
        self.downsample = Conv2d(in_channel_size, hidden_channel_size, 1, stride=2)
        self.relu = ReLU(inplace=True)

    def forward(self, input):
        x1 = self.relu(self.batchnorm(self.conv1(input)))
        x2 = self.batchnorm2(self.conv2(x1))
        x = x2 + self.downsample(input)
        return self.relu(x)

In [9]:
class GestureNN(Module):
    def __init__(self):
        super(GestureNN, self).__init__()
        self.batch_norm = BatchNorm2d(64)
        self.batch_norm2 = BatchNorm2d(8)
        self.batch_norm3 = BatchNorm2d(10)
        self.dropout = Dropout(0.4)
        self.dropout2 = Dropout(0.4)
        self.res_block = ResNetBlock(64, 64)
        self.res_block2 = ResNetBlock(64, 128)
        self.res_block3 = ResNetBlock(128, 256)
        self.res_block4 = ResNetBlock(256, 512)
        self.relu = ReLU(inplace=True)
        self.avg_pool = MaxPool2d(8,8)
        self.pool = MaxPool2d(kernel_size=3, stride=2, padding=1)
        self.pool2 = MaxPool2d(2,2)
        self.pool3 = MaxPool2d(2,2)
        self.conv = Conv2d(1, 64, 2)
        self.conv2 = Conv2d(6, 8, 4)
        self.conv3 = Conv2d(8, 10, 4)
        self.conv4 = Conv2d(10, 20, 2)
        self.conv_pad1 = Conv2d(10, 10, 2, padding=5)
        self.flatten = Flatten()
        self.linear3 = Linear(256, 2)

    def forward(self, input):
        x = self.conv(input)
        x = self.relu(self.batch_norm(x))
        x = self.pool(x)
        x = self.res_block(x)
        x = self.res_block2(x)
        x = self.res_block3(x)
        #x = self.res_block4(x)
        x = self.avg_pool(x)
        x = self.flatten(x)
        x = self.linear3(x)
        return x

# model = resnet18()
# model.conv1 = Conv2d(1, 64, 7, 2)
# num_in_features = model.fc.in_features
# model.fc = Linear(num_in_features, 2)
model = GestureNN()
criterion = CrossEntropyLoss()
if torch.cuda.is_available():
    model = model.cuda()
    criterion = criterion.cuda()
optimizer = Adam(params=model.parameters(), lr=0.07)
#optimizer = SGD(params=model.parameters(), lr=0.07, momentum=0.9)

In [10]:
def test(model, glob_acc):
    val_loss = []
    acc = 0
    for images, labels in val_loader:
        images.cuda()
        labels.cuda()
        output = model(images)
        loss = criterion(output, labels)
        acc += sum((argmax(output, 1) == labels).int())
        val_loss.append(loss.item())
    print(f"Val loss: {sum(val_loss)/len(val_loader.dataset)}")
    print(f"Val accuracy: {acc/len(val_loader.dataset)}")
    glob_acc.append(acc.cpu()/len(val_loader.dataset))
        

In [11]:

total_acc = []
test_acc = []
for i in range(EPOCHS):
    print(f"Epoch: {i}")
    train_loss = 0
    for images, labels in train_loader:
        acc = 0
        optimizer.zero_grad(set_to_none=False)
        images.cuda()
        labels.cuda()
        output = model(images)
        loss = criterion(output, labels)
        loss.backward()
        optimizer.step()
        
        acc += sum((argmax(output, 1) == labels).int())
        train_loss += loss.item()

        print(f"Train loss: {train_loss/BATCH_SIZE}")
        print(f"Train accuracy: {acc/BATCH_SIZE}")
    total_acc.append(acc.cpu()/BATCH_SIZE)
    test(model, test_acc)

Epoch: 0


TypeError: Input image tensor permitted channel values are [3], but found 1

In [None]:
def plot_grad_flow(named_parameters):
    '''Plots the gradients flowing through different layers in the net during training.
    Can be used for checking for possible gradient vanishing / exploding problems.
    
    Usage: Plug this function in Trainer class after loss.backwards() as 
    "plot_grad_flow(self.model.named_parameters())" to visualize the gradient flow'''
    ave_grads = []
    max_grads= []
    layers = []
    for n, p in named_parameters:
        if(p.requires_grad) and ("bias" not in n):
            if p.grad == None:
                layers.append(n)
                ave_grads.append(0)
                max_grads.append(0)
            else:
                layers.append(n)
                ave_grads.append(p.grad.abs().mean())
                max_grads.append(p.grad.abs().max())
    plt.bar(np.arange(len(max_grads)), max_grads, alpha=0.1, lw=1, color="c")
    plt.bar(np.arange(len(max_grads)), ave_grads, alpha=0.1, lw=1, color="b")
    plt.hlines(0, 0, len(ave_grads)+1, lw=2, color="k" )
    plt.xticks(range(0,len(ave_grads), 1), layers, rotation="vertical")
    plt.xlim(left=0, right=len(ave_grads))
    plt.ylim(bottom = -0.001, top=0.02) # zoom in on the lower gradient regions
    plt.xlabel("Layers")
    plt.ylabel("average gradient")
    plt.title("Gradient flow")
    plt.grid(True)
    plt.legend([Line2D([0], [0], color="c", lw=4),
                Line2D([0], [0], color="b", lw=4),
                Line2D([0], [0], color="k", lw=4)], ['max-gradient', 'mean-gradient', 'zero-gradient'])

In [None]:
model = model.cpu()
plot_grad_flow(model.named_parameters())

In [None]:
plt.figure(figsize=(10,5))
plt.title("Training and Validation Loss")
plt.plot(total_acc,label="train")
plt.plot(test_acc,label="val")
plt.xlabel("iterations")
plt.ylabel("accuracy")
plt.legend()
plt.show()