In [None]:
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, datasets
from sklearn.metrics import confusion_matrix
from gradcam.utils import visualize_cam
from gradcam import GradCAM, GradCAMpp
from matplotlib import pyplot as plt
from torch.autograd import Variable
from torchsummary import summary
import torch.utils.data as Data
from torch.utils.data import *
import scipy.stats as stats
import matplotlib as mpl
import seaborn as sns
import torch.nn as nn
import pandas as pd
import torchvision
import numpy as np
import cv2 as cv
import torch
import time
import tqdm
import sys
import os

In [None]:
# Check wheteher GPU is being used
if(torch.cuda.is_available()):
    device = torch.device("cuda")
    print(device, torch.cuda.get_device_name(0))
    torch.backends.cudnn.benchmark = True
else:
    device= torch.device("cpu")
    print(device)

# NN Architecture

In [None]:
# Convolution - Batch Normalization - Leaky block
class CBL(nn.Module):
    def __init__(self, channels_in, channels_out, kernel_size, stride, padding):
        super(CBL, self).__init__()
        
        # Attributes
        self.channels_in = channels_in
        self.channels_out = channels_out
        self.kernel_size = kernel_size
        self.stride = stride
        self.padding = padding
        
        # Sequential block
        self.cbl = nn.Sequential(
            nn.Conv2d(channels_in, channels_out, kernel_size=kernel_size, stride=stride, padding=padding),
            nn.BatchNorm2d(channels_out),
            nn.LeakyReLU(inplace=True)
        )
        
    def forward(self, x):
        return self.cbl(x)

In [None]:
# Convolution - Batch Normalization - Mish block
class CBM(nn.Module):
    def __init__(self, channels_in, channels_out, kernel_size, stride, padding):
        super(CBM, self).__init__()
        
        # Attributes
        self.channels_in = channels_in
        self.channels_out = channels_out
        self.kernel_size = kernel_size
        self.stride = stride
        self.padding = padding
        
        # Sequential block
        self.cbm = nn.Sequential(
            nn.Conv2d(channels_in, channels_out, kernel_size=kernel_size, stride=stride, padding=padding),
            nn.BatchNorm2d(channels_out),
            nn.Mish(inplace=True)
        )
        
    def forward(self, x):
        return self.cbm(x)

In [None]:
# Convolution - Batch Normalization - ReLU block
class CBR(nn.Module):
    def __init__(self, channels_in, channels_out, kernel_size, stride, padding):
        super(CBR, self).__init__()
        
        # Attributes
        self.channels_in = channels_in
        self.channels_out = channels_out
        self.kernel_size = kernel_size
        self.stride = stride
        self. padding = padding
        
        # Sequential block
        self.cbr = nn.Sequential(
            nn.Conv2d(channels_in, channels_out, kernel_size=kernel_size, stride=stride, padding=padding),
            nn.BatchNorm2d(channels_out),
            nn.ReLU(inplace=True)
        )
        
    def forward(self, x):
        return self.cbr(x)

In [None]:
# ResCBR 
class ResCBR(nn.Module):
    def __init__(self, channels_in, channels_out, kernel_size, stride, padding):
        super(ResCBR, self).__init__()
        
        # Attributes
        self.channels_in = channels_in
        self.channels_out = channels_out
        self.kernel_size = kernel_size
        self.stride = stride
        self.padding = padding
        self.res_padding = (kernel_size - 1) // 2
        
        # Sequential block
        self.seq_block = nn.Sequential(
            CBR(channels_in, channels_in, kernel_size=kernel_size, stride=1, padding=self.res_padding),
            CBR(channels_in, channels_in, kernel_size=kernel_size, stride=1, padding=self.res_padding),
        )
        self.cbr = CBR(channels_in, channels_out, kernel_size=kernel_size, stride=stride, padding=padding)
        
    def forward(self, x):
        h = self.seq_block(x)
        y = self.cbr(h + x)
        return y

In [None]:
class ScottCBR(nn.Module):
    def __init__(self, classes: int = 2):
        super(ScottCBR, self).__init__()
        
        # Blocks
        self.block1 = nn.Sequential(
            CBR(3, 8, kernel_size=3, stride=1, padding=0),
            CBR(8, 7, kernel_size=3, stride=1, padding=0),
            nn.MaxPool2d(2, 2)
        )
        self.block2 = nn.Sequential(
            CBR(7, 6, kernel_size=3, stride=1, padding=0),
            CBR(6, 5, kernel_size=3, stride=1, padding=0),
            nn.MaxPool2d(2, 2)
        )
        self.block3 = nn.Sequential(
            CBR(5, 4, kernel_size=3, stride=1, padding=0),
            CBR(4, 3, kernel_size=3, stride=1, padding=0),
            nn.MaxPool2d(2, 2)
        )
        
        # Classification head
        self.classification = nn.Sequential(
            nn.Linear((19 * 19 * 3), 512),
            nn.Linear(512, 128),
            nn.Linear(128, classes)
        )
        
    def forward(self, x):
        block1 = self.block1(x)
        block2 = self.block2(block1)
        block3 = self.block3(block2)
        
        block3 = block3.reshape(block3.size(0), -1)
        pred = self.classification(block3)
        
        return torch.softmax(pred, dim=1)

In [None]:
class ScottCBL(nn.Module):
    def __init__(self, classes: int = 2):
        super(ScottCBL, self).__init__()
    
        # Blocks
        self.block1 = nn.Sequential(
            CBL(3, 8, kernel_size=3, stride=1, padding=0),
            CBL(8, 7, kernel_size=3, stride=1, padding=0),
            nn.MaxPool2d(2, 2)
        )
        self.block2 = nn.Sequential(
            CBL(7, 6, kernel_size=3, stride=1, padding=0),
            CBL(6, 5, kernel_size=3, stride=1, padding=0),
            nn.MaxPool2d(2, 2)
        )
        self.block3 = nn.Sequential(
            CBL(5, 4, kernel_size=3, stride=1, padding=0),
            CBL(4, 3, kernel_size=3, stride=1, padding=0),
            nn.MaxPool2d(2, 2)
        )
        
        # Classification head
        self.classification = nn.Sequential(
            nn.Linear((19 * 19 * 3), 512),
            nn.Linear(512, 128),
            nn.Linear(128, classes)
        )
        
    def forward(self, x):
        block1 = self.block1(x)
        block2 = self.block2(block1)
        block3 = self.block3(block2)
        
        block3 = block3.reshape(block3.size(0), -1)
        pred = self.classification(block3)
        
        return torch.softmax(pred, dim=1)

In [None]:
class ScottCBM(nn.Module):
    def __init__(self, classes: int = 2):
        super(ScottCBM, self).__init__()
        
        # Blocks
        self.block1 = nn.Sequential(
            CBM(3, 8, kernel_size=3, stride=1, padding=0),
            CBM(8, 7, kernel_size=3, stride=1, padding=0),
            nn.MaxPool2d(2, 2)
        )
        self.block2 = nn.Sequential(
            CBM(7, 6, kernel_size=3, stride=1, padding=0),
            CBM(6, 5, kernel_size=3, stride=1, padding=0),
            nn.MaxPool2d(2, 2)
        )
        self.block3 = nn.Sequential(
            CBM(5, 4, kernel_size=3, stride=1, padding=0),
            CBM(4, 3, kernel_size=3, stride=1, padding=0),
            nn.MaxPool2d(2, 2)
        )
        
        # Classification head
        self.classification = nn.Sequential(
            nn.Linear((19 * 19 * 3), 512),
            nn.Linear(512, 128),
            nn.Linear(128, classes)
        )
        
    def forward(self, x):
        block1 = self.block1(x)
        block2 = self.block2(block1)
        block3 = self.block3(block2)
        
        block3 = block3.reshape(block3.size(0), -1)
        pred = self.classification(block3)
        
        return torch.softmax(pred, dim=1)

In [None]:
class ScottRes(nn.Module):
    def __init__(self, classes: int = 2):
        super(ScottRes, self).__init__()
        
        # Blocks
        self.block1 = nn.Sequential(
            ResCBR(3, 8, kernel_size=3, stride=1, padding=0),
            ResCBR(8, 7, kernel_size=3, stride=1, padding=0),
            nn.MaxPool2d(2, 2)
        )
        self.block2 = nn.Sequential(
            ResCBR(7, 6, kernel_size=3, stride=1, padding=0),
            ResCBR(6, 5, kernel_size=3, stride=1, padding=0),
            nn.MaxPool2d(2, 2)
        )
        self.block3 = nn.Sequential(
            ResCBR(5, 4, kernel_size=3, stride=1, padding=0),
            ResCBR(4, 3, kernel_size=3, stride=1, padding=0),
            nn.MaxPool2d(2, 2)
        )
        
        # Classification head
        self.classification = nn.Sequential(
            nn.Linear((19 * 19 * 3), 512),
            nn.Linear(512, 128),
            nn.Linear(128, classes)
        )
        
    def forward(self, x):
        block1 = self.block1(x)
        block2 = self.block2(block1)
        block3 = self.block3(block2)
        
        block3 = block3.reshape(block3.size(0), -1)
        pred = self.classification(block3)
        
        return torch.softmax(pred, dim=1)

In [None]:
class ScottDonPool(nn.Module):
    def __init__(self, classes: int = 2):
        super(ScottDonPool, self).__init__()
        
        # Blocks
        self.block1 = nn.Sequential(
            ResCBR(3, 8, kernel_size=3, stride=1, padding=0),
            ResCBR(8, 7, kernel_size=3, stride=1, padding=0),
            ResCBR(7, 7, kernel_size=3, stride=2, padding=1)
        )
        self.block2 = nn.Sequential(
            ResCBR(7, 6, kernel_size=3, stride=1, padding=0),
            ResCBR(6, 5, kernel_size=3, stride=1, padding=0),
            ResCBR(5, 5, kernel_size=3, stride=2, padding=1)
        )
        self.block3 = nn.Sequential(
            ResCBR(5, 4, kernel_size=3, stride=1, padding=0),
            ResCBR(4, 3, kernel_size=3, stride=1, padding=0),
            ResCBR(3, 3, kernel_size=3, stride=2, padding=1)
        )
        
        # Classification head
        self.classification = nn.Sequential(
            nn.Linear((19 * 19 * 3), 512),
            nn.Linear(512, 128),
            nn.Linear(128, classes)
        )
        
    def forward(self, x):
        block1 = self.block1(x)
        block2 = self.block2(block1)
        block3 = self.block3(block2)
        
        block3 = block3.reshape(block3.size(0), -1)
        pred = self.classification(block3)
        
        return torch.softmax(pred, dim=1)

In [None]:
class Scott(nn.Module):
    def __init__(self, classes: int = 2):
        super(Scott, self).__init__()
        
        # Blocks
        self.block1 = nn.Sequential(
            nn.Conv2d(3, 8, kernel_size=3, stride=1, padding=0),
            nn.ReLU(),
            nn.Conv2d(8, 7, kernel_size=3, stride=1, padding=0),
            nn.ReLU(),
            nn.MaxPool2d(2, 2)
        )
        self.block2 = nn.Sequential(
            nn.Conv2d(7, 6, kernel_size=3, stride=1, padding=0),
            nn.ReLU(),
            nn.Conv2d(6, 5, kernel_size=3, stride=1, padding=0),
            nn.ReLU(),
            nn.MaxPool2d(2, 2)
        )
        self.block3 = nn.Sequential(
            nn.Conv2d(5, 4, kernel_size=3, stride=1, padding=0),
            nn.ReLU(),
            nn.Conv2d(4, 3, kernel_size=3, stride=1, padding=0),
            nn.ReLU(),
            nn.MaxPool2d(2, 2)
        )
        
        # Classification head
        self.classification = nn.Sequential(
            nn.Linear((19 * 19 * 3), 512),
            nn.Linear(512, 128),
            nn.Linear(128, classes)
        )
        
    def forward(self, x):
        block1 = self.block1(x)
        block2 = self.block2(block1)
        block3 = self.block3(block2)
        
        block3 = block3.reshape(block3.size(0), -1)
        pred = self.classification(block3)
        
        return torch.softmax(pred, dim=-1)

# Dataset

In [None]:
class CoffeeDataset(Dataset):
    def __init__(self, csv_file, root_dir, transform=None):
        self.values_frame = pd.read_csv(csv_file, header=None)
        self.root_dir = root_dir
        self.transform = transform
    
    def __len__(self):
        return len(self.values_frame)
    
    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()
        
        img_name = os.path.join(self.root_dir, 
                                self.values_frame.iloc[idx, 0])
        image = cv.imread(img_name)
        image = cv.cvtColor(image, cv.COLOR_BGR2RGB)
        values = self.values_frame.iloc[idx, 1:]
        values = np.array(values)
        sample = {'image': image, 'values':values}
        
        if self.transform:
            sample = self.transform(sample)
        
        return sample

In [None]:
# Resize image
class Resize(object):
    def __init__(self, size: int = 180):
        self.size = size
        
    def __call__(self, sample):
        image, values = sample['image'], sample['values']
        h, w, _ = image.shape
        idx_ax = np.argmin([h, w])
        pad = abs((np.min([h, w]) - np.max([h, w]))//2)
        top, lat = 0, 0
        if idx_ax == 0: 
            top = pad
        else:
            lat = pad
        image = cv.copyMakeBorder(image, top, top, lat, lat,  borderType=cv.BORDER_REPLICATE)
        image = cv.resize(image, (self.size, self.size))
        return {'image': image, 
                'values': values}

In [None]:
# Convert to Tensor
class ToTensor(object):
    def __call__(self, sample):
        image, values = sample['image'], sample['values']
        image = torch.FloatTensor(image) / 255.
        return {'image': image, 
                'values': torch.from_numpy(values.astype('float'))}

# Load Data

In [None]:
# Split data into train and test
train_ratio = 0.8
dataset = CoffeeDataset('Dataset_Beans.csv', root_dir='.', 
                              transform=transforms.Compose([Resize(180), ToTensor()]))
train_size = int(len(dataset) * train_ratio)
test_size = len(dataset) - train_size
train_dataset, test_dataset = random_split(dataset, [train_size, test_size])
train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True, num_workers=4, pin_memory=True)
print(len(dataset))

In [None]:
for i, sample in enumerate(train_loader):
    inputs, values = sample['image'], sample['values']
    fig, ax = plt.subplots(1, 4, figsize=(16, 4))
    for j in range(len(ax)):
        ax[j].imshow(inputs[j].numpy(), cmap='gray')
        ax[j].set_xticks([])
        ax[j].set_yticks([])
        ax[j].set_title(values[j].numpy().astype(int)[0], fontsize=16)
    break
print(inputs.shape)

In [None]:
# Classification model
model = ScottDonPool().to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
loss_func = nn.CrossEntropyLoss()
summary(model, (3, 180, 180))

In [None]:
# Training loop for CNN
epochs = 100
pbar = tqdm.tqdm(range(1, epochs+1))
loss_array = []
torch.cuda.empty_cache()
model.train()
for e in pbar:
    batch_count = 0
    running_loss = 0.
    epoch_loss = 0.
    for i, sample in enumerate(train_loader):
        inputs, values = sample['image'], sample['values']
        inputs = inputs.type(torch.FloatTensor).permute(0, -1, 1, 2).to(device)
        values = values.type(torch.LongTensor).to(device).flatten()
        predictions = model(inputs)
        loss = loss_func(predictions, values)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        # Statistics
        running_loss += loss.item()
        batch_count += 1
    batch_loss = running_loss / batch_count
    loss_array.append(batch_loss)
    pbar.set_description('Epoch {}, Loss {:,.3f}'.format(e, loss_array[-1]))

In [None]:
plt.figure(figsize=(14, 6))
plt.plot(loss_array)
plt.xlabel('Epoch', fontsize=14)
plt.ylabel('Loss', fontsize=14)

In [None]:
# Save weights
torch.save(model.state_dict(), 'SuperScott.dat')

In [None]:
# Training loop for CNN
epochs = 100
pbar = tqdm.tqdm(range(1, epochs+1))
will_loss_array = []
torch.cuda.empty_cache()
will_model.train()
for e in pbar:
    batch_count = 0
    running_loss = 0.
    epoch_loss = 0.
    for i, sample in enumerate(train_loader):
        inputs, values = sample['image'], sample['values']
        inputs = inputs.type(torch.FloatTensor).permute(0, -1, 1, 2).to(device)
        values = values.type(torch.LongTensor).to(device).flatten()
        predictions = will_model(inputs)
        loss = will_loss_func(predictions, values)
        will_optimizer.zero_grad()
        loss.backward()
        will_optimizer.step()
        
        # Statistics
        running_loss += loss.item()
        batch_count += 1
    batch_loss = running_loss / batch_count
    will_loss_array.append(batch_loss)
    pbar.set_description('Epoch {}, Loss {:,.3f}'.format(e, will_loss_array[-1]))

In [None]:
plt.figure(figsize=(14, 6))
plt.plot(will_loss_array)
plt.xlabel('Epoch', fontsize=14)
plt.ylabel('Loss', fontsize=14)

In [None]:
torch.save(will_model.state_dict(), 'Coffee_Defects_Will_2021_12_02.dat')

# Testing

In [None]:
model.load_state_dict(torch.load('Coffee_Defects_2021_11_30-2.dat'))

In [None]:
test_loader = DataLoader(test_dataset, batch_size=1, shuffle=True, num_workers=4, pin_memory=True)

In [None]:
# Test model
model.eval()
reals = []
predictions = []
times = []
with torch.no_grad():
    for i, sample in enumerate(test_loader):
        inputs, values = sample['image'], sample['values']
        inputs = inputs.type(torch.FloatTensor).permute(0, -1, 1, 2).to(device)
        values = values.numpy().flatten()
        t0 = time.time()
        pred = model(inputs).cpu().numpy()
        pred = np.argmax(pred, axis=1).flatten()
        times.append(time.time() - t0)
        predictions.append(pred)
        reals.append(values)
cf_matrix = confusion_matrix(reals, predictions, normalize=None)
sns.heatmap(cf_matrix, annot=True, fmt='.2f', cmap='YlOrBr', 
            cbar=False, annot_kws={'fontsize': 14})
print('Average time {:,.3f}ms'.format(np.mean(times) * 1000.))

In [None]:
 # Test model
model.eval()
reals = []
predictions = []
times = []
with torch.no_grad():
    for i, sample in enumerate(test_loader):
        inputs, values = sample['image'], sample['values']
        inputs = inputs.type(torch.FloatTensor).permute(0, -1, 1, 2).to(device)
        values = values.numpy().flatten()
        t0 = time.time()
        pred = model(inputs).cpu().numpy()
        pred = np.argmax(pred, axis=1).flatten()
        times.append(time.time() - t0)
        predictions.append(pred)
        reals.append(values)
cf_matrix = confusion_matrix(reals, predictions, normalize=None)
sns.heatmap(cf_matrix, annot=True, fmt='.2f', cmap='YlOrBr', 
            cbar=False, annot_kws={'fontsize': 14})
print('Average time {:,.3f}ms'.format(np.mean(times) * 1000.))

In [None]:
 # Test model
will_model.eval()
reals = []
predictions = []
times = []
with torch.no_grad():
    for i, sample in enumerate(test_loader):
        inputs, values = sample['image'], sample['values']
        inputs = inputs.type(torch.FloatTensor).permute(0, -1, 1, 2).to(device)
        values = values.numpy().flatten()
        t0 = time.time()
        pred = will_model(inputs).cpu().numpy()
        pred = np.argmax(pred, axis=1).flatten()
        times.append(time.time() - t0)
        predictions.append(pred)
        reals.append(values)
cf_matrix = confusion_matrix(reals, predictions, normalize=None)
sns.heatmap(cf_matrix, annot=True, fmt='.0f', cmap='YlOrBr', 
            cbar=False, annot_kws={'fontsize': 14})
print('Average time {:,.3f}ms'.format(np.mean(times) * 1000.))

In [None]:
cf_matrix = confusion_matrix(reals, predictions, normalize='true')
sns.heatmap(cf_matrix, annot=True, fmt='.2%', cmap='YlOrBr', 
            cbar=False, annot_kws={'fontsize': 14})
print('Average time {:,.3f}ms'.format(np.mean(times) * 1000.))