In [12]:
from glob import glob
import os
from time import time
import zipfile
import shutil
import numpy as np
import matplotlib.pyplot as plt
from scipy import ndimage
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torch.autograd import Variable

### Prepare data
It unzips *ballpatches_scaled.zip* and creates training and validation sets.

<span style="color:red">It's not necessary to run this, just unpack data.7z to get a subset of validation data.</span>

In [None]:
validation_matches = ['34678777f7369acd1dcc4ccdaf1945ac', 
                      'ab6f78be30ce517d61422f41118c2eb4',
                      '648b5d642ffc96cc23693b0bbe9c6cd7',
                      'c9a72e59e447a605014c933547c4ecc7']
os.mkdir('scaled')
os.chdir('scaled')
os.mkdir('training')
os.mkdir('validation')
os.chdir('training')
myzip = zipfile.ZipFile('../../ballpatches_scaled.zip', 'r')
myzip.extractall()
for match in validation_matches:
    shutil.move(match, '../validation/%s' % match)
os.chdir('../..')

### Constants

In [13]:
image_width = 224
pixels_count = image_width**2
input_channels = 3
output_channels = 1
cuda = torch.cuda.is_available()

### Parameters that are same for all the models

In [3]:
kernel_size = 17
momentum = 0.1

### Dataset

In [14]:
class SyntheticDataset(Dataset):
	def __init__(self, setname):
		self.filenames = glob('scaled/%s/*/*.png' % setname)
	def __len__(self):
		return len(self.filenames)
	def __getitem__(self, idx):
		data = ndimage.imread(self.filenames[idx], flatten=False)
		data = np.array(data, dtype=np.float)
		data = np.swapaxes(data, 0, 2)
		array = self.filenames[idx].split('_') # read correct ball coordinates that are written in file name
		x = float(array[-3])
		y = float(array[-2])
		target = torch.FloatTensor([x, y])
		return data, target

### Model

In [41]:
class Net(nn.Module):
	def __init__(self, hidden_layers, hidden_channels):
		global cuda
		super(Net, self).__init__()
		self.layers = []
		self.drops = []
		self.layers.append(nn.Conv2d(input_channels,  hidden_channels, kernel_size=kernel_size, padding = (kernel_size - 1)//2))
		self.drops.append(nn.Dropout2d(p = 0.1))
		if cuda:
			self.layers[-1] = self.layers[-1].cuda()
			self.drops[-1] = self.drops[-1].cuda()
		for i in range(hidden_layers - 2):
			self.layers.append(nn.Conv2d(hidden_channels,  hidden_channels, kernel_size=kernel_size, padding = (kernel_size - 1)//2))
			self.drops.append(nn.Dropout2d(p = 0.1))
			if cuda:
				self.layers[-1] = self.layers[-1].cuda()
				self.drops[-1] = self.drops[-1].cuda()
		self.last_layer = nn.Conv2d(hidden_channels, output_channels, kernel_size=kernel_size, padding = (kernel_size - 1)//2)
		self.indices = Variable(torch.from_numpy(np.arange(image_width, dtype=np.float)).float(), requires_grad=False)
		if cuda:
			self.indices = self.indices.cuda()

	def forward(self, x):
		for i in range(len(self.layers)):
			x = F.relu(self.drops[i](self.layers[i](x)))
		x = self.last_layer(x)
		x = x.view(-1, pixels_count)
		x = F.softmax(x, dim=1)
		x = x.view(-1, image_width, image_width)

		xprob = torch.sum(x, dim=1) # sum probabilites to a vector
		yprob = torch.sum(x, dim=2)
		xcoord = torch.matmul(xprob, self.indices).view(-1) # get coordinate from weighted sum
		ycoord = torch.matmul(yprob, self.indices).view(-1)
		return torch.stack([xcoord, ycoord], dim=1)

### Validation function definition

In [42]:
saved_accuracy = 0
def test(model, model_name, do_save, loss_function, batch_size, dosave):
	global saved_accuracy
    
	txt_file = open('%s.txt' % model_name, 'w', 1) # 1 = line buffering
	test_loader = DataLoader(SyntheticDataset('validation'), batch_size=batch_size)

	model.eval()
	test_loss = 0
	correct = 0
	for batch_idx, (data, target) in enumerate(test_loader):
		if cuda:
			data, target = data.cuda(), target.cuda()
		data, target = Variable(data, volatile=True), Variable(target.float())
		output = model(data.float())
		cur_test_loss = loss_function(output, target, size_average=False).data[0] # sum up batch loss
		test_loss += cur_test_loss
		correct += output.eq(target).cpu().sum()
        
		# save txt file for plots
		o = output.cpu().data.numpy()
		t = target.cpu().data.numpy()
		for i in range(len(o)):
			print('%d %d %d %d' % (o[i, 0], o[i, 1], t[i, 0], t[i, 1]), file=txt_file)

	test_loss /= len(test_loader.dataset)
	correct = correct.data.numpy()[0]
	accuracy = 100. * correct / len(test_loader.dataset)
	print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(test_loss, correct, len(test_loader.dataset), accuracy))
    
	# Save model if it not worse
	if accuracy >= saved_accuracy and dosave:
		torch.save(model.state_dict(), model_filename)
		saved_accuracy = accuracy
		print ('Model saved')


### Training

In [43]:
def train(model_name, hidden_layers, hidden_channels, batch_size, learning_rate, loss_function):
    global saved_accuracy
    
    model = Net(hidden_layers, hidden_channels)
    if cuda:
        model.cuda()
    optimizer = optim.SGD(model.parameters(), lr=learning_rate, momentum=momentum)
    train_loader = DataLoader(SyntheticDataset('training'), batch_size=batch_size, shuffle=True)

    saved_accuracy = 0
    while True:
        try:
            model.train()
            start_time = time()
            for batch_idx, (data, target) in enumerate(train_loader):
                if cuda:
                    data, target = data.cuda(), target.cuda()
                data, target = Variable(data), Variable(target.float())
                optimizer.zero_grad()
                output = model(data.float())
                loss = loss_function(output, target)
                loss.backward()
                optimizer.step()
                assert(not np.isnan(loss.cpu().data.numpy()))
                print('Train Epoch: [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                    batch_idx * len(data), len(train_loader.dataset),
                    100. * batch_idx / len(train_loader), loss.data[0]))
            print (time() - start_time, 's')

            test(model, model_name, True, loss_function, batch_size, True)
        except KeyboardInterrupt:
            break

NB! These overwrite the saved weights files.

In [None]:
#train('A', 4, 16, 8, 0.0001, F.l1_loss)

In [45]:
#train('B', 10, 64, 32, 0.01, F.l1_loss)

In [46]:
#train('C', 10, 64, 32, 0.01, F.mse_loss)

### Validation of already saved models and plot results

In [47]:
def test_saved(model_name, hidden_layers, hidden_channels, batch_size, loss_function):
	model = Net(hidden_layers, hidden_channels)
	if cuda:
		model.cuda()
	test(model, model_name, False, loss_function, batch_size, False)

	scale = 10 # how many pixels in one bin
	data = np.loadtxt('%s.txt' % model_name)
    
	output_x = data[:, 0]
	output_y = data[:, 1]
	target_x = data[:, 2]
	target_y = data[:, 3]
	x = np.arange(0, image_width + 1)

	plt.hist2d(output_x, target_x, range=[[0, image_width-1], [0, image_width-1]], bins=[image_width//scale, image_width//scale])
	plt.plot(x, x, 'r')
	plt.xlabel('x output')
	plt.ylabel('x target')
	plt.colorbar()
	plt.savefig('%s_x.pdf' % model_name)
	plt.close()

	plt.hist2d(output_y, target_y, range=[[0, image_width-1], [0, image_width-1]], bins=[image_width//scale, image_width//scale])
	plt.plot(x, x, 'r')
	plt.xlabel('y output')
	plt.ylabel('y target')
	plt.colorbar()
	plt.savefig('%s_y.pdf' % model_name)
	plt.close()

	distance = np.sqrt(np.square(output_x - target_x) + np.square(output_y - output_y))
	plt.hist(distance, bins=image_width // scale)
	plt.xlabel('Distance')
	plt.ylabel('Count')
	plt.savefig('%s_distance.pdf' % model_name)
	plt.close()

In [48]:
test_saved('A', 4, 16, 8, F.l1_loss)
# results are in A_x.pdf, A_y.pdf and A_distance.pdf

`imread` is deprecated in SciPy 1.0.0.
Use ``matplotlib.pyplot.imread`` instead.
  import sys



Test set: Average loss: 97.6653, Accuracy: 0/471 (0%)



In [None]:
test_saved('B', 10, 64, 32, F.l1_loss)
# results are in B_x.pdf, B_y.pdf and B_distance.pdf

In [None]:
test_saved('C', 10, 64, 32, F.mse_loss)
# results are in C_x.pdf, C_y.pdf and C_distance.pdf