In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!pip install oidv6

In [None]:
!pip install --upgrade oidv6

In [None]:
!oidv6 downloader en --dataset drive/MyDrive/foldr --type_data train --classes Tomato Pumpkin Carrot Cucumber Broccoli --limit 400


In [None]:
!oidv6 downloader en --dataset drive/MyDrive/foldr --type_data test --classes Lemon --limit 100

In [None]:
#@title Class mapping
number_of_classes = 5
classes = ["broccoli", "carrot", "cucumber", "lemon", "pumpkin"]

#A dictionary is made by finding a corresponding class index by providing the predefined 5 class names

with open('/content/drive/MyDrive/pics/classes.txt', 'r') as f:
    class_names = [ln.strip() for ln in f]

class_mapping = {}
for i, class_name in enumerate(class_names):
    if class_name.lower() in [c.lower() for c in classes]:
        index = [c.lower() for c in classes].index(class_name.lower())
        class_mapping[class_name.lower()] = {'index': i}

print(class_mapping)

{'broccoli': {'index': 0}, 'carrot': {'index': 1}, 'cucumber': {'index': 2}, 'lemon': {'index': 3}, 'pumpkin': {'index': 4}}


In [None]:
import torch
torch.cuda.is_available()

False

In [None]:
#@title Dataset
import numpy as np
import re
import glob
import torch
import torchvision
from PIL import Image
import PIL
from torchvision import transforms
import os

class ImageDataset(torch.utils.data.Dataset):
    def __init__(self, image_dir, transforms=transforms):
        self.image_dir = image_dir
        self.labels = []
        self.class_names = []
        self.transforms = transforms
        

        #Retrieving image files and their label paths, folder doesn't necessarily have to contain images of its name
        self.class1_files = glob.glob(self.image_dir + "/{}/*.jpg".format(classes[0].lower()))
        self.class1_label_paths = glob.glob(self.image_dir + "/{}/labels/*.txt".format(classes[0].lower()))
        self.class2_files = glob.glob(self.image_dir + "/{}/*.jpg".format(classes[1].lower()))
        self.class2_label_paths = glob.glob(self.image_dir + "/{}/labels/*.txt".format(classes[1].lower()))
        self.class3_files = glob.glob(self.image_dir + "/{}/*.jpg".format(classes[2].lower()))
        self.class3_label_paths = glob.glob(self.image_dir + "/{}/labels/*.txt".format(classes[2].lower()))
        self.class4_files = glob.glob(self.image_dir + "/{}/*.jpg".format(classes[3].lower()))
        self.class4_label_paths = glob.glob(self.image_dir + "/{}/labels/*.txt".format(classes[3].lower()))
        self.class5_files = glob.glob(self.image_dir + "/{}/*.jpg".format(classes[4].lower()))
        self.class5_label_paths = glob.glob(self.image_dir + "/{}/labels/*.txt".format(classes[4].lower()))

        self.class1 = len(self.class1_files)
        #print(self.class1)
        self.class2 = len(self.class2_files)
        #print(self.class2)
        self.class3 = len(self.class3_files)
        #print(self.class3)
        self.class4 = len(self.class4_files)
        #print(self.class3)
        self.class5 = len(self.class5_files)
        #print(self.class3)

        #print(self.class3)
    
        self.files = self.class1_files + self.class2_files + self.class3_files +  self.class4_files + self.class5_files
       # print(self.files)
        self.label_paths = self.class1_label_paths + self.class2_label_paths + self.class3_label_paths + self.class4_label_paths + self.class5_label_paths
        #print(self.label_paths)

        
       #Each image has a corresponding label text file, therefore class names are extracted into a list 
        for label_file in self.label_paths:
            #img_name = os.path.splitext(os.path.basename(label_file))[0]
            with open(label_file, 'r') as f:
              label_parts = f.readline().strip().split(' ')
              label_name = label_parts[0]
              self.class_names.append(label_name)
        #print(self.class_names)

        #In order to keep labels as integers(indexes, corresponding in class file), 
        #class names are mapped to their index values in the dictionary
        for label in self.class_names:
            label_data = class_mapping.get(label.lower())
            self.labels.append(label_data['index'])
        #print(self.labels)

        #Image and label order is mixed
        self.order =  [x for x in np.random.permutation(len(self.labels))]
        self.files = [self.files[x] for x in self.order]
        self.labels = [self.labels[x] for x in self.order]
        #print(self.labels)

    def __len__(self):
         return (len(self.labels))

    def __getitem__(self, index):
        files = self.files[index]

        img = Image.open(files).convert("RGB")
        img = self.transforms(img)

        label = self.labels[index]

        return img, label


#Defining transformations which are later going to be applied on train dataset
normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                 std=[0.229, 0.224, 0.225])
transformTrain = transforms.Compose([
            torchvision.transforms.Resize((224,224)),
            #torchvision.transforms.ColorJitter(hue=.05, saturation=.05),
            torchvision.transforms.RandomHorizontalFlip(),
            transforms.RandomGrayscale(),
            transforms.RandomRotation(20),
            #transforms.ColorJitter(brightness = 0.4, saturation = 0.1, hue = 0.1),
            #torchvision.transforms.RandomRotation(20, resample=PIL.Image.BILINEAR),
            transforms.ToTensor(),

            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ])
transformVal = transforms.Compose([
            torchvision.transforms.Resize((224,224)),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ])

#Creating train and validation datasets
#train_image_dir = "/content/drive/MyDrive/foldr/train"
train_image_dir = "/content/drive/MyDrive/foldr/train"
train_dataset = ImageDataset(image_dir=train_image_dir, transforms=transformTrain)

#val_image_dir = "/content/drive/MyDrive/foldr/validation"
val_image_dir = "/content/drive/MyDrive/foldr/test"
val_dataset = ImageDataset(image_dir=val_image_dir, transforms=transformVal)

In [None]:
len(train_dataset)

2000

In [None]:
len(val_dataset)

470

In [None]:
from datetime import datetime
import numpy as np
import torch
import torchvision
import torchvision.transforms as transforms
import matplotlib.pyplot as plt

device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print(f'Device: {device}')

Device: cpu


In [None]:
#@title ConvNetS
#everything is the same as in ConvNet, only a softmax is added after the last fully connected layer
class ConvNetS(torch.nn.Module):
    def __init__(self, input_shape, num_classes):
        super().__init__()

        self.conv1 = torch.nn.Conv2d(input_shape[0], 16, (3, 3), padding='same')
        self.bn1 = torch.nn.BatchNorm2d(16)
        self.conv2 = torch.nn.Conv2d(16, 32, (3, 3), padding='same')
        self.bn2 = torch.nn.BatchNorm2d(32)
        self.conv3 = torch.nn.Conv2d(32, 64, (3, 3), padding='same')
        self.bn3 = torch.nn.BatchNorm2d(64)
        self.fc1 = torch.nn.Linear(64 * (input_shape[1] // 8) * (input_shape[2] // 8), 256)
        self.bn4 = torch.nn.BatchNorm1d(256)
        self.fc2 = torch.nn.Linear(256, 128)
        self.bn5 = torch.nn.BatchNorm1d(128)
        self.fc3 = torch.nn.Linear(128, num_classes)

    def forward(self, x):
        y = self.conv1(x)
        y = self.bn1(y)
        y = torch.nn.ReLU()(y)
        y = torch.nn.MaxPool2d((2, 2), (2, 2))(y)
        y = self.conv2(y)
        y = self.bn2(y)
        y = torch.nn.ReLU()(y)
        y = torch.nn.MaxPool2d((2, 2), (2, 2))(y)
        y = self.conv3(y)
        y = self.bn3(y)
        y = torch.nn.ReLU()(y)
        y = torch.nn.MaxPool2d((2, 2), (2, 2))(y)
        y = torch.nn.Flatten()(y)
        y = self.fc1(y)
        y = self.bn4(y)
        y = torch.nn.ReLU()(y)
        y = self.fc2(y)
        y = self.bn5(y)
        y = torch.nn.ReLU()(y)
        y = self.fc3(y)
        y = torch.nn.Softmax(dim=1)(y)

        return y


In [None]:
#@title Train and validate
#Training for 15 epochs, after 10 epochs, usually overfitting starts 
predicted_list = []
ground_truth_list = []

def train(model, train_dataloader, optimizer, loss_func, epoch_count=15, print_freq=1):
    model.train()
    for epoch in range(epoch_count):
        total_loss = 0
        total_correct = 0
        total_samples = 0

        for images, labels in train_dataloader:
            images = images.to(device)
            labels = labels.to(device)
            optimizer.zero_grad()
            preds = model(images)
            loss = loss_func(preds, labels)
            total_loss += loss.item()
            _, predicted = torch.max(preds, 1)
            total_correct += (predicted == labels).sum().item()
            total_samples += labels.size(0)
            loss.backward()
            optimizer.step()

        epoch_loss = total_loss / len(train_dataloader)
        epoch_acc = total_correct / total_samples
        if (epoch + 1) % print_freq == 0:
            print(f"Epoch [{epoch+1}/{epoch_count}], Train Loss: {epoch_loss:.4f}, Train Acc: {epoch_acc:.4f}")
        val_loss, val_acc, predicted_list_local, ground_truth_list_local = validate(model, val_dataloader, loss_func)
        if (epoch + 1) % print_freq == 0:
          print(f"Epoch [{epoch+1}/{epoch_count}], Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")
        tp_local, fp_local, fn_local = calcMetrics(ground_truth_list_local, predicted_list_local)
        #print(predicted_list_local)
        #print(predicted_list)
        if (epoch + 1) % 5 == 0 or (epoch + 1) == 1:
          for i in range(0, number_of_classes):
              #print(tp_local[classes[i].lower()], tn_local[classes[i].lower()], fp_local[classes[i].lower()], fn_local[classes[i].lower()])
              metrics = calculate_metrics(tp_local[classes[i].lower()], fp_local[classes[i].lower()], fn_local[classes[i].lower()])
              print(classes[i].lower(), ': ', metrics)
          print_confusion_matrix(ground_truth_list_local, predicted_list_local, classes)
              
 
    return epoch_loss, epoch_acc

def validate(model, val_dataloader, loss_func):
    model.eval()
    total_loss = 0
    total_correct = 0
    total_samples = 0
 
    predicted_list_local = []
    ground_truth_list_local = []
    with torch.no_grad():
        for images, labels in val_dataloader:
            images = images.to(device)
            labels = labels.to(device)
            preds = model(images)
            loss = loss_func(preds, labels)
            total_loss += loss.item()

            _, predicted = torch.max(preds, 1)
            total_correct += (predicted == labels).sum().item()
            total_samples += labels.size(0)

            for i in range(len(predicted)):
              ground_truth_index = labels[i].item()
              ground_truth_list.append(ground_truth_index)
              ground_truth_list_local.append(ground_truth_index)
              predicted_index = predicted[i].item()
              predicted_list.append(predicted_index)
              predicted_list_local.append(predicted_index)


    epoch_loss = total_loss / len(val_dataloader)
    epoch_acc = total_correct / total_samples

    return epoch_loss, epoch_acc, predicted_list_local, ground_truth_list_local
    

In [None]:
#@title TP FP FN function

#Obtaining TP FP FN values to calculate other metrics
def calcMetrics(ground_truth_list, predicted_list):
  tp_local = {classes[0].lower(): 0, classes[1].lower(): 0, classes[2].lower(): 0, classes[3].lower(): 0, classes[4].lower(): 0}
  #tn_local = {classes[0].lower(): 0, classes[1].lower(): 0, classes[2].lower(): 0, classes[3].lower(): 0, classes[4].lower(): 0}
  fp_local = {classes[0].lower(): 0, classes[1].lower(): 0, classes[2].lower(): 0, classes[3].lower(): 0, classes[4].lower(): 0}
  fn_local = {classes[0].lower(): 0, classes[1].lower(): 0, classes[2].lower(): 0, classes[3].lower(): 0, classes[4].lower(): 0}
  
  for i in range(len(ground_truth_list)):
      
      current_ground_truth = ground_truth_list[i]
      current_predicted = predicted_list[i]
    
      gt_class_name = [k for k, v in class_mapping.items() if v['index'] == current_ground_truth]
      gt_class_name = gt_class_name[0]
      pr_class_name = [k for k, v in class_mapping.items() if v['index'] == current_predicted]
      pr_class_name = pr_class_name[0]

    # print(current_ground_truth)
    # print(gt_class_name)
    # print(current_predicted)
    # print(pr_class_name)

      if current_predicted == current_ground_truth:
          tp_local[gt_class_name] += 1
      else:
          fp_local[pr_class_name] += 1
          fn_local[gt_class_name] += 1
          
  #print(tp_local, tn_local, fp_local, fn_local)
  return tp_local, fp_local, fn_local


In [None]:
#@title Metrics of each class and overall metrics

def calculate_metrics(tp, fp, fn):
  tn=0
  metrics = {}
  try:
        metrics['accuracy'] = (tp + tn) / (tp + fp + tn + fn)
  except ZeroDivisionError:
        metrics['accuracy'] = 0.0
  try:
        metrics['recall'] = tp / (tp + fn)
  except ZeroDivisionError:
        metrics['recall'] = 0.0
  try:
        metrics['precision'] = tp / (tp + fp)
  except ZeroDivisionError:
        metrics['precision'] = 0.0
  try:
        metrics['f1'] = 2 * (metrics['precision'] * metrics['recall']) / (metrics['precision'] + metrics['recall'])
  except ZeroDivisionError:
        metrics['f1'] = 0.0

  return metrics

#Adding up separate class statistics
def add_up_statistics(t_p, f_p, f_n):

  statistics = {'tp': 0, 'tn': 0, 'fp': 0, 'fn': 0}
  
  #add up tp
  for i in range(0, number_of_classes):
    statistics['tp'] = statistics['tp'] + t_p[classes[0].lower()]

   #add up fp
  for i in range(0, number_of_classes):
    statistics['fp'] = statistics['fp'] + f_p[classes[0].lower()]

   #add up fn
  for i in range(0, number_of_classes):
    statistics['fn'] = statistics['fn'] + f_n[classes[0].lower()]

  return statistics

def calculate_overall_metrics(conjoined_statistics):

  tp = conjoined_statistics['tp']
  tn = conjoined_statistics['tn']
  fp = conjoined_statistics['fp']
  fn = conjoined_statistics['fn']

  metrics = {}
  metrics['accuracy'] = (tp + tn) / (tp + fp + tn + fn)
  metrics['recall'] = tp / (tp + fn)
  metrics['precision'] = tp / (tp + fp)
  metrics['f1'] = 2 * (metrics['precision'] * metrics['recall']) / (metrics['precision'] + metrics['recall'])

  return metrics


In [None]:
#@title Confusion matrix function
def print_confusion_matrix(ground_truth_list, predicted_list, classes):
    cm = [[0 for _ in range(len(classes))] for _ in range(len(classes))]
    for i in range(len(ground_truth_list)):
        cm[ground_truth_list[i]][predicted_list[i]] += 1

    header = [" "] + classes
    rows = [header] + [[classes[i]] + cm[i] for i in range(len(classes))]
    max_len = max(len(str(x)) for row in rows for x in row) + 2

    print("Confusion Matrix:")
    for row in rows:
        print("".join(str(x).ljust(max_len) for x in row))

In [None]:
#@title Dataloader
train_batch_size=200
val_batch_size=30
train_dataloader = torch.utils.data.DataLoader(train_dataset, batch_size=train_batch_size, shuffle=True)
val_dataloader = torch.utils.data.DataLoader(val_dataset, batch_size=val_batch_size, shuffle=False)


In [None]:
tp = {classes[0].lower(): 0, classes[1].lower(): 0, classes[2].lower(): 0, classes[3].lower(): 0, classes[4].lower(): 0}
#tn = {classes[0].lower(): 0, classes[1].lower(): 0, classes[2].lower(): 0, classes[3].lower(): 0, classes[4].lower(): 0}
fp = {classes[0].lower(): 0, classes[1].lower(): 0, classes[2].lower(): 0, classes[3].lower(): 0, classes[4].lower(): 0}
fn = {classes[0].lower(): 0, classes[1].lower(): 0, classes[2].lower(): 0, classes[3].lower(): 0, classes[4].lower(): 0}

index_list = [v['index'] for v in class_mapping.values() if v['index'] != -1]
print(index_list)

[0, 1, 2, 3, 4]


In [None]:
model = ConvNetS(train_dataset[0][0].shape, 5).to(device)
print(f'Parameter count: {sum(p.numel() for p in model.parameters() if p.requires_grad):,}')
optimizer = torch.optim.Adam(model.parameters(), lr=0.0005)
loss_func = torch.nn.CrossEntropyLoss()
train(model, train_dataloader, optimizer, loss_func, 15, 1)

Parameter count: 12,903,429
Epoch [1/15], Train Loss: 1.5455, Train Acc: 0.3400
Epoch [1/15], Val Loss: 1.5754, Val Acc: 0.2489
broccoli :  {'accuracy': 0.0, 'recall': 0.0, 'precision': 0.0, 'f1': 0.0}
carrot :  {'accuracy': 0.0, 'recall': 0.0, 'precision': 0.0, 'f1': 0.0}
cucumber :  {'accuracy': 0.0, 'recall': 0.0, 'precision': 0.0, 'f1': 0.0}
lemon :  {'accuracy': 0.2315035799522673, 'recall': 0.97, 'precision': 0.23317307692307693, 'f1': 0.375968992248062}
pumpkin :  {'accuracy': 0.16806722689075632, 'recall': 0.23529411764705882, 'precision': 0.37037037037037035, 'f1': 0.28776978417266186}
Confusion Matrix:
          broccoli  carrot    cucumber  lemon     pumpkin   
broccoli  0         0         0         98        2         
carrot    0         0         0         59        26        
cucumber  0         0         0         97        3         
lemon     0         0         0         97        3         
pumpkin   0         0         0         65        20        
Epoch [2/15], 

(1.327048110961914, 0.5745)

In [None]:
#Printing out confusion matrix
print_confusion_matrix(ground_truth_list, predicted_list, classes)

Confusion Matrix:
          broccoli  carrot    cucumber  lemon     pumpkin   
broccoli  822       131       299       187       61        
carrot    52        637       70        81        435       
cucumber  412       132       662       244       50        
lemon     197       174       324       649       156       
pumpkin   33        285       63        124       770       


In [None]:
tp, fp, fn = calcMetrics(ground_truth_list, predicted_list)

In [None]:
#Printing out total of TP FP FN for each class
for i in range(0, number_of_classes):
  metrics = calculate_metrics(tp[classes[i].lower()], fp[classes[i].lower()], fn[classes[i].lower()])
  print(classes[i].lower(), ': ', metrics)

broccoli :  {'accuracy': 0.37465815861440294, 'recall': 0.548, 'precision': 0.5422163588390502, 'f1': 0.5450928381962864}
carrot :  {'accuracy': 0.3189784677015523, 'recall': 0.4996078431372549, 'precision': 0.4687270051508462, 'f1': 0.4836750189825361}
cucumber :  {'accuracy': 0.29343971631205673, 'recall': 0.44133333333333336, 'precision': 0.46685472496473907, 'f1': 0.45373543522960935}
lemon :  {'accuracy': 0.3038389513108614, 'recall': 0.43266666666666664, 'precision': 0.5050583657587548, 'f1': 0.4660682226211849}
pumpkin :  {'accuracy': 0.3894790085988872, 'recall': 0.6039215686274509, 'precision': 0.5230978260869565, 'f1': 0.5606115762650162}


In [None]:
metrics_len = len(tp)

#Total of TP FP FN
conjoined_statistics = add_up_statistics(tp, fp, fn)
print(conjoined_statistics)

#Overall metrics
overall_metrics = calculate_overall_metrics(conjoined_statistics)
print(overall_metrics)

{'tp': 4110, 'tn': 0, 'fp': 3470, 'fn': 3390}
{'accuracy': 0.37465815861440294, 'recall': 0.548, 'precision': 0.5422163588390502, 'f1': 0.5450928381962864}


In [None]:
#checking if confusion matrix function works
predicted_list1 = [0, 1, 2, 3, 4, 1, 1, 2, 0, 3, 3, 4, 0, 4, 0, 1, 2, 2, 3, 4]
ground_truth_list1 = [0, 1, 2, 3, 4, 0, 0, 4, 0, 3, 4, 1, 1, 4, 2, 1, 2, 2, 4, 4]
print_confusion_matrix(ground_truth_list1, predicted_list1, classes)

Confusion Matrix:
          broccoli  carrot    cucumber  lemon     pumpkin   
broccoli  2         2         0         0         0         
carrot    1         2         0         0         1         
cucumber  1         0         3         0         0         
lemon     0         0         0         2         0         
pumpkin   0         0         1         2         3         
