In [None]:
import torch
import torch.nn as nn
import os
import matplotlib.pyplot as plt
from torch import optim
from torchvision import datasets
from torchvision.transforms import transforms
from PIL import Image
import time
#////ResNet implementation by Aladdin Persson

#//// https://www.youtube.com/watch?v=DkNIBBBvcPs



class block(nn.Module):
    def __init__(self, in_channels, out_channels, identity_downsample=None, stride=1):
        super(block, self).__init__()
        self.expansion = 4
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=1, padding=0)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=stride, padding=1)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.conv3 = nn.Conv2d(out_channels, out_channels * self.expansion, kernel_size=1, stride=1, padding=0)
        self.bn3 = nn.BatchNorm2d(out_channels * self.expansion)
        self.relu = nn.ReLU()
        self.identity_downsample = identity_downsample

    def forward(self, x):
        identity = x

        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.conv2(x)
        x = self.bn2(x)
        x = self.relu(x)
        x = self.conv3(x)
        x = self.bn3(x)

        if self.identity_downsample is not None:
            identity = self.identity_downsample(identity)

        x += identity
        x = self.relu(x)

        return x


class ResNet(nn.Module):
    def __init__(self, block, layers, image_channels, num_classes):
        super(ResNet, self).__init__()

        self.in_channels = 64
        self.conv1 = nn.Conv2d(image_channels, 64, kernel_size=7, stride=2, padding=3)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU()
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        # resnet
        self.layer1 = self._make_layer(block, layers[0], out_channels=64, stride=1)
        self.layer2 = self._make_layer(block, layers[1], out_channels=128, stride=2)
        self.layer3 = self._make_layer(block, layers[2], out_channels=256, stride=2)
        self.layer4 = self._make_layer(block, layers[3], out_channels=512, stride=2)  # 2048

        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512 * 4, num_classes)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = x.reshape(x.shape[0], -1)
        x = self.fc(x)

        return x

    def _make_layer(self, block, num_residual_blocks, out_channels, stride):
        identity_downsample = None
        layers = []

        if stride != 1 or self.in_channels != out_channels * 4:
            identity_downsample = nn.Sequential(nn.Conv2d(self.in_channels, out_channels * 4, kernel_size=1,
                                                          stride=stride),
                                                nn.BatchNorm2d(out_channels * 4))
        layers.append(block(self.in_channels, out_channels, identity_downsample, stride))
        self.in_channels = out_channels * 4

        for i in range(num_residual_blocks - 1):
            layers.append(block(self.in_channels, out_channels))

        return nn.Sequential(*layers)


def ResNet50(img_channels=3, num_classes=2):
    return ResNet(block, [3, 4, 6, 3], img_channels, num_classes)


def ResNet101(img_channels=3, num_classes=2):
    return ResNet(block, [3, 4, 23, 3], img_channels, num_classes)


def ResNet152(img_channels=3, num_classes=2):
    return ResNet(block, [3, 8, 36, 3], img_channels, num_classes)


def test():
    net = ResNet50()
    x = torch.rand(2, 3, 224, 224)
    y = net(x).to(('cuda' if torch.cuda.is_available() else 'cpu'))
    print(y.shape)


test()

In [2]:
from torch.utils.data import Dataset
import time
import numpy as np


In [3]:
base_dir = '/example/example_media/example_run/ex/Datasets' # Main Dataset which includes Train and Test folder
if os.path.exists(base_dir): # Generation of train and test path, if the train and test folders exist in the main dataset and are named 'Test' and 'Train'
    print('Found the data directory, now loading the data...')
    train_dir = os.path.join(base_dir, 'Train')
    test_dir = os.path.join(base_dir, 'Test')
else:
    print('Could not find the data directory')
    exit()





Found the data directory, now loading the data...


In [4]:
# augmentation function for the data
train_dataset = datasets.ImageFolder(train_dir, transform=transforms.Compose([
    transforms.RandomResizedCrop(224),
    transforms.RandomVerticalFlip(p=0.7),
    transforms.Resize(256),
    transforms.RandomHorizontalFlip(p=0.7),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
]))
test_dataset = datasets.ImageFolder(test_dir, transform=transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
]))
for image, label in train_dataset:
    print(image.shape, label)
    break

# data loader for the data
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=2)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=32, shuffle=False, num_workers=2)
# summary of the data
dataset_sizes = {'Train': len(train_dataset), 'Test': len(test_dataset)}
print('Dataset sizes:', dataset_sizes)

torch.Size([3, 256, 256]) 0
Dataset sizes: {'Train': 8109, 'Test': 2382}


In [None]:
# showing an example image
def imshow(img):
    img = img / 2 + 0.5     # unnormalize
    npimg = img.numpy()
    plt.imshow(np.transpose(npimg, (1, 2, 0)))
    plt.show()

imshow(image)





In [6]:
# empty cuda cache to free up memory, if in use
torch.cuda.empty_cache()

In [7]:
# choose the ResNet model to use
net = input('Enter the network you want to use (50, 101, 152): ')
if net == '50':
    net = ResNet50()
elif net == '101':
    net = ResNet101()
elif net == '152':
    net = ResNet152()
else:
    print('Invalid network')
    exit()

# cuda if available
net = net.to(('cuda' if torch.cuda.is_available() else 'cpu'))
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=0.001, momentum=0.9)

In [8]:
# train function
# shows the progress of the training
# after 1 epoch test on the test data
def train_model():
    epochs = int(input("Enter the number of epochs you want to train: "))
    train_sample_num = dataset_sizes['Train']
    test_sample_num = dataset_sizes['Test']
    train_cost, val_cost = [], []

    for epoch in range(epochs):
        start = time.time()
        running_loss = 0.0
        for i, data in enumerate(train_loader, 0):
            inputs, labels = data
            inputs, labels = inputs.to(('cuda' if torch.cuda.is_available() else 'cpu')), labels.to(('cuda' if torch.cuda.is_available() else 'cpu'))
            optimizer.zero_grad()
            outputs = net(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
            if i % 10 == 9:
                print('[%d, %5d] loss: %.3f' % (epoch + 1, i + 1, running_loss / 10))
                running_loss = 0.0
        end = time.time()
        print('Epoch %d finished in %.2f seconds' % (epoch + 1, end - start))
        train_cost.append(running_loss / 10)
        with torch.no_grad():
            correct = 0
            total = 0
            for data in test_loader:
                images, labels = data
                images, labels = images.to(('cuda' if torch.cuda.is_available() else 'cpu')), labels.to(('cuda' if torch.cuda.is_available() else 'cpu'))
                outputs = net(images)
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()
            val_cost.append(1 - correct / total)
            print(f'Accuracy of the network on the {len(test_dataset)} test images: %d %%' % (100 * correct / total))
    return train_cost, val_cost

In [None]:
train_model()