In [None]:
import numpy as np
import pandas as pd
import seaborn as sns
import os
import cv2
import matplotlib.pyplot as plt

from sklearn.metrics import classification_report, confusion_matrix

# deep learning libraries
from sklearn.model_selection import train_test_split
from sklearn.utils import class_weight
import tensorflow as tf
import keras
from keras.preprocessing.image import ImageDataGenerator
from keras import applications
from keras.models import Sequential, load_model
from keras.layers import Conv2D, MaxPooling2D, GlobalAveragePooling2D, Activation, Flatten, Dense, Dropout, BatchNormalization
from keras.preprocessing import image
from keras.callbacks import EarlyStopping, ModelCheckpoint
from keras.utils import to_categorical
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.preprocessing import image_dataset_from_directory
from tensorflow.keras.applications import VGG19
from tensorflow.keras.models import Model
from keras.optimizers import SGD
from keras.callbacks import ReduceLROnPlateau

from tensorflow.keras.preprocessing import image
from tensorflow.keras.models import load_model
from PIL import Image

import warnings
warnings.filterwarnings('ignore')

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import os
os.environ['KAGGLE_USERNAME']="symeonp"
os.environ['KAGGLE_KEY']="c72551e3c216704f2f2e648d152e71d2"

In [None]:
!kaggle competitions download -c detect-pneumonia-spring-2023

In [None]:
!unzip detect-pneumonia-spring-2023.zip

In [None]:
# Path to train images
train_dir = "/content/train_images/train_images"

# Path to test images
test_dir = "/content/test_images/test_images"

In [None]:
# Transform the csv file into a Pandas DataFrame
labels = pd.read_csv('labels_train.csv')

# Print the first few rows to check
print(labels.head())

In [None]:
#Function for loading images

def load_image(file_name, image_size=(224, 224)):
    """
    Load an image from the train_images directory, given its file name.
    Resize the image to the given size and normalize pixel values.
    """
    # Full path to the train_images directory
    train_images_path = "/content/train_images/train_images"

    # Build the path to the image file
    image_path = os.path.join(train_images_path, file_name)

    # Print the image path to check
    print(f"Loading image from {image_path}")

    # Load the image using OpenCV
    image = cv2.imread(image_path,cv2.IMREAD_GRAYSCALE)

    # Convert grayscale image to RGB
    image = cv2.cvtColor(image, cv2.COLOR_GRAY2RGB)

    # Check if the image was loaded properly
    if image is None:
        print(f"Failed to load image from {image_path}")
        return None

    # Resize the image
    image = cv2.resize(image, image_size)

    # Normalize pixel values to [0, 1]
    image = image / 255.0

    return image

In [None]:
#Function for creating the dataset

def create_dataset(file_names, labels, image_size=(224, 224)):
    """
    Create a dataset of images and labels.
    """
    # Initialize lists to hold images and labels
    images = []
    label_values = []

    # Iterate through the image file names and load each image
    for file_name in file_names:
        image = load_image(file_name, image_size)

        # Skip to the next image if current image failed to load
        if image is None:
            continue

        label = labels.loc[labels['file_name'] == file_name, 'class_id'].values[0]

        images.append(image)
        label_values.append(label)

    # Convert images and labels to numpy arrays
    images = np.array(images)
    label_values = np.array(label_values)

    return images, label_values

In [None]:
# Create the dataset
images, labels = create_dataset(labels['file_name'], labels)

In [None]:
# Plotting sample images from the train dataset
fig, axes = plt.subplots(2, 5, figsize=(10, 5))

for i, ax in enumerate(axes.flat):
    ax.imshow(images[i])
    ax.set_title(f'Label: {labels[i]}')
    ax.axis('off')

plt.tight_layout()
plt.show()

In [None]:
# Split data into train and validation sets
X_train, X_val, y_train, y_val = train_test_split(images, labels, test_size=0.2, random_state=42)

# Estimate sample weights by class for unbalanced datasets.
class_weights = class_weight.compute_sample_weight(class_weight='balanced', y=y_train)
class_weights_dict = dict(enumerate(class_weights))

# Dimensions of our images.
img_width, img_height = 224, 224

# Define a callback to save the model weights only when validation accuracy is maximized
# and another callback for earlystopping
checkpoint = ModelCheckpoint('best_model_test.h5', monitor='val_accuracy', mode='max', save_best_only=True, verbose=1)
callbacks = EarlyStopping(monitor='val_loss', patience=40, verbose=1, mode='auto')

In [None]:
# This is the augmentation configuration we will use for training

# Function for adjusting contrast
def custom_augmentation(np_tensor):

  def random_contrast(np_tensor):
    return tf.image.random_contrast(np_tensor, 0.5, 2)

  augmnted_tensor = random_contrast(np_tensor)
  return np.array(augmnted_tensor)


# Use ImageDataGenerator to generate batches of tensor image data with real-time data augmentation.
train_datagen = ImageDataGenerator(
    shear_range=0.2,            # randomly apply shearing transformations
    zoom_range=0.2,             # randomly zoom inside pictures
    horizontal_flip=False,       # randomly flip images horizontally
    width_shift_range=0.1,      # randomly shift images horizontally (fraction of total width)
    height_shift_range=0.1     # randomly shift images vertically (fraction of total height)
)

# Prepare generators for training and validation sets
batch_size = 32
train_generator = train_datagen.flow(X_train, y_train, batch_size=batch_size)
val_generator = ImageDataGenerator().flow(X_val, y_val, batch_size=batch_size)

In [None]:
# Plot sample images from train dataset after image augmentation
image_batch, label_batch = next(iter(train_generator))

def show_batch(image_batch, label_batch):
    plt.figure(figsize=(10, 10))
    for n in range(15):
        ax = plt.subplot(5, 5, n + 1)
        plt.imshow(image_batch[n])
        if label_batch[n] == 1:
            plt.title("BACT PNEUMONIA")
        elif label_batch[n] == 2:
            plt.title("VIR PNEUMONIA")
        else:
            plt.title("NORMAL")
        plt.axis("off")

show_batch(image_batch, label_batch)

In [None]:
# Load the VGG19 pre-training on ImageNet with weights pre-trained  on ImageNet,
# without the top layers and with the desired input shape
base_model = VGG19(weights='imagenet', include_top=False, input_shape=(224, 224, 3))

# Flatten the output layer to 1 dimension
x = Flatten()(base_model.output)

# Add two fully connected layers with 256 hidden units and ReLU activation
# followed by a BatchNormalization and a Dropout layer
x = Dense(256, activation='relu')(x)
x= BatchNormalization()(x)
x=Dropout(0.7)(x)
x= Dense(256,activation='relu')(x)
x= BatchNormalization()(x)
x=Dropout(0.7)(x)

# Add a final softmax layer for classification
output = Dense(3, activation='softmax')(x)

# Define the model object that will be used for training
model = Model(base_model.input, output)

# Freeze the layers of the base model.
#This is important so that the weights of the pre-trained model remain unchanged during the initial training.
for layer in base_model.layers:
    layer.trainable = False

# Compile the model using Adam optimizer, sparse_categorical_crossentropy as the loss function
# and accuracy as the evaluation metric
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])

# Fit the model for 50 epochs
history = model.fit(train_generator,
                        epochs=50,
                        validation_steps=len(X_val) // batch_size,
                        validation_data=val_generator,
                        steps_per_epoch=len(X_train) // batch_size,
                        callbacks = [callbacks,checkpoint]
                        )

In [None]:
# Adjustments, recompile and refit of the model

# Unfreeze the layers
for layer in base_model.layers:
    layer.trainable = True

# Use SGD with momentum as the optimizer, and add a learning rate scheduler
optimizer = SGD(lr=0.0001, momentum=0.9)
lr_scheduler = ReduceLROnPlateau(factor=0.1, patience=10)

# Recompile the model with the new optimizer and learning rate scheduler
model.compile(optimizer=optimizer, loss='sparse_categorical_crossentropy', metrics=['accuracy'])

# Add Batch Normalization after each Conv layer in the base model
for i, layer in enumerate(base_model.layers):
    if isinstance(layer, Conv2D):
        base_model.layers.insert(i+1, BatchNormalization())

# Adjust data augmentation settings
train_datagen = ImageDataGenerator(
    shear_range=0.2,
    zoom_range=0.2,
    horizontal_flip=False,
    width_shift_range=0.1,
    height_shift_range=0.1
)

# Fit again the model for another 50 epochs with same batch size and number of steps adding a learning rate scheduler
history = model.fit(train_generator,
                    epochs=50,
                    validation_steps=len(X_val) // batch_size,
                    validation_data=val_generator,
                    steps_per_epoch=len(X_train) // batch_size,
                    callbacks = [callbacks, checkpoint, lr_scheduler]  # Add the learning rate scheduler to the callbacks
                    )

In [None]:
# Loading the best weights, making predictions on the test images and generating the final .csv file

# Load the best model
model = load_model('best_model_test.h5')  # replace with the path to your saved model

# Define the image dimensions (must be the same as what the model expects)
img_width, img_height = 224, 224

# Directory containing test images
test_dir = "/content/test_images/test_images"

# DataFrame to store results
results = []

# Loop over each file in the test directory
for file in os.listdir(test_dir):
    # Load the image file
    img_path = os.path.join(test_dir, file)
    img = Image.open(img_path).convert('RGB')  # Convert to RGB
    img = img.resize((img_width, img_height), Image.NEAREST)  # Resize the image

    # Convert the image to a numpy array and reshape it
    x = image.img_to_array(img)
    x = np.expand_dims(x, axis=0)

    # Normalize the image
    x /= 255.0

    # Use the model to make a prediction
    prediction = model.predict(x)
    predicted_class = np.argmax(prediction)

    # Add the filename and prediction to the results
    results.append({'file_name': file, 'class_id': predicted_class})

# Convert results to a DataFrame and save as a CSV file
results_df = pd.DataFrame(results)
results_df.to_csv('test_predictionsvgg19.csv', index=False)

In [None]:
# Plots

# Plotting the loss of the model
plt.figure(figsize=(10, 6))
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Training and Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()

# Plotting the accuracy of the model
plt.figure(figsize=(10, 6))
plt.plot(history.history['accuracy'], label='Training Accuracy')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
plt.title('Training and Validation Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()
plt.show()