In [1]:
import pandas as pd
import os
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
from sklearn.metrics import accuracy_score
import warnings
warnings.filterwarnings('ignore')

# Enter path to the data_fusion_guest_lecture file
image_folder_path = "data"  

# Loads labels
df = pd.read_csv(os.path.join(image_folder_path, "seedling_labels.csv"))

# Creates path to top & side view
df["color_cam_path"] = image_folder_path + "/" + df["color_cam_path"]
df["side_cam_path"] = image_folder_path + "/" + df["side_cam_path"]

# Gives average expert label as a starting point
df["average_expert"] = (df["Expert 1"] + df["Expert 2"]  + df["Expert 3"] + df["Expert 4"]) / 4

## Generate a score using Cohen Kappa for each expert

In [2]:
from sklearn.metrics import cohen_kappa_score
import numpy as np
from collections import Counter
from itertools import combinations

def compute_label_01(x, experts_weights):
    labels = np.array([x['Expert 1'], x['Expert 2'], x['Expert 3'], x['Expert 4']]) # possible values [1, 2, 3, 4]
    labels_normalized = ((labels - 1) * 2) - 3 # possible values [-3, -1, +1, +3]

    label = (np.sum(labels_normalized * experts_weights)+3) / 6
    return round(label + 1e-10, 9)
def get_experts_weights(df):
	experts = ["Expert 1", "Expert 2", "Expert 3", "Expert 4"]
	experts_kappas = [[], [], [], []]

	for pair in combinations(range(len(experts)), 2):
		labels_expert_0 = df[experts[pair[0]]].values.tolist()
		labels_expert_1 = df[experts[pair[1]]].values.tolist()
		kappa = cohen_kappa_score(labels_expert_0, labels_expert_1)

		experts_kappas[pair[0]].append(kappa)
		experts_kappas[pair[1]].append(kappa)

	experts_kappas = np.array(experts_kappas).mean(axis=1)
	experts_weights = experts_kappas / np.sum(experts_kappas)

	return experts_weights

In [3]:
experts_weights = get_experts_weights(df)
df['Label'] = df.apply(lambda x: compute_label_01(x, experts_weights), axis=1)

## Convert (1,2 = normal, 3,4 = abnormal) because it will be binary classification

In [4]:
# 0 = normal
# 1 = abnormal

df[['Expert 1', 'Expert 2', 'Expert 3', 'Expert 4', 'average_expert']] = df[['Expert 1', 'Expert 2', 'Expert 3', 'Expert 4', 'average_expert']].replace({1: 0, 2: 0})
df[['Expert 1', 'Expert 2', 'Expert 3', 'Expert 4', 'average_expert']] = df[['Expert 1', 'Expert 2', 'Expert 3', 'Expert 4', 'average_expert']].replace([3,4], 1)
df['Label'] = df['Label'].round(0).astype(np.int64)
df

Unnamed: 0,Expert 1,Expert 2,Expert 3,Expert 4,color_cam_path,side_cam_path,Rfid,Pos,average_expert,Label
0,1,1,1,1,data/A1/00387 Plant 0000 Plant 0000/18-02-2019...,data/A1/00387 Plant 0000 Plant 0000/18-02-2019...,A1,Plant 0000,1.00,1
1,0,0,0,0,data/A1/00388 Plant 0001 Plant 0001/18-02-2019...,data/A1/00388 Plant 0001 Plant 0001/18-02-2019...,A1,Plant 0001,0.00,0
2,0,0,0,0,data/A1/00389 Plant 0002 Plant 0002/18-02-2019...,data/A1/00389 Plant 0002 Plant 0002/18-02-2019...,A1,Plant 0002,0.00,0
3,1,1,1,1,data/A1/00390 Plant 0003 Plant 0003/18-02-2019...,data/A1/00390 Plant 0003 Plant 0003/18-02-2019...,A1,Plant 0003,3.50,1
4,1,0,0,0,data/A1/00391 Plant 0004 Plant 0004/18-02-2019...,data/A1/00391 Plant 0004 Plant 0004/18-02-2019...,A1,Plant 0004,1.50,0
...,...,...,...,...,...,...,...,...,...,...
989,0,0,0,0,data/B4/01019 Plant 0122 Plant 0122/18-02-2019...,data/B4/01019 Plant 0122 Plant 0122/18-02-2019...,B4,Plant 0122,0.00,0
990,0,0,0,0,data/B4/01020 Plant 0123 Plant 0123/18-02-2019...,data/B4/01020 Plant 0123 Plant 0123/18-02-2019...,B4,Plant 0123,0.00,0
991,0,0,0,0,data/B4/01021 Plant 0124 Plant 0124/18-02-2019...,data/B4/01021 Plant 0124 Plant 0124/18-02-2019...,B4,Plant 0124,0.00,0
992,0,1,1,1,data/B4/01022 Plant 0125 Plant 0125/18-02-2019...,data/B4/01022 Plant 0125 Plant 0125/18-02-2019...,B4,Plant 0125,2.75,1


# High-Level Fusion

This notebook follows the following structure:

##### TensorFlow
- Train Model 1: Color cam
- Train Model 2: Side view cam
- Predict both on test set 
- Weighted Voting
- Majority Voting
- Bayesian Consensus

##### PyTorch
- Train & Test Model 1: Color cam
- Train & Test Model 2: Side view cam
- Weighted Voting
- Majority Voting

### Model Training: Once using TensorFlow (pre-trained model MobileNetV2) and once using PyTorch (own)

In [5]:
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical

train_df, test_df = train_test_split(df, test_size=0.2, random_state=42)
train_df, val_df = train_test_split(train_df, test_size=0.2, random_state=42)

## Using TensorFlow
### CNN using transfer learning with MobileNetV2 as the base model
The MobileNetV2 model is used as a feature extractor and then the extracted features are flattened and passed through a few dense layers with dropout before the final classification layer. The model is then trained on the input images using the ```ImageDataGenerator``` to generate batches of augmented images and passed through the model.

###### COLOR CAM, MODEL 1

In [11]:
import pandas as pd
import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.layers import Dense, Flatten, Dropout
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2
from tensorflow.keras.optimizers import Adam

# Set up a TensorFlow session to use the GPU if available
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    try:
        tf.config.experimental.set_virtual_device_configuration(gpus[0],
            [tf.config.experimental.VirtualDeviceConfiguration(memory_limit=1024)])
    except RuntimeError as e:
        print(e)

train_datagen = ImageDataGenerator(
    rescale=1./255,
    shear_range=0.2,
    zoom_range=0.2,
    horizontal_flip=True)

test_datagen = ImageDataGenerator(rescale=1./255)
val_datagen = ImageDataGenerator(rescale=1./255)

train_generator1 = train_datagen.flow_from_dataframe(
    train_df,
    x_col='color_cam_path',
    y_col='Label',
    target_size=(224, 224),
    batch_size=8,
    class_mode='raw')

test_generator1 = test_datagen.flow_from_dataframe(
    test_df,
    x_col='color_cam_path',
    y_col='Label',
    target_size=(224, 224),
    batch_size=8,
    class_mode='raw')

val_generator1 = val_datagen.flow_from_dataframe(
    val_df,
    x_col='color_cam_path',
    y_col='Label',
    target_size=(224, 224),
    batch_size=8,
    class_mode='raw')

Found 636 validated image filenames.
Found 199 validated image filenames.
Found 159 validated image filenames.


In [30]:
%%time

# load the MobileNetV2 model
mobilenet_model1 = MobileNetV2(weights='imagenet', include_top=False, input_shape=(224, 224, 3))

# unfreeze the last few layers for fine-tuning
for layer in mobilenet_model1.layers[:-4]:
    layer.trainable = False

# build the model
model1 = tf.keras.models.Sequential([
    mobilenet_model1,
    Flatten(),
    Dense(128, activation='relu'),
    Dropout(0.5),
    Dense(1, activation='sigmoid')
])

# compile the model
model1.compile(optimizer=Adam(lr=0.0001), loss='binary_crossentropy', metrics=['accuracy'])

# train the model
model1.fit(train_generator1, epochs=4, validation_data=val_generator1)



Epoch 1/4
Epoch 2/4
Epoch 3/4
Epoch 4/4
Wall time: 6min 28s


<keras.callbacks.History at 0x265dd0237f0>

###### Save model so it can be used later again without training again

In [33]:
# save the model to a file
model1.save('model1_color_cam.h5') 

###### SIDE CAM, MODEL 2

In [34]:
import pandas as pd
import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.layers import Dense, Flatten, Dropout
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2
from tensorflow.keras.optimizers import Adam

# Set up a TensorFlow session to use the GPU if available
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    try:
        tf.config.experimental.set_virtual_device_configuration(gpus[0],
            [tf.config.experimental.VirtualDeviceConfiguration(memory_limit=1024)])
    except RuntimeError as e:
        print(e)

# Define the data generators
train_datagen = ImageDataGenerator(
    rescale=1./255,
    shear_range=0.2,
    zoom_range=0.2,
    horizontal_flip=True,
    rotation_range=20,
    width_shift_range=0.1,
    height_shift_range=0.1)

test_datagen = ImageDataGenerator(rescale=1./255)

train_generator2 = train_datagen.flow_from_dataframe(
    train_df,
    x_col='side_cam_path',
    y_col='Label',
    target_size=(224, 224),
    batch_size=10,
    class_mode='raw')

test_generator2 = test_datagen.flow_from_dataframe(
    test_df,
    x_col='side_cam_path',
    y_col='Label',
    target_size=(224, 224),
    batch_size=10,
    class_mode='raw')

val_generator2 = val_datagen.flow_from_dataframe(
    val_df,
    x_col='color_cam_path',
    y_col='Label',
    target_size=(224, 224),
    batch_size=8,
    class_mode='raw')

Found 636 validated image filenames.
Found 199 validated image filenames.
Found 159 validated image filenames.


In [36]:
%%time
# load the MobileNetV2 model
mobilenet_model = MobileNetV2(weights='imagenet', include_top=False, input_shape=(224, 224, 3))

# unfreeze the last few layers for fine-tuning
for layer in mobilenet_model.layers[:-4]:
    layer.trainable = False

# build the model
model2 = tf.keras.models.Sequential([
    mobilenet_model,
    Flatten(),
    Dense(128, activation='relu'),
    Dropout(0.5),
    Dense(1, activation='sigmoid')
])

# compile the model
model2.compile(optimizer=Adam(lr=0.0001), loss='binary_crossentropy', metrics=['accuracy'])

# train the model
model2.fit(train_generator2, epochs=4, validation_data=val_generator2)



Epoch 1/4
Epoch 2/4
Epoch 3/4
Epoch 4/4
Wall time: 4min 35s


<keras.callbacks.History at 0x266162c8b80>

###### Save model so it can be used later again without training again

In [82]:
# save the model to a file
model2.save('model2_side_cam.h5') 

### Load already trained models and predict on test set

In [54]:
from tensorflow.keras.models import load_model

model1 = load_model('model1_color_cam.h5')
model2 = load_model('model2_side_cam.h5')

In [83]:
%%time
def predict(model, test_generator):
    # predict the labels for test data
    test_generator.reset()
    pred = model.predict(test_generator)

    # convert the predictions to binary labels
    pred_labels = [1 if p >= 0.5 else 0 for p in pred]
    
    return pred_labels, pred

def accuracy(model, test_generator, model_number):
    loss, accuracy = model.evaluate(test_generator)
    print('Accuracy on test set for model number', model_number, ': ', accuracy)
    
    
pred_labels1, pred1 = predict(model1, test_generator1)
print('Prediction done, Model 1')
pred_labels2, pred2 = predict(model2, test_generator2)
print('Prediction done, Model 2')

accuracy(model1, test_generator1, '1')
accuracy(model2, test_generator2, '2')

Prediction done, Model 1
Prediction done, Model 2
Accuracy on test set for model number 1 :  0.9195979833602905
Accuracy on test set for model number 2 :  0.9246231317520142
Wall time: 49.8 s


#### Weighted Voting

In [84]:
from sklearn.metrics import accuracy_score

weighted_pred = (0.4 * pred1) + (0.6 * pred2)
threshold = 0.5
binary_pred = (weighted_pred > threshold).astype(int)

true_labels = test_generator1.labels
accuracy = accuracy_score(true_labels, binary_pred)

print("Accuracy:", accuracy)

Accuracy: 0.6180904522613065


#### Majority Voting

In [85]:
# combine the predictions using majority voting
combined_preds = np.round((preds1 + preds2) / 2)

# convert to binary labels
binary_preds = (combined_preds > 0.5).astype(int)

# calculate accuracy'
true_labels = test_generator1.labels
accuracy = accuracy_score(true_labels, binary_preds)
print("Accuracy: ", accuracy)

Accuracy:  0.7236180904522613


#### Bayesian Consensus

In [86]:
import numpy as np

def softmax(x):
    e_x = np.exp(x - np.max(x))
    return e_x / e_x.sum(axis=0)

# Compute the probability distributions for each model's predictions
probs_model1 = softmax(preds1)
probs_model2 = softmax(preds2)

# Compute the product of the probabilities
prod_probs = probs_model1 * probs_model2

# Normalize the product of probabilities to obtain the consensus probabilities
consensus_probs = prod_probs / np.sum(prod_probs, axis=1, keepdims=True)

# Compute the final predictions using the consensus probabilities
consensus_preds = np.argmax(consensus_probs, axis=1)

# Compute the accuracy of the consensus predictions
accuracy = accuracy_score(test_generator1.labels, consensus_preds)
accuracy

0.7236180904522613

# Using PyTorch

#### Model 1: Color cam

In [27]:
import pandas as pd
import numpy as np
import torch
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader

# Define a transformation to apply to the images
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
])

# Split the dataframe into training and testing sets
train_df = df.sample(frac=0.8, random_state=123)
test_df = df.drop(train_df.index)

# Define custom dataset classes to load the images and their labels
class CustomDataset(Dataset):
    def __init__(self, df, transform=None):
        self.df = df
        self.transform = transform
        
    def __len__(self):
        return len(self.df)
    
    def __getitem__(self, idx):
        img_path = self.df.iloc[idx]['color_cam_path']
        label = self.df.iloc[idx]['Label']
        image = Image.open(img_path)
        if self.transform:
            image = self.transform(image)
        return image, label

# Create custom dataset objects for the training and testing sets
train_dataset = CustomDataset(train_df, transform=transform)
test_dataset = CustomDataset(test_df, transform=transform)

# Define data loaders for the training and testing sets
trainloader = DataLoader(train_dataset, batch_size=32, shuffle=True)
testloader = DataLoader(test_dataset, batch_size=32, shuffle=False)

In [28]:
import torch.nn as nn
import torch.nn.functional as F

class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(3, 8, kernel_size=3, stride=1, padding=1)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        self.conv2 = nn.Conv2d(8, 16, kernel_size=3, stride=1, padding=1)
        self.fc1 = nn.Linear(16 * 56 * 56, 64)
        self.fc2 = nn.Linear(64, 2)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = x.view(-1, 16 * 56 * 56)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

In [29]:
%%time
import torch.optim as optim
from PIL import Image

net = Net()
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(net.parameters(), lr=0.001)

scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', patience=2, factor=0.1, verbose=True)

for epoch in range(9):
    running_loss = 0.0
    for i, data in enumerate(trainloader, 0):
        inputs, labels = data
        optimizer.zero_grad()
        outputs = net(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        if i % 200 == 199:    # print every 200 mini-batches
            print('[%d, %5d] loss: %.3f' %
                  (epoch + 1, i + 1, running_loss / 200))
            running_loss = 0.0

    # Evaluate model on validation set
    val_loss = 0.0
    val_total = 0
    val_correct = 0
    net.eval()
    with torch.no_grad():
        for data in testloader:
            images, labels = data
            outputs = net(images)
            loss = criterion(outputs, labels)
            val_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            val_total += labels.size(0)
            val_correct += (predicted == labels).sum().item()

    # Print validation metrics
    val_accuracy = 100 * val_correct / val_total
    print('Epoch: %d, Training Loss: %.3f, Validation Loss: %.3f, Validation Accuracy: %.2f%%' %
          (epoch + 1, running_loss / len(trainloader), val_loss / len(testloader), val_accuracy))

    # Adjust learning rate based on validation loss
    scheduler.step(val_loss)

    # Save model checkpoint
    if val_loss < 10:
        best_val_loss = val_loss
        torch.save(net.state_dict(), 'model1_color_pytorch.pth')
    
    net.train()

# Load best model checkpoint and evaluate on test set
net.load_state_dict(torch.load('model1_color_pytorch.pth'))
net.eval()

test_correct = 0
test_total = 0
with torch.no_grad():
    for data in testloader:
        images, labels = data
        outputs = net(images)
        _, predicted = torch.max(outputs.data, 1)
        test_total += labels.size(0)
        test_correct += (predicted == labels).sum().item()

print('Test Accuracy: %.2f%%' % (100 * test_correct / test_total))

Epoch: 1, Training Loss: 0.605, Validation Loss: 0.624, Validation Accuracy: 63.82%
Epoch: 2, Training Loss: 0.416, Validation Loss: 0.429, Validation Accuracy: 84.92%
Epoch: 3, Training Loss: 0.237, Validation Loss: 0.327, Validation Accuracy: 91.46%
Epoch: 4, Training Loss: 0.220, Validation Loss: 0.306, Validation Accuracy: 85.43%
Epoch: 5, Training Loss: 0.154, Validation Loss: 0.404, Validation Accuracy: 88.94%
Epoch: 6, Training Loss: 0.128, Validation Loss: 0.417, Validation Accuracy: 89.45%
Epoch: 7, Training Loss: 0.107, Validation Loss: 0.366, Validation Accuracy: 88.94%
Epoch 00007: reducing learning rate of group 0 to 1.0000e-04.
Epoch: 8, Training Loss: 0.068, Validation Loss: 0.337, Validation Accuracy: 89.95%
Epoch: 9, Training Loss: 0.062, Validation Loss: 0.317, Validation Accuracy: 90.95%
Test Accuracy: 90.95%
Wall time: 19min 2s


In [32]:
# Define function to predict label and class probability
def predict(image):
    with torch.no_grad():
        output = net(image.unsqueeze(0))
        prob = F.softmax(output, dim=1)
        label = torch.argmax(prob, dim=1).item()
        prob = prob[0][1].item()
    return label, prob

# Iterate through test set and predict label and class probability for each image
data = []
net.eval()
with torch.no_grad():
    for inputs, labels in testloader:
        for i in range(len(inputs)):
            true_label = labels[i].item()
            predicted_label, class_prob = predict(inputs[i])
            data.append({
                'true_label': true_label,
                'predicted_label': predicted_label,
                'class_prob': class_prob
            })

# Convert list of dictionaries to dataframe
results_df1 = pd.DataFrame(data)

# Convert predicted label to binary value based on class probability
results_df1['predicted_label'] = np.where(results_df1['class_prob'] > 0.5, 1, 0)
results_df1

Unnamed: 0,true_label,predicted_label,class_prob
0,0,0,0.004411
1,1,1,0.993673
2,0,0,0.000332
3,1,1,0.990157
4,1,1,0.928098
...,...,...,...
194,0,0,0.041295
195,1,1,0.976391
196,1,0,0.015404
197,0,0,0.002822


#### Model 2: Side cam

In [33]:
class Normalize(object):
    def __init__(self, mean, std):
        self.mean = mean
        self.std = std

    def __call__(self, tensor):
        if tensor.size(0) == 1:
            # grayscale image
            mean = torch.tensor(self.mean)[[0], [0], [0]]
            std = torch.tensor(self.std)[[0], [0], [0]]
        else:
            # color image
            mean = self.mean
            std = self.std

        return F.normalize(tensor, mean=mean, std=std)

In [34]:
import pandas as pd
import numpy as np
import torch
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader
from torchvision.transforms import Compose, Resize, CenterCrop, ToTensor, Normalize, Grayscale
from PIL import Image

# Define a transformation to apply to the images
Normalize_transform = transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
transform = transforms.Compose([
    transforms.CenterCrop(224),
    transforms.Resize((224, 224)),
    transforms.Grayscale(num_output_channels=3), # convert to 3 channels
    transforms.ToTensor(),
    Normalize_transform
])
# Split the dataframe into training and testing sets
train_df = df.sample(frac=0.8, random_state=123)
test_df = df.drop(train_df.index)

class CustomDataset(Dataset):
    def __init__(self, df, transform=None):
        self.df = df
        self.transform = transform
        
    def __len__(self):
        return len(self.df)
    
    def __getitem__(self, idx):
        img_path = self.df.iloc[idx]['side_cam_path']
        label = self.df.iloc[idx]['Label']
        image = Image.open(img_path)
        if image.mode != 'RGB':
            # convert grayscale to RGB
            image = Image.merge('RGB', [image]*3)
        if self.transform:
            image = self.transform(image)
        label = torch.tensor(label, dtype=torch.long)
        return image, label

# Create custom dataset objects for the training and testing sets
train_dataset = CustomDataset(train_df, transform=transform)
test_dataset = CustomDataset(test_df, transform=transform)

# Define data loaders for the training and testing sets
trainloader = DataLoader(train_dataset, batch_size=32, shuffle=True)
testloader = DataLoader(test_dataset, batch_size=32, shuffle=False)

In [35]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms as transforms

class Net(nn.Module):

    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(3, 6, 5)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(6, 16, 5)
        self.fc1 = nn.Linear(16 * 53 * 53, 120)  # adjust the input size of the first fully connected layer
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 10)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        # adjust the view statement to match the output size of the second convolutional layer
        x = x.view(-1, 16 * 53 * 53)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

In [40]:
class Net(nn.Module):

    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(3, 6, 5)
        self.pool1 = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(6, 16, 5)
        self.pool2 = nn.MaxPool2d(2, 2)
        self.conv3 = nn.Conv2d(16, 32, 5)
        self.pool3 = nn.MaxPool2d(2, 2)
        self.fc1 = nn.Linear(16 * 53 * 53, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 10)
        self.fc4 = nn.Linear(10, 10) 
        self.fc5 = nn.Linear(10, 10) 
        self.fc6 = nn.Linear(10, 10) 

    def forward(self, x):
        x = self.pool1(F.relu(self.conv1(x)))
        x = self.pool2(F.relu(self.conv2(x)))
        x = x.view(-1, 16 * 53 * 53)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = F.relu(self.fc3(x))
        x = F.relu(self.fc4(x))
        x = F.relu(self.fc5(x))
        x = self.fc6(x)
        return x

In [41]:
%%time
import torch.optim as optim

net = Net()
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(net.parameters(), lr=0.005)

scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', patience=2, factor=0.1, verbose=True)

for epoch in range(7):
    running_loss = 0.0
    for i, data in enumerate(trainloader, 0):
        inputs, labels = data
        optimizer.zero_grad()
        outputs = net(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        if i % 200 == 199:    # print every 200 mini-batches
            print('[%d, %5d] loss: %.3f' %
                  (epoch + 1, i + 1, running_loss / 200))
            running_loss = 0.0

    # Reset running_loss to 0 at the end of each epoch
    running_loss = 0.0
    
    # Evaluate model on validation set
    val_loss = 0.0
    val_total = 0
    val_correct = 0
    net.eval()
    with torch.no_grad():
        for data in testloader:
            images, labels = data
            outputs = net(images)
            loss = criterion(outputs, labels)
            val_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            val_total += labels.size(0)
            val_correct += (predicted == labels).sum().item()

    # Print validation metrics
    val_accuracy = 100 * val_correct / val_total
    print('Epoch: %d, Training Loss: %.3f, Validation Loss: %.3f, Validation Accuracy: %.2f%%' %
          (epoch + 1, running_loss / len(trainloader), val_loss / len(testloader), val_accuracy))

    # Adjust learning rate based on validation loss
    scheduler.step(val_loss)

    # Save model checkpoint
    if val_loss < 10:
        best_val_loss = val_loss
        torch.save(net.state_dict(), 'model2_side_pytorch.pth')
    
    net.train()

# Load best model checkpoint and evaluate on test set
net.load_state_dict(torch.load('model2_side_pytorch.pth'))
net.eval()

test_correct = 0
test_total = 0
with torch.no_grad():
    for data in testloader:
        images, labels = data
        outputs = net(images)
        _, predicted = torch.max(outputs.data, 1)
        test_total += labels.size(0)
        test_correct += (predicted == labels).sum().item()

print('Test Accuracy: %.2f%%' % (100 * test_correct / test_total))

Epoch: 1, Training Loss: 0.000, Validation Loss: 0.638, Validation Accuracy: 63.82%
Epoch: 2, Training Loss: 0.000, Validation Loss: 0.621, Validation Accuracy: 67.34%
Epoch: 3, Training Loss: 0.000, Validation Loss: 0.529, Validation Accuracy: 79.90%
Epoch: 4, Training Loss: 0.000, Validation Loss: 0.636, Validation Accuracy: 78.89%
Epoch: 5, Training Loss: 0.000, Validation Loss: 0.614, Validation Accuracy: 79.90%
Epoch: 6, Training Loss: 0.000, Validation Loss: 0.641, Validation Accuracy: 82.41%
Epoch 00006: reducing learning rate of group 0 to 5.0000e-04.
Epoch: 7, Training Loss: 0.000, Validation Loss: 0.660, Validation Accuracy: 80.40%
Test Accuracy: 80.40%
Wall time: 5min 37s


In [43]:
# Create an empty list to hold the results
results = []

# Set the model to evaluation mode
net.eval()

# Turn off gradient calculations to save memory and computation
with torch.no_grad():
    for data in testloader:
        images, labels = data
        outputs = net(images)

        # Get the predicted class labels and class probabilities
        _, predicted = torch.max(outputs.data, 1)
        probs = torch.nn.functional.softmax(outputs, dim=1)

        # Convert PyTorch tensors to numpy arrays
        true_labels = labels.numpy()
        predicted_labels = predicted.numpy()
        class_probs = probs.numpy()[:, 1]  # Only include probabilities for class 1

        # Append the results to the list
        for i in range(len(true_labels)):
            results.append([true_labels[i], predicted_labels[i], class_probs[i]])

# Convert the list of results to a pandas DataFrame
results_df2 = pd.DataFrame(results, columns=['True Label', 'Predicted Label', 'Class Probabilities'])
results_df2

Unnamed: 0,True Label,Predicted Label,Class Probabilities
0,0,0,0.002672
1,1,1,0.960924
2,0,0,0.008700
3,1,1,0.961159
4,1,0,0.024407
...,...,...,...
194,0,0,0.004608
195,1,1,0.931021
196,1,0,0.022019
197,0,0,0.295605


### Merge

In [44]:
merged_df = pd.DataFrame()
merged_df['class_probs_model1_color'] = results_df1['class_prob']
merged_df['pred_model1_color'] = results_df1['predicted_label']
merged_df['class_probs_model2_side'] = results_df2['Class Probabilities']
merged_df['pred_model2_side'] = results_df2['Predicted Label']
merged_df['true_labels'] = results_df1['true_label']

merged_df

Unnamed: 0,class_probs_model1_color,pred_model1_color,class_probs_model2_side,pred_model2_side,true_labels
0,0.004411,0,0.002672,0,0
1,0.993673,1,0.960924,1,1
2,0.000332,0,0.008700,0,0
3,0.990157,1,0.961159,1,1
4,0.928098,1,0.024407,0,1
...,...,...,...,...,...
194,0.041295,0,0.004608,0,0
195,0.976391,1,0.931021,1,1
196,0.015404,0,0.022019,0,1
197,0.002822,0,0.295605,0,0


#### Majority Voting

In [64]:
pred_model1_color = merged_df['pred_model1_color'].tolist()
pred_model2_side = merged_df['pred_model2_side'].tolist()

pred_majority = [np.argmax(np.bincount([pred_model1_color[i], pred_model2_side[i]])) for i in range(len(pred_model1_color))]

merged_df['pred_majority'] = pred_majority
acc = accuracy_score(merged_df['true_labels'], merged_df['pred_majority'])
acc

0.8241206030150754

#### Weighted Voting

In [65]:
def weighted_vote(row):
    # higher weight for the model with higher accuracy
    w1 = 0.9
    w2 = 0.1

    vote = w1 * row['class_probs_model1_color'] + w2 * row['class_probs_model2_side']

    return round(vote, 3)

merged_df['weighted_vote'] = merged_df.apply(weighted_vote, axis=1)
merged_df['weighted_vote_binary'] = merged_df['weighted_vote'].apply(lambda x: 1 if x >= 0.5 else 0)
acc = accuracy_score(merged_df['true_labels'], merged_df['weighted_vote_binary'])
acc

0.9045226130653267