In [None]:
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
import torchvision.models as models
import torchvision.transforms as transforms

In [None]:
PATH = './cifar_net.pth'

mean = (0.485, 0.456, 0.406)
std = (0.229, 0.224, 0.225)

train_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean, std),
    transforms.RandomHorizontalFlip(0.5),
    transforms.RandomCrop(32, 4)])

test_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean, std)
])

batch_size = 64

trainset = torchvision.datasets.CIFAR10(root='./data', train=True,
                                        download=True, transform=train_transform)

train, val = torch.utils.data.random_split(trainset, [48000, 2000])
trainloader = torch.utils.data.DataLoader(train, batch_size=batch_size,
                                          shuffle=True, num_workers=2)
valloader = torch.utils.data.DataLoader(val, batch_size=batch_size,
                                          shuffle=True, num_workers=2)

testset = torchvision.datasets.CIFAR10(root='./data', train=False,
                                       download=True, transform=test_transform)
testloader = torch.utils.data.DataLoader(testset, batch_size=batch_size,
                                         shuffle=False, num_workers=2)

classes = ('plane', 'car', 'bird', 'cat',
           'deer', 'dog', 'frog', 'horse', 'ship', 'truck')

In [None]:
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print(device)

In [None]:
class Bottleneck(nn.Module):
  def __init__(self, in_channels, mid_channels, out_channels, conv2stride, downsample):
    super().__init__()
    self.conv1 = nn.Conv2d(in_channels, mid_channels, 1, stride=1, bias=False)
    self.bn1 = nn.BatchNorm2d(mid_channels)
    self.conv2 = nn.Conv2d(mid_channels, mid_channels, 3, stride=conv2stride, padding=1, bias=False)
    self.bn2 = nn.BatchNorm2d(mid_channels)
    self.conv3 = nn.Conv2d(mid_channels, out_channels, 1, stride=1, bias=False)
    self.bn3 = nn.BatchNorm2d(out_channels)
    self.relu = nn.ReLU()
    if downsample:
      self.downsample = nn.Sequential(
          nn.Conv2d(in_channels, out_channels, 1, stride=conv2stride, bias=False),
          nn.BatchNorm2d(out_channels))
    else: 
      self.downsample = None
  
  def forward(self, x):
    residual = x
    x = self.relu(self.bn1(self.conv1(x)))
    x = self.relu(self.bn2(self.conv2(x)))
    x = self.bn3(self.conv3(x))
    if self.downsample:
      residual = self.downsample(residual)
    return self.relu(x + residual)

In [None]:
class Resnet50(nn.Module):
  def __init__(self):
    super().__init__()
    self.conv1 = nn.Conv2d(3, 64, 7, stride=2, padding=3, bias=False)
    self.bn1 = nn.BatchNorm2d(64)
    self.relu = nn.ReLU()
    self.maxpool = nn.MaxPool2d(3, stride=2, padding=1)
    self.layer1 = nn.Sequential(
        Bottleneck(64, 64, 256, 1, True),
        Bottleneck(256, 64, 256, 1, False),
        Bottleneck(256, 64, 256, 1, False))
    self.layer2 = nn.Sequential(
        Bottleneck(256, 128, 512, 2, True),
        Bottleneck(512, 128, 512, 1, False),
        Bottleneck(512, 128, 512, 1, False),
        Bottleneck(512, 128, 512, 1, False))
    self.layer3 = nn.Sequential(
        Bottleneck(512, 256, 1024, 2, True),
        Bottleneck(1024, 256, 1024, 1, False),
        Bottleneck(1024, 256, 1024, 1, False),
        Bottleneck(1024, 256, 1024, 1, False),
        Bottleneck(1024, 256, 1024, 1, False),
        Bottleneck(1024, 256, 1024, 1, False))
    self.layer4 = nn.Sequential(
        Bottleneck(1024, 512, 2048, 2, True),
        Bottleneck(2048, 512, 2048, 1, False),
        Bottleneck(2048, 512, 2048, 1, False))
    self.avgpool = nn.AdaptiveAvgPool2d(1)
    self.fc1 = nn.Linear(2048, 1024)
    self.fc2 = nn.Linear(1024, 10)
  
  def forward(self, x):
    x = self.maxpool(self.relu(self.bn1(self.conv1(x))))
    x = self.avgpool(self.layer4(self.layer3(self.layer2(self.layer1(x)))))
    x = torch.flatten(x, 1)
    x = self.relu(self.fc1(x))
    x = self.fc2(x)
    return x

net = Resnet50()
net.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(net.parameters(), lr=0.001)

train_losses, val_losses = [], []
lowest_val_loss = np.inf

In [None]:
for epoch in range(32):
    epoch_train_losses, epoch_val_losses = [], []
    for x, y in trainloader:
        x, y = x.to(device), y.to(device)
        pred = net(x)
        loss = criterion(pred, y)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        epoch_train_losses.append(loss.detach().cpu().numpy())
    
    correct = 0
    total = 0
    for x, y in valloader:
        x, y = x.to(device), y.to(device)
        pred = net(x)
        loss = criterion(pred, y)
        epoch_val_losses.append(loss.detach().cpu().numpy())

        _, predicted = torch.max(pred.data, 1)
        total += y.size(0)
        correct += (predicted == y).sum().item()
    
    avg_train_loss = np.mean(epoch_train_losses)
    avg_val_loss = np.mean(epoch_val_losses)
    train_losses.append(avg_train_loss)
    val_losses.append(avg_val_loss)
    print(f'Epoch: {epoch}, Training Loss: {avg_train_loss}, Validation Loss: {avg_val_loss}, Validation Accuracy: {100 * correct // total} %')
    if avg_val_loss < lowest_val_loss:
      lowest_val_loss = avg_val_loss
      torch.save(net.state_dict(), PATH)
            
print('Finished Training')

In [None]:
plt.plot(train_losses)
plt.plot(val_losses)
plt.show();

In [None]:
net = Resnet50()
net.load_state_dict(torch.load('./cifar_net.pth'))
net.to(device);

In [None]:
correct = 0
total = 0
with torch.no_grad():
    for data in testloader:
        images, labels = data[0].to(device), data[1].to(device)
        outputs = net(images)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

print(f'Accuracy of the network on the 10000 test images: {100 * correct // total} %')