In [26]:
# import necessary libraries
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms, models
from torch.utils.data import DataLoader
import copy
import time
from torch.cuda.amp import GradScaler, autocast

In [27]:
# Define data transformations
data_transforms = {
    'train': transforms.Compose([
        transforms.Resize((128, 128)), 
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'val': transforms.Compose([
        transforms.Resize((128, 128)),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'test': transforms.Compose([
        transforms.Resize((128, 128)),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
}

In [28]:
# Load datasets
data_dir = 'data'
image_datasets = {x: datasets.ImageFolder(os.path.join(data_dir, x), data_transforms[x])
                  for x in ['train', 'val', 'test']}
dataloaders = {x: DataLoader(image_datasets[x], batch_size=128, shuffle=True, num_workers=8)
               for x in ['train', 'val', 'test']}
dataset_sizes = {x: len(image_datasets[x]) for x in ['train', 'val', 'test']}
class_names = image_datasets['train'].classes

In [29]:
# Use GPU if available
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [30]:
# Initialize the model
model = models.resnet18(pretrained=True)
num_ftrs = model.fc.in_features
model.fc = nn.Linear(num_ftrs, 2)
model = model.to(device)

In [31]:
# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
exp_lr_scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)

# Initialize GradScaler for mixed precision training
scaler = GradScaler()



In [32]:
# Function to train and evaluate the model
def train_model(model, criterion, optimizer, scheduler, num_epochs=20):
    since = time.time()
    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0

    for epoch in range(num_epochs):
        print(f'Epoch {epoch}/{num_epochs - 1}')
        print('-' * 10)

        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()  # Set model to training mode
            else:
                model.eval()   # Set model to evaluate mode

            running_loss = 0.0
            running_corrects = 0

            for inputs, labels in dataloaders[phase]:
                inputs = inputs.to(device)
                labels = labels.to(device)

                optimizer.zero_grad()

                with autocast():  # Use mixed precision
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)

                if phase == 'train':
                    scaler.scale(loss).backward()
                    scaler.step(optimizer)
                    scaler.update()

                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)

            if phase == 'train':
                scheduler.step()

            epoch_loss = running_loss / dataset_sizes[phase]
            epoch_acc = running_corrects.double() / dataset_sizes[phase]

            print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')

            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())

        print()

    time_elapsed = time.time() - since
    print(f'Training complete in {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s')
    print(f'Best val Acc: {best_acc:4f}')

    model.load_state_dict(best_model_wts)
    return model

In [33]:
# Train the model
model = train_model(model, criterion, optimizer, exp_lr_scheduler, num_epochs=20)

# Save the best model
torch.save(model.state_dict(), 'fatigue_detection_model.pth')

Epoch 0/19
----------




train Loss: 0.5898 Acc: 0.6394
val Loss: 0.5400 Acc: 0.6787

Epoch 1/19
----------
train Loss: 0.5534 Acc: 0.6632
val Loss: 0.5531 Acc: 0.6294

Epoch 2/19
----------
train Loss: 0.5428 Acc: 0.6676
val Loss: 0.5196 Acc: 0.6491

Epoch 3/19
----------
train Loss: 0.5302 Acc: 0.6710
val Loss: 0.4329 Acc: 0.8147

Epoch 4/19
----------
train Loss: 0.5206 Acc: 0.6752
val Loss: 0.5577 Acc: 0.5768

Epoch 5/19
----------
train Loss: 0.5185 Acc: 0.6746
val Loss: 0.3800 Acc: 0.8712

Epoch 6/19
----------
train Loss: 0.5075 Acc: 0.6805
val Loss: 0.4063 Acc: 0.7752

Epoch 7/19
----------
train Loss: 0.4930 Acc: 0.6931
val Loss: 0.4324 Acc: 0.7286

Epoch 8/19
----------
train Loss: 0.4890 Acc: 0.6918
val Loss: 0.4090 Acc: 0.7906

Epoch 9/19
----------
train Loss: 0.4888 Acc: 0.6934
val Loss: 0.4273 Acc: 0.7275

Epoch 10/19
----------
train Loss: 0.4867 Acc: 0.6911
val Loss: 0.4132 Acc: 0.7648

Epoch 11/19
----------
train Loss: 0.4868 Acc: 0.6886
val Loss: 0.4088 Acc: 0.7736

Epoch 12/19
----------
t

In [34]:
# Function to test the model
def test_model(model):
    model.eval()
    running_corrects = 0

    with torch.no_grad():
        for inputs, labels in dataloaders['test']:
            inputs = inputs.to(device)
            labels = labels.to(device)

            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            running_corrects += torch.sum(preds == labels.data)

    test_acc = running_corrects.double() / dataset_sizes['test']
    print(f'Test Acc: {test_acc:.4f}')

In [35]:
# Test the model
test_model(model)

Test Acc: 0.8735
