<a href="https://colab.research.google.com/github/parikhshyamal1993/vegetable_classification/blob/main/Vegitable_Classification_Experiment.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import os
import torch
from torch.utils.data import Dataset
from torchvision import datasets
from torchvision.transforms import ToTensor
import matplotlib.pyplot as plt
from torchvision.io import read_image
import torchvision.transforms as transforms
import cv2
import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim import lr_scheduler
import torch.backends.cudnn as cudnn
import numpy as np
import torchvision
from torchvision import datasets, models, transforms
import matplotlib.pyplot as plt
import time
import os
from PIL import Image
from tempfile import TemporaryDirectory
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print("device",device)

device cuda


In [2]:
dataset_base_location ="/content/drive/MyDrive/vegetable_images/"
dataset_train_location = os.path.join(dataset_base_location,'train')
dataset_test_location = os.path.join(dataset_base_location,'test')
dataset_validation_location = os.path.join(dataset_base_location,'validation')

In [3]:
data_transforms = {
    'train': transforms.Compose([
        transforms.RandomResizedCrop(224),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'val': transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
}

data_dir = dataset_base_location
image_datasets = {x: datasets.ImageFolder(os.path.join(data_dir, x),
                                          data_transforms[x])
                  for x in ['train', 'val']}
dataloaders = {x: torch.utils.data.DataLoader(image_datasets[x], batch_size=256,
                                             shuffle=True, num_workers=4)
              for x in ['train', 'val']}
dataset_sizes = {x: len(image_datasets[x]) for x in ['train', 'val']}
class_names = image_datasets['train'].classes

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")




In [4]:
import torch.nn as nn

class Block(nn.Module):

    def __init__(self, in_channels, out_channels, identity_downsample=None, stride=1):
        super(Block, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU()
        self.identity_downsample = identity_downsample

    def forward(self, x):
        identity = x
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.conv2(x)
        x = self.bn2(x)
        if self.identity_downsample is not None:
            identity = self.identity_downsample(identity)
        x += identity
        x = self.relu(x)
        return x


class ResNet_18(nn.Module):

    def __init__(self, image_channels, num_classes):

        super(ResNet_18, self).__init__()
        self.in_channels = 64
        self.conv1 = nn.Conv2d(image_channels, 64, kernel_size=7, stride=2, padding=3)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU()
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        #resnet layers
        self.layer1 = self.__make_layer(64, 64, stride=1)
        self.layer2 = self.__make_layer(64, 128, stride=2)
        self.layer3 = self.__make_layer(128, 256, stride=2)
        self.layer4 = self.__make_layer(256, 512, stride=2)

        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512, num_classes)

    def __make_layer(self, in_channels, out_channels, stride):

        identity_downsample = None
        if stride != 1:
            identity_downsample = self.identity_downsample(in_channels, out_channels)

        return nn.Sequential(
            Block(in_channels, out_channels, identity_downsample=identity_downsample, stride=stride),
            Block(out_channels, out_channels)
        )

    def forward(self, x):

        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = x.view(x.shape[0], -1)
        x = self.fc(x)
        return x

    def identity_downsample(self, in_channels, out_channels):

        return nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=2, padding=1),
            nn.BatchNorm2d(out_channels)
        )



In [5]:
def train_model(model, criterion, optimizer, scheduler, num_epochs=25):
    since = time.time()

    # Create a temporary directory to save training checkpoints
    if True:
        tempdir = '/content/drive/MyDrive/Vegetable_classification_params'
        best_model_params_path = os.path.join(tempdir, 'best_model_params.pt')

        torch.save(model.state_dict(), best_model_params_path)
        best_acc = 0.0

        for epoch in range(num_epochs):
            print(f'Epoch {epoch}/{num_epochs - 1}')
            print('-' * 10)

            # Each epoch has a training and validation phase
            for phase in ['train', 'val']:
                if phase == 'train':
                    model.train()  # Set model to training mode
                else:
                    model.eval()   # Set model to evaluate mode

                running_loss = 0.0
                running_corrects = 0

                # Iterate over data.
                for inputs, labels in dataloaders[phase]:
                    inputs = inputs.to(device)
                    labels = labels.to(device)

                    # zero the parameter gradients
                    optimizer.zero_grad()

                    # forward
                    # track history if only in train
                    with torch.set_grad_enabled(phase == 'train'):
                        outputs = model(inputs)
                        _, preds = torch.max(outputs, 1)
                        loss = criterion(outputs, labels)

                        # backward + optimize only if in training phase
                        if phase == 'train':
                            loss.backward()
                            optimizer.step()

                    # statistics
                    running_loss += loss.item() * inputs.size(0)
                    running_corrects += torch.sum(preds == labels.data)


                epoch_loss = running_loss / dataset_sizes[phase]
                epoch_acc = running_corrects.double() / dataset_sizes[phase]

                print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')

                # deep copy the model
                if phase == 'val' and epoch_acc > best_acc:
                    best_acc = epoch_acc
                    torch.save(model.state_dict(), best_model_params_path)

                if phase == 'val':
                    scheduler.step(epoch_loss)

            print()

        time_elapsed = time.time() - since
        print(f'Training complete in {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s')
        print(f'Best val Acc: {best_acc:4f}')

        # load best model weights
        model.load_state_dict(torch.load(best_model_params_path, weights_only=True))
    return model

In [6]:
model = ResNet_18(3,15).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
lr_scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, patience=5, verbose=True)


model_ft = train_model(model, criterion, optimizer, lr_scheduler,num_epochs=25)



Epoch 0/24
----------
train Loss: 1.8655 Acc: 0.4219
val Loss: 1.0943 Acc: 0.6687

Epoch 1/24
----------
train Loss: 0.9694 Acc: 0.7123
val Loss: 0.7051 Acc: 0.7823

Epoch 2/24
----------
train Loss: 0.6441 Acc: 0.8133
val Loss: 0.4662 Acc: 0.8667

Epoch 3/24
----------
train Loss: 0.4934 Acc: 0.8595
val Loss: 0.3592 Acc: 0.9023

Epoch 4/24
----------
train Loss: 0.3878 Acc: 0.8887
val Loss: 0.5771 Acc: 0.7953

Epoch 5/24
----------
train Loss: 0.3279 Acc: 0.9069
val Loss: 0.2187 Acc: 0.9350

Epoch 6/24
----------
train Loss: 0.2994 Acc: 0.9137
val Loss: 0.2545 Acc: 0.9213

Epoch 7/24
----------
train Loss: 0.2671 Acc: 0.9239
val Loss: 0.1440 Acc: 0.9627

Epoch 8/24
----------
train Loss: 0.2358 Acc: 0.9301
val Loss: 0.1716 Acc: 0.9543

Epoch 9/24
----------
train Loss: 0.2209 Acc: 0.9355
val Loss: 0.1338 Acc: 0.9607

Epoch 10/24
----------
train Loss: 0.1995 Acc: 0.9415
val Loss: 0.1483 Acc: 0.9557

Epoch 11/24
----------
train Loss: 0.1773 Acc: 0.9488
val Loss: 0.1627 Acc: 0.9467

Ep

# Training model Without Torch modules for process understanding


In [None]:
labels = os.listdir(dataset_train_location)
label_map = {i:k for i , k in enumerate(labels)}
print(label_map)

In [None]:
from torch.utils.data import DataLoader
train = CustomDataLoader(dataset_train_location)
test = CustomDataLoader(dataset_test_location)

train_dataloader = DataLoader(train, batch_size=64, shuffle=True)
test_dataloader = DataLoader(test, batch_size=64, shuffle=True)


train_features,train_labels = next(iter(train_dataloader))
print(train_features)
print(f"Feature batch shape: {train_features.size()}")
print(f"Labels batch shape: {train_labels.size()}")

In [None]:
class CustomDataLoader(Dataset):
    def __init__(self,folder,transform=None, target_transform=None):
        super().__init__()
        self.img_root_folder =folder
        self.labels = os.listdir(dataset_train_location)
        self.label_map = {i:k for i , k in enumerate(self.labels)}
        self.image_dataset = self.data_reader()
        self.transform = transform
        self.normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406],std=[0.229, 0.224, 0.225])
        self.target_transform = target_transform
    def data_reader(self):
        data_list =[]
        for labels , image_type  in self.label_map.items():
            folder_path = os.path.join(self.img_root_folder,image_type)
            print(folder_path)
            for image_name in os.listdir(folder_path):
                data_list.append([os.path.join(folder_path,image_name),labels])
        return data_list

    def __len__(self):
        return len(self.image_dataset)


    def __getitem__(self,idx):
        img_path , label = self.image_dataset[idx]
        #image = read_image(img_path)
        im = (cv2.imread(img_path)[:,:,::-1])
        im = cv2.resize(im, (224,224))
        im = torch.tensor(im/255)
        im = im.permute(2,0,1)
        im = self.normalize(im)
        return im.float().to(device), torch.tensor([label]).float().to(device)


In [None]:
import torch.optim as optim
import torch.nn.functional as F

model = ResNet_18(3,15).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0001, weight_decay=1e-4)
lr_scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, patience=5, verbose=True)

for epoch in range(10):
    running_loss = 0.0
    phase = 'train'
    for i, data in enumerate(train_dataloader, 0):

        inputs, labels = data

        with torch.set_grad_enabled(phase == 'train'):
            outputs = model(inputs)
            _, predicted = torch.max(outputs.data, 1)
            #print(predicted, labels.flatten())
            optimizer.zero_grad()
            #print(outputs.size(),labels.flatten().size())
            loss = criterion(outputs, labels.flatten().long())
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
    print(f'Epoch {epoch} loss: {running_loss / len(trainloader)}')