In [1]:
#a good example is provided here
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
import numpy as np
from torch.autograd import Variable
from PIL import Image
from os import listdir
import json
import numpy as np
from random import shuffle

# globals 
disp = False

def loadImages(path):
    # return array of images

    imagesList = listdir(path)
    loadedImages = []
    for image in imagesList:
        img = Image.open(path + image)
        loadedImages.append(img)

    return loadedImages

#define prepro transform
'''
from:
https://github.com/pytorch/examples/blob/409a7262dcfa7906a92aeac25ee7d413baa88b67/imagenet/main.py#L108-L113
https://github.com/pytorch/examples/blob/409a7262dcfa7906a92aeac25ee7d413baa88b67/imagenet/main.py#L94-L95
'''
normalize = transforms.Normalize(
   mean=[0.485, 0.456, 0.406],
   std=[0.229, 0.224, 0.225]
)
preprocess = transforms.Compose([
   transforms.Scale(40),    
   transforms.RandomHorizontalFlip(),   
   transforms.RandomCrop(32),
   transforms.ToTensor(),
   normalize
])

#provide path to true and fake images
true_path= "raw/person-1/"
fake_path= "raw/person-2/"

# load images in the dir
true_images = loadImages(true_path)
fake_images = loadImages(fake_path)

#be sure the images are rightly loaded
if disp:
    true_images[0].show()
    fake_images[0].show()

# Now preprocess the image

#define tensors to hold the images in memory
fake_tensors, true_tensors = [], []

for imgs in true_images:
    true_temp = preprocess(imgs)
    true_tensors.append(true_temp)

for imgs in fake_images:
    fake_temp = preprocess(imgs)
    fake_tensors.append(fake_temp)

#shuffle the images
shuffle(true_images)
shuffle(fake_images)

# fakenreal_tensors = fake_tensors + true_tensors
# print(fakenreal_tensors[240])
 
# #be sure the images are properly loaded in memory
print("Total # of true tensors: {}, true images size: {}", len(true_tensors),  true_tensors[0].size())
print("Total # of fake tensors: {}, fake images size: {}", len(fake_tensors), fake_tensors[0].size())
print("True Var Size {}".format(true_tensors[0].size()))
print("Fake Var Size {}".format(fake_tensors[0].size()))

# #load labels file
# labels_file = open('labels.json').read()
# labels = np.fromiter(json.loads(labels_file),int)

# #take labels to tensor
# labels = torch.from_numpy(labels)
# labels = Variable(labels)

# print('labels: ', labels)  

Total # of true tensors: {}, true images size: {} 100 torch.Size([3, 32, 32])
Total # of fake tensors: {}, fake images size: {} 170 torch.Size([1, 32, 32])
True Var Size torch.Size([3, 32, 32])
Fake Var Size torch.Size([1, 32, 32])
labels:  Variable containing:
 0
 1
[torch.LongTensor of size 2]



In [8]:
#Now separate true and fake to training and testing sets
#we'll do 80:20 ratio
X_tr = int(0.8*len(true_tensors))
X_te = len(true_tensors) - X_tr

# allocate tensors memory
train_X = torch.LongTensor(X_tr, true_tensors[0].size(0), true_tensors[0].size(1),
                          true_tensors[0].size(2))
test_X = torch.LongTensor(X_te, true_tensors[0].size(0), true_tensors[0].size(1),
                          true_tensors[0].size(2))

#Now copy tensors over 
train_X = torch.stack(true_tensors[:X_tr], 0)
train_Y = torch.from_numpy(np.array([1] * train_X.size(0)))

#testing set
test_X = torch.stack(true_tensors[(X_tr):], 0)
test_Y = torch.from_numpy(np.array([1]*X_te))
                                      
#check size of slices
print('train_X and train_Y sizes: {} | {}'.format(train_X.size(), train_Y.size()))
print('test_X and test_Y sizes: {} | {}'.format(test_X.size(), test_Y.size()))
# print(train_Y)

train_X and train_Y sizes: torch.Size([80, 3, 32, 32]) | torch.Size([80])
test_X and test_Y sizes: torch.Size([20, 3, 32, 32]) | torch.Size([20])


In [9]:
# 3x3 Convolution
def conv3x3(in_channels, out_channels, stride=1):
    return nn.Conv2d(in_channels, out_channels, kernel_size=3, 
                     stride=stride, padding=1, bias=False)

# Residual Block
class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1, downsample=None):
        super(ResidualBlock, self).__init__()
        self.conv1 = conv3x3(in_channels, out_channels, stride)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = conv3x3(out_channels, out_channels)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.downsample = downsample
        
    def forward(self, x):
        residual = x
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
        out = self.conv2(out)
        out = self.bn2(out)
        if self.downsample:
            residual = self.downsample(x)
        out += residual
        out = self.relu(out)
        return out


# ResNet Module
class ResNet(nn.Module):
    def __init__(self, block, layers, num_classes=2):
        super(ResNet, self).__init__()
        self.in_channels = 16
        self.conv = conv3x3(3, 16)
        self.bn = nn.BatchNorm2d(16)
        self.relu = nn.ReLU(inplace=True)
        self.layer1 = self.make_layer(block, 16, layers[0])
        self.layer2 = self.make_layer(block, 32, layers[0], 2)
        self.layer3 = self.make_layer(block, 64, layers[1], 2)
        self.avg_pool = nn.AvgPool2d(8)
        self.fc = nn.Linear(64, num_classes)
        
    def make_layer(self, block, out_channels, blocks, stride=1):
        downsample = None
        if (stride != 1) or (self.in_channels != out_channels):
            downsample = nn.Sequential(
                conv3x3(self.in_channels, out_channels, stride=stride),
                nn.BatchNorm2d(out_channels))
        layers = []
        layers.append(block(self.in_channels, out_channels, stride, downsample))
        self.in_channels = out_channels
        for i in range(1, blocks):
            layers.append(block(out_channels, out_channels))
        return nn.Sequential(*layers)
    
    def forward(self, x):
        out = self.conv(x)
        out = self.bn(out)
        out = self.relu(out)
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.avg_pool(out)
        out = out.view(out.size(0), -1)
        out = self.fc(out)
        return out

In [None]:
resnet = ResNet(ResidualBlock, [3, 3, 3])
resnet.cuda()

# Loss and Optimizer
criterion = nn.CrossEntropyLoss()
lr = 0.001
num_iter = 10
optimizer = torch.optim.Adam(resnet.parameters(), lr=lr)

# Training 
for iter in range(num_iter):
    images = Variable(train_X.cuda())
    labels = Variable(train_Y.cuda())
    
    print(type(labels.data))
        
    # Forward + Backward + Optimize
    optimizer.zero_grad()
    outputs = resnet(images)
    print(outputs.size(), labels.size())
    loss = criterion(outputs, labels)
    print('loss: ', loss)
    loss.backward()
    optimizer.step()

    if (iter+1) % 10 == 0:
        print ("Iter [%d/%d] Loss: %.4f" %(iter+1, 500, loss.data[0]))

    # Decaying Learning Rate
    if (iter+1) % 20 == 0:
        lr /= 3
        optimizer = torch.optim.Adam(resnet.parameters(), lr=lr) 
        
# Test
correct = 0
total = 0
for images, labels in test_loader:
    images = Variable(images.cuda())
    outputs = resnet(images)
    _, predicted = torch.max(outputs.data, 1)
    total += labels.size(0)
    correct += (predicted.cpu() == labels).sum()

print('Accuracy of the model on the test images: %d %%' % (100 * correct / total))

# Save the Model
torch.save(resnet.state_dict(), 'resnet.pkl')