# Image Classification

In [2]:
import cv2
import pandas
import sklearn
import torch
import torch.nn as nn
from torchvision import datasets, transforms, models
from torch.utils.data import DataLoader
from sklearn.model_selection import KFold
import numpy as np

In [3]:
# CNN model for grayscale image
class val_cnn_1D(nn.Module):
    def __init__(self):
        super(val_cnn_1D, self).__init__()
        self.conv1 = nn.Conv2d(1, 6, kernel_size=5) #1, 10
        self.pool  = nn.MaxPool2d(2,2)
        self.conv2 = nn.Conv2d(6, 20, kernel_size=5) #10, 20
        self.conv2_drop = nn.Dropout2d(0.25)
        self.fc1 = nn.Linear(320, 50)
        self.fc2 = nn.Linear(50, 10)

    def forward(self, x):
        x = F.relu(F.max_pool2d(self.conv1(x), 2))
        x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
        x = x.view(-1, 320)
        x = F.relu(self.fc1(x))
        x = F.dropout(x, training=self.training)
        x = self.fc2(x)
        return x
        

In [7]:
# CNN model for color image
class val_cnn_3D(nn.Module):
    def __init__(self):
        super(val_cnn_3D, self).__init__()
        self.conv1 = nn.Conv2d(3, 6, kernel_size=5) #1, 10
        self.pool  = nn.MaxPool2d(2,2)
        self.conv2 = nn.Conv2d(6, 16, kernel_size=5) #10, 20
        self.conv2_drop = nn.Dropout2d(.25)
        self.fc1 = nn.Linear(16 * 5 * 5, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 10)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = torch.flatten(x, 1) # flatten all dimensions except batch
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

In [8]:
# LeNet model
class LeNet(nn.Module):
    def __init__(self):
        super(LeNet, self).__init__()
        self.conv1 = nn.Conv2d(3, 20, 5, 1)
        self.conv2 = nn.Conv2d(20, 50, 5, 1)
        self.fc1 = nn.Linear(5*5*50, 500)
        self.fc1_drop = nn.Dropout2d(.5)
        #self.fc2 = nn.Linear(120, 84)
        self.fc2 = nn.Linear(1250, 10)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.max_pool2d(x, 2)
        x = F.relu(self.conv2(x))
        x = F.max_pool2d(x, 2)
        x = x.view(-1, 5*5*50)
        x = F.relu(self.fc1_drop(x))
        #x = F.relu(self.fc2(x))
        x = self.fc2(x)
        return x

In [11]:
import math
# VGG Model
class VGG(nn.Module):
    '''
    VGG model 
    '''
    def __init__(self, features):
        super(VGG, self).__init__()
        self.features = features
        self.classifier = nn.Sequential(
            nn.Dropout(),
            nn.Linear(512, 512),
            nn.ReLU(True),
            nn.Dropout(),
            nn.Linear(512, 512),
            nn.ReLU(True),
            nn.Linear(512, 10),
        )
         # Initialize weights
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                n = m.kernel_size[0] * m.kernel_size[1] * m.out_channels
                m.weight.data.normal_(0, math.sqrt(2. / n))
                m.bias.data.zero_()


    def forward(self, x):
        x = self.features(x)
        x = x.view(x.size(0), -1)
        x = self.classifier(x)
        return x


def make_layers(cfg, batch_norm=False):
    layers = []
    in_channels = 3
    for v in cfg:
        if v == 'M':
            layers += [nn.MaxPool2d(kernel_size=2, stride=2)]
        else:
            conv2d = nn.Conv2d(in_channels, v, kernel_size=3, padding=1)
            if batch_norm:
                layers += [conv2d, nn.BatchNorm2d(v), nn.ReLU(inplace=True)]
            else:
                layers += [conv2d, nn.ReLU(inplace=True)]
            in_channels = v
    return nn.Sequential(*layers)


cfg = {
    'A': [64, 'M', 128, 'M', 256, 256, 'M', 512, 512, 'M', 512, 512, 'M'],
    'B': [64, 64, 'M', 128, 128, 'M', 256, 256, 'M', 512, 512, 'M', 512, 512, 'M'],
    'D': [64, 64, 'M', 128, 128, 'M', 256, 256, 256, 'M', 512, 512, 512, 'M', 512, 512, 512, 'M'],
    'E': [64, 64, 'M', 128, 128, 'M', 256, 256, 256, 256, 'M', 512, 512, 512, 512, 'M', 
          512, 512, 512, 512, 'M'],
}


def vgg11():
    """VGG 11-layer model (configuration "A")"""
    return VGG(make_layers(cfg['A']))


def vgg11_bn():
    """VGG 11-layer model (configuration "A") with batch normalization"""
    return VGG(make_layers(cfg['A'], batch_norm=True))


def vgg13():
    """VGG 13-layer model (configuration "B")"""
    return VGG(make_layers(cfg['B']))


def vgg13_bn():
    """VGG 13-layer model (configuration "B") with batch normalization"""
    return VGG(make_layers(cfg['B'], batch_norm=True))


def vgg16():
    """VGG 16-layer model (configuration "D")"""
    return VGG(make_layers(cfg['D']))


def vgg16_bn():
    """VGG 16-layer model (configuration "D") with batch normalization"""
    return VGG(make_layers(cfg['D'], batch_norm=True))


def vgg19():
    """VGG 19-layer model (configuration "E")"""
    return VGG(make_layers(cfg['E']))


def vgg19_bn():
    """VGG 19-layer model (configuration 'E') with batch normalization"""
    return VGG(make_layers(cfg['E'], batch_norm=True))

In [12]:
# 3x3 convolution
def conv3x3(in_channels, out_channels, stride=1):
    return nn.Conv2d(in_channels, out_channels, kernel_size=3, 
                     stride=stride, padding=1, bias=False)
# Residual block
class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1, downsample=None):
        super(ResidualBlock, self).__init__()
        self.conv1 = conv3x3(in_channels, out_channels, stride)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = conv3x3(out_channels, out_channels)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.downsample = downsample

    def forward(self, x):
        residual = x
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
        out = self.conv2(out)
        out = self.bn2(out)
        if self.downsample:
            residual = self.downsample(x)
        out += residual
        out = self.relu(out)
        return out

# ResNet
class ResNet(nn.Module):
    def __init__(self, block, layers, num_classes=10):
        super(ResNet, self).__init__()
        self.in_channels = 16
        self.conv = conv3x3(3, 16)
        self.bn = nn.BatchNorm2d(16)
        self.relu = nn.ReLU(inplace=True)
        self.layer1 = self.make_layer(block, 16, layers[0])
        self.layer2 = self.make_layer(block, 32, layers[1], 2)
        self.layer3 = self.make_layer(block, 64, layers[2], 2)
        self.avg_pool = nn.AvgPool2d(8)
        self.fc = nn.Linear(64, num_classes)

    def make_layer(self, block, out_channels, blocks, stride=1):
        downsample = None
        if (stride != 1) or (self.in_channels != out_channels):
            downsample = nn.Sequential(
                conv3x3(self.in_channels, out_channels, stride=stride),
                nn.BatchNorm2d(out_channels))
        layers = []
        layers.append(block(self.in_channels, out_channels, stride, downsample))
        self.in_channels = out_channels
        for i in range(1, blocks):
            layers.append(block(out_channels, out_channels))
        return nn.Sequential(*layers)

    def forward(self, x):
        out = self.conv(x)
        out = self.bn(out)
        out = self.relu(out)
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.avg_pool(out)
        out = out.view(out.size(0), -1)
        out = self.fc(out)
        return out
def ResNet18():
    return ResNet(BasicBlock, [2, 2, 2, 2])


def ResNet34():
    return ResNet(BasicBlock, [3, 4, 6, 3])


def ResNet50():
    return ResNet(Bottleneck, [3, 4, 6, 3])


def ResNet101():
    return ResNet(Bottleneck, [3, 4, 23, 3])


def ResNet152():
    return ResNet(Bottleneck, [3, 8, 36, 3])

In [13]:
# Pretrained AlexNet Model
def alexnet():
    alex_model=models.alexnet(pretrained=True)
    for params in alex_model.features.parameters():
        params.requires_grad=False
    n_inputs = alex_model.classifier[6].in_features
    print(n_inputs)
    last_layer = nn.Linear(n_inputs, 10)
    alex_model.classifier[6] = last_layer
    return alex_model

In [4]:
import torch.nn.functional as F
import torch.optim as optim
from torch.optim.lr_scheduler import StepLR

# Train and test model
def train(network, device, train_loader, optimizer, epoch):
    running_loss = 0.0
    running_acc = 0.0
    train_losses = []
    train_acc=[]
    network.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data = data.to(device)
        target = target.to(device)
        optimizer.zero_grad()
        output = network(data)
        output = F.log_softmax(output)
        loss = F.nll_loss(output, target)
        new_target = F.one_hot(target, num_classes=output.shape[1])
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
        running_acc += torch.sum(output == new_target)
    else:
        epoch_loss = running_loss/len(train_loader)
        epoch_acc = running_acc/len(train_loader)*100
        train_losses.append(epoch_loss)
        train_acc.append(epoch_acc)
        print('Training loss: ' + str(epoch_loss))
        #torch.save(network.state_dict(), '/results/model.pth')
        #torch.save(optimizer.state_dict(), '/results/optimizer.pth')
    '''if batch_idx % log_interval == 0:
        print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format( epoch, batch_idx * len(data), 
                                                                       len(train_loader.dataset), 100. * batch_idx / len(train_loader), loss.item()))
        train_losses.append(loss.item())
        train_counter.append((batch_idx*64) + ((epoch-1)*len(train_loader.dataset)))
        torch.save(network.state_dict(), '/results/model.pth')
        torch.save(optimizer.state_dict(), '/results/optimizer.pth')'''
    return train_losses

def test(network, device, test_loader, epoch):
    test_losses=[]
    train_acc=[]
    network.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            #data = data.to(device)
            #target = target.to(device)
            output = network(data)
            output = F.log_softmax(output)
            test_loss += F.nll_loss(output, target, size_average=False).item()
            pred = output.data.max(1, keepdim=True)[1]
            correct += pred.eq(target.data.view_as(pred)).sum()
            test_loss /= len(test_loader.dataset)
            test_losses.append(test_loss)
            test_acc =  100. * correct / len(test_loader.dataset)
        print('\nTest set: epoch: {:.4f}, Test set: Avg. loss: {:.4f}, Accuracy: ({:.0f}%)\n'.format(epoch, test_loss, test_acc))
    return test_losses, test_acc



In [5]:
# Evaluate MNIST model
def evaluate_image_model(train_img, test_img, lr, n_epochs):
    device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
    #model= ResNet(ResidualBlock, [2, 2, 2]).to(device)
    model=val_cnn_1D()
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(model.parameters(), lr=lr, momentum=0.9)
    scheduler = StepLR(optimizer, step_size=1, gamma=0.01)
    for epoch in range(1, n_epochs + 1):
        train_loss = train(model, device, train_img, optimizer, epoch)
        test_loss, test_acc = test(model, device, test_img, epoch)
        scheduler.step()

    return test_acc

In [6]:
from sklearn.model_selection import train_test_split
n_epochs = 3
batch_size_train = 100
batch_size_test = 1000
learning_rate = 0.1
momentum = 0.5
log_interval = 10

random_seed = 1
torch.backends.cudnn.enabled = False
torch.manual_seed(random_seed)

# Load MNIST dataset
dftransform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5), (0.5))])
train_ds = datasets.MNIST(root = './data', train=True, download=True, transform=dftransform)
train_data = DataLoader(dataset=train_ds, batch_size=batch_size_train, shuffle=True)
test_ds = datasets.MNIST(root = './data', train=False, download=True, transform=dftransform)
test_data = DataLoader(dataset=train_ds, batch_size=batch_size_train, shuffle=True)
dataiter=iter(train_data)
#images, labels = dataiter.next()
test_a = evaluate_image_model(train_data, test_data, learning_rate, n_epochs)
test_a

  app.launch_new_instance()


Training loss: 0.5900100059558948





Test set: epoch: 1.0000, Test set: Avg. loss: 0.0004, Accuracy: (95%)

Training loss: 0.347133049753805

Test set: epoch: 2.0000, Test set: Avg. loss: 0.0001, Accuracy: (97%)

Training loss: 0.2960183087363839

Test set: epoch: 3.0000, Test set: Avg. loss: 0.0002, Accuracy: (97%)



tensor(96.7067)

In [18]:
def evaluate_image_model2(train_img, test_img, lr, n_epochs):
    device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
    #model= ResNet(ResidualBlock, [2, 2, 2]).to(device)
    model=alexnet()
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)#, momentum=0.9)
    scheduler = StepLR(optimizer, step_size=1, gamma=0.01)
    for epoch in range(1, n_epochs + 1):
        train_loss = train(model, device, train_img, optimizer, epoch)
        test_loss, test_acc = test(model, device, test_img, epoch)
        scheduler.step()

    return test_acc

In [None]:
from sklearn.model_selection import train_test_split
import ssl
ssl._create_default_https_context = ssl._create_unverified_context
n_epochs = 9
batch_size_train = 100
batch_size_test = 1000
learning_rate = 0.01
momentum = 0.5
log_interval = 10

random_seed = 1
torch.backends.cudnn.enabled = False
torch.manual_seed(random_seed)

# Load 3D color pictures
dftransform = transforms.Compose([transforms.Resize(256), transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
#dftransform = transforms.Compose([transforms.Resize(32), transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
train_ds = datasets.CIFAR10(root = './data', train=True, download=True, transform=dftransform)
train_data = DataLoader(dataset=train_ds, batch_size=batch_size_train, shuffle=True)
test_ds = datasets.CIFAR10(root = './data', train=False, download=True, transform=dftransform)
test_data = DataLoader(dataset=train_ds, batch_size=batch_size_train, shuffle=False)
dataiter=iter(train_data)
#images, labels = dataiter.next()
test_a = evaluate_image_model2(train_data, test_data, learning_rate, n_epochs)

Files already downloaded and verified
Files already downloaded and verified
4096


  app.launch_new_instance()


Training loss: 4.989968449831009





Test set: epoch: 1.0000, Test set: Avg. loss: 0.0033, Accuracy: (40%)

Training loss: 1.6812016954421998

Test set: epoch: 2.0000, Test set: Avg. loss: 0.0031, Accuracy: (45%)

Training loss: 1.630280685186386

Test set: epoch: 3.0000, Test set: Avg. loss: 0.0031, Accuracy: (45%)

Training loss: 1.6318878059387207

Test set: epoch: 4.0000, Test set: Avg. loss: 0.0031, Accuracy: (45%)

Training loss: 1.6356476933956146

Test set: epoch: 5.0000, Test set: Avg. loss: 0.0031, Accuracy: (45%)

Training loss: 1.629455483675003
