# Convolutional neural network for image classification

In [19]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader

import numpy as np
import pickle
from sklearn.model_selection import train_test_split

## Load data

In [20]:
main_path = "../../datasets/shapes/tensors.pt"
tensor_file = open(main_path, "rb+")
all_tensors = pickle.load(tensor_file)

In [21]:
print(all_tensors['square'].shape)

torch.Size([1200, 1, 200, 200])


In [22]:
# label data by the shapes
X_ci, Y_ci = all_tensors['circle'], np.zeros(len(all_tensors['circle']))
X_sq, Y_sq = all_tensors['square'], np.ones(len(all_tensors['square']))
X_st, Y_st = all_tensors['star'], np.full((len(all_tensors['star']),), 2)
X_tr, Y_tr = all_tensors['triangle'], np.full((len(all_tensors['triangle']),), 3)

# split train and test data across all shapes for proportionate class distribution
X_ci_train, X_ci_test, Y_ci_train, Y_ci_test = train_test_split(X_ci, Y_ci, test_size=0.25)
X_sq_train, X_sq_test, Y_sq_train, Y_sq_test = train_test_split(X_sq, Y_sq, test_size=0.25)
X_st_train, X_st_test, Y_st_train, Y_st_test = train_test_split(X_st, Y_st, test_size=0.25)
X_tr_train, X_tr_test, Y_tr_train, Y_tr_test = train_test_split(X_tr, Y_tr, test_size=0.25)

X_train = torch.cat((X_ci_train, X_sq_train, X_st_train, X_tr_train), dim=0)
Y_train = np.concatenate((Y_ci_train, Y_sq_train, Y_st_train, Y_tr_train))
X_test = torch.cat((X_ci_test, X_sq_test, X_st_test, X_tr_test), dim=0)
Y_test = np.concatenate((Y_ci_test, Y_sq_test, Y_st_test, Y_tr_test))

# converting to tensors
Y_train = torch.from_numpy(Y_train).type("torch.FloatTensor")
Y_test = torch.from_numpy(Y_test).type("torch.FloatTensor")

In [23]:
print(X_train.shape, Y_train.shape, X_train.dtype)

torch.Size([3600, 1, 200, 200]) torch.Size([3600]) torch.float32


In [24]:
# custom dataset class to make use of Dataset and DataLoader modules
class CustomDataset(Dataset):

  def __init__(self, features: torch.Tensor, labels: torch.Tensor):
    super(CustomDataset, self).__init__()
    self._features = features
    self._labels = labels

  def __len__(self):
    return self._labels.shape[0]
  
  def __getitem__(self, idx):
    return self._features[idx], self._labels[idx]

In [25]:
train_data = CustomDataset(X_train, Y_train)
test_data = CustomDataset(X_test, Y_test)

In [26]:
# data loaders

batch_size = 50
train_dataloader = DataLoader(train_data, shuffle=True, batch_size=batch_size)
test_dataloader = DataLoader(test_data, batch_size=batch_size)
for X, y in train_dataloader:
  print(f"Shape of X [N, C, H, W]: {X.shape} {X.dtype}")
  print(f"Shape of y: {y.shape} {y.dtype}")
  break

Shape of X [N, C, H, W]: torch.Size([50, 1, 200, 200]) torch.float32
Shape of y: torch.Size([50]) torch.float32


## Model

In [27]:
# We want to be able to train our model on a hardware accelerator like the GPU or MPS, if available. Let’s check to see if torch.cuda or torch.backends.mps are available, otherwise we use the CPU.

device = (
  "cuda"
  if torch.cuda.is_available()
  else "mps"
  if torch.backends.mps.is_available()
  else "cpu"
)
print(f"Using {device} device")

Using cpu device


In [28]:
# Convolutional neural network as a shape classification model
class ShapeClassifier(nn.Module):

	# constructor
	def __init__(self, numChannels, classes):
		super(ShapeClassifier, self).__init__()
		# initialize first set of CONV => RELU => POOL layers
		self.conv1 = nn.Conv2d(in_channels=numChannels, out_channels=4, kernel_size=(6, 6), stride=(2, 2))
		self.relu1 = nn.ReLU()
		self.maxpool1 = nn.MaxPool2d(kernel_size=(4, 4), stride=(2, 2))
		# initialize second set of CONV => RELU => POOL layers
		self.conv2 = nn.Conv2d(in_channels=4, out_channels=8, kernel_size=(6, 6), stride=(2, 2))
		self.relu2 = nn.ReLU()
		self.maxpool2 = nn.MaxPool2d(kernel_size=(4, 4), stride=(2, 2))
		# initialize first (and only) set of FC => RELU layers
		self.fc1 = nn.Linear(in_features=800, out_features=400)
		self.relu3 = nn.ReLU()
		# initialize our softmax classifier
		self.fc2 = nn.Linear(in_features=400, out_features=classes)
		self.logSoftmax = nn.LogSoftmax(dim=1)

	# connect layers and generate output
	def forward(self, x):
		# pass the input through our first set of convolution -> activation -> pooling layers
		x = self.conv1(x)
		x = self.relu1(x)
		x = self.maxpool1(x)
		# pass the output from the previous layer through the second set of convolution -> activation -> pooling layers
		x = self.conv2(x)
		x = self.relu2(x)
		x = self.maxpool2(x)
		# flatten the output from the previous layer and pass it through our only set of fully connected -> activation layers
		x = torch.flatten(x, 1)
		x = self.fc1(x)
		x = self.relu3(x)
		# pass the output to our softmax classifier to get our output predictions
		x = self.fc2(x)
		output = self.logSoftmax(x)
		return output

In [29]:
# model

model = ShapeClassifier(numChannels=1, classes=4).to(device)
model

ShapeClassifier(
  (conv1): Conv2d(1, 4, kernel_size=(6, 6), stride=(2, 2))
  (relu1): ReLU()
  (maxpool1): MaxPool2d(kernel_size=(4, 4), stride=(2, 2), padding=0, dilation=1, ceil_mode=False)
  (conv2): Conv2d(4, 8, kernel_size=(6, 6), stride=(2, 2))
  (relu2): ReLU()
  (maxpool2): MaxPool2d(kernel_size=(4, 4), stride=(2, 2), padding=0, dilation=1, ceil_mode=False)
  (fc1): Linear(in_features=800, out_features=400, bias=True)
  (relu3): ReLU()
  (fc2): Linear(in_features=400, out_features=4, bias=True)
  (logSoftmax): LogSoftmax(dim=1)
)

## Model evaluation

In [30]:
# train model
def train(model, loss_fn, optimizer, train_dataloader):

  model.train()
  size = len(train_dataloader.dataset)
  for batch, (X, Y) in enumerate(train_dataloader):
    X, Y = X.to(device), Y.to(device)
    pred = model(X)
    loss = loss_fn(pred, Y.long())

    optimizer.zero_grad() # zero out the gradients
    loss.backward() # Backpropagation
    optimizer.step() # update weights
    
    if batch % 10 == 0:
      loss, current = loss.item(), (batch + 1) * len(X)
      print(f"loss: {loss:>.5f}  [{current:>4d}/{size:>4d}]")


# test model
def test(model, loss_fn, test_dataloader):

  model.eval()
  size = len(test_dataloader.dataset)
  num_batches = len(test_dataloader)
  test_loss, correct = 0, 0

  with torch.no_grad(): # turn off gradient tracking and computation
    for X, Y in test_dataloader:
      X, Y = X.to(device), Y.to(device)    
      pred = model(X)
      test_loss += loss_fn(pred, Y.long()).item()
      correct += (pred.argmax(dim=1) == Y).type(torch.float).sum().item()

  test_loss /= num_batches
  correct /= size
  print(f"Test Error: \n Accuracy: {(100*correct):>0.3f}%, Avg loss: {test_loss:>7f}\n")

In [31]:
# initialize optimizer and loss function
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
loss_fn = nn.NLLLoss()

epochs = 5
for e in range(epochs):
  print(f"Epoch {e+1}\n-------------------------------")
  train(model, loss_fn, optimizer, train_dataloader)
  test(model, loss_fn, test_dataloader)
print("Done!")

Epoch 1
-------------------------------
loss: 12.93164  [  50/3600]
loss: 1.33714  [ 550/3600]
loss: 0.30539  [1050/3600]
loss: 0.06219  [1550/3600]
loss: 0.03156  [2050/3600]
loss: 0.03578  [2550/3600]
loss: 0.01651  [3050/3600]
loss: 0.08980  [3550/3600]
Test Error: 
 Accuracy: 99.333%, Avg loss: 0.026253

Epoch 2
-------------------------------
loss: 0.02189  [  50/3600]
loss: 0.00784  [ 550/3600]
loss: 0.01912  [1050/3600]
loss: 0.01318  [1550/3600]
loss: 0.01179  [2050/3600]
loss: 0.00667  [2550/3600]
loss: 0.00473  [3050/3600]
loss: 0.02122  [3550/3600]
Test Error: 
 Accuracy: 99.500%, Avg loss: 0.017533

Epoch 3
-------------------------------
loss: 0.00552  [  50/3600]
loss: 0.00324  [ 550/3600]
loss: 0.00671  [1050/3600]
loss: 0.00320  [1550/3600]
loss: 0.02184  [2050/3600]
loss: 0.01116  [2550/3600]
loss: 0.00525  [3050/3600]
loss: 0.00697  [3550/3600]
Test Error: 
 Accuracy: 99.750%, Avg loss: 0.010301

Epoch 4
-------------------------------
loss: 0.00362  [  50/3600]
loss: