In [1]:
import torch
import torch.nn as nn

In [12]:
class BasicBlock(nn.Module):
    """Basic Block for resnet 18 and resnet 34

    """

    #BasicBlock and BottleNeck block
    #have different output size
    #we use class attribute expansion
    #to distinct
    expansion = 1

    def __init__(self, in_channels, out_channels, stride=1):
        super().__init__()

        #residual function
        self.residual_function = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels * BasicBlock.expansion, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(out_channels * BasicBlock.expansion)
        )

        #shortcut
        self.shortcut = nn.Sequential()

        #the shortcut output dimension is not the same with residual function
        #use 1*1 convolution to match the dimension
        if stride != 1 or in_channels != BasicBlock.expansion * out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels * BasicBlock.expansion, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels * BasicBlock.expansion)
            )

    def forward(self, x):
        return nn.ReLU(inplace=True)(self.residual_function(x) + self.shortcut(x))

class BottleNeck(nn.Module):
    """Residual block for resnet over 50 layers

    """
    expansion = 4
    def __init__(self, in_channels, out_channels, stride=1):
        super().__init__()
        self.residual_function = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=1, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, stride=stride, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels * BottleNeck.expansion, kernel_size=1, bias=False),
            nn.BatchNorm2d(out_channels * BottleNeck.expansion),
        )

        self.shortcut = nn.Sequential()

        if stride != 1 or in_channels != out_channels * BottleNeck.expansion:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels * BottleNeck.expansion, stride=stride, kernel_size=1, bias=False),
                nn.BatchNorm2d(out_channels * BottleNeck.expansion)
            )

    def forward(self, x):
        return nn.ReLU(inplace=True)(self.residual_function(x) + self.shortcut(x))

class ResNet(nn.Module):

    def __init__(self, block, num_block, num_classes=100):
        super().__init__()

        self.in_channels = 64

        self.conv1 = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True))
        #we use a different inputsize than the original paper
        #so conv2_x's stride is 1
        self.conv2_x = self._make_layer(block, 64, num_block[0], 1)
        self.conv3_x = self._make_layer(block, 128, num_block[1], 2)
        self.conv4_x = self._make_layer(block, 256, num_block[2], 2)
        self.conv5_x = self._make_layer(block, 512, num_block[3], 2)
        self.avg_pool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512 * block.expansion, num_classes)

    def _make_layer(self, block, out_channels, num_blocks, stride):
        """make resnet layers(by layer i didnt mean this 'layer' was the
        same as a neuron netowork layer, ex. conv layer), one layer may
        contain more than one residual block

        Args:
            block: block type, basic block or bottle neck block
            out_channels: output depth channel number of this layer
            num_blocks: how many blocks per layer
            stride: the stride of the first block of this layer

        Return:
            return a resnet layer
        """

        # we have num_block blocks per layer, the first block
        # could be 1 or 2, other blocks would always be 1
        strides = [stride] + [1] * (num_blocks - 1)
        layers = []
        for stride in strides:
            layers.append(block(self.in_channels, out_channels, stride))
            self.in_channels = out_channels * block.expansion

        return nn.Sequential(*layers)

    def forward(self, x):
        output = self.conv1(x)
        output = self.conv2_x(output)
        output = self.conv3_x(output)
        output = self.conv4_x(output)
        output = self.conv5_x(output)
        output = self.avg_pool(output)
        output = output.view(output.size(0), -1)
        output = self.fc(output)

        return output

def resnet18():
    """ return a ResNet 18 object
    """
    return ResNet(BasicBlock, [2, 2, 2, 2])

def resnet34():
    """ return a ResNet 34 object
    """
    return ResNet(BasicBlock, [3, 4, 6, 3], num_classes=2)

def resnet50():
    """ return a ResNet 50 object
    """
    return ResNet(BottleNeck, [3, 4, 6, 3])

def resnet101():
    """ return a ResNet 101 object
    """
    return ResNet(BottleNeck, [3, 4, 23, 3])

def resnet152():
    """ return a ResNet 152 object
    """
    return ResNet(BottleNeck, [3, 8, 36, 3])

model = resnet34()

In [13]:
import os
import numpy as np
from PIL import Image
from tqdm import tqdm

X = []
y = []
manipulated_path = './data/manipulated'
original_path = './data/original'
print("processing fake datasets")
for img_name in tqdm(os.listdir(manipulated_path)):
    X.append(np.array(Image.open(f'{manipulated_path}/{img_name}')))
    y.append([1, 0])
print("processing true datasets")
for img_name in tqdm(os.listdir(original_path)):
    X.append(np.array(Image.open(f'{original_path}/{img_name}')))
    y.append([0, 1])
X = np.array(X).swapaxes(3, 1).swapaxes(3, 2)
y = np.array(y)
print(X.shape)
print(y.shape)

processing fake datasets


100%|██████████| 8000/8000 [00:19<00:00, 412.72it/s]


processing true datasets


100%|██████████| 4000/4000 [00:09<00:00, 413.46it/s]


(12000, 3, 299, 299)
(12000, 2)


In [14]:
np.random.seed(42)
indices = np.random.permutation(X.shape[0])
X_shuffled = []
y_shuffled = []
for i in range(X.shape[0]):
    X_shuffled.append(X[indices[i]])
    y_shuffled.append(y[indices[i]])
X_shuffled = np.array(X_shuffled, dtype=np.float32)
y_shuffled = np.array(y_shuffled, dtype=np.float32)
train_X = X_shuffled[:int(X.shape[0] * 0.8)]
train_y = y_shuffled[:int(X.shape[0] * 0.8)]
valid_X = X_shuffled[int(X.shape[0] * 0.8):int(X.shape[0] * 0.9)]
valid_y = y_shuffled[int(X.shape[0] * 0.8):int(X.shape[0] * 0.9)]
test_X = X_shuffled[int(X.shape[0] * 0.9):]
test_y = y_shuffled[int(X.shape[0] * 0.9):]
print(train_X.shape[0])
print(valid_X.shape[0])
print(test_X.shape[0])

9600
1200
1200


In [15]:
import math

def process_epoch(X, y, model, opt=None, batch_size=100):
    if opt:
        model.train()
    else:
        model.eval()

    loss_func = nn.CrossEntropyLoss()
    correct, loss_sum, n_step, n_samples = 0, 0, 0, 0

    iter = math.ceil(X.shape[0] / batch_size)
    for batch in tqdm(range(iter)):
        X_batch = X[batch * batch_size:min((batch + 1) * batch_size, X.shape[0])][:, :, :224, :224]
        y_batch = y[batch * batch_size:min((batch + 1) * batch_size, X.shape[0])]
        if opt:
            opt.zero_grad()
        pred_y = model(torch.tensor(X_batch).cuda())
        loss = loss_func(pred_y, torch.tensor(y_batch).cuda())
        correct += np.sum(np.array(pred_y.cpu().detach().numpy().argmax(axis=1) == y_batch.argmax(axis=1), dtype=np.int64))
        if opt:
            loss.backward()
            opt.step()
        loss_sum += loss.cpu().detach().numpy()
        n_step += 1
        n_samples += X_batch.shape[0]

    return 1 - correct / n_samples, loss_sum / n_step

In [16]:
def train(
        batch_size=5,
        epochs=10,
        optimizer=torch.optim.Adam,
        lr=0.001,
        weight_decay=0.001,
        model=None):

    if(model == None):
        print("Please choose a type of ResNet!")
        return

    opt = optimizer(model.parameters(), lr=lr, weight_decay=weight_decay)

    for epoch in range(epochs):
        print(f"Epoch {epoch}: ")
        train_acc, train_loss = process_epoch(train_X, train_y, model, batch_size=batch_size, opt=opt)
        test_acc, test_loss = process_epoch(test_X, test_y, model, batch_size=batch_size)
        print(f"Train Accuracy: {round(train_acc, 3)} Test Accuracy: {round(test_acc, 3)}")

In [17]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

train(model=model.to(device))

Epoch 0: 


100%|██████████| 1920/1920 [04:24<00:00,  7.26it/s]
100%|██████████| 240/240 [00:09<00:00, 26.39it/s]


Train Accuracy: 0.343 Test Accuracy: 0.322
Epoch 1: 


100%|██████████| 1920/1920 [04:26<00:00,  7.19it/s]
100%|██████████| 240/240 [00:09<00:00, 26.49it/s]


Train Accuracy: 0.332 Test Accuracy: 0.322
Epoch 2: 


100%|██████████| 1920/1920 [04:22<00:00,  7.30it/s]
100%|██████████| 240/240 [00:09<00:00, 26.58it/s]


Train Accuracy: 0.332 Test Accuracy: 0.322
Epoch 3: 


100%|██████████| 1920/1920 [04:22<00:00,  7.31it/s]
100%|██████████| 240/240 [00:08<00:00, 26.72it/s]


Train Accuracy: 0.332 Test Accuracy: 0.322
Epoch 4: 


100%|██████████| 1920/1920 [04:22<00:00,  7.31it/s]
100%|██████████| 240/240 [00:08<00:00, 26.73it/s]


Train Accuracy: 0.332 Test Accuracy: 0.322
Epoch 5: 


100%|██████████| 1920/1920 [04:18<00:00,  7.42it/s]
100%|██████████| 240/240 [00:08<00:00, 27.30it/s]


Train Accuracy: 0.332 Test Accuracy: 0.322
Epoch 6: 


100%|██████████| 1920/1920 [04:17<00:00,  7.45it/s]
100%|██████████| 240/240 [00:08<00:00, 27.36it/s]


Train Accuracy: 0.332 Test Accuracy: 0.322
Epoch 7: 


100%|██████████| 1920/1920 [04:17<00:00,  7.46it/s]
100%|██████████| 240/240 [00:08<00:00, 27.39it/s]


Train Accuracy: 0.332 Test Accuracy: 0.322
Epoch 8: 


100%|██████████| 1920/1920 [04:17<00:00,  7.46it/s]
100%|██████████| 240/240 [00:08<00:00, 27.36it/s]


Train Accuracy: 0.332 Test Accuracy: 0.322
Epoch 9: 


100%|██████████| 1920/1920 [04:17<00:00,  7.45it/s]
100%|██████████| 240/240 [00:08<00:00, 27.41it/s]

Train Accuracy: 0.332 Test Accuracy: 0.322



