In [None]:
import torch
import torchvision
import torchvision.transforms as transforms
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import deeplake
from time import time
import pandas as pd


In [None]:
def LoadData(DATA, batch_size):

    if DATA == "CIFAR10":
        root = '../Data/CIFAR10'
        num_classes = 10

        cifar10_train_transform = transforms.Compose([
            transforms.RandomCrop(32),
            transforms.RandomHorizontalFlip(),
            transforms.ToTensor(),
            transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
        ])
        cifar10_test_transform = transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])

        train = torchvision.datasets.CIFAR10(root=root, train=True, transform=cifar10_train_transform)
        test = torchvision.datasets.CIFAR10(root=root, train=False, transform=cifar10_test_transform)

        trainloader = torch.utils.data.DataLoader(train, batch_size=batch_size, shuffle=True, num_workers=2)
        testloader = torch.utils.data.DataLoader(test, batch_size=batch_size,shuffle=False, num_workers=2)
        print(DATA + " successfully loaded.")

    elif DATA == "CIFAR100":
        root = '../Data/CIFAR100'
        num_classes = 100

        cifar100_train_transform = transforms.Compose([
            transforms.RandomResizedCrop(32, scale=(0.8, 1.2)), # Simulate scale variations
            transforms.RandomHorizontalFlip(),
            transforms.RandomRotation(degrees=15), # Consider for object orientations
            transforms.RandomGrayscale(p=0.2), # Optional for color augmentation
            transforms.ToTensor(),
            transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)) # Standard CIFAR-100 normalization
        ])
        cifar100_test_transform = transforms.Compose([
            # transforms.Resize(32),
            # transforms.CenterCrop(32),
            transforms.ToTensor(),
            transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
        ])

        train = torchvision.datasets.CIFAR100(root=root, train=True, transform=cifar100_train_transform)
        test = torchvision.datasets.CIFAR100(root=root, train=False, transform=cifar100_test_transform)

        trainloader = torch.utils.data.DataLoader(train, batch_size=batch_size, shuffle=True, num_workers=2)
        testloader = torch.utils.data.DataLoader(test, batch_size=batch_size,shuffle=False, num_workers=2)
        print(DATA + " successfully loaded.")

    elif DATA == "MNIST":
        root='../Data/MNIST'
        num_classes = 10

        mnist_train_transform = transforms.Compose([
            transforms.RandomRotation(10),  # Improve robustness
            transforms.RandomHorizontalFlip(),  # Augment data
            transforms.ToTensor(),
            transforms.Normalize((0.1307,), (0.3081,))
        ])


        mnist_test_transform = transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize((0.1307,), (0.3081,))
        ])

        train = torchvision.datasets.MNIST(root=root, train=True, transform=mnist_train_transform, download=True)
        test = torchvision.datasets.MNIST(root=root, train=False, transform=mnist_test_transform ,download=True)

        trainloader = torch.utils.data.DataLoader(train, batch_size=batch_size, shuffle=True)
        testloader = torch.utils.data.DataLoader(test, batch_size=batch_size, shuffle=False)
        print(DATA + " successfully loaded.")

    elif DATA == "FASION_MNIST":
        root='../Data/FASION_MNIST'
        num_classes = 10

        fashion_mnist_train_transform = transforms.Compose([
            transforms.RandomRotation(15),  # Improve robustness
            transforms.RandomHorizontalFlip(),  # Augment data
            transforms.RandomResizedCrop(28),
            transforms.ToTensor(),
            transforms.Normalize((0.5,), (0.5,))
        ])
        fashion_mnist_test_transform = transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize((0.5,), (0.5,))
        ])

        train = torchvision.datasets.FashionMNIST(root=root, train=True, transform=fashion_mnist_train_transform, download=True)
        test = torchvision.datasets.FashionMNIST(root=root, train=False, transform=fashion_mnist_test_transform ,download=True)

        trainloader = torch.utils.data.DataLoader(train, batch_size=batch_size, shuffle=True)
        testloader = torch.utils.data.DataLoader(test, batch_size=batch_size, shuffle=False)
        print(DATA + " successfully loaded.")
    return trainloader, testloader, num_classes

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
class Bottleneck(nn.Module):
    expansion = 4
    def __init__(self, in_channels, out_channels, i_downsample=None, stride=1):
        super(Bottleneck, self).__init__()

        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=1, padding=0)
        self.batch_norm1 = nn.BatchNorm2d(out_channels)

        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=stride, padding=1)
        self.batch_norm2 = nn.BatchNorm2d(out_channels)

        self.conv3 = nn.Conv2d(out_channels, out_channels*self.expansion, kernel_size=1, stride=1, padding=0)
        self.batch_norm3 = nn.BatchNorm2d(out_channels*self.expansion)

        self.i_downsample = i_downsample
        self.stride = stride
        self.relu = nn.ReLU()

    def forward(self, x):
        identity = x.clone()
        x = self.relu(self.batch_norm1(self.conv1(x)))

        x = self.relu(self.batch_norm2(self.conv2(x)))

        x = self.conv3(x)
        x = self.batch_norm3(x)

        #downsample if needed
        if self.i_downsample is not None:
            identity = self.i_downsample(identity)
        #add identity
        x+=identity
        x=self.relu(x)

        return x

In [None]:
class ResNet(nn.Module):
    def __init__(self, ResBlock, layer_list, num_classes, num_channels=3):
        super(ResNet, self).__init__()
        self.in_channels = 64

        self.conv1 = nn.Conv2d(num_channels, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.batch_norm1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU()
        self.max_pool = nn.MaxPool2d(kernel_size = 3, stride=2, padding=1)

        self.layer1 = self._make_layer(ResBlock, layer_list[0], planes=64)
        self.layer2 = self._make_layer(ResBlock, layer_list[1], planes=128, stride=2)
        self.layer3 = self._make_layer(ResBlock, layer_list[2], planes=256, stride=2)
        self.layer4 = self._make_layer(ResBlock, layer_list[3], planes=512, stride=2)

        self.avgpool = nn.AdaptiveAvgPool2d((1,1))
        self.fc = nn.Linear(512*ResBlock.expansion, num_classes)

    def forward(self, x):
        x = self.relu(self.batch_norm1(self.conv1(x)))
        x = self.max_pool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = x.reshape(x.shape[0], -1)
        x = self.fc(x)

        return x

    def _make_layer(self, ResBlock, blocks, planes, stride=1):
        ii_downsample = None
        layers = []

        if stride != 1 or self.in_channels != planes*ResBlock.expansion:
            ii_downsample = nn.Sequential(
                nn.Conv2d(self.in_channels, planes*ResBlock.expansion, kernel_size=1, stride=stride),
                nn.BatchNorm2d(planes*ResBlock.expansion)
            )

        layers.append(ResBlock(self.in_channels, planes, i_downsample=ii_downsample, stride=stride))
        self.in_channels = planes*ResBlock.expansion

        for i in range(blocks-1):
            layers.append(ResBlock(self.in_channels, planes))
        return nn.Sequential(*layers)

In [None]:
class Layer_Ensemble(ResNet):
    def __init__(self, modelA, modelB):
        super(ResNet, self).__init__()

        self.modelA = modelA
        self.modelB = modelB

    def forward(self, x,):

        out1 = self.modelA.max_pool(self.modelA.relu(self.modelA.batch_norm1(self.modelA.conv1(x))))
        out2 = self.modelB.max_pool(self.modelB.relu(self.modelB.batch_norm1(self.modelB.conv1(x))))

        out1 = self.modelA.layer1(out1)
        out2 = self.modelB.layer1(out2)
        out = (out1+out2)/2

        out1 = self.modelA.layer2(out)
        out2 = self.modelB.layer2(out)
        out = (out1+out2)/2

        out1 = self.modelA.layer3(out)
        out2 = self.modelB.layer3(out)
        out = (out1+out2)/2

        out1 = self.modelA.layer4(out)
        out2 = self.modelB.layer4(out)
        out = (out1+out2)/2

        out1 = self.modelA.avgpool(out)
        out2 = self.modelB.avgpool(out)
        out = (out1+out2)/2

        out = out.reshape(out.shape[0], -1)

        out1 = self.modelA.fc(out)
        out2 = self.modelB.fc(out)

        # out = torch.max(out1, out2)
        out = (out1+out2)/2
        return out

In [None]:
def ResNet50(num_classes, channels=3):
    return ResNet(Bottleneck, [3,4,6,3], num_classes, channels)

In [None]:
def train(model, dataloader, criterion, optimizer):
    total = 0.
    correct = 0.
    running_loss = 0.
    for inputs, labels in dataloader:
        inputs, labels = inputs.to(device), labels.to(device)

        optimizer.zero_grad()

        outputs = model(inputs)
        _, predicted = outputs.max(1)

        loss = criterion(outputs, labels)

        loss.backward()
        optimizer.step()

        total += labels.size(0)
        correct += predicted.eq(labels).sum().item()
        running_loss += loss.item()

    train_accuracy = 100 * correct / total
    train_loss = running_loss / total

    return train_loss, train_accuracy

In [None]:
def test(model, dataloader, criterion):
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0

    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs, labels = inputs.to(device), labels.to(device)

            outputs = model(inputs)
            loss = criterion(outputs, labels)
            total += labels.size(0)

            _, predicted = outputs.max(1)
            correct += predicted.eq(labels).sum().item()
            running_loss += loss.item()
    test_accuracy = 100 * correct / total
    test_loss = running_loss / total

    return test_loss, test_accuracy

In [None]:
Data_List = ["CIFAR10", "CIFAR100", "MNIST", "FASION_MNIST"]
for DATA in Data_List:
    trainloader, testloader, num_classes = LoadData(DATA, 128)

    first_model = ResNet50(num_classes, 1).to(device)
    second_model = ResNet50(num_classes, 1).to(device)

    model = Layer_Ensemble(first_model, second_model)

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(model.parameters(), lr=0.1, momentum=0.9, weight_decay=0.0001)

    last_epoch = 0
    num_epochs = 100
    history = []

    for epoch in range(last_epoch,num_epochs):
        t0 = time()
        model.train(True)
        running_loss = 0.0
        correct = 0
        total = 0
        train_loss, train_accuracy = train(model, trainloader, criterion, optimizer)
        t1 = time()
        test_loss, test_accuracy = test(model, testloader, criterion)
        t2 = time()

        data = {
            "Epoch": epoch + 1,
            "Train Loss": train_loss,
            "Train Accuracy": train_accuracy,
            "Train Time": t1 - t0,
            "Test Loss": test_loss,
            "Test Accuracy": test_accuracy,
            "Test Time": t2 - t1,
        }
        print(data)
        history.append(data)

    pd.DataFrame(history).to_json(f'./History/{DATA}-Layer-new.json')