In [None]:
import os
import copy
import random
import glob

import cv2
import torch
import numpy as np
from torch import nn
from torchvision import transforms, models
from torch.utils.data import Dataset, DataLoader
import matplotlib.pyplot as plt
from ipywidgets import interact

random_seed = 2022

random.seed(random_seed)
np.random.seed(random_seed)
torch.manual_seed(random_seed)
torch.cuda.manual_seed(random_seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

In [None]:
import glob
import os
from torch.utils.data import Dataset
import cv2
import torch

data_dir = "/content/drive/MyDrive/lettuce/train"

def image_rad():

  all_image_path = []

  all_image_path.extend(glob.glob(data_dir + '/**/*.jpg'))
  all_image_path.extend(glob.glob(data_dir + '/**/*.png'))
  all_image_path.extend(glob.glob(data_dir + '/**/*.jpeg'))

  print(f"Total images: {len(all_image_path)}")
  print(all_image_path[:5])

  return all_image_path

def image_path_lbale():
  image_path_lable = []

  all_image_path = image_rad()

  for path in all_image_path:
    label = os.path.basename(os.path.dirname(path))
    image_path_lable.append((path, label))

  return image_path_lable

train_data_dir = "/content/drive/MyDrive/lettuce/train/"

class Chest_dataset(Dataset):
  def __init__(self, data_dir, transform=None):
    self.files_path = image_path_lbale()
    self.transform = transform

  def __len__(self):
    return len(self.files_path)

  def __getitem__(self, index):
    image_file = self.files_path[index][0]
    image = cv2.imread(image_file)
    image = cv2.cvtColor(image, cv2.COLOR_VGR2GB)

    target = self.files_path[index][1]

    if self.transform:
      image = self.transform(image)
      target = torch.Tensor([target]).long()

    return {"image": image, "target": target}

dset = Chest_dataset(train_data_dir)
print(len(dset))

In [None]:
transformer = transforms.Compose([
    transforms.ToTensor(),
    transforms.Resize((224, 224)),
    transforms.Normalize(mean=[0.5, 0.5, 0.5],
                         std=[0.5, 0.5, 0.5])
])

train_dset = Chest_dataset(train_data_dir, transformer)
index = 200
image = train_dset[index]["image"]
label = train_dset[index]["target"]

print(image.shape, label)

In [None]:
def build_dataloader(train_data_dir, val_data_dir):
    dataloaders = {}
    train_dset = Chest_dataset(train_data_dir, transformer)
    dataloaders["train"] = DataLoader(train_dset, batch_size=4, shuffle=True, drop_last=True)
    val_dset = Chest_dataset(val_data_dir, transformer)
    dataloaders["val"] = DataLoader(val_dset, batch_size=1, shuffle=False, drop_last=False)
    return dataloaders

In [None]:
train_data_dir = "/content/drive/MyDrive/lettuce/train/"
val_data_dir = "/content/drive/MyDrive/lettuce/test/"
dataloaders = build_dataloader(train_data_dir, val_data_dir)

In [None]:
class InvertedBottleneck(nn.Module):
    def __init__(self, in_channels, out_channels, t, stride = 1):
        super().__init__()

        self.in_channels = in_channels
        self.out_channels = out_channels
        self.stride = stride

        expand = nn.Sequential(
            nn.Conv2d(in_channels, in_channels * t, 1, bias = False),
            nn.BatchNorm2d(in_channels * t),
            nn.ReLU6(inplace = True),
        )
        depthwise = nn.Sequential(
            nn.Conv2d(in_channels * t, in_channels * t, 3, stride = stride, padding = 1, groups = in_channels * t, bias = False),
            nn.BatchNorm2d(in_channels * t),
            nn.ReLU6(inplace = True),
        )
        pointwise = nn.Sequential(
            nn.Conv2d(in_channels * t, out_channels, 1, bias = False),
            nn.BatchNorm2d(out_channels),
        )

        residual_list = []
        if t > 1:
            residual_list += [expand]
        residual_list += [depthwise, pointwise]
        self.residual = nn.Sequential(*residual_list)

    def forward(self, x):
        if self.stride == 1 and self.in_channels == self.out_channels:
            out = self.residual(x) + x
        else:
            out = self.residual(x)

        return out

class MobileNetV2(nn.Module):
    def __init__(self, n_classes = 1000):
        super().__init__()

        self.first_conv = nn.Sequential(
            nn.Conv2d(3, 32, 3, stride = 2, padding = 1, bias = False),
            nn.BatchNorm2d(32),
            nn.ReLU6(inplace = True)
        )

        self.bottlenecks = nn.Sequential(
            self.make_stage(32, 16, t = 1, n = 1),
            self.make_stage(16, 24, t = 6, n = 2, stride = 2),
            self.make_stage(24, 32, t = 6, n = 3, stride = 2),
            self.make_stage(32, 64, t = 6, n = 4, stride = 2),
            self.make_stage(64, 96, t = 6, n = 3),
            self.make_stage(96, 160, t = 6, n = 3, stride = 2),
            self.make_stage(160, 320, t = 6, n = 1)
        )

        self.last_conv = nn.Sequential(
            nn.Conv2d(320, 1280, 1, bias = False),
            nn.BatchNorm2d(1280),
            nn.ReLU6(inplace = True)
        )

        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Sequential(
        	nn.Dropout(0.2),
            nn.Linear(1280, n_classes)
        )

    def forward(self, x):
        x = self.first_conv(x)
        x = self.bottlenecks(x)
        x = self.last_conv(x)
        x = self.avgpool(x)
        x = torch.flatten(x, 1) # (N, C, 1, 1) -> (N, C)
        x = self.fc(x)
        return x

    def make_stage(self, in_channels, out_channels, t, n, stride = 1):
        layers = [InvertedBottleneck(in_channels, out_channels, t, stride)]
        in_channels = out_channels
        for _ in range(n-1):
            layers.append(InvertedBottleneck(in_channels, out_channels, t))

        return nn.Sequential(*layers)

model = MobileNetV2()

In [None]:
loss_func = nn.CrossEntropyLoss(reduction="mean")
optimizer = torch.optim.SGD(model.parameters(), lr= 1E-3, momentum=0.9)

@torch.no_grad()
def get_accuracy(image, target, model):
    batch_size = image.shape[0]
    prediction = model(image)
    _, pred_label = torch.max(prediction, dim=1)
    is_correct = (pred_label == target)
    return is_correct.cpu().numpy().sum() / batch_size

In [None]:
device = torch.device("cuda")
loss_func = nn.CrossEntropyLoss(reduction="mean")
optimizer = torch.optim.SGD(model.parameters(), lr= 1E-3, momentum=0.9)

In [None]:
def train_one_epoch(dataloaders, model, optimizer, loss_func, device):
    losses = {}
    accuracies = {}
    for phase in ["train", "val"]:
        running_loss = 0.0
        running_correct = 0

        if phase == "train":
            model.train()
        else:
            model.eval()

        for index, batch in enumerate(dataloaders[phase]):
            image = batch["image"].to(device)
            target = batch["target"].to(device).view(-1)

            optimizer.zero_grad()

            with torch.set_grad_enabled(phase == "train"):
                prediction = model(image)
                loss = loss_func(prediction, target)

                if phase == "train":
                    loss.backward()
                    optimizer.step()

            running_loss += loss.item()
            running_correct += get_accuracy(image, target, model)

            if phase == "train" and index % 10 == 0:
                print(f"{index}/{len(dataloaders[phase])} - Running Loss: {loss.item()}")

        losses[phase] = running_loss / len(dataloaders[phase])
        accuracies[phase] = running_correct / len(dataloaders[phase])
    return losses, accuracies

In [None]:
num_epochs = 10
model = MobileNetV2().to(device)

for epoch in range(num_epochs):
    losses, accuracies = train_one_epoch(dataloaders, model, optimizer, loss_func, device)
    print(f"{epoch+1}/{num_epochs}-Train Loss: {losses['train']}, Val Loss: {losses['val']}")
    print(f"{epoch+1}/{num_epochs}-Train Acc: {accuracies['train']}, Val Acc: {accuracies['val']}")

    if accuracies["val"] > best_acc:
        best_acc = accuracies["val"]
        best_model = copy.deepcopy(model.state_dict())

save_best_model(best_model, "best_model.pth")
print("Best model saved successfully.")