In [None]:
!pip install -U torch torchvision --index-url https://download.pytorch.org/whl/cu121

Load Data
- Traverse train/test folders
- Create Xtrain/ytrain & Xtest/ytest vectors
- Load into matrix

In [None]:
# Encoding: 1 represents defective, 0 represents non-defective
"""
import os
from PIL import Image
import numpy as np

X_train = []
X_test = []
base_path = os.path.join("pepsico-lab-potato-quality-control", "Pepsico RnD Potato Lab Dataset")

for folder in ['Train', 'Test']:
    for category in ['Non-Defective', 'Defective']:
        folder_path = os.path.join(base_path, folder, category)
        for image_path in os.listdir(folder_path):
            if image_path.endswith('.jpg') is False:
                continue
            img = Image.open(os.path.join(folder_path, image_path))
            if folder == 'Train':
                X_train.append(np.asarray(img))
            else:
                X_test.append(np.asarray(img))

X_train = np.array(X_train)
X_test = np.array(X_test)

print(f'X_test Shape: {X_test.shape}')
print(f'X_train Shape: {X_train.shape}')

with open('X_train.npy', 'wb') as file:
    np.save(file, X_train)

with open('X_test.npy', 'wb') as file:
    np.save(file, X_test)
"""

In [None]:
import os
import random
import torch
from torch.utils.data import DataLoader, Subset
from lenet import LeNet
from torch.nn import BCEWithLogitsLoss
from torch.optim import Adam
from torch.nn.functional import sigmoid
from PIL import Image
import numpy as np


class Dataset(torch.utils.data.Dataset):
    def __init__(self, file_ids, labels, base_path):
        """
        Constructor for the Dataset.

        Parameters
        ----------
        file_ids : list[str]
            List of file names (not paths)
        labels : list[int]
            List of class identifiers, corresponding to the list of file IDs
        base_path : str
            Path to the folder containing the files in `file_ids`

        Returns
        -------
        Dataset
        """
        self.file_ids = file_ids
        self.labels = labels
        self.base_path = base_path

    def __len__(self):
        """
        Return the size of the dataset.

        Parameters
        ----------

        Returns
        -------
        int
        """
        return len(self.file_ids)

    def __getitem__(self, index):
        """
        Get a feature and label tuple based on the index.

        Parameters
        ----------
        index : int
            This function does not check the bounds of the Dataset

        Returns
        -------
        (torch.tensor, int) : Image vector and class
        """
        filename = self.file_ids[index]
        label = self.labels[index]
        img = Image.open(os.path.join(self.base_path, "Defective" if label == 1 else "Non-Defective", filename))
        img = img.resize((28, 28), Image.BILINEAR)
        X = torch.tensor(np.asarray(img).reshape((3, 28, 28)) / 255).float()
        return X, self.labels[index]


def generate_dataset(is_train):
    """
    Generate training and testing datasets for the PepsiCo folder structure.

    Parameters
    ----------
    is_train: bool

    Returns
    -------
    Dataset
    """
    base_path = os.path.join("pepsico-lab-potato-quality-control", "Pepsico RnD Potato Lab Dataset")
    train_path = os.path.join(base_path, "Train" if is_train else "Test")
    file_ids = []
    labels = []
    for category in ["Defective", "Non-Defective"]:
        for filename in os.listdir(os.path.join(train_path, category)):
            if filename.endswith(".jpg") is False:
                continue
            file_ids.append(filename)
            labels.append(1 if category == 'Defective' else 0)
    return Dataset(file_ids, labels, train_path)

In [None]:
raw_train = generate_dataset(is_train=True)
raw_test = generate_dataset(is_train=False)

raw_train_len = len(raw_train)
indices = random.sample(range(raw_train_len), raw_train_len)
train_indices = indices[:int(raw_train_len * 0.8)]
val_indices = list(set(indices).difference(set(train_indices)))
train = Subset(raw_train, train_indices)
validation = Subset(raw_train, val_indices)
train_loader = DataLoader(train, batch_size=32, shuffle=True, pin_memory=True)
validation_loader = DataLoader(validation, batch_size=32, shuffle=False, pin_memory=True)

In [None]:
model = LeNet(3, 1)
gpu = torch.device('cuda')
model = model.to(device=gpu)
loss = BCEWithLogitsLoss()
optimizer = Adam(model.parameters())

epochs = 10
for epoch in range(epochs):
    print(f'Epoch: {epoch}')
    # each batch contains 32 images
    current_batch = 0
    for batch in train_loader:
        if current_batch % 4 == 0:
            print(f'Batch: {current_batch}')
        current_batch += 1
        # batch[0] contains features
        X_batch, y_batch = batch[0].to(gpu), batch[1].to(gpu).unsqueeze(1).float()
        output = model.forward(X_batch)
        loss_value = loss(output, y_batch)
        optimizer.zero_grad()
        loss_value.backward()
        optimizer.step()
    print(f'Train Loss: {loss_value}')
    with torch.set_grad_enabled(False):
        correct = 0
        for val_batch in validation_loader:
            X_validation, y_validation = val_batch[0].to(gpu), val_batch[1].to(gpu).unsqueeze(1)
            val_preds = model.forward(X_validation)
            # print(sigmoid(val_preds))
            # print(y_validation)
            correct += int(sum((sigmoid(val_preds) >= 0.5) == y_validation))
        # Probability > 0.5 => Defective, otherwise non-defective
        # validation[1] (1 => Defective, 0 => Non-Defective)
        acc = correct * 100 / len(val_indices)
        print(f'Validation Accuracy: {acc:.2f}%')
    print()