In [3]:
import pandas as pd
import torch
from torch import nn
from torchvision import models, transforms
import numpy as np
import time
import copy
from torchvision import datasets, models, transforms

In [4]:
import cv2
#from google.colab.patches import cv2_imshow

def get_frames_from_video(video_path):
    frames = []
    cap = cv2.VideoCapture(video_path)
    if cap.isOpened():
        width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        res=(int(width), int(height))
        while True:
            try:
                is_success, frame = cap.read()
            except cv2.error:
                print("err")
                continue
            if not is_success:
                print("can't receive frame")
                break 
            image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            frames.append(image)
    cap.release()
    return frames

def get_labels_from_csv(csv_path):
    df = pd.read_csv(csv_path)
    df.drop(['p', 'w', 'h'], axis=1, inplace=True)
    opened = [1 if x>0 else 0 for x in df['x']]
    df.rename(columns={"x":"pupil_center_x_coord", "y":"pupil_center_y_coord", "a":"corneal_reflection_x_coord", "b":"corneal_reflection_y_coord"}, inplace=True)
    df['opened'] = opened
    return df

In [5]:
class PupilCoreDatasetIfOpened(torch.utils.data.Dataset):
    def __init__(self, eye0_video_path, eye0_labels_path, eye1_video_path, eye1_labels_path) -> None:
        super().__init__()
        self.eye0_video_path = eye0_video_path
        self.eye0_labels_path = eye0_labels_path
        self.eye1_video_path = eye1_video_path
        self.eye1_labels_path = eye1_labels_path
        self.eye0_labels_df = get_labels_from_csv(self.eye0_labels_path)
        self.eye0_frames = get_frames_from_video(self.eye0_video_path)
        self.eye1_labels_df = get_labels_from_csv(self.eye1_labels_path)
        self.eye1_frames = get_frames_from_video(self.eye1_video_path)

    def __getitem__(self, idx):
        opened = self.eye0_labels_df.at[idx, 'opened']
        image = self.eye0_frames[idx]
        T = transforms.Compose([transforms.ToTensor()])
        image = T(image)
        return image, opened

    def __len__(self):
        return len(self.eye0_frames)
        

In [None]:
class PupilCoreDatasetCoords(torch.utils.data.Dataset):
    def __init__(self, eye0_video_path, eye0_labels_path, eye1_video_path, eye1_labels_path) -> None:
        super().__init__()
        self.eye0_video_path = eye0_video_path
        self.eye0_labels_path = eye0_labels_path
        self.eye1_video_path = eye1_video_path
        self.eye1_labels_path = eye1_labels_path
        self.eye0_labels_df = get_labels_from_csv(self.eye0_labels_path)
        self.eye0_frames = get_frames_from_video(self.eye0_video_path)
        self.eye1_labels_df = get_labels_from_csv(self.eye1_labels_path)
        self.eye1_frames = get_frames_from_video(self.eye1_video_path)

    def __getitem__(self, idx):
        pupil_coords = (self.eye0_labels_df.at[idx, 'pupil_center_x_coord'], self.eye0_labels_df.at[idx, 'pupil_center_y_coord'])
        image = self.eye0_frames[idx]
        T = transforms.Compose([transforms.ToTensor()])
        image = T(image)
        return image, pupil_coords

    def __len__(self):
        return len(self.eye0_frames)
        
        

In [6]:
dataset = PupilCoreDatasetIfOpened(
    "PupilCoreDataset/video5_eye0_video.avi",
    'PupilCoreDataset/video5_eye0_pupildata.csv',
    "PupilCoreDataset/video5_eye1_video.avi",
    'PupilCoreDataset/video5_eye1_pupildata.csv'
)


can't receive frame
can't receive frame


In [7]:
#dataset.eye0_frames
indices = torch.randperm(len(dataset)).tolist()
train_part = int(0.8 * len(dataset))
train_dataset = torch.utils.data.Subset(dataset, indices[:train_part])
test_dataset = torch.utils.data.Subset(dataset, indices[train_part:])
dataset_sizes = {
    'train': len(train_dataset),
    'test': len(test_dataset)
}
#dataset.eye0_labels_df['opened']

In [8]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device
# TODO pobrac i wkomponowac CUDA

device(type='cpu')

In [9]:
train_dataloader = torch.utils.data.DataLoader(train_dataset, batch_size=8, shuffle=True)
test_dataloader = torch.utils.data.DataLoader(test_dataset, batch_size=8, shuffle=True)
dataloaders = {
    "train": train_dataloader,
    "test": test_dataloader
}

In [50]:
def train_first_model(model, criterion, optimizer, scheduler, num_epochs=25):
    since = time.time()

    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0

    for epoch in range(num_epochs):
        print(f'Epoch {epoch}/{num_epochs - 1}')
        print('-' * 10)

        # Each epoch has a training and validation phase
        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()  # Set model to training mode
            else:
                model.eval()   # Set model to evaluate mode

            running_loss = 0.0
            running_corrects = 0

            # Iterate over data.
            for inputs, labels in dataloaders[phase]:
                print(type(inputs))
                inputs = inputs.to(device)
                pupil_coords, opened = labels
                # pupil_coords = torch.Tensor(pupil_coords)
                print(type(pupil_coords), opened)
                #pupil_coords.dtype = torch.long64()
                

                opened = opened.to(device)
                opened = opened.to(torch.long)
                pupil_coords = pupil_coords.long()

                # zero the parameter gradients
                optimizer.zero_grad()

                # forward
                # track history if only in train
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, opened)

                    # backward + optimize only if in training phase
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                # statistics
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == opened.data)
            if phase == 'train':
                scheduler.step()

            epoch_loss = running_loss / dataset_sizes[phase]
            epoch_acc = running_corrects.double() / dataset_sizes[phase]

            print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')

            # deep copy the model
            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())

        print()

    time_elapsed = time.time() - since
    print(f'Training complete in {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s')
    print(f'Best val Acc: {best_acc:4f}')

    # load best model weights
    model.load_state_dict(best_model_wts)
    return model

In [51]:
# 1st model for checking if eye is opened
num_classes = 2
model = models.squeezenet1_1(pretrained=True)
final_conv = nn.Conv2d(512, num_classes, kernel_size=1)
classifier = nn.Sequential(
    nn.Dropout(p=0.5), final_conv, nn.ReLU(inplace=True), nn.AdaptiveAvgPool2d((1, 1))
)
model.classifier = classifier
model.to(device)




SqueezeNet(
  (features): Sequential(
    (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(2, 2))
    (1): ReLU(inplace=True)
    (2): MaxPool2d(kernel_size=3, stride=2, padding=0, dilation=1, ceil_mode=True)
    (3): Fire(
      (squeeze): Conv2d(64, 16, kernel_size=(1, 1), stride=(1, 1))
      (squeeze_activation): ReLU(inplace=True)
      (expand1x1): Conv2d(16, 64, kernel_size=(1, 1), stride=(1, 1))
      (expand1x1_activation): ReLU(inplace=True)
      (expand3x3): Conv2d(16, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (expand3x3_activation): ReLU(inplace=True)
    )
    (4): Fire(
      (squeeze): Conv2d(128, 16, kernel_size=(1, 1), stride=(1, 1))
      (squeeze_activation): ReLU(inplace=True)
      (expand1x1): Conv2d(16, 64, kernel_size=(1, 1), stride=(1, 1))
      (expand1x1_activation): ReLU(inplace=True)
      (expand3x3): Conv2d(16, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (expand3x3_activation): ReLU(inplace=True)
    )
    (5): MaxPool2d

In [52]:
# TODO model wykrywajacy zamkniete lub otwarte oko
from torch.optim import lr_scheduler
import torch.optim as optim

criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
exp_lr_scheduler = lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)

model = train_first_model(model, criterion, optimizer, exp_lr_scheduler)


Epoch 0/24
----------
<class 'torch.Tensor'>
<class 'list'> tensor([1, 1, 1, 1, 1, 1, 1, 1])


AttributeError: 'list' object has no attribute 'long'

In [None]:
# TODO model do detekcji źrenicy