In [None]:
import os
import cv2
import json
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
import pickle

from PIL import Image
from matplotlib import cm
import matplotlib.pyplot as plt
from torchsummary import summary

# Pollen-Detection-Network

### Network

In [None]:
class FCNN(nn.Module):
    """
    This is a class for fully convolutional neural networks.
    
    It is a subclass of the Module class from torch.nn.
    See the torch.nn documentation for more information.
    """
    
    def __init__(self):
        """
        The constructor for FCNN class. The internal states of the network are initialized. 
        """
        
        super(FCNN, self).__init__()
        self.conv0 = nn.Conv2d(in_channels=1, out_channels=30, kernel_size=2, stride=2)
        
        self.conv1 = nn.Conv2d(in_channels=30, out_channels=30, kernel_size=5, stride=2)
        
        self.conv2 = nn.Conv2d(in_channels=30, out_channels=60, kernel_size=3, stride=1)
        self.conv2_drop = nn.Dropout2d()
        
        self.conv3 = nn.Conv2d(in_channels=60, out_channels=60, kernel_size=3, stride=2)
        self.conv3_drop = nn.Dropout2d()
        
        self.conv4 = nn.Conv2d(in_channels=60, out_channels=120, kernel_size=3, stride=1)
        
        self.conv5 = nn.Conv2d(in_channels=120, out_channels=120, kernel_size=3, stride=1)
        
        self.conv6 = nn.Conv2d(in_channels=120, out_channels=1, kernel_size=1, stride=1)
        
        
    def forward(self, data):
        """
        Defines the computation performed at every call.
        
        Parameters:
            data (4D-tensor): The input that is evaluated by the network.
            
        Returns:
            x (tensor): The output of the network after evaluating it on the given input.
        """
        
        x = F.relu(self.conv0(data))
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2_drop(self.conv2(x)))
        x = F.relu(self.conv3_drop(self.conv3(x)))
        x = F.relu(self.conv4(x))
        x = F.relu(self.conv5(x))
        x = torch.sigmoid(self.conv6(x))
        return x

### Validation

In [None]:
def validate(network, testloader):
    """
    Calculates the mean validation loss, the F1-Score and the accuracy of the given network on the testloader data.
    
    Parameters:
        network (): The network to validate. 
        testloader (torch.utils.data.DataLoader): Contains the data which is used to validate the network. 
        
    Returns:
        validation_loss (): The mean loss per image with regards to a fixed criterion.
        F1_Score (): The harmonic mean of the precision and the recall of the given network
            on the given validation data.
        correct / total (): The percentage of correctly classified samples of the total number of validation samples.
        
    """
    
    total = 0
    correct = 0
    true_positive = 0
    false_negative = 0
    false_positive = 0
    true_negative = 0

    # We do not need any gradiants here, since we do not train the network.
    # We are only interested in the predictions of the network on the testdata. 
    with torch.no_grad():
        validation_loss = 0
        for i, (inputs, labels) in enumerate(testloader):
            
            inputs, labels = inputs.to(device), labels.to(device)
            
            outputs = network(torch.transpose(inputs[...,None],1,3)).view(-1)
            #print(labels.size(0))
            loss = criterion(outputs,labels)
            validation_loss += loss.item()
            predicted = (outputs >= threshold) # Predicted is a tensor of booleans 
            total += labels.size(0)
            predicted = predicted.view(predicted.size(0)) 
            #print(labels.size())
            #print((predicted == labels).sum().item())
            labels = labels == 1
            correct += (predicted == labels).sum().item()
            true_positive += (predicted & labels).sum().item()
            false_negative += (torch.logical_not(predicted) & labels).sum().item()
            false_positive += (predicted & torch.logical_not(labels)).sum().item()
            true_negative += (torch.logical_not(predicted) & torch.logical_not(labels)).sum().item()
            
        validation_loss = validation_loss / total 
        
        try: 
            recall = true_positive / (true_positive + false_negative)
        except ZeroDivisionError:
            recall = 0
        try:
            precision = true_positive / (true_positive + false_positive)
        except ZeroDivisionError:
            precision = 0
        try:
            F1_score = 2 * (precision * recall) / (precision + recall)
        except ZeroDivisionError:
            F1_score = 0
            
    
    print('Accuracy of the network on the test images: %d %%' % (
    100 * correct / total))
    print('F1 Score of the network on the test images: %f' % F1_score)
    return validation_loss, F1_score, (correct / total)

### Training

In [None]:
def train(network, trainloader, ep, criterion, optimizer, print_interval):
    """
    Train a neural network.
    
    
    Parameters:
        network ():
        trainloader ():
        ep (int): The number of epochs the network will be trained for.
        criterion ():
        optimizer ():
        print_interval (int): The number of batches after which the function prints the loss for one batch.
    
    Returns:
        train_losses ([float]): The mean of the loss for one train sample after every epoch. 
        validation_losses ([float]): The mean of the loss for one validation sample after every epoch.
        F1 ([float]): The F1-Score of the validation data after every epoch.
        accuracy ([float]): The accuracy of the network on the validation data after every epoch.
    """
    
    losses = [] # Mean of loss/image in every epoch of training 
    validation_losses = []
    F1 = []
    accuracy = []
    validate(network, validloader)
    
    for epoch in range(ep):

        loss_stats = 0.0
        total = 0
        
        running_loss = 0.0
    
        for i, (inputs, lables) in enumerate(trainloader):
            
            inputs, lables = inputs.to(device), lables.to(device)
            
            # zero the parameter gradients
            optimizer.zero_grad()
            #print(lables.shape)
            
            outputs = network(torch.transpose(inputs[...,None],1,3)).view(-1)
            #print(outputs.shape)
            loss = criterion(outputs, lables)
            loss.backward() #propagate the error back through the network
            optimizer.step() #adjust the weights of the network depending on the propagated error
    
            #that's it.
            total += lables.shape[0]
            
            #Some statistics:
            running_loss += loss.item()
            
            loss_stats += loss.item()
            
            if i % print_interval == print_interval - 1:    # print every x mini-batches (loss for one batch)
                print('[%d, %5d] loss: %.3f' %
                      (epoch + 1, i + 1, running_loss / print_interval))
                running_loss = 0.0
        
        losses.append(loss_stats / total)
        #Test the network using the validation set after every epoch of training.
        val_loss_curr, F1_curr, accuracy_curr = validate(network, validloader)
        validation_losses.append(val_loss_curr)
        F1.append(F1_curr)
        accuracy.append(accuracy_curr)
        
    print('Finished Training')
    return losses, validation_losses, F1, accuracy

### Loading Data

In [None]:
def load_pollen_data(dir):
    np_pollen_data = []     # single date will be [img,label]
    for folder in next(os.walk(dir))[1]:
        if folder == 'p':
            label = 1
        else:
            label = 0
        parent_path = os.path.join(dir, folder)
        for file in os.listdir(parent_path):
            if '.png' in file:
                try:
                    path = os.path.join(parent_path, file)
                    img = cv2.imread(path, cv2.IMREAD_GRAYSCALE)
                    np_pollen_data.append([np.array(img), label])
                except Exception as e:
                    print(folder, file, str(e))

    data = np.random.shuffle(np_pollen_data)
    print("Data created.")
    return data

### Create Train and Testsets

In [None]:
def create_test_val_sets(data, ratio=0.8):
    images = []
    labels = []

    for i in range(len(np_pollen_data)):
        images.append(np_pollen_data[i][0])
        labels.append(np_pollen_data[i][1])
    
    train_x = images[:int(len(images)*0.8)]
    train_y = labels[:int(len(images)*0.8)]
    valid_x = images[int(len(images)*0.8):]
    valid_y = labels[int(len(images)*0.8):]
    
    return train_x, train_y, valid_x, valid_y

# Train Network

#### Load Data

In [None]:
# Load Data
cur_dir = os.getcwd()
pollen_path = cur_dir + '/Datasets/PollenDataSmall/'
pollendata = load_pollen_data(pollen_path)

# Split into sets
train_x, train_y, valid_x, valid_y = create_test_val_sets(pollendata, 0.8)

# Our neural network expects a dataloader. Thus, we 
trainloader = torch.utils.data.DataLoader(torch.utils.data.TensorDataset(torch.Tensor(train_x),torch.Tensor(train_y)),batch_size=32)
validloader = torch.utils.data.DataLoader(torch.utils.data.TensorDataset(torch.Tensor(valid_x),torch.Tensor(valid_y)),batch_size=100)

# Save dataloader for later use
torch.save(trainloader, 'trainloader.pth')
torch.save(validloader, 'validloader.pth')

#### Initialise the network

In [None]:
# setup wether gpu or cpu should be used for computations
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

#Choose optimizer, criterion, number of epochs and threshold for the network to accept or dismiss a computated input
criterion = nn.MSELoss()
optimizer = optim.SGD(pollen_network.parameters(), lr=0.001, momentum=0.9)
ep = 10
threshold = 0.8

# Create a new instance of the FCNN class. This will be our network.      
pollen_network = FCNN().to(device)

#### First we have a look at the performance of the untrained model on our validation set.

In [None]:
validate(network, validloader)

#### Now we can train our network with given data

In [None]:
train_losses, val_losses, F1_scores, accuracies = train(network, trainloader, ep, criterion, optimizer, 32)

#### Save trained Network

In [None]:
# save the networks parameters to have the possibility to reload them later:
cur_dir = os.getcwd()
save_path = cur_dir + '/pollennet.pt'
torch.save(network.state_dict(), save_path)

# save the corresponding losses, F1-Scores and Accuracies using pickle:

with open('train_losses.obj', 'wb') as train_losses_file:
    pickle.dump(train_losses, train_losses_file)

with open('val_losses.obj', 'wb') as val_losses_file:
    pickle.dump(val_losses, val_losses_file)
    
with open('F1.obj', 'wb') as F1_file:
    pickle.dump(F1_scores, F1_file)
    
with open('accuracies.obj', 'wb') as accuracies_file:
    pickle.dump(accuracies, accuracies_file)