In [1]:
#importing libs

import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader, random_split


from sklearn.metrics import confusion_matrix, f1_score, accuracy_score, classification_report

import glob, os
import numpy as np
from PIL import Image

In [2]:
#device

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print('device:',device)

device: cuda


In [3]:
# dataloader

class IMDDataset(Dataset):
    def __init__(self, root_dir, sub_dir, transform=None):
        self.root_dir = root_dir
        self.sub_dir = sub_dir
        self.transform = transform

        self.img_names = []
        for sdir in self.sub_dir:
            self.img_names.extend(sorted(glob.glob(os.path.join(root_dir, sdir, '*'))))

    def __len__(self):
        return len(self.img_names)

    def __getitem__(self, idx):
        self.img_path = self.img_names[idx]
        
        self.img = Image.open(self.img_path)

        self.y = 1 if self.img_path.split('/')[-1].split('_')[0] == 'Au' else 0

        if self.transform:
            (self.img, self.y) = self.transform((self.img, self.y))
        
        return (self.img, self.y)


class ToTensor:
    def __call__(self, sample):
        X, y = sample
        return (torch.from_numpy(X),torch.tensor(y))


class Resize:
    def __init__(self, width, height):
        self.w = width
        self.h = height

    def __call__(self, sample):
        X, y = sample
        X = X.resize((self.w,self.h))
        return (X, y)


class Normalize:
    def __call__(self, sample):
        X, y = sample
        X = np.array(X, dtype=np.float32).transpose(2,0,1)/255.0
        return (X, y)

In [15]:
# auth -> 1 and tp -> 0
class IMDModel(nn.Module):

    def __init__(self):
        super(IMDModel,self).__init__()

        self.maxpool = nn.MaxPool2d(kernel_size=2)
        self.relu = nn.ReLU()
        self.down_conv1 = nn.Sequential(
                        nn.Conv2d(in_channels=3, out_channels=64, kernel_size=3),
                        nn.BatchNorm2d(64),
                        self.maxpool,
                        self.relu
        )
        self.down_conv2 = nn.Sequential(
                        nn.Conv2d(in_channels=64, out_channels=16, kernel_size=3),
                        nn.BatchNorm2d(16),
                        self.maxpool,
                        self.relu
        )
        self.linear = nn.Sequential(
                        nn.Linear(in_features=16*30*30, out_features=1024),
                        nn.BatchNorm1d(1024),
                        self.relu,
                        nn.Linear(in_features=1024, out_features=64),
                        nn.BatchNorm1d(64),
                        self.relu,
                        nn.Linear(in_features=64, out_features=2),
                        nn.Softmax()
        )


    def forward(self, img):

        d1 = self.down_conv1(img)
        d2 = self.down_conv2(d1)

        d2 = d2.view(-1, d2.shape[1]*d2.shape[2]*d2.shape[3])
        out = self.linear(d2)
        
        return out

In [6]:
#hyperparameters

model = IMDModel().to(device)

epochs = 100
batch_size = 128
opt = optim.Adam(model.parameters(), lr=1e-4)
loss_fn1 = nn.CrossEntropyLoss(reduction='mean')


In [8]:
root_dir = '../data/'
sub_dir = ['auth_model', 'sp_model']

composed = transforms.Compose([Resize(128, 128), Normalize(), ToTensor()])
dataset = IMDDataset(root_dir=root_dir, sub_dir=sub_dir, transform=composed)

train_dataset, validation_dataset, test_dataset = random_split(dataset, [1000, 500, 221])
train_data = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)
validation_data = DataLoader(dataset=validation_dataset, batch_size=batch_size, shuffle=True)
test_data = DataLoader(dataset=test_dataset, batch_size=221, shuffle=True)

In [7]:
# training

for epoch in range(1, epochs+1):
    
    print('Epoch: {}/{} '.format(epoch,epochs))

    model.train()
    train_loss = []

    for idx, (X, y) in enumerate(train_data):
        X, y = X.to(device), y.to(device)

        opt.zero_grad()
        with torch.set_grad_enabled(True):

            output = model(X)

            loss = loss_fn1(output, y)

            # loss.type(torch.FloatTensor)
            train_loss.append(loss.item())

            loss.backward()
            opt.step()

    print('|--- Train Loss: {}'.format(np.mean(train_loss)))

    if(epoch % 10 == 0):
        val_loss = []
        val_acc = []
        with torch.no_grad():
                model.eval()
                for id, (X_val, y_val) in enumerate(validation_data):
                    X_val, y_val = X_val.to(device), y_val.to(device)

                    output = model(X_val)
                    loss1 = loss_fn1(output, y_val)
                    val_loss.append(loss1.item())
                    # epoch_val_score.append(f1_score(y_true > 0 , y_pred > 0))
                    _, y_pred = torch.max(output, dim=1)
                    val_acc.append(accuracy_score(y_val.cpu().numpy(), y_pred.cpu().numpy()))

                print('|--- Val Loss: {} --- Val Accuracy: {}'.format(np.mean(val_loss), np.mean(val_acc)))
            # print('F1 score for validation: %f'%(np.mean(epoch_val_score)))
    
    print('-'*80)

Epoch: 1/100 
|--- Train Loss: 0.6446183994412422
--------------------------------------------------------------------------------
Epoch: 2/100 
|--- Train Loss: 0.590332143008709
--------------------------------------------------------------------------------
Epoch: 3/100 
|--- Train Loss: 0.5496955290436745
--------------------------------------------------------------------------------
Epoch: 4/100 
|--- Train Loss: 0.5062691532075405
--------------------------------------------------------------------------------
Epoch: 5/100 
|--- Train Loss: 0.4890725761651993
--------------------------------------------------------------------------------
Epoch: 6/100 
|--- Train Loss: 0.460037499666214
--------------------------------------------------------------------------------
Epoch: 7/100 
|--- Train Loss: 0.44341691583395004
--------------------------------------------------------------------------------
Epoch: 8/100 
|--- Train Loss: 0.4307126961648464
----------------------------------

In [16]:
# testing

model = torch.load('model/model_c1_e100_v2.pth')
ttloss = []
ttacc = []
ttf1 = []
with torch.no_grad():
    model.eval()
    for id, (X_val, y_val) in enumerate(test_data):
        X_val, y_val = X_val.to(device), y_val.to(device)

        output = model(X_val)
        loss1 = loss_fn1(output, y_val)
        ttloss.append(loss1.item())
                    # epoch_val_score.append(f1_score(y_true > 0 , y_pred > 0))
        _, y_pred = torch.max(output, dim=1)
        y_val, y_pred = y_val.cpu().numpy(), y_pred.cpu().numpy()
        ttacc.append(accuracy_score(y_val, y_pred))
        ttf1.append(f1_score(y_val, y_pred))
        res = confusion_matrix(y_val, y_pred)
        report = classification_report(y_val, y_pred)

    print('|--- Test Loss: {} --- Test Accuracy: {} --- F1 Score: {}'.format(np.mean(ttloss), np.mean(ttacc), np.mean(ttf1)))
    print('Confusion matrix:\n',res)
    print('Report:\n',report)

|--- Test Loss: 0.18185213208198547 --- Test Accuracy: 0.9366515837104072 --- F1 Score: 0.9339622641509434
Confusion matrix:
 [[108   8]
 [  6  99]]
Report:
               precision    recall  f1-score   support

           0       0.95      0.93      0.94       116
           1       0.93      0.94      0.93       105

    accuracy                           0.94       221
   macro avg       0.94      0.94      0.94       221
weighted avg       0.94      0.94      0.94       221



In [21]:
# saving model

torch.save(model,'model/model_c1.pth')