<a href="https://colab.research.google.com/github/rajlm10/D2L-Torch/blob/main/D2L_Conv_Basics.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [106]:
import torch
from torch import nn
from torch.utils import data
import torchvision
from torchvision import transforms

# Basic Operations in a CNN 

In [107]:
def corr2d(X,K):
  h,w=K.shape
  Y=torch.zeros(X.shape[0]-h+1,X.shape[1]-w+1)

  for i in range(Y.shape[0]):
    for j in range(Y.shape[1]):
      Y[i,j]=(X[i:i+h,j:j+w]*K).sum()

  return Y

In [108]:
X = torch.tensor([[0.0, 1.0, 2.0], [3.0, 4.0, 5.0], [6.0, 7.0, 8.0]]) 
K = torch.tensor([[0.0, 1.0], [2.0, 3.0]])
print(X.shape,K.shape)
corr2d(X, K)

torch.Size([3, 3]) torch.Size([2, 2])


tensor([[19., 25.],
        [37., 43.]])

In [109]:
class Conv2D(nn.Module):
  def __init__(self,kernel_size):
    super().__init__()
    self.weight=nn.Parameter(torch.rand(kernel_size))
    self.bias=nn.Parameter(torch.zeros(1))

  def forward(self,X):
    return corr2d(X,self.weight)+self.bias

In [110]:
X = torch.ones((6, 8)) 
X[:, 2:6] = 0
X

tensor([[1., 1., 0., 0., 0., 0., 1., 1.],
        [1., 1., 0., 0., 0., 0., 1., 1.],
        [1., 1., 0., 0., 0., 0., 1., 1.],
        [1., 1., 0., 0., 0., 0., 1., 1.],
        [1., 1., 0., 0., 0., 0., 1., 1.],
        [1., 1., 0., 0., 0., 0., 1., 1.]])

In [111]:
K = torch.tensor([[1.0, -1.0]])

In [112]:
Y = corr2d(X, K)
Y

tensor([[ 0.,  1.,  0.,  0.,  0., -1.,  0.],
        [ 0.,  1.,  0.,  0.,  0., -1.,  0.],
        [ 0.,  1.,  0.,  0.,  0., -1.,  0.],
        [ 0.,  1.,  0.,  0.,  0., -1.,  0.],
        [ 0.,  1.,  0.,  0.,  0., -1.,  0.],
        [ 0.,  1.,  0.,  0.,  0., -1.,  0.]])

In [113]:
#Learning a kernel through gradient descent

# Construct a two-dimensional convolutional layer with 1 input channel,1 output channel and a kernel of shape (1, 2). 
#For the sake of simplicity, we ignore the bias here 
conv2d = nn.Conv2d(1,1, kernel_size=(1, 2), bias=False)
# The two-dimensional convolutional layer uses four-dimensional input and
# output in the format of (num_example, channel, height, width), where the batch
# size (number of examples in the batch) and the number of channels are both 1 
X = X.reshape((1, 1, 6, 8))
Y = Y.reshape((1, 1, 6, 7))
lr = 3e-2 # Learning rate


for i in range(10):
  Y_hat=conv2d(X)
  l=(Y_hat-Y)**2
  conv2d.zero_grad()
  l.sum().backward()

  # Update the kernel
  conv2d.weight.data[:] -= lr * conv2d.weight.grad 
  print(f'epoch {i + 1}, loss {l.sum():.3f}')

epoch 1, loss 27.388
epoch 2, loss 13.092
epoch 3, loss 6.561
epoch 4, loss 3.455
epoch 5, loss 1.906
epoch 6, loss 1.095
epoch 7, loss 0.650
epoch 8, loss 0.395
epoch 9, loss 0.244
epoch 10, loss 0.153


In [114]:
def corr2d_multi_in(X, K):
  # First, iterate through the 0th dimension (channel dimension) of `X` and # `K`. Then, add them together
  #Zip iterates through first channel
  return sum(corr2d(x, k) for x, k in zip(X, K))

In [115]:
X = torch.tensor([[[0.0, 1.0, 2.0], [3.0, 4.0, 5.0], [6.0, 7.0, 8.0]], [[1.0, 2.0, 3.0], [4.0, 5.0, 6.0], [7.0, 8.0, 9.0]]])
K = torch.tensor([[[0.0, 1.0], [2.0, 3.0]], [[1.0, 2.0], [3.0, 4.0]]])
print(X.shape,K.shape) #Note X and K shapes
corr2d_multi_in(X, K)

torch.Size([2, 3, 3]) torch.Size([2, 2, 2])


tensor([[ 56.,  72.],
        [104., 120.]])

In [116]:
def corr2d_multi_in_out(X, K):
  # Iterate through the 0th dimension of `K`, and each time, perform
  # cross-correlation operations with input `X`. All of the results are stacked together
  return torch.stack([corr2d_multi_in(X, k) for k in K], 0)

In [117]:
K = torch.stack((K, K + 1, K + 2), 0) 
K.shape

torch.Size([3, 2, 2, 2])

In [118]:
corr2d_multi_in_out(X, K)

tensor([[[ 56.,  72.],
         [104., 120.]],

        [[ 76., 100.],
         [148., 172.]],

        [[ 96., 128.],
         [192., 224.]]])

In [119]:
#1X1 convolution
X = torch.normal(0, 1, (3, 3, 3)) # C X H X W
K = torch.normal(0, 1, (2, 3, 1, 1)) # N X C X Hk X Wk

corr2d_multi_in_out(X, K),corr2d_multi_in_out(X, K).shape

#Reduced depth from 3 to 2 spatial dimensions unchanged

(tensor([[[-4.7996, -2.9807, -1.5040],
          [-0.3910,  0.8925,  0.9623],
          [-2.1591, -1.5493,  1.4726]],
 
         [[ 2.6735,  0.6189, -0.7479],
          [-0.0580, -0.2176, -1.2712],
          [ 0.6884,  0.3185, -0.5092]]]), torch.Size([2, 3, 3]))

In [120]:
#Pooling

def pool2d(X, pool_size, mode='max'):
  h, w = pool_size
  Y = torch.zeros((X.shape[0] - h + 1, X.shape[1] - w + 1)) 
  for i in range(Y.shape[0]):
    for j in range(Y.shape[1]): 
      if mode == 'max':
        Y[i, j] = X[i: i + h, j: j + w].max() 
      elif mode == 'avg':
        Y[i,j]=X[i:i+h,j:j+w].mean()

  return Y

In [121]:
X = torch.tensor([[0.0, 1.0, 2.0], [3.0, 4.0, 5.0], [6.0, 7.0, 8.0]]) 
print(X.shape)
pool2d(X, (2, 2))

torch.Size([3, 3])


tensor([[4., 5.],
        [7., 8.]])

When processing multi-channel input data, the pooling layer pools each input channel separately, rather than summing the inputs up over channels as in a convolutional layer. This means that the number of output channels for the pooling layer is the same as the number of input channels.

# LeNet 

In [122]:
import torch
net = nn.Sequential(
  nn.Conv2d(1, 6, kernel_size=5, padding=2), #in channels, out channels
  nn.Sigmoid(), 
  nn.AvgPool2d(kernel_size=2, stride=2),
  nn.Conv2d(6, 16, kernel_size=5), 
  nn.Sigmoid(), 
  nn.AvgPool2d(kernel_size=2, stride=2),
  nn.Flatten(),
  nn.Linear(16 * 5 * 5, 120), 
  nn.Sigmoid(),
  nn.Linear(120, 84), 
  nn.Sigmoid(),
  nn.Linear(84, 10))

In [123]:
X = torch.rand(size=(1, 1, 28, 28), dtype=torch.float32) 
for layer in net: #Iterate through nets layers
  X = layer(X)
  print(layer.__class__.__name__,'output shape: \t',X.shape)

Conv2d output shape: 	 torch.Size([1, 6, 28, 28])
Sigmoid output shape: 	 torch.Size([1, 6, 28, 28])
AvgPool2d output shape: 	 torch.Size([1, 6, 14, 14])
Conv2d output shape: 	 torch.Size([1, 16, 10, 10])
Sigmoid output shape: 	 torch.Size([1, 16, 10, 10])
AvgPool2d output shape: 	 torch.Size([1, 16, 5, 5])
Flatten output shape: 	 torch.Size([1, 400])
Linear output shape: 	 torch.Size([1, 120])
Sigmoid output shape: 	 torch.Size([1, 120])
Linear output shape: 	 torch.Size([1, 84])
Sigmoid output shape: 	 torch.Size([1, 84])
Linear output shape: 	 torch.Size([1, 10])


In [124]:
import multiprocessing
def get_workers():
  return multiprocessing.cpu_count()

In [125]:
class Accumulator: 
  """For accumulating sums over `n` variables.""" 
  def __init__(self, n):
    self.data = [0.0] * n 
    
  def add(self, *args):
    self.data = [a + float(b) for a, b in zip(self.data, args)] 
    
  def reset(self):
    self.data = [0.0] * len(self.data)
  
  def __getitem__(self, idx): 
    return self.data[idx]

In [126]:
batch_size = 256

def load_fashion_mnist(batch_size,resize=None):
  trans=[transforms.ToTensor()]
  if resize:
    trans.insert(0,transforms.Resize(resize))
  trans=transforms.Compose(trans)

  mnist_train=torchvision.datasets.FashionMNIST(root="../data", train=True, transform=trans, download=True)
  mnist_test=torchvision.datasets.FashionMNIST(root="../data", train=False, transform=trans, download=True)

  return data.DataLoader(mnist_train,batch_size,shuffle=True,num_workers=get_workers()),data.DataLoader(mnist_test,batch_size,shuffle=False,num_workers=get_workers())

train_iter, test_iter = load_fashion_mnist(batch_size=batch_size)

In [127]:
def accuracy(y_hat,y):
  """Compute the number of correct predictions.""" 
  if y_hat.shape[0]>1 and y_hat.shape[1]>1:
    y_hat=y_hat.argmax(axis=1)
  cmp=y_hat.type(y.dtype)==y
  return float(cmp.type(y.dtype).sum())

In [128]:
def evaluate_accuracy_gpu(net,test_iter,device=None):
  """Compute the accuracy for a model on a dataset using a GPU."""
  if isinstance (net,nn.Module):
    net.eval()
    if not device:
      device=next(iter(net.parameters())).device

    # No. of correct predictions, no. of predictions 
    metric =Accumulator(2)

    with torch.no_grad():
      for X,y in test_iter:
        if isinstance(X, list):
          # Required for BERT Fine-tuning (to be covered later) 
          X = [x.to(device) for x in X]

        else:
          X=X.to(device)
        
        y=y.to(device)
        metric.add(accuracy(net(X), y), y.numel())
      
    return metric[0] / metric[1]

  

In [129]:
def train(net,train_iter,test_iter,loss,optimizer,num_epochs,learning_rate,device):
  def init_weights(layer):
    if isinstance(layer,nn.Linear) or isinstance(layer,nn.Conv2d):
      nn.init.xavier_uniform_(layer.weight)
  net.apply(init_weights)
  print('training on', device)
  net.to(device)

  for epoch in range(num_epochs):
    metric=Accumulator(3) # Sum of training loss, sum of training accuracy, no. of examples
    net.train()

    for X,y in train_iter:
      X,y=X.to(device), y.to(device)
      optimizer.zero_grad()
      y_hat=net(X)
      l=loss(y_hat,y)
      l.backward()
      optimizer.step()

      with torch.no_grad():
        metric.add(l * X.shape[0], accuracy(y_hat, y), X.shape[0])

      train_l = metric[0] / metric[2]
      train_acc = metric[1] / metric[2]

    test_acc = evaluate_accuracy_gpu(net, test_iter)

    print(f'epcoch:{epoch+1} 'f'loss {train_l:.3f}, train acc {train_acc:.3f},'f'test acc {test_acc:.3f}')
  



In [130]:
def try_gpu(i=0):
  """Return gpu(i) if exists, otherwise return cpu().""" 
  if torch.cuda.device_count() >= i + 1:
    return torch.device(f'cuda:{i}') 
  return torch.device('cpu')

In [None]:
lr, num_epochs = 0.9, 10
loss=nn.CrossEntropyLoss()
optimizer=torch.optim.SGD(net.parameters(),lr)
_=train(net, train_iter, test_iter,loss,optimizer, num_epochs, lr, try_gpu())

training on cuda:0
