In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torch.utils.data import sampler
import torchvision.transforms as T
import torch.nn.functional as F

from operator import itemgetter
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from skimage import transform
from skimage import filters
from skimage import color
from PIL import Image
import numpy as np
import imageio
import skimage
import glob
import copy

In [None]:
dtype = torch.float32 
device = torch.device('cpu')

In [None]:
def load_pics():
    
    X = []
    Y = []
    test_set_images = []
    test_set_truths = []
    train_set_images = []
    train_set_truths = []
    num_falses_per_true = 4
    f = open('micheI_sgs4.txt')
       
    counter = 0
    for idx, l in enumerate(f):

        l = l.strip().split(' ')
        name, y, x, r = l[0], int(l[1]), int(l[2]), int(l[3])
        pic = imageio.imread('SamsungGalaxyS4/' + name + '.jpg')
        h, w, c = pic.shape
        
        if idx > 907: 
            scale_factor = 160.0/h
            pic_small = skimage.transform.resize(pic, (160,90,3))
            new_r = int(r * scale_factor)
            new_x = int(x * scale_factor)
            new_y = int(y * scale_factor)
            test_set_images.append(pic_small)
            test_set_truths.append((new_r, new_x, new_y))
            
        if idx < 778:
            scale_factor = 160.0/h
            pic_small = skimage.transform.resize(pic, (160,90,3))
            new_r = int(r * scale_factor)
            new_x = int(x * scale_factor)
            new_y = int(y * scale_factor)
            train_set_images.append(pic_small)
            train_set_truths.append((new_r, new_x, new_y))
            
        iris = pic[x-r:x+r,y-r:y+r,:]
        iris_scale = skimage.transform.resize(iris, (32,32,3))  
        X.append(iris_scale)
        Y.append(1)
       
        for _ in range(num_falses_per_true):
            corner_x = np.random.randint(0, h - 2*r)
            corner_y = np.random.randint(0, w - 2*r)
            while abs(corner_x - (x-r)) < r and abs(corner_y - (y-r)) < r:
                corner_x = np.random.randint(0, h - r)
                corner_y = np.random.randint(0, w - r)
        
            not_iris = pic[corner_x:corner_x+2*r, corner_y:corner_y+2*r,:]
            not_iris_scale = skimage.transform.resize(not_iris, (32,32,3))
        
            X.append(not_iris_scale)
            Y.append(0)
        
        counter += 1
        if counter % 10 == 0:
            print(counter)
           
        if counter % 100 == 0:
            plt.imshow(pic)
            plt.show()
            plt.imshow(iris_scale)
            plt.show()
            plt.imshow(not_iris_scale)
            plt.show()
    
    
    return (train_set_images, train_set_truths, test_set_images, test_set_truths, X, Y)

In [None]:
def gen_sets(X, Y, num_new = 0):
    
    # Convert to np-arrays
    X_np = np.array(X)
    Y_np = np.array(Y)
    print("Gen Sets: Original Shapes")
    print(X_np.shape)
    print(Y_np.shape)
    
    # Reshape to Correect Tensor Format
    Xt = X_np.reshape((X_np.shape[0], 1, X_np.shape[3], X_np.shape[1], X_np.shape[2]))
    Yt = Y_np.reshape(Y_np.shape[0], 1, 1)
    print("Gen Sets: Reshaped Shapes")
    print(Xt.shape)
    print(Yt.shape)
    
    # Train-Val-Test Partitioning
    N = Xt.shape[0]
    train = int(6*(N-num_new)/10) + num_new
    val = int(7*(N-num_new)/10) + num_new
    X_train = Xt[:train]
    X_val = Xt[train:val]
    X_test = Xt[val:]
    y_train = Yt[:train]
    y_val = Yt[train:val]
    y_test = Yt[val:]

    # Convert to Torch Tensors
    X_train = torch.tensor(X_train).type(torch.FloatTensor)
    y_train = torch.tensor(y_train)
    X_val = torch.tensor(X_val).type(torch.FloatTensor)
    y_val = torch.tensor(y_val)
    X_test = torch.tensor(X_test).type(torch.FloatTensor)
    y_test = torch.tensor(y_test)

    # Report Shapes
    print(X_train.shape)
    print(y_train.shape)
    print(X_val.shape)
    print(y_val.shape)
    print(X_test.shape)
    print(y_test.shape)
    
    return (X_train, y_train, X_val, y_val, X_test, y_test)

In [None]:
def check_accuracy(mode, model, verbose=False):
            
    if mode == 'val':
        X_sel = X_val
        y_sel = y_val
    elif mode == 'train':
        X_sel = X_train
        y_sel = y_train
    else:
        X_sel = X_test
        y_sel = y_test
    
    num_correct = 0
    num_samples = 0
    model.eval()
    with torch.no_grad():
        for t in range(X_sel.shape[0]):
            
            x = X_sel[t]
            y = y_sel[t]
            
            x = x.to(device=device, dtype=dtype)
            y = y.to(device=device, dtype=dtype)
            
            preds = 1 if model(x).item() > 0.5 else 0
            num_correct += (preds == y.item())
            if verbose and y.item() != preds:
                plt.imshow(np.array(x.detach())[0].reshape((32,32,3)))
                plt.show()
            num_samples += 1
            
        acc = float(num_correct) / num_samples
        print('Got %d / %d correct (%.2f) for mode %s' % (num_correct, num_samples, 100 * acc, mode))

In [None]:
def train(model, optimizer, epochs=20):
    
    model = model.to(device=device)
    for e in range(epochs):
        for t in range(X_train.shape[0]):
            model.train() 
            
            x = X_train[t]
            y = y_train[t]
            
            x = x.to(device=device, dtype=dtype)
            y = y.to(device=device, dtype=dtype)
            
            scores = model(x)
            loss = F.binary_cross_entropy(scores, y)
            optimizer.zero_grad()

            loss.backward()
            optimizer.step()

            if t % 500 == 0:
                print('Iteration %d, loss = %.4f' % (t, loss.item()))
                print() 
        
        print('epoch %d' % (e))
        check_accuracy('train', model)
        check_accuracy('val', model)
        print() 

In [None]:
def flatten(x):
    return x.view(x.shape[0] , -1)

In [None]:
class Flatten(nn.Module):
    def forward(self, x):
        return flatten(x)

In [None]:
def train_model():
    
    channel_1 = 32
    channel_2 = 16

    model = nn.Sequential(
        nn.Conv2d(3, channel_1, kernel_size=5, padding=0, stride=1),
        nn.ReLU(),
        nn.MaxPool2d(kernel_size=2, stride=2, padding=0),
        nn.Conv2d(channel_1, channel_2, 3, padding=1),
        nn.ReLU(),
        nn.MaxPool2d(kernel_size=2, stride=2, padding=0),
        Flatten(),
        nn.Linear(784, 100),
        nn.ReLU(),
        nn.Linear(100, 1),
        nn.Sigmoid()
    )

    optimizer = optim.SGD(model.parameters(), lr=1e-3, momentum=0.9, nesterov=True)
    train(model, optimizer, epochs=20)
    return model

In [None]:
def evaluation_gaussian(num_sampled):
    
    dist = []

    print("Began Iterations")
    for image_idx, pic_small in enumerate(test_set_images):

        if (image_idx % 100 == 0):
            print("Processing Image Index: %d" % image_idx);
        (new_r, new_x, new_y) = test_set_truths[image_idx]
        
        scores = []
        for sampling in range(num_sampled):
            mean = [80 - new_r, 45-new_r]
            cov = [[new_r, 0], [0, new_r]]
            i, j = np.random.multivariate_normal(mean, cov)
            i = int(i)
            j = int(j)
            if i >= 0 and i < 160 - 2*new_r and j >= 0 and j < 90 - 2*new_r:
                model_in = skimage.transform.resize(pic_small[i:i+2*new_r,j:j+2*new_r,:], (32,32,3))
                score = model(torch.tensor(model_in.reshape((1, 3, 32, 32))).type(torch.FloatTensor))
                scores.append((i,j,score))
            
        c_xy = max(scores,key=itemgetter(2))
        c_x, c_y = c_xy[0], c_xy[1]
        dist.append(np.sqrt((c_y - (new_y - new_r))**2 + (c_x - (new_x - new_r))**2))
    
    return dist

In [None]:
def evaluation_uniform(num_sampled):
    
    dist = []
    print("Began Iterations")
    for image_idx, pic_small in enumerate(test_set_images):

        if (image_idx % 100 == 0):
            print("Processing Image Index: %d" % image_idx);
        (new_r, new_x, new_y) = test_set_truths[image_idx]
        
        scores = []
        for sampling in range(num_sampled):
            i = np.random.randint(0, 160 - 2*new_r)
            j = np.random.randint(0, 90 - 2*new_r)
            model_in = skimage.transform.resize(pic_small[i:i+2*new_r,j:j+2*new_r,:], (32,32,3))
            score = model(torch.tensor(model_in.reshape((1, 3, 32, 32))).type(torch.FloatTensor))
            scores.append((i,j,score))
            
        c_xy = max(scores,key=itemgetter(2))
        c_x, c_y = c_xy[0], c_xy[1]
        dist.append(np.sqrt((c_y - (new_y - new_r))**2 + (c_x - (new_x - new_r))**2))
    
    return dist

In [None]:
def evaluation_test():
    
    dist = []
    print("Began Iterations")
    for image_idx, pic_small in enumerate(test_set_images):

        print("Processing Image Index: %d" % image_idx);
        (new_r, new_x, new_y) = test_set_truths[image_idx]
        scores = np.zeros((160-2*new_r,90-2*new_r))
        for ic, i in enumerate(range(160-2*new_r)):
            for jc, j in enumerate(range(90-2*new_r)):
                model_in = skimage.transform.resize(pic_small[i:i+2*new_r,j:j+2*new_r,:], (32,32,3))
                score = model(torch.tensor(model_in.reshape((1, 3, 32, 32))).type(torch.FloatTensor))
                scores[ic,jc] = score.item()

        c_x, c_y = np.unravel_index(np.argmax(scores), scores.shape)
        dist.append(np.sqrt((c_y - (new_y - new_r))**2 + (c_x - (new_x - new_r))**2))

        if image_idx % 1 == 0:
            plt.imshow(scores)
            fig, ax = plt.subplots(1)
            plt.imshow(pic_small)
            rect_true = patches.Rectangle((new_y-new_r,new_x-new_r),2*new_r,2*new_r, linewidth=6, edgecolor='r', facecolor='none')
            rect_pred = patches.Rectangle((c_y, c_x), 2*new_r, 2*new_r, linewidth=1, edgecolor='b', facecolor='none')
            ax.add_patch(rect_true)
            ax.add_patch(rect_pred)
            plt.show()

    return dist

In [None]:
def evaluation_train():
    
    dist = []
    num_new = 0
    print("Began Iterations")
    for image_idx, pic_small in enumerate(train_set_images):

        print("Processing Image Index: %d" % image_idx);
        (new_r, new_x, new_y) = train_set_truths[image_idx]
        
        scores = []
        for sampling in range(500):
            i = np.random.randint(0, 160 - 2*new_r)
            j = np.random.randint(0, 90 - 2*new_r)
            model_in = skimage.transform.resize(pic_small[i:i+2*new_r,j:j+2*new_r,:], (32,32,3))
            score = model(torch.tensor(model_in.reshape((1, 3, 32, 32))).type(torch.FloatTensor))
            scores.append((i,j,score))
            
        c_xy = max(scores,key=itemgetter(2))
        c_x, c_y = c_xy[0], c_xy[1]
        dist.append(np.sqrt((c_y - (new_y - new_r))**2 + (c_x - (new_x - new_r))**2))

        if dist[-1] > 5.0:
            num_new = num_new + 1
            print("Classified Wrong (Distance %f)" % float(dist[-1]))
            predicted_image = skimage.transform.resize(pic_small[c_x:c_x+2*new_r,c_y:c_y+2*new_r,:], (32,32,3))
            X.insert(0, predicted_image)
            Y.insert(0, 0)
            
        if image_idx % 1 == 0:
            fig, ax = plt.subplots(1)
            plt.imshow(pic_small)
            rect_true = patches.Rectangle((new_y-new_r,new_x-new_r),2*new_r,2*new_r, linewidth=6, edgecolor='r', facecolor='none')
            rect_pred = patches.Rectangle((c_y, c_x), 2*new_r, 2*new_r, linewidth=1, edgecolor='b', facecolor='none')
            ax.add_patch(rect_true)
            ax.add_patch(rect_pred)
            plt.show()

    return (num_new, dist)

In [None]:
# Load Initial Datasets
train_set_images, train_set_truths, test_set_images, test_set_truths, X_og, Y_og = load_pics()

In [None]:
# Train the Model
X_train, y_train, X_val, y_val, X_test, y_test = gen_sets(X_og, Y_og, 0)
model = train_model()
check_accuracy('test', model, False)

In [None]:
# Update the Datasets with New Negative Examples
num_new, dist = evaluation_test()

In [None]:
plt.hist(np.array(dist))
plt.title('Training Set Model Analysis')
plt.xlabel('Euclidean Distances to Ground-Truth Corner')
plt.ylabel('Frequency')
plt.show()
print(num_new)
print(dist)

In [None]:
# Retrain Model
print(num_new)
X_train, y_train, X_val, y_val, X_test, y_test = gen_sets(X, Y, num_new)
model = train_model()
check_accuracy('test', model, False)

In [None]:
# Analyze on Test Set
dist = evaluation_test()
dist_uni = evaluation_uniform(1000)
dist_gauss = evaluation_gaussian(1000)

In [None]:
plt.hist(np.array(dist))
plt.title('Test Set Model Analysis')
plt.xlabel('Euclidean Distances to Ground-Truth Corner')
plt.ylabel('Frequency')
plt.show()
print(num_new)
print(dist)
print(dist_uni)
print(dist_gauss)