In [None]:
import os
import pandas as pd
import glob
import torch.optim as optim
import torch
import PIL
import pytz
import time,copy
import numpy as np
from torchvision.io import read_image
from torch.utils.data import Dataset
from torchvision.transforms.transforms import ToTensor
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
import cv2 as cv
from matplotlib import pyplot as plt
from torch import nn
from torchvision import datasets
from torchvision import models
from torch.optim import lr_scheduler as schedulers
import tqdm.notebook as tq
from datetime import datetime

## Data Class and Loaders

In [None]:
class CoinImageDataset(Dataset):
  def __init__(self, annotations_file, img_dir, transform=None, target_transform=None):
    self.img_dir = img_dir
    self.img_lables = pd.read_csv(annotations_file)
    self.transform = transform
    self.traget_transform = target_transform
  
  def __len__(self):
    return len(self.img_lables)

  def __getitem__(self,idx):
    img_path = os.path.join(self.img_dir, self.img_lables.iloc[idx, 0])
    
    image = cv.imread(img_path+".jpg")
    
    label = self.img_lables.iloc[idx, 1]
    if self.transform:
      image = self.transform(image)

    sample = {"image": image, "label": label}


    return sample



In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
training_data = "/content/drive/MyDrive/Coin-classification-project/resized_train"
training_annotations = "/content/drive/MyDrive/Coin-classification-project/annotation_files/train_annotations_3classes.csv"

test_data = "/content/drive/MyDrive/Coin-classification-project/resized_test"
test_annotations = "/content/drive/MyDrive/Coin-classification-project/annotation_files/test_annotations_3classes.csv"

# Initialize DataLoaders

In [None]:

dataset_train = CoinImageDataset(annotations_file=training_annotations, img_dir=training_data, transform=ToTensor())
dataloader_train = DataLoader(dataset_train, batch_size = 32, shuffle=True)

dataset_test = CoinImageDataset(annotations_file=test_annotations, img_dir=test_data, transform=ToTensor())
dataloader_test = DataLoader(dataset_test, batch_size = 32, shuffle=True)

View a Sample from the Data:

In [None]:
batch = next(iter(dataloader_train))
img = batch["image"][0]
label = batch["label"][0]
plt.imshow(img.permute(2,1,0))
print(label.numpy)
plt.show()


#Data Augmentation with Rotation Technique





In [None]:
def get_image_rotations(img):
  degs = [10,20,30,40,50]
  rotated_imgs = [transforms.RandomRotation(degrees=deg)(img) for deg in degs]
  return rotated_imgs

In [None]:
def augment_batch(inputs,labels):
  batch_size = len(inputs)
  labels_lst = labels.tolist()    
  for i in range(batch_size):
    img = inputs[i]
    label = labels_lst[i]
    rotated_imgs = get_image_rotations(img)
    stack_of_rotated_imgs = torch.stack(rotated_imgs)
    inputs = torch.cat((inputs, stack_of_rotated_imgs), 0)
    for j in rotated_imgs:
      labels_lst.append(label)
      
  labels = torch.Tensor(labels_lst).type(torch.LongTensor)
  return inputs, labels 

# Define Model and Optimizer

In [None]:
def set_parameter_requires_grad(model, feature_extracting):
    if feature_extracting:
        for param in model.parameters():
            param.requires_grad = False


In [None]:
def initialize_model(model_name, num_classes, feature_extract, use_pretrained=True):

    model_ft = None
    input_size = 0

    if model_name == "resnet":
        """ Resnet18
        """
        model_ft = models.resnet18(pretrained=use_pretrained)
        set_parameter_requires_grad(model_ft, feature_extract)
        num_ftrs = model_ft.fc.in_features
        model_ft.fc = nn.Linear(num_ftrs, num_classes)
        input_size = 224


    elif model_name == "vgg":
        """ VGG11_bn
        """
        model_ft = models.vgg11_bn(pretrained=use_pretrained)
        set_parameter_requires_grad(model_ft, feature_extract)
        num_ftrs = model_ft.classifier[6].in_features
        model_ft.classifier[6] = nn.Linear(num_ftrs,num_classes)
        input_size = 224

    elif model_name == "alexnet":
        """ Alexnet
        """
        model_ft = models.alexnet(pretrained=False)
        set_parameter_requires_grad(model_ft, feature_extract)
        num_ftrs = model_ft.classifier[6].in_features
        model_ft.classifier[6] = nn.Linear(num_ftrs,num_classes)
        input_size = 224

    


    else:
        print("Invalid model name, exiting...")
        exit()

    return model_ft, input_size

In [None]:
num_classes = 3
epochs = 100
feature_extract = False
TRAINING_RESULTS_PATH = '/content/drive/MyDrive/Coin-classification-project/results/'

In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
model_ft, input_size = initialize_model('resnet', 3, True,True)
model_ft = model_ft.to(device)

##Train Model

In [None]:
from numpy.ma.core import size
def train_model(model, dataloaders, criterion, optimizer, num_epochs=25, is_inception=False):

  timezone = pytz.timezone("Israel")
  now_time = datetime.now(timezone).strftime("%d-%m-%Y_%H-%M-%S")
  dir_of_res_path = TRAINING_RESULTS_PATH + now_time   # same to loger
  if not os.path.isdir(dir_of_res_path):
    os.makedirs(dir_of_res_path)

  since = time.time()
  val_acc_history = []

  best_model_wts = copy.deepcopy(model.state_dict())
  best_acc = 0.0


  results_df = pd.DataFrame([], columns=['train_loss', 'val_loss'])



  for epoch in tq.tqdm(range(num_epochs)):
    print('Epoch {}/{}'.format(epoch, num_epochs - 1))
    train_loss, val_loss = 0, 0
    train_acc, val_acc = 0, 0

    x_len = 0
    for phase in ['train', 'val']:
      if phase == 'train':
        model.train()
      else:
        model.eval()

      running_loss = 0.0
      running_corrects = 0
      
      for batch in tq.tqdm(dataloaders[phase]):
        inputs = batch['image']
        labels = batch['label']
        
        if phase == 'train':
            inputs, labels = augment_batch(inputs,labels)
            x_len += labels.size(0)
        inputs = inputs.to(device)
        labels = labels.to(device)
        optimizer.zero_grad()

        with torch.set_grad_enabled(phase == 'train'):
          outputs = nn.Softmax(dim=1)(model(inputs))
          loss = criterion(outputs, labels)
          
          _, preds = torch.max(outputs, 1)

          if phase == 'train':
            loss.backward()
            optimizer.step()
            train_loss += loss.item() * inputs.size(0)
            train_acc += torch.sum(preds == labels.data)
          
          if phase == 'val':
            val_loss += loss.item() * inputs.size(0)
            val_acc += torch.sum(preds == labels.data)

        running_loss += loss.item() * inputs.size(0)
        running_corrects += torch.sum(preds == labels.data)
    

    

    num_rotations = 6
    len_train = len(dataloaders['train'].dataset) * num_rotations
    epoch_train_loss = train_loss / len_train
    epoch_train_acc = train_acc.double() / len_train    
    epoch_val_loss = val_loss / len(dataloaders['val'].dataset)
    epoch_val_acc = val_acc.double() / len(dataloaders['val'].dataset)
    if phase == 'train':
      epoch_loss = running_loss / len_train
      epoch_acc = running_corrects.double() / len_train

    else:
      epoch_loss = running_loss / len(dataloaders[phase].dataset)
      epoch_acc = running_corrects.double() / len(dataloaders[phase].dataset)

    with open(dir_of_res_path+'/res.txt', 'a',) as f:
      print('{} Loss: {:.4f} Acc: {:.4f}'.format('train', epoch_train_loss, epoch_train_acc),file=f)
      print('{} Loss: {:.4f} Acc: {:.4f}'.format('val', epoch_val_loss, epoch_val_acc),file=f)

    print('{} Loss: {:.4f} Acc: {:.4f}'.format('train', epoch_train_loss, epoch_train_acc))
    print('{} Loss: {:.4f} Acc: {:.4f}'.format('val', epoch_val_loss, epoch_val_acc))
    if phase == 'val' and epoch_acc > best_acc:
      best_acc = epoch_acc
      best_model_wts = copy.deepcopy(model.state_dict())

    if phase == 'val':
      val_acc_history.append(epoch_acc)
      
    epoch_val_loss = np.round(epoch_val_loss, 4)
    epoch_train_loss = np.round(epoch_train_loss, 4)
    results_df.loc[len(results_df)] = [epoch_train_loss, epoch_val_loss]




  time_elapsed = time.time() - since
  with open(dir_of_res_path+'/res.txt', 'a',) as f:
    print('Training complete in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60),file=f)
    print('Best val Acc: {:4f}'.format(best_acc),file=f)

  model.load_state_dict(best_model_wts)
  torch.save(model.state_dict(), dir_of_res_path + f'/model_ft_{best_acc}.pth')
  plt.figure(figsize=(10, 10))
  plt.plot(results_df['train_loss'], color='gold', label='train')
  plt.plot(results_df['val_loss'], color='purple', label='val')
  plt.ylabel('Loss', fontsize=25)
  plt.xlabel('Epoch', fontsize=25)
  plt.legend()
  plt.savefig(dir_of_res_path + '/final_loss_plot.jpeg')





  return model, val_acc_history

In [None]:
params_to_update = model_ft.parameters()
print("Params to learn:")
if feature_extract:
    params_to_update = []
    for name,param in model_ft.named_parameters():
        if param.requires_grad == True:
            params_to_update.append(param)
            print("\t",name)
    print("end")
else:
    for name,param in model_ft.named_parameters():
        if param.requires_grad == True:
            print("\t",name)

In [None]:
optimizer_ft = optim.SGD(params_to_update, lr=0.01, momentum=0.9)
criterion = nn.CrossEntropyLoss()
dataloaders_dict = {'train': dataloader_train, 'val': dataloader_test}
model_ft, hist = train_model(model_ft, dataloaders_dict, criterion, optimizer_ft, num_epochs=epochs)