In [2]:
# import necessary packages
import os

from tqdm import tqdm

import torch as t
import numpy as np
from PIL import Image
import matplotlib.pyplot as plt
import torch.utils.data as DT
from torchvision import transforms as TM
import torchvision

In [3]:
# data
data_path = './data/cat-and-dog/'
train_path = os.path.join(data_path, 'training_set')
test_path = os.path.join(data_path, 'test_set')
train_img = [os.path.join(train_path, f) for f in os.listdir(train_path)]
test_img = [os.path.join(test_path, f) for f in os.listdir(test_path)]


# pytorch Data Loader
class CatDogDataLoader(DT.Dataset):
    def __init__(self, train=True):
        if train:
            self.imgs = train_img
        else:
            self.imgs = test_img
        self.transformer = TM.Compose(
            [
                TM.Resize(224),
                TM.CenterCrop(224),
                TM.ToTensor(),
                TM.Normalize(mean=[.5, .5, .5], std=[.5, .5, .5])
            ]
        )
    
    def __getitem__(self, item):
        img_path = self.imgs[item]
        img = Image.open(img_path)
        img_array = self.transformer(img)
        img_label = 1 if 'dog' in img_path.split('.')[-3].split('/')[-1] else 0
        img_label = t.tensor(img_label).long()
        return img_array, img_label

    def __len__(self):
        return len(self.imgs)

# demo
def demo(img_path):
    print('image name is {}'.format(img_path))
    img = Image.open(img_path)
    img.show()

In [4]:
# model: ResNet
class BasicBlock(t.nn.Module):
    def __init__(self, block_in_channels, block_out_channels, stride=1, shortcut=None):
        super(BasicBlock, self).__init__()
        self.block_shortcut = shortcut
        self.block_residual = t.nn.Sequential(
            t.nn.Conv2d(in_channels=block_in_channels, out_channels=block_out_channels, kernel_size=3, stride=stride,
                        padding=1),
            t.nn.BatchNorm2d(num_features=block_out_channels),
            t.nn.ReLU(),
            t.nn.Conv2d(in_channels=block_out_channels, out_channels=block_out_channels, kernel_size=3, stride=1,
                        padding=1),
            t.nn.BatchNorm2d(num_features=block_out_channels)
        )

    def forward(self, x):
        residual = self.block_residual(x)
        shortcut = x if self.block_shortcut is None else self.block_shortcut(x)
        return t.nn.functional.relu(residual + shortcut)


class ResNet34(t.nn.Module):
    def __init__(self, in_channels, out_classes=2, shortcut=None):
        super(ResNet34, self).__init__()
        self.in_channels = in_channels
        self.out_classes = out_classes
        self.shortcut = shortcut
        self.pre_conv = t.nn.Sequential(
            t.nn.Conv2d(in_channels=self.in_channels, out_channels=64, kernel_size=7, stride=2, padding=3),
            t.nn.BatchNorm2d(64),
            t.nn.ReLU(),
            t.nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        )
        self.layer1 = self.make_layer(in_channels=64, out_channels=128, num_of_blocks=3)
        self.layer2 = self.make_layer(in_channels=128, out_channels=256, num_of_blocks=4, stride=2)
        self.layer3 = self.make_layer(in_channels=256, out_channels=512, num_of_blocks=6, stride=2)
        self.layer4 = self.make_layer(in_channels=512, out_channels=512, num_of_blocks=3, stride=2)
        self.out_pooling = t.nn.AvgPool2d(kernel_size=7)
        self.out_fc = t.nn.Linear(512, self.out_classes)

    def make_layer(self, in_channels, out_channels, num_of_blocks, stride=1):
        assert num_of_blocks >= 2
        shortcut = t.nn.Sequential(
            t.nn.Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=1, stride=stride),
            t.nn.BatchNorm2d(num_features=out_channels)
        )
        layers = []
        layers.append(BasicBlock(block_in_channels=in_channels, block_out_channels=out_channels, stride=stride,
                                 shortcut=shortcut))
        for cnt in range(1, num_of_blocks):
            layers.append(BasicBlock(block_in_channels=out_channels, block_out_channels=out_channels))
        return t.nn.Sequential(*layers)

    def forward(self, x):
        x = self.pre_conv(x)
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        x = self.out_pooling(x)
        x = x.view(x.size()[0], -1)
        y = self.out_fc(x)
        y = t.nn.functional.softmax(y, dim=-1)
        return y

In [None]:
## prepare for training
# define hyperparameters
batch_size = 64
max_epoch = 150
learning_rate = 0.001
weight_decay = 1e-4
eval_step = 1
gpu = True

# define data
train_data = DT.DataLoader(
    dataset=CatDogDataLoader(train=True),
    batch_size=batch_size,
    shuffle=True
)

test_data = DT.DataLoader(
    dataset=CatDogDataLoader(train=False),
    batch_size=batch_size,
    shuffle=True
)

# define model
model = ResNet34(in_channels=3)
if gpu:
    model = model.cuda()

# define optimizer
optimizer = t.optim.Adam(
    params=model.parameters(),
    lr=learning_rate,
    weight_decay=weight_decay
)

# define loss
criterion = t.nn.CrossEntropyLoss()

# train
print('*' * 10 + ' Start Training ' + '*' * 10)
for epoch in range(max_epoch):
    print('Epoch ', epoch)
    epoch_loss = 0
    cnt = 0
    for data in tqdm(train_data, position=0):
        X, y = data
        if gpu:
            X = X.cuda()
            y = y.cuda()
        optimizer.zero_grad()
        pred = model(X)
        loss = criterion(pred, y)
        loss.backward()
        optimizer.step()
        epoch_loss += loss
        cnt += 1
    print('loss at epoch {} is {}'.format(epoch, epoch_loss / cnt))

    if (epoch + 1) % eval_step == 0:
        acc = 0
        cnt = 0
        for data in tqdm(test_data, position=0):
            X, y = data
            if gpu:
                X = X.cuda()
                y = y.cuda()
            pred = model(X)
            pred = t.argmax(pred, dim=-1)
            acc += (pred == y).sum().float() / pred.shape[0]
            cnt += 1
        print('accuracy is {}'.format(acc.data / cnt))