In [61]:
from matplotlib import pyplot as plt
import numpy as np
from sklearn.model_selection import train_test_split

from pigeon import annotate
from tqdm import tqdm, tqdm_notebook
from pathlib import Path
import pickle

import torch
import torchvision
from torchvision import transforms, models
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn

In [10]:
train_on_gpu = torch.cuda.is_available()

if not train_on_gpu:
    print('CUDA is not available.  Training on CPU ...')
else:
    print('CUDA is available!  Training on GPU ...')

CUDA is available!  Training on GPU ...


# Labeling

In [56]:
DATA_MODES = ['train', 'test']
DEVICE = torch.device("cuda")

In [57]:
DATA_DIR = Path('EyesDataset/')
files = sorted(list(DATA_DIR.rglob('*.jpg')))

In [None]:
from IPython.display import display, Image

In [3]:
annotations = annotate(
  files,
  options=['opened', 'closed'],
  display_fn=lambda filename: display(Image(filename))
)

HTML(value='0 examples annotated, 4001 examples left')

HBox(children=(Button(description='opened', style=ButtonStyle()), Button(description='closed', style=ButtonSty…

Output()

Annotation done.


In [6]:
with open('annotations.pkl', 'wb') as f:
    pickle.dump(annotations, f)

In [3]:
with open('annotations.pkl', 'rb') as f:
    annotations = pickle.load(f)

In [6]:
len(annotations)

3955

In [24]:
train_ann, test_ann = train_test_split(annotations, test_size=0.1, shuffle=True)

# Dataset class

In [59]:
from PIL import Image 

In [25]:
class EyesDataset(Dataset):
    def __init__(self, annotations, mode):
        super().__init__()
        # список файлов для загрузки
        self.annotations = annotations
        # режим работы
        self.mode = mode

        if self.mode not in DATA_MODES:
            print(f"{self.mode} is not correct; correct modes: {DATA_MODES}")
            raise NameError

        self.len_ = len(self.annotations)
     
                      
    def __len__(self):
        return self.len_
      
    def load_sample(self, file):
        image = Image.open(file)
        image.load()
        return image
  
    def __getitem__(self, index):
        
        transform = transforms.Compose([
            transforms.ToTensor(),
        ])
        
        augmentation = transforms.Compose([
            transforms.RandomHorizontalFlip(),
            transforms.RandomResizedCrop(24, scale=(0.8, 1), ratio=(0.75, 1.3333333333333333), interpolation=2),
#             transforms.RandomPerspective(distortion_scale=0.3, p=0.9, interpolation=3, fill=0),
            transforms.RandomAffine(degrees=20, shear=20, resample=False),
            transforms.Resize((24, 24))
        ])
        
        x = self.load_sample(self.annotations[index][0])
        label = self.annotations[index][1]
        if self.mode == 'train':
            x = augmentation(x)
        x = self._prepare_sample(x)
        x = np.array(x / 255, dtype='float32')
        x = transform(x)
        return x, label
        
    def _prepare_sample(self, image):
        image = image.resize((24, 24))
        return np.array(image)

In [51]:
train_dataset = EyesDataset(train_ann, 'train')
test_dataset = EyesDataset(test_ann, 'test')

In [60]:
train_loader = DataLoader(train_dataset, shuffle=True, batch_size=32)
test_loader = DataLoader(test_dataset, shuffle=False, batch_size=32)

In [62]:
mobilenet = models.mobilenet_v2()

In [80]:
mobilenet.features[0][0] = nn.Conv2d(1, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)

In [87]:
mobilenet.classifier

Sequential(
  (0): Dropout(p=0.2, inplace=False)
  (1): Linear(in_features=1280, out_features=1000, bias=True)
)

In [88]:
mobilenet.classifier = nn.Sequential(
    nn.Dropout(p=0.2, inplace=False), 
    nn.Linear(in_features=1280, out_features=100, bias=True), 
    nn.ReLU(), 
    nn.Linear(in_features=100, out_features=1, bias=True)
)

In [64]:
def train_epoch(net, train_loader, loss_fn, optimizer):
    net.train()
    train_ep_loss = 0.
    counter = 0
    for train_x in train_loader:
        train_x = train_x.to(DEVICE)
        # zero  gradients
        optimizer.zero_grad()

        # get the output from the model
        mu, logsigma, reconstruction = net(train_x)
        
        # calculate loss
        loss = loss_fn(train_x, mu, logsigma, reconstruction)
        loss.backward()
        optimizer.step()

        train_ep_loss += loss.item() 
        counter += 1
    train_ep_loss /= counter
   
    return train_ep_loss

In [65]:
def test_epoch(net, test_loader, loss_fn):
    net.eval()
    test_ep_loss = 0.
    counter = 0.
    for test_x in test_loader:
        # get the output from the model
        test_x = test_x.to(DEVICE)
#         test_x = test_x.view(-1, 24 * 24)
        
        mu, logsigma, reconstruction = net(test_x)
        
        # calculate loss
        
        loss = loss_fn(test_x, mu, logsigma, reconstruction)
        test_ep_loss += loss.item() 
        
        counter += 1

    test_ep_loss /= counter
    ind = np.random.randint(len(test_x))
    plot_eyes(test_x[ind], reconstruction[ind])

    return test_ep_loss

In [66]:
def train(net, train_loader, test_loader, loss_fn, optimizer, epochs, scheduler=None):
    
    train_losses = []
    test_losses = []

    for e in tqdm(range(epochs)):

        train_loss = train_epoch(net, train_loader, loss_fn, optimizer)
        if scheduler:
            scheduler.step()
        with torch.no_grad():
            test_loss = test_epoch(net, test_loader, loss_fn)

        train_losses.append(train_loss)
        test_losses.append(test_loss)

        if e % 5 == 0:
            print("Epoch: {}/{}...".format(e+1, epochs),
                          "Loss: {:.6f}...".format(train_loss),
                          "Test Loss: {:.6f}".format(test_loss))
    return train_losses, test_losses