In [46]:
import os
import shutil
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
from torch.autograd import Variable
import os
import pandas as pd
from torchvision.io import read_image
from torch.utils.data import Dataset
import cv2
from torch.utils.data import DataLoader
import numpy as np

In [47]:
from sklearn import metrics
def metric(y_test, y_pred):
    matrix = metrics.confusion_matrix(y_test, y_pred)
    accuracy = metrics.accuracy_score(y_test, y_pred)
    precision = metrics.precision_score(y_test, y_pred)
    recall = metrics.recall_score(y_test, y_pred)
    f1 = metrics.f1_score(y_test, y_pred)
    return matrix,accuracy,precision,recall,f1

<h1>Steps to load data from drive to google collab

In [48]:
os.mkdir("dataset")
os.mkdir("dataset/train")
os.mkdir("dataset/test")
os.mkdir("dataset/train/parasite")
os.mkdir("dataset/test/parasite")
os.mkdir("dataset/train/uninfected")
os.mkdir("dataset/test/uninfected")

In [49]:
def move_files(abs_dirname, train_dir, test_dir):
    """Move files into subdirectories."""

    files = [os.path.join(abs_dirname, f) for f in os.listdir(abs_dirname)]

    i = 0
    curr_subdir = None
    files.sort()

    for f in files:
        # create new subdir if necessary
        if i == 0:
            curr_subdir = train_dir
        
        if i == 11025:
            curr_subdir = test_dir

        # move file to current dir
        f_base = os.path.basename(f)
        shutil.copy(f, os.path.join(curr_subdir, f_base))
        i += 1
        if(i%1000 == 0):
            print(i)

In [50]:
abs_dirname = "cell_images/Parasitized/"
train_dir = "dataset/train/parasite"
test_dir = "dataset/test/parasite"
move_files(abs_dirname, train_dir, test_dir)

print(type(abs_dirname))

1000
2000
3000
4000
5000
6000
7000
8000
9000
10000
11000
12000
13000
<class 'str'>


In [51]:
abs_dirname = "cell_images/Uninfected/"
train_dir = "dataset/train/uninfected"
test_dir = "dataset/test/uninfected"
move_files(abs_dirname, train_dir, test_dir)

1000
2000
3000
4000
5000
6000
7000
8000
9000
10000
11000
12000
13000


In [54]:
import pandas as pd
train_df = pd.DataFrame(columns = ["path", "label"])
test_df = pd.DataFrame(columns = ["path", "label"])

abs_dirname  = "dataset/train/parasite"
files = [os.path.join(abs_dirname, f) for f in os.listdir(abs_dirname)]

for f in files:
    f_base = os.path.basename(f)
    if f_base.endswith('.png'):
        df = pd.DataFrame({'path': [f_base], 'label': [1]})
        train_df = pd.concat([train_df, df], ignore_index=True)

abs_dirname  = "dataset/train/uninfected"
files = [os.path.join(abs_dirname, f) for f in os.listdir(abs_dirname)]
for f in files:
    f_base = os.path.basename(f)
    if f_base.endswith('.png'):
        df = pd.DataFrame({'path': [f_base], 'label': [0]})
        train_df = pd.concat([train_df, df], ignore_index=True)
    
train_df.to_csv("dataset/train.csv", index = False)

abs_dirname  = "dataset/test/parasite"
files = [os.path.join(abs_dirname, f) for f in os.listdir(abs_dirname)]
for f in files:
    f_base = os.path.basename(f)
    if f_base.endswith('.png'):
        df = pd.DataFrame({'path': [f_base], 'label': [1]})
        test_df = pd.concat([test_df, df], ignore_index=True)
    
abs_dirname  = "dataset/test/uninfected"
files = [os.path.join(abs_dirname, f) for f in os.listdir(abs_dirname)]
for f in files:
    f_base = os.path.basename(f)
    if f_base.endswith('.png'):
        df = pd.DataFrame({'path': [f_base], 'label': [0]})
        test_df = pd.concat([test_df, df], ignore_index=True)
    
test_df.to_csv("dataset/test.csv", index = False)

In [55]:
test_df
print(test_df.size)

11016


<h1>Creating dataloader for CNN model

In [56]:
class CustomImageDataset(Dataset):
    def __init__(self, annotations_file, img_dir, transform=None, target_transform=None):
        self.img_labels = pd.read_csv(annotations_file)
        self.img_dir = img_dir
        self.transform = transform
        self.target_transform = target_transform

    def __len__(self):
        return len(self.img_labels)

    def __getitem__(self, idx):
        label = self.img_labels['label'][idx]
        class_type = "parasite" if label == 1 else "uninfected"
        img_path = os.path.join(self.img_dir, class_type, self.img_labels.iloc[idx, 0])
        image = cv2.imread(img_path)
        image = cv2.resize(image, (32, 32))
        image = np.moveaxis(image, -1, 0)
        if self.transform:
            image = self.transform(image)
        if self.target_transform:
            label = self.target_transform(label)
        return image.astype(np.float32)/255.0, label

In [57]:
train_dir = "dataset/train"
test_dir = "dataset/test"
train_csv_path = "dataset/train.csv"
test_csv_path = "dataset/test.csv"

<h1>CNN

In [59]:
# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

device(type='cpu')

In [60]:
# Hyper-parameters 
num_epochs = 100
batch_size = 256
learning_rate = 0.005

In [61]:
##train and test dataloader
training_data = CustomImageDataset(annotations_file=train_csv_path, img_dir= train_dir)
train_loader = DataLoader(training_data, batch_size=batch_size, shuffle=True)
testing_data = CustomImageDataset(annotations_file=test_csv_path, img_dir= test_dir)
test_loader = DataLoader(testing_data, batch_size=1024, shuffle=True)

In [73]:
#Dataframe to store accuracy
train_accuracy_df = pd.DataFrame(columns = ['Accuracy', 'Precision', 'Recall','F1'], index = ['VGG11'])
test_accuracy_df = pd.DataFrame(columns = ['Accuracy', 'Precision', 'Recall','F1'], index = ['VGG11'])

In [63]:
# clas for CNN architecture
class ConvNet(nn.Module):
    def __init__(self, cnn_type, in_channels=3, num_classes=1):
        super(ConvNet, self).__init__()
        self.in_channels = in_channels
        self.cnn_type = cnn_type
        self.VGG_types = {'VGG11': [64, 'M', 128, 'M', 256, 256, 'M', 512, 512, 'M', 512, 512, 'M'],}
        self.conv_layers = self.create_conv_layers(self.VGG_types[self.cnn_type])
        
        self.fcs = nn.Sequential(
            nn.Linear(512, 512),
            nn.ReLU(),
            nn.Dropout(p = 0.5),
            nn.Linear(512, 512),
            nn.ReLU(),
            nn.Dropout(p = 0.5),
            nn.Linear(512, num_classes)
            )
        
    def forward(self, x):
        x = self.conv_layers(x)
        x = x.reshape(x.shape[0], -1)
        x = self.fcs(x)
        return torch.sigmoid(x)

    def create_conv_layers(self, architecture):
        layers = []
        in_channels = self.in_channels
        
        for x in architecture:
            if type(x) == int:
                out_channels = x
                
                layers += [nn.Conv2d(in_channels=in_channels,out_channels=out_channels,
                                     kernel_size=(3,3), stride=(1,1), padding=(1,1)),
                           nn.BatchNorm2d(x),
                           nn.ReLU()]
                in_channels = x
            elif x == 'M':
                layers += [nn.MaxPool2d(kernel_size=(2,2), stride=(2,2))]
                
        return nn.Sequential(*layers)

In [64]:
#Function to train the model
def train(cnn_type, check):
  model = ConvNet(cnn_type = cnn_type).to(device)
  PATH = f'./{model.cnn_type}.pth'
  if(check == 'save'):
    pass
  elif check == 'load':
    print("Model already present")
    model.load_state_dict(torch.load(PATH))

  model.train()
  criterion = nn.BCELoss()
  optimizer = torch.optim.Adam(model.parameters(), lr = learning_rate)
  loss_plot = []
  for epoch in range(num_epochs):
    for i, (images, labels) in enumerate(train_loader):
      images = images.to(device)
      labels = labels.unsqueeze(1)
      labels = labels.float()
      labels = labels.to(device)

      # Forward pass
      outputs = model(images)
      loss = criterion(outputs, labels)

     # Backward and optimize
      optimizer.zero_grad()
      loss.backward()
      optimizer.step()
      if (i+1) % 64 == 0:
        print (f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.6f}')
        loss_plot.append(loss.item())
    print('Model saved after epoch: ', epoch)
    torch.save(model.state_dict(), PATH)
    
  print('Finished Training Completely')
  torch.save(model.state_dict(), PATH)

In [65]:
#Function to evaluate the test dataset
def test(data_loader, model):
  with torch.no_grad():
    y_pred = []
    y_hat = []
    for images, labels in data_loader:
        images = images.to(device)
        labels = labels.to(device)
        outputs = model(images)
        outputs = outputs.cpu().detach().numpy()
        labels = labels.cpu().detach().numpy()
        y_pred.extend(outputs)
        y_hat.extend(labels)
  y_pred = np.array(y_pred).flatten()
  y_hat = np.array(y_hat).flatten()
  y_pred =  np.where(y_pred>=0.5, 1, 0) 
  return metric(y_hat, y_pred) 

In [66]:
train('VGG11', check = 'save')

Epoch [1/100], Loss: 0.689884
Model saved after epoch:  0
Epoch [2/100], Loss: 0.532565
Model saved after epoch:  1
Epoch [3/100], Loss: 0.232001
Model saved after epoch:  2
Epoch [4/100], Loss: 0.141270
Model saved after epoch:  3
Epoch [5/100], Loss: 0.122403
Model saved after epoch:  4
Epoch [6/100], Loss: 0.109334
Model saved after epoch:  5
Epoch [7/100], Loss: 0.151461
Model saved after epoch:  6
Epoch [8/100], Loss: 0.118564
Model saved after epoch:  7
Epoch [9/100], Loss: 0.060867
Model saved after epoch:  8
Epoch [10/100], Loss: 0.147612
Model saved after epoch:  9
Epoch [11/100], Loss: 0.080366
Model saved after epoch:  10
Epoch [12/100], Loss: 0.160371
Model saved after epoch:  11
Epoch [13/100], Loss: 0.127296
Model saved after epoch:  12
Epoch [14/100], Loss: 0.104681
Model saved after epoch:  13
Epoch [15/100], Loss: 0.052756
Model saved after epoch:  14
Epoch [16/100], Loss: 0.091756
Model saved after epoch:  15
Epoch [17/100], Loss: 0.133449
Model saved after epoch:  16

In [67]:
model = ConvNet(cnn_type = 'VGG11').to(device)
PATH = f'./VGG11.pth' 
model.load_state_dict(torch.load(PATH))

<All keys matched successfully>

In [78]:
matrix, accuracy, precision, recall, f1 = test(test_loader, model)
test_accuracy_df.loc['VGG11'] = [accuracy,precision,recall,f1]
matrix

array([[2533,  221],
       [ 108, 2646]], dtype=int64)

In [79]:
matrix, accuracy, precision, recall, f1 = test(train_loader, model)
train_accuracy_df.loc['VGG11'] = [accuracy,precision,recall,f1]
matrix

array([[10988,    37],
       [    9, 11016]], dtype=int64)

In [80]:
test_accuracy_df

Unnamed: 0,Accuracy,Precision,Recall,F1
VGG11,0.940269,0.922916,0.960784,0.941469


In [81]:
train_accuracy_df

Unnamed: 0,Accuracy,Precision,Recall,F1
VGG11,0.997914,0.996652,0.999184,0.997916
