In [None]:
"""
Purpose: Training Blood Cell Dataset with CNN
Course: CS 171 - Machine Learning
Group Members: Galit Bolotin, Jasmine Lao
Date: May 17th, 2024
"""

# Import necessary libraries
import os
import cv2
from google.colab import files
import numpy as np
import torch
import torchvision
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
import torchvision.models as models
from tqdm.notebook import tqdm
import matplotlib.pyplot as plt
import keras
import tensorflow as tf
from tensorflow.keras.applications.vgg16 import VGG16
from tensorflow.keras.callbacks import EarlyStopping
from keras.models import Sequential
from keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, BatchNormalization, Dropout
from keras.preprocessing.image import ImageDataGenerator
from keras.optimizers import Adam
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score


In [None]:
# Load blood cell dataset from Kaggle to Google Colab
!pip install kaggle
# Upload Kaggle API key (kaggle.json)
uploaded = files.upload()

!mkdir ~/.kaggle
!cp kaggle.json ~/.kaggle/
!cd ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json
!kaggle datasets download -d paultimothymooney/blood-cells --force
!unzip -q -o blood-cells.zip -d dataset_folder



Saving kaggle.json to kaggle.json
Dataset URL: https://www.kaggle.com/datasets/paultimothymooney/blood-cells
License(s): other
Downloading blood-cells.zip to /content
100% 108M/108M [00:04<00:00, 31.9MB/s]
100% 108M/108M [00:04<00:00, 25.4MB/s]


In [None]:
# Detect CUDA-compatible device. If available, use GPU.
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Define relevant variables
num_classes = 4
learning_rate = 0.01
num_epochs = 15
batch_size = 64;

# Define image transformations needed for dataset
transform = transforms.Compose([
    transforms.Resize((64, 64)),
    transforms.ToTensor(),
])

# Load dataset with PyTorch

# Split into training and testing set
# train_size = int(0.8 * len(dataset))
# validation_size = len(dataset) - train_size
# training_set, validation_set = torch.utils.data.random_split(dataset, [train_size, validation_size])

# Load the full dataset
#full_set = torchvision.datasets.ImageFolder(
#    root='/content/dataset_folder/dataset2-master/dataset2-master/images',
#    transform=transforms.Compose([
#        transforms.Resize((224, 224)),
#        transforms.ToTensor(),
#    ])
#)

# Load full dataset as batches
#full_loader = torch.utils.data.DataLoader(full_set, batch_size=batch_size, shuffle=True)

#N_CHANNELS = 3

#mean = torch.zeros(N_CHANNELS)
#std = torch.zeros(N_CHANNELS)
#print('==> Computing mean and std..')
#for inputs, _labels in tqdm(full_loader):
#    for i in range(N_CHANNELS):
#        mean[i] += inputs[:,i,:,:].mean()
#        std[i] += inputs[:,i,:,:].std()
#mean.div_(len(full_set))
#std.div_(len(full_set))
#print(mean, std)

# Load the training and test datasets with calculated normalization values
training_set = torchvision.datasets.ImageFolder(
    root='/content/dataset_folder/dataset2-master/dataset2-master/images/TRAIN',
    transform=transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        # Normalization: left is means, right is standard deviation.
        # Common practice for PyTorch models to use these ImageNet normalization values
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])
)

testing_set = torchvision.datasets.ImageFolder(
    root='/content/dataset_folder/dataset2-master/dataset2-master/images/TEST',
    transform=transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])
)


training_loader = torch.utils.data.DataLoader(training_set, batch_size=batch_size, shuffle=True)
testing_loader = torch.utils.data.DataLoader(testing_set, batch_size=batch_size, shuffle=False)

In [None]:
############### Our Custom Image Classification Model ################
# DEFINE MODEL
class BloodModel(nn.Module):
  def __init__(self, num_classes):
        super(BloodModel, self).__init__()
        # Start with 3 input channels since images are in RGB.
        # Increase number of channels to learn more complex/subtle features.
        # Each layer decreases WxH by 2. Each max pool halves the WxH.
        # Max pooling used to reduce size of image representation while still retaining feature info.
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=8, kernel_size=3, padding=1)
        self.pool1 = nn.MaxPool2d(2, 2)
        self.dropout1 = nn.Dropout(0.25) # dropout after every layer to reduce overfitting

        self.conv2 = nn.Conv2d(in_channels=8, out_channels=16, kernel_size=3, padding=1)
        self.pool2 = nn.MaxPool2d(2, 2)
        self.dropout2 = nn.Dropout(0.25)

        self.conv3 = nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3, padding=1)
        self.pool3 = nn.MaxPool2d(2, 2)
        self.dropout3 = nn.Dropout(0.25)

        self.conv4 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, padding=1)
        self.pool4 = nn.MaxPool2d(2, 2)
        self.dropout4 = nn.Dropout(0.25)

        # Fully connected Layer
        self.fc1 = nn.Linear(64 * 14 * 14, 256) # Final matrix shape = 14x14, 64 channels.
        self.fc2 = nn.Linear(256, num_classes)

  # Pass data through layers and return output
  # Applies ReLU activation function to layer outputs
  def forward(self, out):
      # Convolutional layers
      out = self.pool1(F.relu(self.conv1(out)))
      out = self.dropout1(out)
      out = self.pool2(F.relu(self.conv2(out)))
      out = self.dropout2(out)
      out = self.pool3(F.relu(self.conv3(out)))
      out = self.dropout3(out)
      out = self.pool4(F.relu(self.conv4(out)))
      out = self.dropout4(out)
      out = out.view(-1, 64 * 14 * 14) # 4D tensor (batch size, channels, H, W) into 2D tensor (batch size, flattened feature maps)

      # Fully connected layer
      out = F.relu(self.fc1(out))
      out = self.fc2(out)
      return out

# Send model to device, display which device is being used (CPU or GPU)
model = BloodModel(num_classes)
model.to(device)
print("Device In Use: ", end="", flush=True)
print(device)

# Set loss function
loss_func = nn.CrossEntropyLoss()

# Set optimizer (Adam, seems to outperform stochastic gradient descent)
optimizer = torch.optim.Adam(model.parameters())

# TRAINING
losses = [] # Store losses during training
for epoch in range(num_epochs):
    model.train()
    for inputs, labels in training_loader:
        # Send data to GPU or CPU
        inputs = inputs.to(device)
        labels = labels.to(device)
        # Forward pass
        predicts = model(inputs)
        # Calculate loss
        loss = loss_func(predicts, labels)
        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    # Append current loss to the list
    losses.append(loss.item())

    # Print current loss
    print(f'Epoch [{epoch+1}/{num_epochs}]: {loss.item():.4f}')

# Plot the loss values
plt.plot(losses)
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.show()


# TESTING
# Set model to evaluation mode
model.eval()

true_labelsT = []
pred_probsT = []  # store predicted probabilities instead of labels

with torch.no_grad():
    for images, labels in training_loader:
        images = images.to(device)
        labels = labels.to(device)
        outputs = model(images)
        probs = torch.softmax(outputs, dim=1)  # calculate probabilities

        # Add true labels and predicted probabilities to lists
        true_labelsT.extend(labels.tolist())
        pred_probsT.extend(probs.tolist())

    # Convert lists to numpy arrays
    true_labelsT = np.array(true_labelsT)
    pred_probsT = np.array(pred_probsT)

    # Calculate accuracy, F1 score, and AUC score
    acc = accuracy_score(true_labelsT, np.argmax(pred_probsT, axis=1))
    f1 = f1_score(true_labelsT, np.argmax(pred_probsT, axis=1), average='macro')

# Print results
print(f'Training accuracy: {acc:.4f}')
print(f'Training F1 score: {f1:.4f}')

# Initialize lists to store true labels and predicted labels
true_labels = []
pred_probs = []  # store predicted probabilities instead of labels

with torch.no_grad():
    for images, labels in testing_loader:
        images = images.to(device)
        labels = labels.to(device)
        outputs = model(images)
        probs = torch.softmax(outputs, dim=1)  # calculate probabilities

        # Add true labels and predicted probabilities to lists
        true_labels.extend(labels.tolist())
        pred_probs.extend(probs.tolist())

    # Convert lists to numpy arrays
    true_labels = np.array(true_labels)
    pred_probs = np.array(pred_probs)

    # Calculate accuracy, F1 score, and AUC score
    acc = accuracy_score(true_labels, np.argmax(pred_probs, axis=1))
    f1 = f1_score(true_labels, np.argmax(pred_probs, axis=1), average='macro')

# Print results
print(f'Test accuracy: {acc:.4f}')
print(f'Test F1 score: {f1:.4f}')





In [None]:
################ Our pre-trained VGG16 CNN Model ##################
# Load pre-trained VGG Model
train_path = "images/TRAIN"
train_path = "images/TEST"
vgg = VGG16(weights='imagenet', include_top=False, input_shape=(224, 224, 3))

vggModel = Sequential()
for layer in vgg.layers[:-4]:
# for layer in vgg.layers:
  layer.trainable=False
for layer in vgg.layers:
  print(layer, layer.trainable)
vggModel.add(vgg)

# Add new custom layers to pre-trained VGG model
vggModel.add(Conv2D(input_shape=(224,224,3),filters=64,kernel_size=(3,3),padding="same", activation="relu"))
vggModel.add(MaxPooling2D((2,2)))
vggModel.add(Conv2D(input_shape=(224,224,3),filters=128,kernel_size=(3,3),padding="same", activation="relu"))
vggModel.add(MaxPooling2D((2,2)))
vggModel.add(Flatten())                   # crucial step for transitioning from conv layer to fully connected layer
vggModel.add(BatchNormalization())        # used to normalize activations of previous layer
vggModel.add(Dense(16, activation='relu')) # start with small units, then increase
vggModel.add(BatchNormalization())
vggModel.add(Dense(64, activation='relu'))
vggModel.add(Dense(units=num_classes, activation='softmax')) # for the final ouput layer, use softmax

vggModel.summary()

# Prepare training set
training_datagen = ImageDataGenerator(rescale=1./255)
training_generator = training_datagen.flow_from_directory(
    '/content/dataset_folder/dataset2-master/dataset2-master/images/TRAIN',
    target_size=(224, 224),
    batch_size=batch_size,
    class_mode='categorical',
    shuffle=False
)

# Prepare validation set
validation_datagenerator = ImageDataGenerator(rescale=1./255)
validation_generator = validation_datagenerator.flow_from_directory(
    '/content/dataset_folder/dataset2-master/dataset2-master/images/TEST_SIMPLE',
    target_size=(224,224),
    batch_size=batch_size,
    class_mode='categorical',
    shuffle=False
)

# Train network on TRAINING set
optimizer = Adam(learning_rate=learning_rate)
vggModel.compile(optimizer=optimizer, loss='categorical_crossentropy',
                 metrics=['accuracy'])

early_stopping = EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True)
trainedVGG = vggModel.fit(
    training_generator,
    epochs=num_epochs,
    batch_size=batch_size,
    validation_data=validation_generator,
    steps_per_epoch=len(training_generator),
    validation_steps=len(validation_generator),
    callbacks=[early_stopping]
)

# Plot training and validation metrics
plt.plot(trainedVGG.history['loss'], label='Training Loss')
plt.plot(trainedVGG.history['val_loss'], label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.show()

plt.plot(trainedVGG.history['accuracy'], label='Training Accuracy')
plt.plot(trainedVGG.history['val_accuracy'], label='Validation Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
plt.show()

predicted = vggModel.predict(training_generator)
actual = training_generator.classes
f1Score = f1_score(actual, predicted.argmax(axis=1), average="weighted")
print("F1-score for Training:", f1Score)

predicted = vggModel.predict(validation_generator)
actual = validation_generator.classes
f1Score = f1_score(actual, predicted.argmax(axis=1), average="weighted")
print("F1-score for Validation:", f1Score)


In [None]:
# Test network on TESTING set and plot testing metric
testing_datagen = ImageDataGenerator(rescale=1./255)
testing_generator = training_datagen.flow_from_directory(
    '/content/dataset_folder/dataset2-master/dataset2-master/images/TEST',
    target_size=(224, 224),
    batch_size=batch_size,
    class_mode='categorical',
    shuffle=False
)
test_loss, test_accuracy = vggModel.evaluate(testing_generator)
print("Test Accuracy:", test_accuracy)
plt.bar(["Test"], [test_accuracy], color=['blue'])
plt.title('Testing Accuracy')
plt.ylabel('Accuracy')
plt.ylim(0, 1)
plt.show()

plt.bar(["Test"], [test_loss], color=['red'])
plt.title('Testing Loss')
plt.ylabel('Loss')
plt.show()

predicted = vggModel.predict(testing_generator)
actual = testing_generator.classes
f1Score = f1_score(actual, predicted.argmax(axis=1), average="weighted")
print("F1-score for Testing:", f1Score)