In [19]:
import os
import torch
from torch import nn
import random
from PIL import Image
from torchvision import transforms, models
from torch.utils.data import DataLoader, Dataset, random_split
from PIL import ImageFile

In [20]:
ImageFile.LOAD_TRUNCATED_IMAGES = True

In [21]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

device(type='cuda')

In [22]:
class MultiFolderDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.images = []
        self.labels = []
        self.class_names = []

        # Enumerate subdirectories and assign a unique label to each
        for label, subdir in enumerate(sorted(os.listdir(root_dir))):
            subdir_path = os.path.join(root_dir, subdir)
            if os.path.isdir(subdir_path):
                self.class_names.append(subdir)
                for filename in os.listdir(subdir_path):
                    file_path = os.path.join(subdir_path, filename)
                    self.images.append(file_path)
                    self.labels.append(label)  # Assign label based on folder index

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        img_path = self.images[idx]
        
        try:
            # Open the image and convert to RGB
            image = Image.open(img_path).convert('RGB')
        except OSError as e:
            print(f"Error loading image {img_path}: {e}")
            # Return a blank image or handle it in another way (e.g., skip this image)
            return self.__getitem__((idx + 1) % len(self.images))

        label = self.labels[idx]

        if self.transform:
            image = self.transform(image)

        return image, label


In [23]:
data_transforms = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

In [24]:
root_dir = 'images/'

In [25]:
dataset = MultiFolderDataset(root_dir, transform=data_transforms)

In [26]:
train_size = int(0.7 * len(dataset))
val_size = int(0.15 * len(dataset))
test_size = len(dataset) - train_size - val_size

In [27]:
train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, val_size, test_size], generator=torch.Generator().manual_seed(42))

In [28]:
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

In [29]:
print(f"Train Dataset Size: {len(train_dataset)}")
print(f"Validation Dataset Size: {len(val_dataset)}")
print(f"Test Dataset Size: {len(test_dataset)}")

Train Dataset Size: 11116
Validation Dataset Size: 2382
Test Dataset Size: 2383


In [30]:
print(f"Class Names: {dataset.class_names}")

Class Names: ['body', 'body-art', 'face', 'face-art', 'hand', 'hand-art']


In [31]:
model = models.resnet50(pretrained=True)
num_features = model.fc.in_features

In [32]:
model.fc = nn.Linear(num_features, len(dataset.class_names))

In [33]:
model = model.to(device)

In [34]:
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

In [35]:
import copy


def train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=25):
    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0

    for epoch in range(num_epochs):
        print(f'Epoch {epoch}/{num_epochs - 1}')
        print('-' * 10)

        # Training phase
        model.train()
        running_loss = 0.0
        running_corrects = 0

        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            _, preds = torch.max(outputs, 1)
            loss.backward()
            optimizer.step()

            running_loss += loss.item() * inputs.size(0)
            running_corrects += torch.sum(preds == labels.data)

        epoch_loss = running_loss / len(train_loader.dataset)
        epoch_acc = running_corrects.double() / len(train_loader.dataset)

        print(f'Train Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')

        # Validation phase
        model.eval()
        val_loss = 0.0
        val_corrects = 0

        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                _, preds = torch.max(outputs, 1)

                val_loss += loss.item() * inputs.size(0)
                val_corrects += torch.sum(preds == labels.data)

        val_loss /= len(val_loader.dataset)
        val_acc = val_corrects.double() / len(val_loader.dataset)

        print(f'Val Loss: {val_loss:.4f} Acc: {val_acc:.4f}')

        # Deep copy the model
        if val_acc > best_acc:
            best_acc = val_acc
            best_model_wts = copy.deepcopy(model.state_dict())

    print(f'Best val Acc: {best_acc:.4f}')
    model.load_state_dict(best_model_wts)
    return model

In [37]:
trained_model = train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=10)

Epoch 0/9
----------
Train Loss: 0.3420 Acc: 0.8811
Val Loss: 0.3500 Acc: 0.8766
Epoch 1/9
----------
Train Loss: 0.2549 Acc: 0.9103
Val Loss: 0.4994 Acc: 0.8300
Epoch 2/9
----------
Train Loss: 0.2034 Acc: 0.9274
Val Loss: 0.2986 Acc: 0.8841
Epoch 3/9
----------


KeyboardInterrupt: 

In [None]:
from sklearn.metrics import classification_report, confusion_matrix


def evaluate_model(model, test_loader):
    model.eval()
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    print(classification_report(all_labels, all_preds, target_names=dataset.class_names))
    print(confusion_matrix(all_labels, all_preds))


In [None]:
evaluate_model(trained_model, test_loader)