In [1]:
# Importing packages
import torch
import torchvision
import torchvision.datasets as datasets
import torchvision.transforms as transforms
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, sampler
from torchsummary import summary

import copy
import numpy as np
import cv2
import matplotlib.pyplot as plt
from PIL import Image

import cv2
import numpy as np
from PIL import Image, ImageFilter, ImageEnhance, ImageDraw

## Define functions

In [2]:
def add_gaussian_noise(img, mean=0, std=20):
    # Create Gaussian noise
    noise = np.random.normal(mean, std, img.shape)

    # Apply the noise to the input image
    img_gn = np.zeros(img.shape)
    for i in range(img.shape[2]):
        img_gn[:, :, i] = np.clip(img[:, :, i] + noise[:, :, i], 0, 255)

    return img_gn.astype(np.uint8)

def add_shot_noise(img, lam=100):
  
    # Create Poisson noise
    noise = np.random.poisson(lam, img.shape)

    # Apply the noise to the input image
    img_sn = np.zeros(img.shape)
    for i in range(img.shape[2]):
        img_sn[:, :, i] = np.clip(img[:, :, i] + noise[:, :, i], 0, 255)

    return img_sn.astype(np.uint8)

# Create rotation function 
def rotate(image, degree=1):
    reshape = np.transpose(image, (1, 2, 0))
    rot_image = np.rot90(reshape, degree)
    return torch.Tensor(np.transpose(rot_image, (2, 0, 1)).copy())

# Define my dataset
class MyDataset(Dataset):
    def __init__(self, X, y):
        self.X = X
        self.y = y
    
    def __len__(self):
        return len(self.X)
    
    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

In [3]:
# Define the device for training
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

device(type='cuda')

In [None]:
# Define transformations 
transform = transforms.Compose([
    transforms.RandomCrop(96, padding=4),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
])

# Load the dataset
testset = datasets.STL10(root='../datasets', split='test', download=True, transform=transform)

Downloading http://ai.stanford.edu/~acoates/stl10/stl10_binary.tar.gz to ../datasets\stl10_binary.tar.gz


  0%|          | 0/2640397119 [00:00<?, ?it/s]

In [None]:
testloader = DataLoader(testset, batch_size=1, num_workers=2, shuffle=True)

In [None]:
for num, batch in enumerate(testloader):
    if num > 10:
        break
    images, labels = batch
    print(images.shape, labels)

In [None]:
test_x = testset.data
test_y = testset.labels

In [None]:
# Expand x_test and y_test
X_test_gauss = []
y_test_gauss = []

for (x, y) in testset:
    X_test_gauss.append(add_gaussian_noise(x))
    y_test_gauss.append(y)

In [None]:
testloader_g = DataLoader(MyDataset(X_test_gauss, y_test_gauss), batch_size=1, num_workers=2, shuffle=True)

In [None]:
# Expand x_test and y_test
X_test_shot = []
y_test_shot = []

for (x, y) in testset:
    X_test_shot.append(add_shot_noise(x))
    y_test_shot.append(y)

In [None]:
testloader_s = DataLoader(MyDataset(X_test_shot, y_test_shot), batch_size=1, num_workers=2, shuffle=True)

## Load models

In [None]:
class ResBlock(nn.Module):
    def __init__(self, in_channel, out_channel, stride=1, num_block=1):
        super(ResBlock, self).__init__()
        self.num_block = num_block
        self.layers = nn.ModuleList([])
        
        self.layers.append(nn.Sequential(
            nn.Conv2d(in_channel, out_channel, kernel_size=1, stride=stride)
        ))
        self.layers.append(nn.Sequential  (
            nn.Conv2d(in_channel, out_channel, kernel_size=3, padding=1, stride=stride),
#             nn.GroupNorm(out_channel//4, out_channel),
            nn.BatchNorm2d(out_channel),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channel, out_channel, kernel_size=3, padding=1, stride=1),
            nn.GroupNorm(out_channel//4, out_channel),
            nn.ReLU(inplace=True)
        ))
        
        for _ in range(1, num_block):
            self.layers.append(nn.Sequential(
                nn.Conv2d(out_channel, out_channel, kernel_size=1, stride=1)
            ))
            self.layers.append(nn.Sequential(
                nn.Conv2d(out_channel, out_channel, kernel_size=3, padding=1, stride=1),
#                 nn.GroupNorm(out_channel//4, out_channel),
                nn.BatchNorm2d(out_channel),
                nn.ReLU(inplace=True),
                nn.Conv2d(out_channel, out_channel, kernel_size=3, padding=1, stride=1),
#                 nn.GroupNorm(out_channel//4, out_channel),
                nn.BatchNorm2d(out_channel),
                nn.ReLU(inplace=True)
            ))
    
    def forward(self, x):
        for i in range(self.num_block):
            x_identity = self.layers[2*i](x)
            x = self.layers[2*i+1](x)
            x = F.relu(x + x_identity, inplace=True) 
        
        return x

class ResNet(nn.Module):
    def __init__(self):
        super(ResNet, self).__init__()
        self.conv1 = nn.Conv2d(3, 32, 3, stride=1, padding=1)
#         self.gn1 = nn.GroupNorm(4, 16)
        self.bn1 = nn.BatchNorm2d(32)
        
        self.layers1 = ResBlock(32, 32, num_block=4)
        self.layers2 = ResBlock(32, 64, stride=2, num_block=2)
        self.layers3 = ResBlock(64, 128, stride=2, num_block=2)
        self.layers4 = ResBlock(128, 256, stride=2, num_block=2)
        
        self.avgpool1 = nn.AdaptiveAvgPool2d((1, 1))
        self.fc1 = nn.Linear(256, 10)
          
    def forward(self, x, theta = None):
        x = self.conv1(x)
#         x = self.gn1(x)
        x = self.bn1(x)
        x = F.relu(x, inplace=True)
        x = self.layers1(x)
        x = self.layers2(x)
        
        x1 = self.layers3(x)
        x1 = self.layers4(x1)
        x1 = self.avgpool1(x1)
        x1 = x1.view(-1, 256)
        x1 = self.fc1(x1)
        
        return x1


class JointResNet(nn.Module):
    def __init__(self):
        super(JointResNet, self).__init__()
        self.conv1 = nn.Conv2d(3, 64, 3, stride=1, padding=1)
#         self.gn1 = nn.GroupNorm(4, 16)
        self.bn1 = nn.BatchNorm2d(64)
        
        self.layers1 = ResBlock(64, 64, num_block=2)
        self.layers2 = ResBlock(64, 128, stride=2, num_block=2)
        self.layers3 = ResBlock(128, 256, stride=2, num_block=2)
        self.layers4_1 = ResBlock(256, 512, stride=2, num_block=2)
        self.layers4_2 = ResBlock(256, 512, stride=2, num_block=2)
        
        self.avgpool1 = nn.AdaptiveAvgPool2d((1, 1))
        self.fc1 = nn.Linear(512, 10)
        
        self.avgpool2 = nn.AdaptiveAvgPool2d((1, 1))
        self.fc2 = nn.Linear(512, 4)
        
        
    def forward(self, x, theta = None):
        x = self.conv1(x)
#         x = self.gn1(x)
        x = self.bn1(x)
        x = F.relu(x, inplace=True)
        x = self.layers1(x)
        x = self.layers2(x)
        x = self.layers3(x)
        
        x1 = self.layers4_1(x)
        x1 = self.avgpool1(x1)
        x1 = x1.view(-1, 512)
        x1 = self.fc1(x1)
        
        x2 = self.layers4_2(x)
        x2 = self.avgpool2(x2)
        x2 = x2.view(-1, 512)
        x2 = self.fc2(x2)
        return x1, x2  

In [None]:
# Define the loss functions
criterion1 = nn.CrossEntropyLoss()
criterion2 = nn.CrossEntropyLoss()

# Initialize the model and define optimizer
model2 = JointResNet().to(device)
optimizer2 = optim.SGD(model2.parameters(), lr=0.1, momentum=0.9, weight_decay=3e-4)
scheduler = StepLR(optimizer2, step_size=30, gamma=0.1)

In [None]:
# load the saved model
checkpoint = torch.load('../models/stl_jointresnet.pt')
model2.load_state_dict(checkpoint['model_state_dict'])
optimizer2.load_state_dict(checkpoint['optimizer_state_dict'])

## Test time training