In [3]:
import torch
import torchvision
from torchvision import transforms
import os

In [4]:
from PIL import Image, ImageFile
ImageFile.LOAD_TRUNCATED_IMAGES = True

In [5]:
use_cuda = torch.cuda.is_available()
use_cuda

False

In [6]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [6]:
base_dir = ('/content/drive/MyDrive/2023 spring /cats_and_dogs_small')
train_dir = os.path.join(base_dir, 'train/')
validation_dir = os.path.join(base_dir, 'validation/')
test_dir = os.path.join(base_dir, 'test/')
train_cats_dir = os.path.join(train_dir, 'cats')
train_dogs_dir = os.path.join(train_dir, 'dogs')
validation_cats_dir = os.path.join(validation_dir, 'cats')
validation_dogs_dir = os.path.join(validation_dir, 'dogs')
test_cats_dir = os.path.join(test_dir, 'cats')
test_dogs_dir = os.path.join(test_dir, 'dogs')

In [7]:
print('total training cat images:', len(os.listdir(train_cats_dir)))
print('total training dog images:', len(os.listdir(train_dogs_dir)))
print('total validation cat images:', len(os.listdir(validation_cats_dir)))
print('total validation dog images:', len(os.listdir(validation_dogs_dir)))
print('total validation dog images:', len(os.listdir(validation_dogs_dir)))
print('total test cat images:', len(os.listdir(test_cats_dir)))
print('total test dog images:', len(os.listdir(test_dogs_dir)))

total training cat images: 1000
total training dog images: 1000
total validation cat images: 500
total validation dog images: 500
total validation dog images: 500
total test cat images: 500
total test dog images: 500


In [8]:
def checkImage(path):
    try:
        im = Image.open(path)
        return True
    except:
        return False
    pass

In [9]:
img_transforms1 = transforms.Compose([
    transforms.Resize(256),
    transforms.RandomCrop(227),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
])

In [10]:
train_data1 = torchvision.datasets.ImageFolder(root = train_dir, transform = img_transforms1, is_valid_file = checkImage)
val_data1 = torchvision.datasets.ImageFolder(root = validation_dir, transform = img_transforms1, is_valid_file = checkImage)
test_data1 = torchvision.datasets.ImageFolder(root = test_dir, transform = img_transforms1, is_valid_file = checkImage)

In [11]:
#By default, PyTorch’s data loaders are set to a batch_size of 1.
BATCH_SIZE = 64
train_data_loader1 = torch.utils.data.DataLoader(train_data1, batch_size = BATCH_SIZE)
val_data_loader1  = torch.utils.data.DataLoader(val_data1, batch_size = BATCH_SIZE) 
test_data_loader1  = torch.utils.data.DataLoader(test_data1, batch_size = BATCH_SIZE)

In [12]:
img_transforms2 = transforms.Compose([
    transforms.Resize(320),
    transforms.RandomCrop(299, 320),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
])

In [13]:
train_data2 = torchvision.datasets.ImageFolder(root = train_dir, transform = img_transforms2, is_valid_file = checkImage)
val_data2 = torchvision.datasets.ImageFolder(root = validation_dir, transform = img_transforms2, is_valid_file = checkImage)
test_data2 = torchvision.datasets.ImageFolder(root = test_dir, transform = img_transforms2, is_valid_file = checkImage)

In [14]:
#By default, PyTorch’s data loaders are set to a batch_size of 1.
BATCH_SIZE = 64
train_data_loader2 = torch.utils.data.DataLoader(train_data2, batch_size = BATCH_SIZE)
val_data_loader2  = torch.utils.data.DataLoader(val_data2, batch_size = BATCH_SIZE) 
test_data_loader2  = torch.utils.data.DataLoader(test_data2, batch_size = BATCH_SIZE)

In [15]:
import torch.nn as nn
import torch.nn.functional as F

In [16]:
img_transforms1 = transforms.Compose([
    transforms.Resize(256),
    transforms.RandomCrop(227),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
])

In [17]:
class AlexNet(nn.Module):
    def __init__(self, num_classes: int = 1000, init_weights: bool = True):
        super(AlexNet, self).__init__()
        self.convnet = nn.Sequential(
            # Input Channel (RGB: 3)
            nn.Conv2d(in_channels=3, out_channels=96, kernel_size=11, padding=0, stride=4), # 227 -> 55
            nn.ReLU(inplace=True),
            nn.LocalResponseNorm(size=5, k=2),
            nn.MaxPool2d(kernel_size=3, stride=2), # 55 -> 27
            
            nn.Conv2d(in_channels=96, out_channels=256, kernel_size=5, padding=2, stride=1), # 27 -> 27
            nn.ReLU(inplace=True),
            nn.LocalResponseNorm(size=5, k=2),
            nn.MaxPool2d(kernel_size=3, stride=2), # 27 -> 13
            
            nn.Conv2d(in_channels=256, out_channels=384, kernel_size=3, padding=1, stride=1),
            nn.ReLU(inplace=True),
            nn.LocalResponseNorm(size=5, k=2),
            nn.Conv2d(in_channels=384, out_channels=384, kernel_size=3, padding=1, stride=1),
            nn.ReLU(inplace=True),
            nn.LocalResponseNorm(size=5, k=2),
            nn.Conv2d(in_channels=384, out_channels=256, kernel_size=3, padding=1, stride=1),
            nn.ReLU(inplace=True),
            nn.LocalResponseNorm(size=5, k=2),
            nn.MaxPool2d(kernel_size=3, stride=2), # 13 -> 6
        )

        self.fclayer = nn.Sequential(
            nn.Linear(256 * 6 * 6, 4096),
            nn.ReLU(inplace=True),
            nn.Dropout(p=0.5),
            nn.Linear(4096, 4096),
            nn.ReLU(inplace=True),
            nn.Dropout(p=0.5),
            nn.Linear(4096, num_classes),
        )
    
    def forward(self, x:torch.Tensor):
        x = self.convnet(x)
        x = torch.flatten(x, 1)
        x = self.fclayer(x)
        return x

In [None]:
model1 = AlexNet()
print(model1)

In [19]:
# Depthwise Separable Convolution
class SeparableConv(nn.Module):
    def __init__(self, in_channels, out_channels):
        super().__init__()

        self.seperable = nn.Sequential(
            nn.Conv2d(in_channels, in_channels, 3, stride=1, padding=1, bias=False),
            nn.Conv2d(in_channels, out_channels, 1, stride=1, padding=0, bias=False)
        )

    def forward(self, x):
        x = self.seperable(x)
        return x

In [20]:
# EnrtyFlow
class EntryFlow(nn.Module):
    def __init__(self):
        super().__init__()

        self.conv1 = nn.Sequential(
            nn.Conv2d(3, 32, 3, stride=2, padding=1, bias=False),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.Conv2d(32, 64, 3, stride=1, padding=0, bias=False),
            nn.BatchNorm2d(64),
            nn.ReLU()
        )

        self.conv2_residual = nn.Sequential(
            SeparableConv(64, 128),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            SeparableConv(128, 128),
            nn.BatchNorm2d(128),
            nn.MaxPool2d(3, stride=2, padding=1)
        )

        self.conv2_shortcut = nn.Sequential(
            nn.Conv2d(64, 128, 1, stride=2, padding=0),
            nn.BatchNorm2d(128)
        )

        self.conv3_residual = nn.Sequential(
            nn.ReLU(),
            SeparableConv(128, 256),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            SeparableConv(256, 256),
            nn.BatchNorm2d(256),
            nn.MaxPool2d(3, stride=2, padding=1)
        )

        self.conv3_shortcut = nn.Sequential(
            nn.Conv2d(128, 256, 1, stride=2, padding=0),
            nn.BatchNorm2d(256)
        )

        self.conv4_residual = nn.Sequential(
            nn.ReLU(),
            SeparableConv(256, 728),
            nn.BatchNorm2d(728),
            nn.ReLU(),
            SeparableConv(728, 728),
            nn.BatchNorm2d(728),
            nn.MaxPool2d(3, stride=2, padding=1)
        )

        self.conv4_shortcut = nn.Sequential(
            nn.Conv2d(256, 728, 1, stride=2, padding=0),
            nn.BatchNorm2d(728)
        )

    def forward(self, x):
        x = self.conv1(x)
        x = self.conv2_residual(x) + self.conv2_shortcut(x)
        x = self.conv3_residual(x) + self.conv3_shortcut(x)
        x = self.conv4_residual(x) + self.conv4_shortcut(x)
        return x

In [21]:
# MiddleFlow
class MiddleFlow(nn.Module):
    def __init__(self):
        super().__init__()

        self.conv_residual = nn.Sequential(
            nn.ReLU(),
            SeparableConv(728, 728),
            nn.BatchNorm2d(728),
            nn.ReLU(),
            SeparableConv(728, 728),
            nn.BatchNorm2d(728),
            nn.ReLU(),
            SeparableConv(728, 728),
            nn.BatchNorm2d(728)
        )

        self.conv_shortcut = nn.Sequential()

    def forward(self, x):
        return self.conv_shortcut(x) + self.conv_residual(x)

In [22]:
# ExitFlow
class ExitFlow(nn.Module):
    def __init__(self, num_classes=10):
        super().__init__()

        self.conv1_residual = nn.Sequential(
            nn.ReLU(),
            SeparableConv(728, 1024),
            nn.BatchNorm2d(1024),
            nn.ReLU(),
            SeparableConv(1024, 1024),
            nn.BatchNorm2d(1024),
            nn.MaxPool2d(3, stride=2, padding=1)
        )

        self.conv1_shortcut = nn.Sequential(
            nn.Conv2d(728, 1024, 1, stride=2, padding=0),
            nn.BatchNorm2d(1024)
        )

        self.conv2 = nn.Sequential(
            SeparableConv(1024, 1536),
            nn.BatchNorm2d(1536),
            nn.ReLU(),
            SeparableConv(1536, 2048),
            nn.BatchNorm2d(2048),
            nn.ReLU()
        )

        self.avg_pool = nn.AdaptiveAvgPool2d((1,1))
    
    def forward(self, x):
        x = self.conv1_residual(x) + self.conv1_shortcut(x)
        x = self.conv2(x)
        x = self.avg_pool(x)
        return x

In [23]:
# Xception
class Xception(nn.Module):
    def __init__(self, num_classes=10, init_weights=True):
        super().__init__()
        self.init_weights = init_weights

        self.entry = EntryFlow()
        self.middle = self._make_middle_flow()
        self.exit = ExitFlow()

        self.linear = nn.Linear(2048, num_classes)

        # weights initialization
        if self.init_weights:
            pass


    def forward(self, x):
        x = self.entry(x)
        x = self.middle(x)
        x = self.exit(x)
        x = x.view(x.size(0), -1)
        x = self.linear(x)
        return x

    def _make_middle_flow(self):
        middle = nn.Sequential()
        for i in range(8):
            middle.add_module('middle_block_{}'.format(i), MiddleFlow())
        return middle

    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init_kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
                if m.bias is not None:
                    nn.init_constant_(m.bias, 0)
            elif isinstance(m, nn.BatchNorm2d):
                nn.init_constant_(m.weight, 1)
                nn.init_bias_(m.bias, 0)
            elif isinstance(m, nn.Linear):
                nn.init_normal_(m.weight, 0, 0.01)
                nn.init_constant_(m.bias, 0)

In [None]:
model2 = Xception()
print(model2)

In [25]:
loss_function = torch.nn.CrossEntropyLoss()
loss_function

CrossEntropyLoss()

In [26]:
import torch.optim as optim
optimizer = optim.Adam(model1.parameters(), lr=0.001)

In [None]:
if torch.cuda.is_available():
    device = torch.device("cuda") 
else:
    device = torch.device("cpu")
model1.to(device)

In [29]:
def train_process_batches(model, train_loader, optimizer, loss_function, verbose = True ):
    train_loss = 0.0
    
    model.train()
    if verbose:
        print(f"Training data batch process: ", end = "")
        
    for batch_idx, (data, target) in enumerate(train_loader):
        # move to GPU
        if use_cuda:
            data, target = data.cuda(), target.cuda()
            
        #we need to set the gradients to zero before starting to do backpropragation 
        #because PyTorch accumulates the gradients on subsequent backward passes
        optimizer.zero_grad()
        
        #forward pass: compute predicted outputs by passing inputs to the model
        output = model(data)
        
        #calculate the batch loss
        loss = loss_function(output, target)
        
        #backward pass: compute gradient of the loss with respect to model parameters
        loss.backward()
        
        # perform a single optimization step (parameter update)
        optimizer.step()
        
        ## calculate train_loss
        train_loss = train_loss + ((1 / (batch_idx + 1)) * (loss.data - train_loss))
        if batch_idx % 50 == 0:
            if verbose:
                print(f"\t{batch_idx}, {train_loss}", end = "\n")
            else:
                print(f"\t{batch_idx}, ", end = "")
        pass
    
    return train_loss
    pass

In [30]:
def eval_process_batches(model, val_loader, optimizer, loss_function, verbose = True ):
    valid_loss = 0.0
    model.eval()
    if verbose:
        print(f"Test data batch process: ", end = "")
        
    for batch_idx, (data, target) in enumerate(val_loader):
        # move to GPU
        if use_cuda:
            data, target = data.cuda(), target.cuda()
        ## update the average validation loss
        # forward pass: compute predicted outputs by passing inputs to the model
        output = model(data)
        # calculate the batch loss
        loss = loss_function(output, target)
        # update average validation loss 
        valid_loss = valid_loss + ((1 / (batch_idx + 1)) * (loss.data - valid_loss))
            
        if batch_idx % 20 == 0:
            if verbose:
                print(f"\t{batch_idx}, {valid_loss}", end = "\n")
            else:
                print(f"\t{batch_idx}, ", end = "")
        pass
    return valid_loss
    pass

In [37]:
import numpy as np
from sklearn.metrics import f1_score

In [46]:
def validate(model, dataloader):
  with torch.no_grad():

    model.eval()
    val_count = 0
    val_correct = 0

    val_labels = []
    val_predicts = []

    for index, (inputs, labels) in enumerate(dataloader):
      inputs = inputs.to(device)
      labels = labels.to(device)

      outs = model(inputs)
      predicts = torch.argmax(outs, 1)

      val_labels.append(labels.cpu())
      val_predicts.append(predicts.cpu())

      val_correct += (predicts == labels).sum().item()
      val_count += inputs.size(0)

    val_acc = (val_correct / val_count)

    val_labels = np.concatenate(val_labels)
    val_predicts = np.concatenate(val_predicts)
    val_f1_score = f1_score(val_labels, val_predicts, average='macro')

  return val_acc, val_f1_score

In [49]:
def train(start_epochs, n_epochs, model, train_loader, val_loader):
    best_acc = 0
    best_f1 = 0
    for epoch in range(start_epochs, n_epochs+1):
        acc_val, f1_val = validate(model1, val_data_loader1)

        print('-' * 70)
        if best_acc < acc_val:
          best_acc = acc_val

        if best_f1 < f1_val:
          best_f1 = f1_val
          print(f"| best score!! | best accuracy {best_acc:8.3f} | best f1 score {best_f1:8.3f}")
          torch.save(model.state_dict(), f"{base_dir}/best.pth")
          print(f"| end of epoch {epoch:3d} | best accuracy {acc_val:8.3f} | best f1 score {f1_val:8.3f}")
          print('-' * 70)
       # initialize variables to monitor training and validation loss
        valid_loss = 0.0
        
        #train model
        train_loss = train_process_batches(model, train_loader, optimizer, loss_function, verbose = False)
        valid_loss = eval_process_batches(model, val_loader, optimizer, loss_function, verbose = True)
        
          
        print(f"\ntrain_loss = {train_loss}")
        print(f"\nvalid_loss = {valid_loss}")
        
    # return trained model
    return model
train(0, 5, model1, train_data_loader1, val_data_loader1)

----------------------------------------------------------------------
| best score!! | best accuracy    0.500 | best f1 score    0.333
| end of epoch   0 | best accuracy    0.500 | best f1 score    0.333
----------------------------------------------------------------------
	0, Test data batch process: 	0, 1.0212078094482422

train_loss = 2.7880709171295166

valid_loss = 0.7271148562431335
----------------------------------------------------------------------
	0, Test data batch process: 	0, 0.9821245670318604

train_loss = 1.4015069007873535

valid_loss = 0.7300966382026672
----------------------------------------------------------------------
	0, Test data batch process: 	0, 0.9641247987747192

train_loss = 0.7312788367271423

valid_loss = 0.719427227973938
----------------------------------------------------------------------
	0, Test data batch process: 	0, 0.9410831928253174

train_loss = 0.7273628115653992

valid_loss = 0.7152036428451538
----------------------------------------

AlexNet(
  (convnet): Sequential(
    (0): Conv2d(3, 96, kernel_size=(11, 11), stride=(4, 4))
    (1): ReLU(inplace=True)
    (2): LocalResponseNorm(5, alpha=0.0001, beta=0.75, k=2)
    (3): MaxPool2d(kernel_size=3, stride=2, padding=0, dilation=1, ceil_mode=False)
    (4): Conv2d(96, 256, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2))
    (5): ReLU(inplace=True)
    (6): LocalResponseNorm(5, alpha=0.0001, beta=0.75, k=2)
    (7): MaxPool2d(kernel_size=3, stride=2, padding=0, dilation=1, ceil_mode=False)
    (8): Conv2d(256, 384, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (9): ReLU(inplace=True)
    (10): LocalResponseNorm(5, alpha=0.0001, beta=0.75, k=2)
    (11): Conv2d(384, 384, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (12): ReLU(inplace=True)
    (13): LocalResponseNorm(5, alpha=0.0001, beta=0.75, k=2)
    (14): Conv2d(384, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (15): ReLU(inplace=True)
    (16): LocalResponseNorm(5, alpha=0.0001, 

In [50]:
model1.load_state_dict(torch.load(
    os.path.join(base_dir, "best.pth")))
model1.eval()
y_true = []
y_pred = []
with torch.no_grad():
    for test_data in test_data_loader1:
        test_images, test_labels = (
            test_data[0].to(device),
            test_data[1].to(device),
        )
        pred = model1(test_images).argmax(dim=1)
        for i in range(len(pred)):
            y_true.append(test_labels[i].item())
            y_pred.append(pred[i].item())

In [51]:
from sklearn.metrics import classification_report
print(classification_report(y_true, y_pred))

              precision    recall  f1-score   support

           0       0.00      0.00      0.00       500
           1       0.50      1.00      0.67       500

    accuracy                           0.50      1000
   macro avg       0.25      0.50      0.33      1000
weighted avg       0.25      0.50      0.33      1000



  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))


In [62]:
from sklearn.metrics import roc_auc_score

roc_auc_score(y_true, y_pred)

0.5

In [52]:
img = Image.open(test_dir + "dogs/dog.1500.jpg")

In [55]:
img = img_transforms1(img).to(device)
img = torch.unsqueeze(img, 0)

In [None]:
model1.eval()
prediction = F.softmax(model1(img), dim = 1)
prediction

In [58]:
prediction = prediction.argmax()
prediction

tensor(1)

In [59]:
labels = ['cats','dogs']
print(labels[prediction])

dogs
