<a href="https://colab.research.google.com/github/mehrshad-sdtn/DeepLearning/blob/master/PyTorch/3_Pytorch_ResNet.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# prompt: import all the necessary packages for common pytorch programs

import torch
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
import torchvision.datasets as datasets
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import models
import os


In [2]:
class Block(nn.Module):
  def __init__(self, in_channels, out_channels, identity_downsample=None, stride=1):
    super(Block, self).__init__()
    self.expansion = 4
    self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=1, padding=0)
    self.bn1 = nn.BatchNorm2d(out_channels)
    self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=stride, padding=1)
    self.bn2 = nn.BatchNorm2d(out_channels)
    self.conv3 = nn.Conv2d(out_channels, out_channels * self.expansion, kernel_size=1, stride=1, padding=0)
    self.bn3 = nn.BatchNorm2d(out_channels * self.expansion)
    self.relu = nn.ReLU()
    self.identity_downsample = identity_downsample

  def forward(self, x):
    identity = x
    x = self.conv1(x)
    x = self.bn1(x)
    x = self.relu(x)
    x = self.conv2(x)
    x = self.bn2(x)
    x = self.relu(x)
    x = self.conv3(x)
    x = self.bn3(x)
    if self.identity_downsample is not None:
      identity = self.identity_downsample(identity)
    x += identity
    x = self.relu(x)
    return x


class ResNet(nn.Module):
  def __init__(self, Block, layers, image_channels, num_classes):
    super(ResNet, self).__init__()
    self.in_channels = 64
    self.conv1 = nn.Conv2d(image_channels, 64, kernel_size=7, stride=2, padding=3)
    self.bn1 = nn.BatchNorm2d(64)
    self.relu = nn.ReLU()
    self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

    # ResNet layers
    self.layer1 = self._make_layer(Block, layers[0], out_channels=64, stride=1)
    self.layer2 = self._make_layer(Block, layers[1], out_channels=128, stride=2)
    self.layer3 = self._make_layer(Block, layers[2], out_channels=256, stride=2)
    self.layer4 = self._make_layer(Block, layers[3], out_channels=512, stride=2)

    self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
    self.fc = nn.Linear(512 * 4, num_classes)

  def forward(self, x):
    x = self.conv1(x)
    x = self.bn1(x)
    x = self.relu(x)
    x = self.maxpool(x)
    x = self.layer1(x)
    x = self.layer2(x)
    x = self.layer3(x)
    x = self.layer4(x)

    x = self.avgpool(x)
    x = x.reshape(x.shape[0], -1)
    x = self.fc(x)
    return x


  def _make_layer(self, Block, num_residual_blocks, out_channels, stride):
    identity_downsample = None
    layers = []

    if stride != 1 or self.in_channels != out_channels * 4:
      identity_downsample = nn.Sequential(
          nn.Conv2d(self.in_channels, out_channels * 4, kernel_size=1, stride=stride),
          nn.BatchNorm2d(out_channels * 4)
      )

    layers.append(Block(self.in_channels, out_channels, identity_downsample, stride))
    self.in_channels = out_channels * 4

    for i in range(num_residual_blocks - 1):
      layers.append(Block(self.in_channels, out_channels))

    return nn.Sequential(*layers)

In [3]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

cuda


In [4]:
def save_checkpoint(state, filename="my_checkpoint.pth.tar"):
  print("=> Saving checkpoint")
  torch.save(state, filename)


def load_checkpoint(checkpoint, model):
  print("=> Loading checkpoint")
  model.load_state_dict(checkpoint['state_dict'])
  optimizer.load_state_dict(checkpoint['state_dict'])



In [5]:
def ResNet50(image_channels=3, num_classes=1000):
  return ResNet(Block, [3, 4, 6, 3], image_channels, num_classes)

def ResNet101(image_channels=3, num_classes=1000):
  return ResNet(Block, [3, 4, 23, 3], image_channels, num_classes)

def ResNet152(image_channels=3, num_classes=1000):
  return ResNet(Block, [3, 4, 36, 3], image_channels, num_classes)

In [6]:
def test():
  net = ResNet50()
  x = torch.randn(64, 3, 224, 224)
  y = net(x).to(device)
  print(y.shape)

In [7]:
test()

torch.Size([64, 1000])


In [11]:
transform_train = transforms.Compose([
    transforms.Resize(224),
    #transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

transform_val = transforms.Compose([
    transforms.Resize(224),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

# Load the CIFAR10 training and validation datasets
trainset = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transform_train)
train_loader = DataLoader(trainset, batch_size=32, shuffle=True, num_workers=2)

valset = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transform_val)
val_loader = DataLoader(valset, batch_size=32, shuffle=False, num_workers=2)

Files already downloaded and verified
Files already downloaded and verified


In [28]:
model = ResNet50()

model = model.to(device)

# Define a loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

num_epochs = 3

In [29]:
def check_accuracy_and_loss(data_loader, model, criterion, device=device):
    num_correct = 0
    num_samples = 0
    total_loss = 0.0
    model.eval()
    with torch.no_grad():
        for x, y in data_loader:
            x = x.to(device=device)
            y = y.to(device=device)
            scores = model(x)
            _, predictions = scores.max(1)
            total_loss += criterion(scores, y).item()
            num_correct += (predictions == y).sum().item()
            num_samples += predictions.size(0)
    avg_loss = total_loss / num_samples
    accuracy = (num_correct / num_samples) * 100
    return accuracy, avg_loss


for epoch in range(num_epochs):
    print(f'Epoch {epoch + 1}/{num_epochs}')
    print('-' * 10)

    model.train()
    train_loss = 0.0
    num_batches = len(train_loader)

    for batch_idx, (data, targets) in enumerate(train_loader):
        if batch_idx % 50 == 0:
          print('=', end='')
        # 1) data and targets -> device
        data = data.to(device)
        targets = targets.to(device)

        # 2) optimizer init
        optimizer.zero_grad()

        # 3) forward prop
        outputs = model(data)
        loss = criterion(outputs, targets)

        # 4) backward prop
        loss.backward()
        optimizer.step()

        train_loss += loss.item()

    # Calculate average training loss
    avg_train_loss = train_loss / num_batches

    # Calculate accuracy and loss for training and validation data
    train_acc, train_avg_loss = check_accuracy_and_loss(train_loader, model, criterion, device=device)
    validation_acc, validation_avg_loss = check_accuracy_and_loss(val_loader, model, criterion, device=device)
    print('\n')
    print(f'Train Accuracy: {train_acc:.2f}% - Validation Accuracy: {validation_acc:.2f}%')
    print(f'Train Loss: {avg_train_loss:.4f} - Validation Loss: {validation_avg_loss:.4f}')





Epoch 1/3
----------

Train Accuracy: 51.08% - Validation Accuracy: 51.09%
Train Loss: 1.7405 - Validation Loss: 0.0425
Epoch 2/3
----------

Train Accuracy: 63.59% - Validation Accuracy: 62.57%
Train Loss: 1.2016 - Validation Loss: 0.0333
Epoch 3/3
----------

Train Accuracy: 71.23% - Validation Accuracy: 69.60%
Train Loss: 0.9641 - Validation Loss: 0.0272
