In [1]:

import os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, datasets

In [2]:
# import torch
# import torch.nn as nn

# class OpenEyesClassificator(nn.Module):
#     def __init__(self, dropout_rate=0.6):
#         super(OpenEyesClassificator, self).__init__()
#         self.conv1 = nn.Conv2d(1, 32, kernel_size=3, padding=1)
#         self.bn1 = nn.BatchNorm2d(32)
#         self.relu1 = nn.ReLU()
#         self.dropout1 = nn.Dropout(dropout_rate)
        
#         self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
#         self.bn2 = nn.BatchNorm2d(64)
#         self.relu2 = nn.ReLU()
#         self.dropout2 = nn.Dropout(dropout_rate)
        
#         self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        
#         self.fc1 = nn.Linear(64 * 28 * 28, 128)
#         self.bn3 = nn.BatchNorm1d(128)
#         self.relu3 = nn.ReLU()
#         self.dropout3 = nn.Dropout(dropout_rate)
        
#         self.fc2 = nn.Linear(128, 1)
#         self.sigmoid = nn.Sigmoid()

#     def forward(self, x):
#         x = self.pool(self.relu1(self.bn1(self.conv1(x))))
#         x = self.dropout1(x)
        
#         x = self.pool(self.relu2(self.bn2(self.conv2(x))))
#         x = self.dropout2(x)
        
#         print(x.shape)
#         # x = x.view(-1, 64 * 28 * 28)
#         x = x.view(x.size(0), -1)
#         print(x.shape)
#         x = self.relu3(self.bn3(self.fc1(x)))
#         x = self.dropout3(x)
        
#         x = self.sigmoid(self.fc2(x))
#         return x

In [3]:
import torch
import torch.nn as nn
import torch.optim as optim

class OpenEyesClassifier(nn.Module):
    def __init__(self):
        super(OpenEyesClassifier, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, kernel_size=3, padding='same')
        self.leaky_relu1 = nn.LeakyReLU(0.1)
        self.max_pool1 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.dropout1 = nn.Dropout(0.25)
        
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding='same')
        self.leaky_relu2 = nn.LeakyReLU(0.1)
        self.max_pool2 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.dropout2 = nn.Dropout(0.25)
        
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, padding='same')
        self.leaky_relu3 = nn.LeakyReLU(0.1)
        self.max_pool3 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.dropout3 = nn.Dropout(0.4)
        
        self.flatten = nn.Flatten()
        self.fc1 = nn.Linear(1152, 128) 
        self.leaky_relu4 = nn.LeakyReLU(0.1)
        self.dropout4 = nn.Dropout(0.3)
        self.fc2 = nn.Linear(128, 1)
        self.sigmoid = nn.Sigmoid()
        self.softmax = nn.Softmax(dim=-1)
        # self.logsoftmax = nn.LogSoftmax(dim=-1)

    def forward(self, x):
        x = self.leaky_relu1(self.conv1(x))
        x = self.max_pool1(x)
        x = self.dropout1(x)

        x = self.leaky_relu2(self.conv2(x))
        x = self.max_pool2(x)
        x = self.dropout2(x)

        x = self.leaky_relu3(self.conv3(x))
        x = self.max_pool3(x)
        x = self.dropout3(x)

        x = self.flatten(x)
        x = self.leaky_relu4(self.fc1(x))
        x = self.dropout4(x)

        # x = self.sigmoid(self.fc2(x))
        x = self.fc2(x)
        # return self.logsoftmax(x)
        return self.sigmoid(x)
        # return self.softmax(x) 


In [4]:
from sklearn.metrics import roc_curve, auc
import numpy as np

def compute_eer(labels, scores):
    fpr, tpr, thresholds = roc_curve(labels, scores, pos_label=1)
    fnr = 1 - tpr
    eer_index = np.argmin(np.abs(fnr - fpr))
    eer = (fpr[eer_index] + fnr[eer_index]) / 2
    thresh = thresholds[eer_index]
    return eer, thresh

In [94]:
def compute_eer_(labels, scores):
    fpr, tpr, thresholds = roc_curve(labels, scores, pos_label=1)
    frr = 1 - tpr
    abs_diffs = np.abs(fpr - frr)
    min_index = np.argmin(abs_diffs)
    eer = (fpr[min_index]+ frr[min_index])/2
    
    return eer

In [161]:
def compute_accuracy(preds, labels):
    correct = (preds == labels).sum().item()
    total = preds.shape[0]

    return correct/total
    

In [155]:
from sklearn.metrics import roc_curve, auc

def evaluate_model(model, criterion, val_loader, device=None):
    if device is None:
        device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    model.eval()
    val_loss = 0.0

    val_labels_list = []
    val_preds_list = []
    with torch.no_grad():
        for it in val_loader:
            inputs, labels = it['image'].to(device), it['label'].to(device)
            outputs = model(inputs)

            loss = criterion(outputs.permute(1,0), labels.unsqueeze(0))

            val_loss += loss.item()

            val_labels_list += labels.unsqueeze(0).cpu().detach().numpy().tolist()
            val_preds_list += outputs.permute(1,0).cpu().detach().numpy().tolist()

        val_eer = compute_eer_(val_labels_list[0], val_preds_list[0])
        preds = (torch.tensor(val_labels_list[0]) >= 0.5).float()

        val_acc = compute_accuracy(preds, torch.tensor(val_labels_list[0]))

    return val_loss, val_acc, val_eer



def train_model(model, train_loader, val_loader, criterion, optimizer, ckpts_path, num_epochs=1, scheduler=None, device=None):
    if device is None:
        device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    model.to(device)

    min_eer = np.inf
    for epoch in range(num_epochs):
        # --- Training phase ---
        model.train()
        train_loss = 0.0
        correct = 0.0
        total = 0.0

        train_labels_list = []
        train_preds_list = []
        
        for it in train_loader:
            inputs, labels = it['image'].to(device), it['label'].to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            # print(outputs.permute(1,0).shape, labels.unsqueeze(0).shape)
            loss = criterion(outputs.permute(1,0), labels.unsqueeze(0))
            # eer_train = compute_eer_(labels.unsqueeze(0).cpu().detach().numpy(), outputs.permute(1,0).cpu().detach().numpy())

            loss.backward()
            optimizer.step()
            train_loss += loss.item()


            train_labels_list += labels.unsqueeze(0).cpu().detach().numpy().tolist()
            train_preds_list += outputs.permute(1,0).cpu().detach().numpy().tolist()


        train_eer = compute_eer_(train_labels_list[0], train_preds_list[0])
        preds = (torch.tensor(train_preds_list[0]) >= 0.5).float()

        train_acc = compute_accuracy(preds, torch.tensor(train_labels_list[0]))


        print(train_acc, train_eer)

        val_loss, val_acc, val_eer = evaluate_model(model, criterion, val_loader, device=device)

        wandb.log({"epoch": epoch, "loss": val_loss, "train_accuracy": train_acc, "train_eer": train_eer, \
                   "val_loss": val_loss, "val_accuracy": val_acc, "val_eer": val_eer})
        
        if scheduler:
            scheduler.step()

        if val_eer < min_eer or val_eer < 0.01:
            min_eer = val_eer
            torch.save(model.state_dict(), ckpts_path)
            print(f"Saved model with validation accuracy = {val_acc:.4f} and eer = {val_eer:.4f}")

In [156]:
open_dir = '/home/sadevans/space/CloseEyesClassifier/data/clustered_auto_tcne_CHECKED/open'
close_dir = '/home/sadevans/space/CloseEyesClassifier/data/clustered_auto_tcne_CHECKED/close'

In [157]:
import random
from sklearn.model_selection import train_test_split


def load_image_paths_and_labels(open_dir, close_dir):
    open_images = [os.path.join(open_dir, img) for img in os.listdir(open_dir) if img.endswith(('png', 'jpg', 'jpeg'))]
    close_images = [os.path.join(close_dir, img) for img in os.listdir(close_dir) if img.endswith(('png', 'jpg', 'jpeg'))]
    images = open_images + close_images
    labels = [1] * len(open_images) + [0] * len(close_images)
    return images, labels

# Перемешать пути изображений и метки классов
def shuffle_data(images, labels):
    combined = list(zip(images, labels))
    random.shuffle(combined)
    images[:], labels[:] = zip(*combined)
    return images, labels

# Разделить данные на обучающую, валидационную и тестовую выборки
def split_data(images, labels):
    train_images, temp_images, train_labels, temp_labels = train_test_split(images, labels, test_size=0.4, stratify=labels, random_state=42)
    val_images, test_images, val_labels, test_labels = train_test_split(temp_images, temp_labels, test_size=0.5, stratify=temp_labels, random_state=42)
    return train_images, val_images, test_images, train_labels, val_labels, test_labels


In [158]:
import numpy as np
import torch


seed = 42
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
# Загрузить данные
images, labels = load_image_paths_and_labels(open_dir, close_dir)

# Перемешать данные
images, labels = shuffle_data(images, labels)

# Разделить данные
train_images, val_images, test_images, train_labels, val_labels, test_labels = split_data(images, labels)

print(f"Train: {len(train_images)} images")
print(f"Validation: {len(val_images)} images")
print(f"Test: {len(test_images)} images")

Train: 2400 images
Validation: 800 images
Test: 800 images


In [159]:
from dataset import ImageTransform, EyeDataset
import cv2
from torch.utils.data import Dataset, DataLoader


# seed = 42

train_dataset = EyeDataset(train_images, train_labels, transform=ImageTransform('train'))
val_dataset = EyeDataset(val_images, val_labels, transform=ImageTransform('val'))
test_dataset = EyeDataset(test_images, test_labels, transform=ImageTransform('test'))


train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

In [160]:
model = OpenEyesClassifier()
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)


train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=5, device=None)

[1.0, 0.0, 1.0, 0.0, 1.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.0, 1.0, 0.0, 1.0, 0.0, 1.0, 1.0, 0.0, 1.0, 0.0, 1.0, 0.0, 1.0, 0.0, 1.0, 1.0, 1.0, 0.0, 1.0, 1.0, 1.0, 1.0]
[0.4893954396247864, 0.4759499430656433, 0.4773009419441223, 0.4791807532310486, 0.48713383078575134, 0.4869005084037781, 0.4673452377319336, 0.48795607686042786, 0.4844638705253601, 0.4944532513618469, 0.4771513044834137, 0.5044295191764832, 0.504009485244751, 0.4867953360080719, 0.4812712073326111, 0.4881991446018219, 0.4882332980632782, 0.4848036468029022, 0.4834272861480713, 0.47826460003852844, 0.4839402735233307, 0.4751614034175873, 0.48781877756118774, 0.4845618009567261, 0.48473361134529114, 0.4863566756248474, 0.44750821590423584, 0.4907439351081848, 0.47800448536872864, 0.4870065450668335, 0.47488510608673096, 0.49149301648139954]
tensor([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 1., 1., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.])
0.40625 0.3765182186234818
[0.0, 0.0, 0.0, 1.