In [1]:
%%writefile /tmp/LP_detect.py
import argparse
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
import time
from torchvision import datasets, transforms
from torch.autograd import Variable
from LogGabor import LogGabor

# Init pytorch arguments
parser = argparse.ArgumentParser(description='PyTorch MNIST detector')
parser.add_argument('--batch_size', type=int, default=100, metavar='N',
                   help='input batch size for training (default: 100)')
parser.add_argument('--eval_batch_size', type=int, default=1000, metavar='N',
                   help='input batch size for evaluation (default: 1000)')
parser.add_argument('--epochs', type=int, default=10, metavar='N',
                   help='number of training epochs (default: 10)')
parser.add_argument('--lr', type=float, default=0.01, metavar='LR',
                   help='learning rate (default: 0.01)')
#parser.add_argument('--momentum', type=float, default=0.5, metavar='M',
#                   help='SGM momentum for training (default: 0.5)')
parser.add_argument('--not_cuda', action='store_true', default=False,
                   help='Disables use of GPU during training (default: False)')
parser.add_argument('--seed', type=int, default=1, metavar='S',
                   help='random number seed (default: 1)')
args = parser.parse_args()
args.cuda = not args.not_cuda and torch.cuda.is_available() # check if cuda (GPU) processing is available

torch.manual_seed(args.seed)
if args.cuda:
    torch.cuda.manual_seed(args.seed)

kwargs = {'num_workers': 1, 'pin_memory': True} if args.cuda else {} # set some cuda parameters if cuda is activated

# Defining how the training data will be loaded
train_loader = torch.utils.data.DataLoader(
    datasets.MNIST('/tmp/data', 
                   train=True, 
                   download=True,
                   transform=transforms.Compose([
                       transforms.ToTensor(),
                       transforms.Normalize((0.1307,), (0.3081,))])),
                batch_size=args.batch_size, 
                shuffle=True, 
                **kwargs)

# Defining how the test data will be loaded
test_loader = torch.utils.data.DataLoader(
    datasets.MNIST('/tmp/data', 
                   train=False, 
                   transform=transforms.Compose([
                       transforms.ToTensor(),
                       transforms.Normalize((0.1307,), (0.3081,))])),
                batch_size=args.eval_batch_size, 
                shuffle=True, 
                **kwargs)

# Defining the neural network itself
class Net(nn.Module):
    # Defining the layers contained within the network
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv1d(1, 20, kernel_size=5)  # First convolutional layer        
        self.conv2 = nn.Conv1d(20, 50, kernel_size=5) # Second convolutional layer
        self.conv2_drop = nn.Dropout()                # Dropout layer (randomly zeroes some of the input elements)
        self.fc1 = nn.Linear(5850, 50)                # First linear layer (applies a linear transformation)
        self.fc2 = nn.Linear(50, 2)                   # Second linear layer
    
    # Defining the actions that'll be submitted to the network
    def forward(self, x):    
        x = F.relu(F.max_pool1d(self.conv1(x),2))                   # F.relu applies the rectified linear unit function element-wise
        x = F.relu(F.max_pool1d(self.conv2_drop(self.conv2(x)), 2)) # F.max_pool1d applies 1d max pooling over an input signal
        x = x.view(-1, 5850)
        x = F.relu(self.fc1(x))
        x = F.dropout(x, training=self.training)
        x = self.fc2(x)
        return x
    
model = Net()
print('cuda:', args.cuda)
if args.cuda:
    model.cuda()
    
# Defining the optimizer that'll train the network
optimizer = optim.Adam(model.parameters(),
                      lr=args.lr)
optimizer.zero_grad()

def mnist_reshape_128(x, i_off=0, j_off=0):
    # Function that take a 28x28 pixels image and integrate it inside a blank 128*128 image
    # on coordinates defined by the i_off and j_off arguments
    N_pix = 28
    assert x.shape == (N_pix,N_pix)
    x_translate = np.zeros((N_pix*(128/N_pix), N_pix*(128/N_pix)))
    x_translate[(N_pix+22+i_off):(2*N_pix+22+i_off), (N_pix+22+j_off):(2*N_pix+22+j_off)] = x[2,-1]
    return x_translate

def minmax(value, border):
    # Function that take a value and make sure it isn't superior
    # to a value defined by the border argument (or inferior of its inverse)
    value = max(value, -border)
    value = min(value, border)
    return value

def vectorization(N_theta, N_orient, N_scale, N_phase, N_X, N_Y):
    # Function that applies the LogPolar filter on an image, decreasing its resolution
    # with the excentricity compared to its center.
    # N_theta, N_orient, N_scale and N_phase define the filter shape
    phi = np.zeros((N_theta, N_orient, N_scale, N_phase, N_X*N_Y))
    parameterfile = 'https://raw.githubusercontent.com/bicv/LogGabor/master/default_param.py'
    lg = LogGabor(parameterfile)
    lg.set_size((N_X, N_Y))
    params= {'sf_0':.1, 'B_sf': lg.pe.B_sf, 'theta':np.pi* 5 / 7., 'B_theta': lg.pe.B_theta}
    phase = np.pi/4
    edge = lg.normalize(lg.invert(lg.loggabor(N_X/3, 3*N_Y/4, **params)*np.exp(-1j*phase)))
    
    for i_theta in range(N_theta):
        for i_orient in range(N_orient):
            for i_scale in range(N_scale):
                ecc =  .5**(N_scale - i_scale)
                r = np.sqrt(N_X**2+N_Y**2) / 2 * ecc # radius
                sf_0 = 0.5 * 0.03 / ecc
                x = N_X/2 + r * np.cos((i_orient+(i_scale % 2)*.5)*np.pi*2 / N_orient)
                y = N_Y/2 + r * np.sin((i_orient+(i_scale % 2)*.5)*np.pi*2 / N_orient)            
                for i_phase in range(N_phase):
                    params= {'sf_0':sf_0, 'B_sf': lg.pe.B_sf, 'theta':i_theta*np.pi/N_theta, 'B_theta': np.pi/N_theta/2}
                    phase = i_phase * np.pi/2
                    phi[i_theta, i_orient, i_scale, i_phase, :] = lg.normalize(lg.invert(lg.loggabor(x, y, **params)*np.exp(-1j*phase))).ravel()            
    return phi

def train(epoch):
    model.train()
    
    t0 = time.time()
    
    N_theta, N_orient, N_scale, N_phase, N_X, N_Y = 6, 8, 5, 2, 128, 128
    phi = vectorization(N_theta, N_orient, N_scale, N_phase, N_X, N_Y)
    phi_vector = phi.reshape((N_theta*N_orient*N_scale*N_phase, N_X*N_Y))
    
    for batch_idx, (data, target) in enumerate(train_loader):
        if args.cuda: data = data.cuda()     
        data = Variable(data)
        INPUT = np.zeros((data.shape[0], 1, 480))
        LABEL = np.zeros((data.shape[0], 2))
        
        for idx in range(args.batch_size):
            i_off, j_off = minmax(int(np.random.randn()*15), 50), minmax(int(np.random.randn()*15), 50)
            LABEL[idx,:] = (i_off, j_off)
            
            data_reshaped = mnist_reshape_128(data[idx,0,:], i_off, j_off)          
            v = phi_vector @ np.ravel(data_reshaped) # 1d vector of size 480
        
            INPUT[idx,0,:] = v
        
        
        
        INPUT = torch.FloatTensor(INPUT)        
        INPUT = Variable(INPUT)
        
        LABEL = torch.FloatTensor(LABEL)
        LABEL = Variable(LABEL)
    
        if args.cuda: INPUT, LABEL = INPUT.cuda(), LABEL.cuda()

        #optimizer.zero_grad()
        OUTPUT = model(INPUT)
        loss = F.mse_loss(OUTPUT, LABEL, size_average=True)
        loss.backward()  # computes the gradient derivative, necessary for the next line
        optimizer.step() # update the learned parameters
        print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.4f}, elapsed time: {:.2f} mn'.format(epoch, 
                                                                       batch_idx * len(data), 
                                                                       len(train_loader.dataset),
                                                                       100. * batch_idx / len(train_loader), 
                                                                       loss.data[0],
                                                                       (time.time() - t0)/60))

        
def eval(test_loader=test_loader):
    model.eval()
    
    N_theta, N_orient, N_scale, N_phase, N_X, N_Y = 6, 8, 5, 2, 128, 128
    phi = vectorization(N_theta, N_orient, N_scale, N_phase, N_X, N_Y)
    phi_vector = phi.reshape((N_theta*N_orient*N_scale*N_phase, N_X*N_Y))
    
    test_loss, correct = 0, 0
    
    for batch_idx, (data, target) in enumerate(test_loader):
        if args.cuda: data = data.cuda()
        data= Variable(data, volatile=True)
        INPUT = np.zeros((data.shape[0], 1, 480))
        LABEL = np.zeros((data.shape[0], 2))
        
        for idx in range(args.eval_batch_size):
            i_off, j_off = minmax(int(np.random.randn()*15), 50), minmax(int(np.random.randn()*15), 50)
            LABEL[idx,:] = (i_off, j_off)
            
            data_reshaped = mnist_reshape_128(data[idx,0,:], i_off, j_off)
            v = phi_vector @ np.ravel(data_reshaped)
            INPUT[idx,0,:] = v
        
        if args.cuda: INPUT, LABEL = INPUT.cuda(), LABEL.cuda()
        INPUT = torch.FloatTensor(INPUT)
        INPUT = Variable(INPUT)
        
        LABEL = torch.FloatTensor(LABEL)
        LABEL = Variable(LABEL)
            
        OUTPUT = model(INPUT)
        test_loss += F.mse_loss(OUTPUT, LABEL, size_average=False).data[0]
        pred = OUTPUT.data[1]
        correct += pred.eq(LABEL.data.view_as(pred)).cpu().sum()
        
    test_loss /= len(test_loader.dataset)
    print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(test_loss, 
                                                                                 correct, 
                                                                                 len(test_loader.dataset),
                                                                                 100. * correct / len(test_loader.dataset)))

Overwriting /tmp/LP_detect.py


In [2]:
%run /tmp/LP_detect.py --batch_size=100 --epochs=1

cuda: True


In [None]:
import os
import time

path = "MNIST_detector.pt"

if os.path.isfile(path):
    print('Loading file...')
    model.load_state_dict(torch.load(path))
else:
    print('Training model...')
    t0 = time.time()
    for epoch in range(1, args.epochs+1):
        train(epoch)
    torch.save(model.state_dict(), path)
    print('Done in', time.time()-t0, 'seconds')

Training model...




