
# RealWaste Preprocessing

*Dataset Splitting, Pre-Processing, and Augmentation*

In [None]:
# Install augmentor library for geometric augmentations
!pip install Augmentor
# Import directory navigation libraries
from os import listdir, rmdir
# Import image preprocessing libraries
import numpy as np
import cv2
from shutil import move, copy
from random import randint
from PIL import Image
from tensorflow.image import resize
from tensorflow.io import decode_jpeg, encode_jpeg, write_file
from tensorflow.dtypes import saturate_cast
from random import randint
# Import augmentor for geometric data augmentations
import Augmentor
# Import Colab runtime disconnection
from google.colab import runtime
# Import sleep to allow time for copying to process
from time import sleep

# ------------------------------- Constants -------------------------------- #
# Original dataset path
real_waste_original_path = ''
# RealWaste training and validation paths
real_waste_training_path = ''
real_waste_validation_path = ''
# RealWaste temporary training and validation paths for augmentation stages
temp_real_waste_training_path = ''
temp_real_waste_validation_path = ''
# Test path
test_path = ''
# Labels
labels = ['Cardboard', 'Food Organics', 'Glass', 'Metal', 'Paper', 'Plastic', 'Miscellaneous Trash', 'Textile Trash', 'Vegetation']
# Processed image dimensions
img_height = 524
img_width = 524

# ------------------------------- Functions -------------------------------- #
# Count images in directory
def count_images(path):
    directory = listdir(path)
    return len(directory)

# Split datasets into training, validation and testing
def split_dataset(input_path, train_path, val_path, test_path, val_prop, test_prop, label):
    # Count images in directory and get files names
    num_files = count_images(input_path+label)
    file_names = listdir(input_path+label)

    # Calculated number of validation and testing images
    num_val = round(num_files*val_prop)
    num_test = round(num_files*test_prop)
    
    # Initialise empty lists for image indexes
    val_list = []
    test_list = []
    
    # Randomly assign file indexes for validation dataset
    i = 0
    while i < num_val:
        r = randint(1, num_files - 1)
        if r not in val_list:
            val_list.append(r)
            i += 1

    # Move validation dataset files and rename
    name_incrementer = 1
    for j in val_list:
        copy(input_path+label+'/'+file_names[j], val_path+label+'/'+label+'_'+str(name_incrementer)+'.jpg')
        sleep(0.05)
        name_incrementer += 1
    
    # Randomly assign file indexes for testing dataset
    i = 0
    while i < num_test:
        r = randint(1, num_files - 1)
        if r not in test_list and r not in val_list:
            test_list.append(r)
            i += 1

    # Move test dataset files and rename
    name_incrementer = 1
    for j in test_list:
        copy(input_path+label+'/'+file_names[j], test_path+label+'/'+label+'_'+str(name_incrementer)+'.jpg')
        sleep(0.05)
        name_incrementer += 1
    
    # Move remaining files into training dataset
    i = 0
    while i < num_files:
        if i not in val_list and i not in test_list:
            copy(input_path+label+'/'+file_names[i], train_path+label+'/'+label+'_'+str(name_incrementer)+'.jpg')
            sleep(0.05)
            name_incrementer += 1
        i += 1

# Resize images
def resize_image(path, width, height):
    directory = listdir(path)
    for file in directory:
        # Read image as binary file
        image = open(path+'/'+file, 'rb')                          
        binary_representation = image.read()                        
        decoded_representation = decode_jpeg(binary_representation) 
        # Resize binary represnetation
        resized_image = resize(decoded_representation, [width, height])
        resized_image = saturate_cast(resized_image, 'uint8')
        encoded_image = encode_jpeg(resized_image)                  
        write_file(path+'/'+file, encoded_image)
        sleep(0.05)

# Check images for RGB mode and convert if required
def convert_rgb_images(path):
    print(path)
    directory = listdir(path)
    for file in directory:
        img = Image.open(path+'/'+file)
        # Convert to RGB mode if not already in format
        if img.mode != 'RGB':
            img.convert('RGB').save(path+'/'+file)
            sleep(0.05)
            
# Geometric augmentation function: distort and flip
def distort_and_flip(path):
    pipe = Augmentor.Pipeline(path)           # Create pipeline in path of dataset
    pipe.flip_left_right(1)                   # Horizontally flip every image
    pipe.random_distortion(1, 5, 5, 4)        # Randomly distort flipped images
    pipe.process()                            # Process images in pipeline

# Geometric augmentation: rotate and shear
def rotate_and_shear(path):
    pipe = Augmentor.Pipeline(path)           # Create pipeline in path of dataset
    pipe.rotate(1, 25, 25)                    # Rotate every image by 45 degrees
    pipe.shear(1, 15, 15)                     # Randomly shear between 0 and 15 degrees
    pipe.process()                            # Process images in pipeline

# Move files between directories
def move_files(augmented_path, dataset_path, new_file_name):
    directory = listdir(augmented_path)
    name_incrementer = 1
    for file in directory:
        move(augmented_path+'/'+file, dataset_path+'/'+new_file_name+'_'+str(name_incrementer)+'.jpg')
        name_incrementer += 1
        sleep(0.05)

# ---------------------------- Split RealWaste ----------------------------- #
print('\n|----------------------------------| RealWaste Splitting |---------------------------------|\n')

# Split datasets
for label in labels:
    split_dataset(real_waste_original_path, real_waste_training_path, real_waste_validation_path, test_path, 0.2, 0.1, label)

# Print file counts in paths
for label in labels:
    print('Number of files in RealWaste raw ' + label + ': ' + str(count_images(real_waste_original_path + label)))
    print('Number of files in RealWaste training ' + label + ': ' + str(count_images(real_waste_training_path + label)))
    print('Number of files in RealWaste validation ' + label + ': ' + str(count_images(real_waste_validation_path + label)))
    print('Number of files in RealWaste test ' + label + ': ' + str(count_images(test_path + label)))
    print()

# ---------------------- Pre-Process Images RealWaste ---------------------- #
print('|--------------------------------| RealWaste Pre-Processing |------------------------------|\n')
# Preprocess images
for label in labels:
    # RealWaste training
    print('\nBefore preprocessing RealWaste training ' + label + ': ' + str(count_images(real_waste_training_path + label)))
    convert_rgb_images(real_waste_training_path+label)
    resize_image(real_waste_training_path+label, img_width, img_height)
    print('After preprocessing RealWaste training ' + label + ': ' + str(count_images(real_waste_training_path + label)))
    
    # RealWaste validation
    print('Before preprocessing RealWaste validation ' + label + ': ' + str(count_images(real_waste_validation_path + label)))
    convert_rgb_images(real_waste_validation_path+label)
    resize_image(real_waste_validation_path+label, img_width, img_height)
    print('After preprocessing RealWaste validation ' + label + ': ' + str(count_images(real_waste_validation_path + label)))
    
    # Testing
    print('Before preprocessing test ' + label + ': ' + str(count_images(test_path + label)))
    convert_rgb_images(test_path+label)
    resize_image(test_path+label, img_width, img_height)
    print('After preprocessing test ' + label + ': ' + str(count_images(test_path + label)))
    print()

# ---------------------- Distort and Flip RealWaste ------------------------ #
print('|----------------------| RealWaste Augmentation - Distort and Flip |---------------------|\n')

# Perform augmentation
for label in labels:
    # RealWaste training
    print('\nTraining count RealWaste '+ label + ': ' + str(count_images(real_waste_training_path+label)))
    distort_and_flip(real_waste_training_path+label)
    print('Augmented training count RealWaste '+ label + ': ' + str(count_images(real_waste_training_path+label+'/output')))
    sleep(60)
    # RealWaste validation
    print('\nValidation count RealWaste '+ label + ': ' + str(count_images(real_waste_validation_path+label)))
    distort_and_flip(real_waste_validation_path+label)
    print('Augmented validation count RealWaste '+ label + ': ' + str(count_images(real_waste_validation_path+label+'/output')))
    sleep(60)
    # Move to temp directories
    move_files(real_waste_training_path+label+'/output', temp_real_waste_training_path+label+'/', label+'_Distort_Flip')
    sleep(60)
    move_files(real_waste_validation_path+label+'/output', temp_real_waste_validation_path+label+'/', label+'_Distort_Flip')
    sleep(60)
    # Delete augmented output folder
    rmdir(real_waste_training_path+label+'/output')
    rmdir(real_waste_validation_path+label+'/output')

# ---------------------- Rotate and Shear RealWaste ------------------------ #
print('|---------------------| RealWaste Augmentation - Rotate and Shear |--------------------|\n')

# Perform augmentation
for label in labels:
    # RealWaste training
    print('\nTraining count RealWaste '+ label + ': ' + str(count_images(real_waste_training_path+label)))
    rotate_and_shear(real_waste_training_path+label)
    print('Augmented training count RealWaste '+ label + ': ' + str(count_images(real_waste_training_path+label+'/output')))
    sleep(60)
    # RealWaste validation
    print('\nValidation count RealWaste '+ label + ': ' + str(count_images(real_waste_validation_path+label)))
    rotate_and_shear(real_waste_validation_path+label)
    print('Augmented validation count RealWaste '+ label + ': ' + str(count_images(real_waste_validation_path+label+'/output')))
    sleep(60)
    # Move files back to container
    move_files(real_waste_training_path+label+'/output', real_waste_training_path+label+'/', label+'_Rotate_Shear')
    sleep(60)
    move_files(real_waste_validation_path+label+'/output', real_waste_validation_path+label+'/', label+'_Rotate_Shear')
    sleep(60)
    # Delete augmented output folder
    rmdir(real_waste_training_path+label+'/output')
    rmdir(real_waste_validation_path+label+'/output')

# Move first augmentation back to directory
for label in labels:
    move_files(temp_real_waste_training_path+label, real_waste_training_path+label, label+'_Distort_Flip')
    sleep(60)
    move_files(temp_real_waste_validation_path+label, real_waste_validation_path+label, label+'_Distort_Flip')
    sleep(60)

# Print final label counts
print('Final training count RealWaste ' + label + ': ' + str(count_images(real_waste_training_path)))
print('Final validation count RealWaste ' + label + ': ' + str(count_images(real_waste_validation_path)))
print()

# -------------------------- Print Final Counts ---------------------------- #
# Print training counts for each label in training dataset
print('\n|-----------| Total Training Label Counts |-----------|')
for label in labels:
    print('RealWaste ' + label + ': ' + str(count_images(real_waste_training_path+label)))
print('\n')

# Print validation counts for each label
print('\|----------| Total Validation Label Counts |----------|')
for label in labels:
    print('RealWaste ' + label + ': ' + str(count_images(real_waste_validation_path+label)))
print('\n')

# Print testing counts for each label
print('\n|------------| Total Testing Label Counts |-----------|')
for label in labels:
    print('Test '+ label + ': ' + str(count_images(test_path+label)))

# -------------------------- Disconnect Runtime ---------------------------- #
runtime.unassign()   

# DiversionNet Preprocessing

In [None]:
# Install augmentor library for geometric augmentations
!pip install Augmentor
# Import directory navigation libraries
from os import listdir, rmdir
# Import image preprocessing libraries
import numpy as np
import cv2
from shutil import move, copy
from random import randint
from PIL import Image
from tensorflow.image import resize
from tensorflow.io import decode_jpeg, encode_jpeg, write_file
from tensorflow.dtypes import saturate_cast
from random import randint
# Import augmentor for geometric data augmentations
import Augmentor
# Import Colab runtime disconnection
from google.colab import runtime
# Import sleep to allow time for copying to process
from time import sleep

# ------------------------------- Constants -------------------------------- #
# Original dataset path
diversion_net_original_path = ''
# DiversionNet training and validation paths
diversion_net_training_path = ''
diversion_net_validation_path = ''
# RealWaste temporary training and validation paths for augmentation stages
temp_diversion_net_training_path = ''
temp_diversion_net_validation_path = ''
# Labels
labels = ['Cardboard', 'Food Organics', 'Glass', 'Metal', 'Paper', 'Plastic', 'Miscellaneous Trash', 'Textile Trash', 'Vegetation']
# Processed image dimensions
img_height = 524
img_width = 524

# ------------------------------- Functions -------------------------------- #
# Crop non-square images
def crop_square(path, image, width, height, new_width, new_height, file):
    top = (height - new_height)/2
    bottom = (height + new_height)/2
    left = (width - new_width)/2
    right = (width + new_width)/2
    cropped = image.crop((left, top, right, bottom))
    cropped.save(path+'/'+file)
    sleep(0.05)

# Copy original images to dataset path and crop if required
def copy_and_crop(path):
    directory = listdir(path+' Raw')
    for file in directory:
        im = Image.open(path+' Raw/'+file)
        new_height = 0
        new_width = 0
        width, height = im.size
        if width > height:
            crop_square(path, im, width, height, height, height, file)
        elif height > width:
            crop_square(path, im, width, height, width, width, file)
        else:
            copy(path+' Raw/'+file, path+'/'+file)
            sleep(0.05)

# Count images
def count_images(path):
    directory = listdir(path)
    return len(directory)

# Split training and validation datasets
def split_dataset(input_path, train_path, val_path, val_prop, label):
    # Count images in directory and get files names
    num_files = count_images(input_path+label)
    file_names = listdir(input_path+label)

    # Calculated number of validation
    num_val = round(num_files*val_prop)
    
    # Initialise empty lists for image indexes
    val_list = []
    
    # Randomly assign file indexes for validation dataset
    i = 0
    while i < num_val:
        r = randint(1, num_files - 1)
        if r not in val_list:
            val_list.append(r)
            i += 1

    # Move validation dataset files and rename
    name_incrementer = 1
    for j in val_list:
        copy(input_path+label+'/'+file_names[j], val_path+label+'/'+label+'_'+str(name_incrementer)+'.jpg')
        sleep(0.05)
        name_incrementer += 1

    # Move remaining files into training dataset
    i = 0
    name_incrementer = 1
    while i < num_files:
        if i not in val_list:
            copy(input_path+label+'/'+file_names[i], train_path+label+'/'+label+'_'+str(name_incrementer)+'.jpg')
            sleep(0.05)
            name_incrementer += 1
        i += 1

# Resize images
def resize_image(path, width, height):
    directory = listdir(path)
    for file in directory:
        # Read image as binary file
        binary_image = open(path+'/'+file, 'rb')                          
        binary_representation = binary_image.read()                        
        decoded_representation = decode_jpeg(binary_representation) 
        # Resize binary represnetation
        resized_image = resize(decoded_representation, [width, height])
        resized_image = saturate_cast(resized_image, 'uint8')
        encoded_image = encode_jpeg(resized_image)                  
        write_file(path+'/'+file, encoded_image)
        sleep(0.05)

# Check images for RGB mode and convert if required
def convert_rgb_images(path):
    print(path)
    directory = listdir(path)
    for file in directory:
        img = Image.open(path+'/'+file)
        # Convert to RGB mode if not already in format
        if img.mode != 'RGB':
            img.convert('RGB').save(path+'/'+file)
            sleep(0.05)
            
# Geometric augmentation function: distort and flip
def distort_and_flip(path):
    pipe = Augmentor.Pipeline(path)           # Create pipeline in path of dataset
    pipe.flip_left_right(1)                   # Horizontally flip every image
    pipe.random_distortion(1, 5, 5, 4)        # Randomly distort flipped images
    pipe.process()                            # Process images in pipeline

# Geometric augmentation: rotate and shear
def rotate_and_shear(path):
    pipe = Augmentor.Pipeline(path)           # Create pipeline in path of dataset
    pipe.rotate(1, 25, 25)                    # Rotate every image by 45 degrees
    pipe.shear(1, 15, 15)                     # Randomly shear between 0 and 15 degrees
    pipe.process()                            # Process images in pipeline

# Move files between directories
def move_files(augmented_path, dataset_path, new_file_name):
    directory = listdir(augmented_path)
    name_incrementer = 1
    for file in directory:
        move(augmented_path+'/'+file, dataset_path+'/'+new_file_name+'_'+str(name_incrementer)+'.jpg')
        name_incrementer += 1
        sleep(0.05)

# -------------------------- Format DiversionNet --------------------------- #
print('\n|----------------------------------| DiversionNet Formatting |--------------------------------|\n')

# Format images by cropping if required
for label in labels:
    copy_and_crop(diversion_net_original_path+label)

# -------------------------- Split DiversionNet ---------------------------- #
print('|----------------------------------| DiversionNet Splitting |---------------------------------|\n')

# Split datasets
for label in labels:
    split_dataset(diversion_net_original_path, diversion_net_training_path, diversion_net_validation_path, 0.3, label)

# Print file counts in paths
for label in labels:
    print('Number of files in DiversionNet raw ' + label + ': ' + str(count_images(diversion_net_original_path + label)))
    print('Number of files in DiversionNet training ' + label + ': ' + str(count_images(diversion_net_training_path + label)))
    print('Number of files in DiversionNet validation ' + label + ': ' + str(count_images(diversion_net_validation_path + label)))
    print()

# -------------------- Pre-Process Images DiversionNet --------------------- #
print('|--------------------------------| DiversionNet Pre-Processing |------------------------------|\n')
# Preprocess images
for label in labels:
    # DiversionNet training
    print('\nBefore preprocessing DiversionNet training ' + label + ': ' + str(count_images(diversion_net_training_path + label)))
    convert_rgb_images(diversion_net_training_path+label)
    resize_image(diversion_net_training_path+label, img_width, img_height)
    print('After preprocessing DiversionNet training ' + label + ': ' + str(count_images(diversion_net_training_path + label)))
    
    # DiversionNet validation
    print('Before preprocessing DiversionNet validation ' + label + ': ' + str(count_images(diversion_net_validation_path + label)))
    convert_rgb_images(diversion_net_validation_path+label)
    resize_image(diversion_net_validation_path+label, img_width, img_height)
    print('After preprocessing DiversionNet validation ' + label + ': ' + str(count_images(diversion_net_validation_path + label)))

# --------------------- Distort and Flip DiversionNet ---------------------- #
print('|----------------------| DiversionNet Augmentation - Distort and Flip |---------------------|\n')

# Perform augmentation
for label in labels:
    # DiversionNet training
    print('\nTraining count DiversionNet '+ label + ': ' + str(count_images(diversion_net_training_path+label)))
    distort_and_flip(diversion_net_training_path+label)
    print('Augmented training count DiversionNet '+ label + ': ' + str(count_images(diversion_net_training_path+label+'/output')))
    sleep(60)
    # DiversionNet validation
    print('\nValidation count DiversionNet '+ label + ': ' + str(count_images(diversion_net_validation_path+label)))
    distort_and_flip(diversion_net_validation_path+label)
    print('Augmented validation count DiversionNet '+ label + ': ' + str(count_images(diversion_net_validation_path+label+'/output')))
    sleep(60)
    # Move to temp directories
    move_files(diversion_net_training_path+label+'/output', temp_diversion_net_training_path+label+'/', label+'_Distort_Flip')
    sleep(60)
    move_files(diversion_net_validation_path+label+'/output', temp_diversion_net_validation_path+label+'/', label+'_Distort_Flip')
    sleep(60)
    # Delete augmented output folder
    rmdir(diversion_net_training_path+label+'/output')
    rmdir(diversion_net_validation_path+label+'/output')

# --------------------- Rotate and Shear DiversionNet ---------------------- #
print('|---------------------| DiversionNet Augmentation - Rotate and Shear |--------------------|\n')

# Perform augmentation
for label in labels:
    # DiversionNet training
    print('\nTraining count DiversionNet '+ label + ': ' + str(count_images(diversion_net_training_path+label)))
    rotate_and_shear(diversion_net_training_path+label)
    print('Augmented training count DiversionNet '+ label + ': ' + str(count_images(diversion_net_training_path+label+'/output')))
    sleep(60)
    # DiversionNet validation
    print('\nValidation count DiversionNet '+ label + ': ' + str(count_images(diversion_net_validation_path+label)))
    rotate_and_shear(diversion_net_validation_path+label)
    print('Augmented validation count DiversionNet '+ label + ': ' + str(count_images(diversion_net_validation_path+label+'/output')))
    sleep(60)
    # Move files back to container
    move_files(diversion_net_training_path+label+'/output', diversion_net_training_path+label+'/', label+'_Rotate_Shear')
    sleep(60)
    move_files(diversion_net_validation_path+label+'/output', diversion_net_validation_path+label+'/', label+'_Rotate_Shear')
    sleep(60)
    # Delete augmented output folder
    rmdir(diversion_net_training_path+label+'/output')
    rmdir(diversion_net_validation_path+label+'/output')

# Move first augmentation back to directory
for label in labels:
    move_files(temp_diversion_net_training_path+label, diversion_net_training_path+label, label+'_Distort_Flip')
    sleep(60)
    move_files(temp_diversion_net_validation_path+label, diversion_net_validation_path+label, label+'_Distort_Flip')
    sleep(60)

# Print final label counts
print('Final training count DiversionNet ' + label + ': ' + str(count_images(diversion_net_training_path)))
print('Final validation count DiversionNet ' + label + ': ' + str(count_images(diversion_net_validation_path)))
print()

# --------------------------- Print Final Counts --------------------------- #
# Print training counts for each label in training dataset
print('\n|-----------| Total Training Label Counts |-----------|')
for label in labels:
    print('DiversionNet ' + label + ': ' + str(count_images(diversion_net_training_path+label)))
print('\n')

# Print validation counts for each label
print('\|----------| Total Validation Label Counts |----------|')
for label in labels:
    print('DiversionNet ' + label + ': ' + str(count_images(diversion_net_validation_path+label)))
print('\n')

# -------------------------- Disconnect Runtime ---------------------------- #
runtime.unassign()   

# DenseNet121

In [None]:
# Import os library to navigate directories
from os import listdir
# Import tensorflow functions for model building
from tensorflow.keras.applications.densenet import DenseNet121
from tensorflow.keras.layers import GlobalAveragePooling2D, Dense, Input
from tensorflow.keras import Model
from tensorflow.keras.models import save_model, load_model
from tensorflow.keras.optimizers import Adam
# Import tensorflow functions for training
from tensorflow.keras.utils import image_dataset_from_directory
from tensorflow.data import AUTOTUNE as autotune
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
from tensorflow import cast
# Import tensorflow functions for metrics
from tensorflow.keras.metrics import Accuracy, Precision, Recall
# Import Colab runtime disconnection
from google.colab import runtime

# ------------------------------- Constants -------------------------------- #
# Split basic and advanced feature extraction layers
split_layer = 143

# Training parameters
batch_size = 16       # Batch size for gradient learning
img_height = 524      # Image pixel height
img_width = 524       # Image pixel width
epochs = 500          # Arbitrarily large epoch size for convergence

# Learning rates
learn_rate_fc = 1e-4  # Fully connected layers
learn_rate_fe1 = 1e-5 # Later feature extraction layers
learn_rate_fe2 = 1e-5 # Early feature extraction layers

# Callback to stop model training when model converges
model_callback = EarlyStopping(monitor='val_loss', patience=10, mode='min')

# Dataset paths
train_dir = ''
validation_dir = ''
best_model_path_fc = ''
best_model_path_fe1 = ''
best_model_path_fe2 = ''
dataset_name = ''

# ------------------------ Training Mode Functions ------------------------- #
# Freeze fully connected layers
def freeze_fully_connected(model):
    # Loop through fully connected layers and freeze
    i = 2
    while i < len(model.layers):
        model.layers[i].trainable = False
        i += 1
    return model

# Unfreeze set of feature extraction layers
def unfreeze_feature_extraction_block(model, start_layer, end_layer):  
    # Loop through all feature extraction layers and freeze
    j = 0
    while j < len(model.layers[1].layers):
        model.layers[1].layers[j].trainable = False
        j += 1
    # Loop through specified feature extraction layers and unfreeze
    for k in range(start_layer, end_layer + 1, 1):
        if str(model.layers[1].layers[k]).split('.')[3] != 'batch_normalization':
            model.layers[1].layers[k].trainable = True
        k += 1
    return model

# --------------------------- Build DenseNet121 ---------------------------- #
# Create base DenseNet121 model:
  # 524x524x3 input size
  # Only feature extraction layers - no classification layers
  # Feature extraction layer weights trained on ImageNet dataset
base_model = DenseNet121(input_shape = (img_height, img_width, 3),
                         include_top = False,
                         weights = 'imagenet'
                         )
# Freeze learning on feature extraction layers
base_model.trainable = False

# Create classification layers of DenseNet121
# 7x7 global average pooling layer
global_average_pooling = GlobalAveragePooling2D()
# Fully connected layer using softmax activation function for 8 labels
classifier = Dense(units = 9, activation = 'softmax')

# Connect input layer, base model and output classifier
inputs = Input(shape = (img_height, img_width, 3)) # Input layer: 524x524x3 RGB images
model = base_model(inputs)                         # Feature extraction layers of DenseNet121
model = global_average_pooling(model)              # Fully connected layer: 7x7 global average pooling
outputs = classifier(model)                        # Fully connected layer: 8 neuron softmax activation for classification

# Build DenseNet121 model
dense_net_121 = Model(inputs, outputs, name = 'DenseNet121')

# --------------- Train DenseNet121 Fully Connected Layers ----------------- #
# RGB value normalization to improve learning rate - RGB values within activation function range
def process(image, label):
    image = cast(image/255.0, 'float32')
    return image, label

# Create training dataset
train_ds = image_dataset_from_directory(
    train_dir,
    validation_split = 0,
    seed = 123,
    image_size = (img_height, img_width),
    batch_size = batch_size,
    label_mode = 'categorical'
)
# Normalize training dataset pixel values to improve learning performance
train_ds = train_ds.map(process)
# Prefetch training dataset to improve computational speed
train_ds = train_ds.cache().prefetch(buffer_size = autotune)

# Define validation dataset
val_ds = image_dataset_from_directory(
    validation_dir,
    validation_split = 0,
    seed = 123,
    image_size = (img_height, img_width),
    batch_size = batch_size,
    label_mode = 'categorical'
)
# Normalize validation pixel values to improve learning performance
val_ds = val_ds.map(process)
# Prefetch validation dataset to improve computation speed
val_ds = val_ds.cache().prefetch(buffer_size = autotune)

# Compile model:
  # Adam algorithm optimization
  # Categorical crossentropy loss function for multilabel classification
  # Metrics: accuracy, precision, recall
dense_net_121.compile(
    optimizer = Adam(learn_rate_fc),
    loss = 'categorical_crossentropy',
    metrics = ['accuracy', Precision(), Recall()]
)

# ---------------- Train DenseNet121 Fully Connected Layers ---------------- #
print('\n|------------------------------| Training Fully Connected Layers |--------------------------------|\n')

# Define checkpoint to save model after every epoch
model_checkpoint_fc = ModelCheckpoint(best_model_path_fc + '/densenet121-' + dataset_name + '-{epoch:02d}-{val_accuracy:.4f}.hdf5', save_best_only = True, save_weights_only = False, mode= 'auto')

# Train model
dense_net_121.fit(
    train_ds,
    validation_data = val_ds,
    epochs = epochs,
    verbose = 1,
    callbacks = [model_checkpoint_fc, model_callback]
)

print('\n|--------------------------| Fully Connected Layers Training Complete |---------------------------|\n')

# ----------- Train DenseNet121 Later Feature Extraction Layers ------------ #
print('|------------------------| Fine Tuning Later Feature Extraction Layers |--------------------------|\n')

# Load model with best accuracy
print('Loading model with best accuracy...\n')
directory = listdir(best_model_path_fc)
for file in directory:
    best_model_name = file.replace('.hdf5', '')
dense_net_121_fe1 = load_model(best_model_path_fc + '/' + best_model_name + '.hdf5')

# Swap learning mode
dense_net_121_fe1 = freeze_fully_connected(dense_net_121_fe1)
dense_net_121_fe1 = unfreeze_feature_extraction_block(dense_net_121_fe1, split_layer, len(dense_net_121_fe1.layers[1].layers) - 1)

# Define checkpoint to save best model
best_model_checkpoint_fe1 = ModelCheckpoint(best_model_path_fe1 + '/' + best_model_name + '-fe1-{epoch:02d}-{val_accuracy:.4f}.hdf5', save_best_only = True, save_weights_only = False, mode= 'auto')

# Decrease learning rate to reduce overfitting
dense_net_121_fe1.optimizer = Adam(learn_rate_fe1)

# Fine tune model
dense_net_121_fe1.fit(
    train_ds,
    validation_data = val_ds,
    epochs = epochs,
    verbose = 1,
    callbacks = [best_model_checkpoint_fe1, model_callback]
)

print('\n|--------------------| Fine Tuning Later Feature Extraction Layers Complete |---------------------|\n')

# ----------- Train DenseNet121 Early Feature Extraction Layers ------------ #
print('|------------------------| Fine Tuning Early Feature Extraction Layers |--------------------------|\n')

# Load model with best accuracy
print('Loading model with best accuracy...\n')
directory = listdir(best_model_path_fe1)
for file in directory:
    best_model_name = file.replace('.hdf5', '')
dense_net_121_fe2 = load_model(best_model_path_fe1 + '/' + best_model_name + '.hdf5')

# Swap learning mode
dense_net_121_fe2 = freeze_fully_connected(dense_net_121_fe2)
dense_net_121_fe2 = unfreeze_feature_extraction_block(dense_net_121_fe2, 1, split_layer - 1)

# Define checkpoint to save best model
best_model_checkpoint_fe2 = ModelCheckpoint(best_model_path_fe2 + '/' + best_model_name + '-fe2-{epoch:02d}-{val_accuracy:.4f}.hdf5', save_best_only = True, save_weights_only = False, mode= 'auto')

# Decrease learning rate to reduce overfitting
dense_net_121_fe2.optimizer = Adam(learn_rate_fe2)

# Fine tune model
dense_net_121_fe2.fit(
    train_ds,
    validation_data = val_ds,
    epochs = epochs,
    verbose = 1,
    callbacks = [best_model_checkpoint_fe2, model_callback]
)

print('\n|--------------------| Fine Tuning Early Feature Extraction Layers Complete |---------------------|')

# -------------------------- Disconnect Runtime ---------------------------- #
runtime.unassign()

# Inception V3

In [None]:
# Import os library to navigate directories
from os import listdir
# Import tensorflow functions for model building
from tensorflow.keras.applications.inception_v3 import InceptionV3
from tensorflow.keras.layers import Dense, Input, GlobalAveragePooling2D
from tensorflow.keras import Model
from tensorflow.keras.models import save_model, load_model
from tensorflow.keras.optimizers import Adam
# Import tensorflow functions for training
from tensorflow.keras.utils import image_dataset_from_directory
from tensorflow.data import AUTOTUNE as autotune
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
from tensorflow import cast
# Import tensorflow functions for metrics
from tensorflow.keras.metrics import Accuracy, Precision, Recall
# Import Colab runtime disconnection
from google.colab import runtime

# ------------------------------- Constants -------------------------------- #
# Split basic and advanced feature extraction layers
split_layer = 249

# Training parameters
batch_size = 32       # Batch size for gradient learning
img_height = 524      # Image pixel height
img_width = 524       # Image pixel width
epochs = 500          # Arbitrarily large epoch size for convergence

# Learning rates
learn_rate_fc = 1e-5  # Fully connected layers
learn_rate_fe1 = 1e-5 # Later feature extraction layers
learn_rate_fe2 = 1e-5 # Early feature extraction layers

# Callback to stop model training when model converges
model_callback = EarlyStopping(monitor='val_loss', patience=10, mode='min')

# Dataset paths
train_dir = ''
validation_dir = ''
best_model_path_fc = ''
best_model_path_fe1 = ''
best_model_path_fe2 = ''
dataset_name = ''

# ------------------------ Training Mode Functions ------------------------- #
# Freeze fully connected layers
def freeze_fully_connected(model):
    # Loop through fully connected layers and freeze
    i = 2
    while i < len(model.layers):
        model.layers[i].trainable = False
        i += 1
    return model

# Unfreeze set of feature extraction layers
def unfreeze_feature_extraction_block(model, start_layer, end_layer):  
    # Loop through all feature extraction layers and freeze
    j = 0
    while j < len(model.layers[1].layers):
        model.layers[1].layers[j].trainable = False
        j += 1
    # Loop through specified feature extraction layers and unfreeze
    for k in range(start_layer, end_layer, 1):
        if str(model.layers[1].layers[k]).split('.')[3] != 'batch_normalization':
            model.layers[1].layers[k].trainable = True
        k += 1
    return model

# ---------------------------- Build Inception V3 -------------------------- #
# Create base Inception V3 model:
  # 524x524x3 input size
  # Only feature extraction layers - no classification layers
  # Feature extraction layer weights trained on ImageNet dataset
base_model = InceptionV3(input_shape = (img_height, img_width, 3),
                         include_top = False,
                         weights = 'imagenet'
                         )
# Freeze learning on feature extraction layers
base_model.trainable = False

# Create classification layers of Inception V3
global_average_pool = GlobalAveragePooling2D()        # Global average pooling
fc1 = Dense(units = 2048, activation = 'relu')        # 2048 neuron fully connected layer with ReLU activation
classifier = Dense(units = 9, activation = 'softmax') # Softmax classifier

# Connect input layer, base model and output layers
inputs = Input(shape = (img_height, img_width, 3)) # Input layer: 224x224 RGB images
model = base_model(inputs)                         # Feature extraction layers of Inception V3
model = global_average_pool(model)                 # Global average pooling layer
model = fc1(model)                                 # Fully connected layer: 2048 neuron ReLU
outputs = classifier(model)                        # Fully connected layer: 8 neuron softmax activation for classification

# Build inception_v3 model
inception_v3 = Model(inputs, outputs, name = 'InceptionV3')

# --------------- Train Inception V3 Fully Connected Layers ---------------- #
# RGB value normalization to improve learning rate - RGB values within activation function range
def process(image, label):
    image = cast(image/255.0, 'float32')
    return image, label

# Create training dataset
train_ds = image_dataset_from_directory(
    train_dir,
    validation_split = 0,
    seed = 123,
    image_size = (img_height, img_width),
    batch_size = batch_size,
    label_mode = 'categorical'
)
# Normalize training dataset pixel values to improve learning performance
train_ds = train_ds.map(process)
# Prefetch training dataset to improve computational speed
train_ds = train_ds.cache().prefetch(buffer_size = autotune)

# Define validation dataset
val_ds = image_dataset_from_directory(
    validation_dir,
    validation_split = 0,
    seed = 123,
    image_size = (img_height, img_width),
    batch_size = batch_size,
    label_mode = 'categorical'
)
# Normalize validation pixel values to improve learning performance
val_ds = val_ds.map(process)
# Prefetch validation dataset to improve computation speed
val_ds = val_ds.cache().prefetch(buffer_size = autotune)

# Compile model:
  # Adam algorithm optimization
  # Categorical crossentropy loss function for multilabel classification
  # Metrics: accuracy, precision, recall
inception_v3.compile(
    optimizer = Adam(learn_rate_fc),
    loss = 'categorical_crossentropy',
    metrics = ['accuracy', Precision(), Recall()]
)

# ---------------- Train Inception V3 Fully Connected Layers --------------- #
print('\n|------------------------------| Training Fully Connected Layers |--------------------------------|\n')

# Define checkpoint to save model after every epoch
model_checkpoint_fc = ModelCheckpoint(best_model_path_fc + '/inception_v3-' + dataset_name + '-{epoch:02d}-{val_accuracy:.4f}.hdf5', save_best_only = True, save_weights_only = False, mode= 'auto')

# Train model
inception_v3.fit(
    train_ds,
    validation_data = val_ds,
    epochs = epochs,
    verbose = 1,
    callbacks = [model_checkpoint_fc, model_callback]
)

print('\n|--------------------------| Fully Connected Layers Training Complete |---------------------------|\n')

# ----------- Train Inception V3 Later Feature Extraction Layers ----------- #
print('|------------------------| Fine Tuning Later Feature Extraction Layers |--------------------------|\n')

# Load model with best accuracy
print('Loading model with best accuracy...\n')
directory = listdir(best_model_path_fc)
for file in directory:
    best_model_name = file.replace('.hdf5', '')
inception_v3_fe1 = load_model(best_model_path_fc + '/' + best_model_name + '.hdf5')

# Swap learning mode
inception_v3_fe1 = freeze_fully_connected(inception_v3_fe1)
inception_v3_fe1 = unfreeze_feature_extraction_block(inception_v3_fe1, split_layer, len(inception_v3_fe1.layers[1].layers))

# Define checkpoint to save best model
best_model_checkpoint_fe1 = ModelCheckpoint(best_model_path_fe1 + '/' + best_model_name + '-fe1-{epoch:02d}-{val_accuracy:.4f}.hdf5', save_best_only = True, save_weights_only = False, mode= 'auto')

# Decrease learning rate to reduce overfitting
inception_v3_fe1.optimizer = Adam(learn_rate_fe1)

# Fine tune model
inception_v3_fe1.fit(
    train_ds,
    validation_data = val_ds,
    epochs = epochs,
    verbose = 1,
    callbacks = [best_model_checkpoint_fe1, model_callback]
)

print('\n|--------------------| Fine Tuning Later Feature Extraction Layers Complete |---------------------|\n')

# ----------- Train Inception V3 Early Feature Extraction Layers ----------- #
print('|------------------------| Fine Tuning Early Feature Extraction Layers |--------------------------|\n')

# Load model with best accuracy
print('Loading model with best accuracy...\n')
directory = listdir(best_model_path_fe1)
for file in directory:
    best_model_name = file.replace('.hdf5', '')
inception_v3_fe2 = load_model(best_model_path_fe1 + '/' + best_model_name + '.hdf5')

# Swap learning mode
inception_v3_fe2 = freeze_fully_connected(inception_v3_fe2)
inception_v3_fe2 = unfreeze_feature_extraction_block(inception_v3_fe2, 1, split_layer - 1)

# Define checkpoint to save best model
best_model_checkpoint_fe2 = ModelCheckpoint(best_model_path_fe2 + '/' + best_model_name + '-fe2-{epoch:02d}-{val_accuracy:.4f}.hdf5', save_best_only = True, save_weights_only = False, mode= 'auto')

# Decrease learning rate to reduce overfitting
inception_v3_fe2.optimizer = Adam(learn_rate_fe2)

# Fine tune model
inception_v3_fe2.fit(
    train_ds,
    validation_data = val_ds,
    epochs = epochs,
    verbose = 1,
    callbacks = [best_model_checkpoint_fe2, model_callback]
)

print('\n|--------------------| Fine Tuning Early Feature Extraction Layers Complete |---------------------|')

# -------------------------- Disconnect Runtime ---------------------------- #
runtime.unassign()

# InceptionResNet V2

In [None]:
# Import os library to navigate directories
from os import listdir
# Import tensorflow functions for model building
from tensorflow.keras.applications.inception_resnet_v2 import InceptionResNetV2
from tensorflow.keras.layers import Input, Dense, Conv2D, Flatten
from tensorflow.keras import Model
from tensorflow.keras.models import save_model, load_model
from tensorflow.keras.optimizers import Adam
# Import tensorflow functions for training
from tensorflow.keras.utils import image_dataset_from_directory
from tensorflow.data import AUTOTUNE as autotune
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
from tensorflow import cast
# Import tensorflow functions for metrics
from tensorflow.keras.metrics import Accuracy, Precision, Recall
# Import Colab runtime disconnection
from google.colab import runtime

# ------------------------------- Constants -------------------------------- #
# Split basic and advanced feature extraction layers
split_layer = 576

# Training parameters
batch_size = 32       # Batch size for gradient learning
img_height = 524      # Image pixel height
img_width = 524       # Image pixel width
epochs = 500          # Arbitrarily large epoch size for convergence

# Learning rates
learn_rate_fc = 1e-6  # Fully connected layers
learn_rate_fe1 = 1e-6 # Later feature extraction layers
learn_rate_fe2 = 1e-6 # Early feature extraction layers

# Callback to stop model training when model converges
model_callback = EarlyStopping(monitor='val_loss', patience=10, mode='min')

# Dataset paths
train_dir = ''
validation_dir = ''
best_model_path_fc = ''
best_model_path_fe1 = ''
best_model_path_fe2 = ''
dataset_name = ''

# ------------------------ Training Mode Functions ------------------------- #
# Freeze fully connected layers
def freeze_fully_connected(model):
    # Loop through fully connected layers and freeze
    i = 2
    while i < len(model.layers):
        model.layers[i].trainable = False
        i += 1
    return model

# Unfreeze set of feature extraction layers
def unfreeze_feature_extraction_block(model, start_layer, end_layer):  
    # Loop through all feature extraction layers and freeze
    j = 0
    while j < len(model.layers[1].layers):
        model.layers[1].layers[j].trainable = False
        j += 1
    # Loop through specified feature extraction layers and unfreeze
    for k in range(start_layer, end_layer, 1):
        if str(model.layers[1].layers[k]).split('.')[3] != 'batch_normalization':
            model.layers[1].layers[k].trainable = True
        k += 1
    return model

# ---------------------- Build InceptionResNet V2 ------------------------ #
# Create base InceptionResNet V2 model:
  # 524x524x3 input size
  # Only feature extraction layers - no classification layers
  # Feature extraction layer weights trained on ImageNet dataset
base_model = InceptionResNetV2(input_shape = (img_height, img_width, 3),
                         include_top = False,
                         weights = 'imagenet'
                         )
# Freeze learning on feature extraction layers
base_model.trainable = False

# Create classification layers of InceptionResNet V2
# Flatten layer
flatten = Flatten()
# Fully connected layer using softmax activation function for 8 labels
classifier = Dense(units = 9, activation = 'softmax')

# Connect input layer, base model and output classifier
inputs = Input(shape = (img_height, img_width, 3)) # Input layer: 524x524x3 RGB images
model = base_model(inputs)                         # Feature extraction layers of InceptionResNet V2
model = flatten(model)                             # Fully connected layer: flatten function
outputs = classifier(model)                        # Fully connected layer: 8 neuron softmax activation for classification

# Build InceptionResNetV2 model
inception_resnet_v2 = Model(inputs, outputs, name = 'InceptionResNetV2')

# ----------- Train InceptionResNet V2 Fully Connected Layers ------------- #
# RGB value normalization to improve learning rate - RGB values within activation function range
def process(image, label):
    image = cast(image/255.0, 'float32')
    return image, label

# Create training dataset
train_ds = image_dataset_from_directory(
    train_dir,
    validation_split = 0,
    seed = 123,
    image_size = (img_height, img_width),
    batch_size = batch_size,
    label_mode = 'categorical'
)
# Normalize training dataset pixel values to improve learning performance
train_ds = train_ds.map(process)
# Prefetch training dataset to improve computational speed
train_ds = train_ds.cache().prefetch(buffer_size = autotune)

# Define validation dataset
val_ds = image_dataset_from_directory(
    validation_dir,
    validation_split = 0,
    seed = 123,
    image_size = (img_height, img_width),
    batch_size = batch_size,
    label_mode = 'categorical'
)
# Normalize validation pixel values to improve learning performance
val_ds = val_ds.map(process)
# Prefetch validation dataset to improve computation speed
val_ds = val_ds.cache().prefetch(buffer_size = autotune)

# Compile model:
  # Adam algorithm optimization
  # Categorical crossentropy loss function for multilabel classification
  # Metrics: accuracy, precision, recall
inception_resnet_v2.compile(
    optimizer = Adam(learn_rate_fc),
    loss = 'categorical_crossentropy',
    metrics = ['accuracy', Precision(), Recall()]
)

# ------------ Train InceptionResNet V2 Fully Connected Layers ------------- #
print('\n|------------------------------| Training Fully Connected Layers |--------------------------------|\n')

# Define checkpoint to save model after every epoch
model_checkpoint_fc = ModelCheckpoint(best_model_path_fc + '/inception_resnet_v2-' + dataset_name + '-{epoch:02d}-{val_accuracy:.4f}.hdf5', save_best_only = True, save_weights_only = False, mode= 'auto')

# Train model
inception_resnet_v2.fit(
    train_ds,
    validation_data = val_ds,
    epochs = epochs,
    verbose = 1,
    callbacks = [model_checkpoint_fc, model_callback]
)

print('\n|--------------------------| Fully Connected Layers Training Complete |---------------------------|\n')

# ------- Train InceptionResNet V2 Later Feature Extraction Layers --------- #
print('|------------------------| Fine Tuning Later Feature Extraction Layers |--------------------------|\n')

# Load model with best accuracy
print('Loading model with best accuracy...\n')
directory = listdir(best_model_path_fc)
for file in directory:
    best_model_name = file.replace('.hdf5', '')
inception_resnet_v2_fe1 = load_model(best_model_path_fc + '/' + best_model_name + '.hdf5')

# Swap learning mode
inception_resnet_v2_fe1 = freeze_fully_connected(inception_resnet_v2_fe1)
inception_resnet_v2_fe1 = unfreeze_feature_extraction_block(inception_resnet_v2_fe1, split_layer, len(inception_resnet_v2_fe1.layers[1].layers))

# Define checkpoint to save best model
best_model_checkpoint_fe1 = ModelCheckpoint(best_model_path_fe1 + '/' + best_model_name + '-fe1-{epoch:02d}-{val_accuracy:.4f}.hdf5', save_best_only = True, save_weights_only = False, mode= 'auto')

# Decrease learning rate to reduce overfitting
inception_resnet_v2_fe1.optimizer = Adam(learn_rate_fe1)

# Fine tune model
inception_resnet_v2_fe1.fit(
    train_ds,
    validation_data = val_ds,
    epochs = epochs,
    verbose = 1,
    callbacks = [best_model_checkpoint_fe1, model_callback]
)

print('\n|--------------------| Fine Tuning Later Feature Extraction Layers Complete |---------------------|\n')

# -------- Train InceptionResNet V2 Early Feature Extraction Layers -------- #
print('|------------------------| Fine Tuning Early Feature Extraction Layers |--------------------------|\n')

# Load model with best accuracy
print('Loading model with best accuracy...\n')
directory = listdir(best_model_path_fe1)
for file in directory:
    best_model_name = file.replace('.hdf5', '')
inception_resnet_v2_fe2 = load_model(best_model_path_fe1 + '/' + best_model_name + '.hdf5')

# Swap learning mode
inception_resnet_v2_fe2 = freeze_fully_connected(inception_resnet_v2_fe2)
inception_resnet_v2_fe2 = unfreeze_feature_extraction_block(inception_resnet_v2_fe2, 1, split_layer - 1)

# Define checkpoint to save best model
best_model_checkpoint_fe2 = ModelCheckpoint(best_model_path_fe2 + '/' + best_model_name + '-fe2-{epoch:02d}-{val_accuracy:.4f}.hdf5', save_best_only = True, save_weights_only = False, mode= 'auto')

# Decrease learning rate to reduce overfitting
inception_resnet_v2_fe2.optimizer = Adam(learn_rate_fe2)

# Fine tune model
inception_resnet_v2_fe2.fit(
    train_ds,
    validation_data = val_ds,
    epochs = epochs,
    verbose = 1,
    callbacks = [best_model_checkpoint_fe2, model_callback]
)

print('\n|--------------------| Fine Tuning Early Feature Extraction Layers Complete |---------------------|\n')

# -------------------------- Disconnect Runtime ---------------------------- #
runtime.unassign()

# MobileNetV2

In [None]:
# Import os library to navigate directories
from os import listdir
# Import tensorflow functions for model building
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2
from tensorflow.keras.layers import Dense, Input, GlobalAveragePooling2D
from tensorflow.keras import Model
from tensorflow.keras.models import save_model, load_model
from tensorflow.keras.optimizers import Adam
# Import tensorflow functions for training
from tensorflow.keras.utils import image_dataset_from_directory
from tensorflow.data import AUTOTUNE as autotune
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
from tensorflow import cast
# Import tensorflow functions for metrics
from tensorflow.keras.metrics import Accuracy, Precision, Recall
# Import Colab runtime disconnection
from google.colab import runtime

# ------------------------------- Constants -------------------------------- #
# Split basic and advanced feature extraction layers
split_layer = 149

# Training parameters
batch_size = 4        # Batch size for gradient learning
img_height = 524      # Image pixel height
img_width = 524       # Image pixel width
epochs = 500          # Arbitrarily large epoch size for convergence

# Learning rates
learn_rate_fc = 1e-5  # Fully connected layers
learn_rate_fe1 = 1e-5 # Later feature extraction layers
learn_rate_fe2 = 1e-5 # Early feature extraction layers

# Callback to stop model training when model converges
model_callback = EarlyStopping(monitor='val_loss', patience=10, mode='min')

# Dataset paths
train_dir = ''
validation_dir = ''
best_model_path_fc = ''
best_model_path_fe1 = ''
best_model_path_fe2 = ''
dataset_name = ''

# ------------------------ Training Mode Functions ------------------------- #
# Freeze fully connected layers
def freeze_fully_connected(model):
    # Loop through fully connected layers and freeze
    i = 2
    while i < len(model.layers):
        model.layers[i].trainable = False
        i += 1
    return model

# Unfreeze set of feature extraction layers
def unfreeze_feature_extraction_block(model, start_layer, end_layer):  
    # Loop through all feature extraction layers and freeze
    j = 0
    while j < len(model.layers[1].layers):
        model.layers[1].layers[j].trainable = False
        j += 1
    # Loop through specified feature extraction layers and unfreeze
    for k in range(start_layer, end_layer, 1):
        if str(model.layers[1].layers[k]).split('.')[3] != 'batch_normalization':
            model.layers[1].layers[k].trainable = True
        k += 1
    return model

# ---------------------------- Build MobileNetV2 --------------------------- #
# Create base MobileNetV2 model:
  # 524x524x3 input size
  # Only feature extraction layers - no classification layers
  # Feature extraction layer weights trained on ImageNet dataset
base_model = MobileNetV2(input_shape = (img_height, img_width, 3),
                         include_top = False,
                         weights = 'imagenet'
                         )
# Freeze learning on feature extraction layers
base_model.trainable = False

# Create classification layers of InceptionV3
global_average_pool = GlobalAveragePooling2D()        # Global average pooling
classifier = Dense(units = 9, activation = 'softmax') # 1000 neuron fully connected layer with ReLU activation

# Connect input layer, base model and output layers
inputs = Input(shape = (img_height, img_width, 3)) # Input layer: 524x524x3 RGB images
model = base_model(inputs)                         # Feature extraction layers of InceptionV3
model = global_average_pool(model)                 # Global average pooling layer
outputs = classifier(model)                        # Fully connected layer: 8 neuron softmax activation for classification

# Build MobileNetV2 model
mobile_net_v2 = Model(inputs, outputs, name = 'MobileNetV2')

# --------------- Train MobileNetV2 Fully Connected Layers ---------------- #
# RGB value normalization to improve learning rate - RGB values within activation function range
def process(image, label):
    image = cast(image/255.0, 'float32')
    return image, label

# Create training dataset
train_ds = image_dataset_from_directory(
    train_dir,
    validation_split = 0,
    seed = 123,
    image_size = (img_height, img_width),
    batch_size = batch_size,
    label_mode = 'categorical'
)
# Normalize training dataset pixel values to improve learning performance
train_ds = train_ds.map(process)
# Prefetch training dataset to improve computational speed
train_ds = train_ds.cache().prefetch(buffer_size = autotune)

# Define validation dataset
val_ds = image_dataset_from_directory(
    validation_dir,
    validation_split = 0,
    seed = 123,
    image_size = (img_height, img_width),
    batch_size = batch_size,
    label_mode = 'categorical'
)
# Normalize validation pixel values to improve learning performance
val_ds = val_ds.map(process)
# Prefetch validation dataset to improve computation speed
val_ds = val_ds.cache().prefetch(buffer_size = autotune)

# Compile model:
  # Adam algorithm optimization
  # Categorical crossentropy loss function for multilabel classification
  # Metrics: accuracy, precision, recall
mobile_net_v2.compile(
    optimizer = Adam(learn_rate_fc),
    loss = 'categorical_crossentropy',
    metrics = ['accuracy', Precision(), Recall()]
)

# ---------------- Train MobileNet V2 Fully Connected Layers --------------- #
print('\n|------------------------------| Training Fully Connected Layers |--------------------------------|\n')

# Define checkpoint to save model after every epoch
model_checkpoint_fc = ModelCheckpoint(best_model_path_fc + '/mobilenet_v2-' + dataset_name + '-{epoch:02d}-{val_accuracy:.4f}.hdf5', save_best_only = True, save_weights_only = False, mode= 'auto')

# Train model
mobile_net_v2.fit(
    train_ds,
    validation_data = val_ds,
    epochs = epochs,
    verbose = 1,
    callbacks = [model_checkpoint_fc, model_callback]
)

print('\n|--------------------------| Fully Connected Layers Training Complete |---------------------------|\n')

# ----------- Train MobileNet V2 Later Feature Extraction Layers ----------- #
print('|------------------------| Fine Tuning Later Feature Extraction Layers |--------------------------|\n')

# Load model with best accuracy
print('Loading model with best accuracy...\n')
directory = listdir(best_model_path_fc)
for file in directory:
    best_model_name = file.replace('.hdf5', '')
mobile_net_v2_fe1 = load_model(best_model_path_fc + '/' + best_model_name + '.hdf5')

# Swap learning mode
mobile_net_v2_fe1 = freeze_fully_connected(mobile_net_v2_fe1)
mobile_net_v2_fe1 = unfreeze_feature_extraction_block(mobile_net_v2_fe1, split_layer, len(mobile_net_v2_fe1.layers[1].layers))

# Define checkpoint to save best model
best_model_checkpoint_fe1 = ModelCheckpoint(best_model_path_fe1 + '/' + best_model_name + '-fe1-{epoch:02d}-{val_accuracy:.4f}.hdf5', save_best_only = True, save_weights_only = False, mode= 'auto')

# Decrease learning rate to reduce overfitting
mobile_net_v2_fe1.optimizer = Adam(learn_rate_fe1)

# Fine tune model
mobile_net_v2_fe1.fit(
    train_ds,
    validation_data = val_ds,
    epochs = epochs,
    verbose = 1,
    callbacks = [best_model_checkpoint_fe1, model_callback]
)

print('\n|--------------------| Fine Tuning Later Feature Extraction Layers Complete |---------------------|\n')

# ----------- Train MobileNet V2 Early Feature Extraction Layers ----------- #
print('|------------------------| Fine Tuning Early Feature Extraction Layers |--------------------------|\n')

# Load model with best accuracy
print('Loading model with best accuracy...\n')
directory = listdir(best_model_path_fe1)
for file in directory:
    best_model_name = file.replace('.hdf5', '')
mobile_net_v2_fe2 = load_model(best_model_path_fe1 + '/' + best_model_name + '.hdf5')

# Swap learning mode
mobile_net_v2_fe2 = freeze_fully_connected(mobile_net_v2_fe2)
mobile_net_v2_fe2 = unfreeze_feature_extraction_block(mobile_net_v2_fe2, 1, split_layer - 1)

# Define checkpoint to save best model
best_model_checkpoint_fe2 = ModelCheckpoint(best_model_path_fe2 + '/' + best_model_name + '-fe2-{epoch:02d}-{val_accuracy:.4f}.hdf5', save_best_only = True, save_weights_only = False, mode= 'auto')

# Decrease learning rate to reduce overfitting
mobile_net_v2_fe2.optimizer = Adam(learn_rate_fe2)

# Fine tune model
mobile_net_v2_fe2.fit(
    train_ds,
    validation_data = val_ds,
    epochs = epochs,
    verbose = 1,
    callbacks = [best_model_checkpoint_fe2, model_callback]
)

print('\n|--------------------| Fine Tuning Early Feature Extraction Layers Complete |---------------------|\n')

# -------------------------- Disconnect Runtime ---------------------------- #
runtime.unassign()

# VGG-16

In [None]:
# Import os library to navigate directories
from os import listdir
# Import tensorflow functions for model building
from tensorflow.keras.applications.vgg16 import VGG16
from tensorflow.keras.layers import Dense, Input, GlobalAveragePooling2D
from tensorflow.keras import Model
from tensorflow.keras.models import save_model, load_model
from tensorflow.keras.optimizers import Adam
# Import tensorflow functions for training
from tensorflow.keras.utils import image_dataset_from_directory
from tensorflow.data import AUTOTUNE as autotune
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
from tensorflow import cast
# Import tensorflow functions for metrics
from tensorflow.keras.metrics import Accuracy, Precision, Recall
# Import Colab runtime disconnection
from google.colab import runtime

# ------------------------------- Constants -------------------------------- #
# Split basic and advanced feature extraction layers
split_layer = 11

# Training parameters
batch_size = 4        # Batch size for gradient learning
img_height = 524      # Image pixel height
img_width = 524       # Image pixel width
epochs = 500          # Arbitrarily large epoch size for convergence

# Learning rates
learn_rate_fc = 1e-5  # Fully connected layers
learn_rate_fe1 = 1e-6 # Later feature extraction layers
learn_rate_fe2 = 1e-6 # Early feature extraction layers

# Callback to stop model training when model converges
model_callback = EarlyStopping(monitor='val_loss', patience=10, mode='min')

# Dataset paths
train_dir = ''
validation_dir = ''
best_model_path_fc = ''
best_model_path_fe1 = ''
best_model_path_fe2 = ''
dataset_name = ''

# ------------------------ Training Mode Functions ------------------------- #
# Freeze fully connected layers
def freeze_fully_connected(model):
    # Loop through fully connected layers and freeze
    i = 2
    while i < len(model.layers):
        model.layers[i].trainable = False
        i += 1
    return model

# Unfreeze set of feature extraction layers
def unfreeze_feature_extraction_block(model, start_layer, end_layer):  
    # Loop through all feature extraction layers and freeze
    j = 0
    while j < len(model.layers[1].layers):
        model.layers[1].layers[j].trainable = False
        j += 1
    # Loop through specified feature extraction layers and unfreeze
    for k in range(start_layer, end_layer, 1):
        if str(model.layers[1].layers[k]).split('.')[3] != 'batch_normalization':
            model.layers[1].layers[k].trainable = True
        k += 1
    return model

# ------------------------------ Build VGG-16 ------------------------------ #

# Create base VGG-16 model:
  # 524x524x3 input size
  # Only feature extraction layers - no classification layers
  # Feature extraction layer weights trained on ImageNet dataset
base_model = VGG16(input_shape = (img_height, img_width, 3),
                         include_top = False,
                         weights = 'imagenet'
                         )
# Freeze learning on feature extraction layers
base_model.trainable = False

# Create classification layers of VGG16
# Global average pooling
global_average_pooling = GlobalAveragePooling2D()
# Fully connected layer 1: 4096 neurons ReLU activation function
fc1 = Dense(units = 4096, activation = 'relu')
# Fully connected layer 2: 4096 neurons ReLU activation function
fc2 = Dense(units = 4096, activation = 'relu')
# Classification layer: softmax layer with 8 neurons for class labels
classifier = Dense(units = 9, activation = 'softmax')

# Connect input layer, base model and output layers
inputs = Input(shape = (img_height, img_width, 3)) # Input layer: 524x524x3 RGB images
model = base_model(inputs)                         # Feature extraction layers of VGG-16
model = global_average_pooling(model)              # Global average pooling layer
model = fc1(model)                                 # Fully connected layer: 4096 neuron ReLU
model = fc2(model)                                 # Fully connected layer: 4096 neuron ReLU
outputs = classifier(model)                        # Fully connected layer: 8 neuron softmax activation for classification

# Build VGG-16 model
vgg16 = Model(inputs, outputs, name = 'VGG-16')

# ------------------ Train VGG-16 Fully Connected Layers ------------------- #
# RGB value normalization to improve learning rate - RGB values within activation function range
def process(image, label):
    image = cast(image/255.0, 'float32')
    return image, label

# Create training dataset
train_ds = image_dataset_from_directory(
    train_dir,
    validation_split = 0,
    seed = 123,
    image_size = (img_height, img_width),
    batch_size = batch_size,
    label_mode = 'categorical'
)
# Normalize training dataset pixel values to improve learning performance
train_ds = train_ds.map(process)
# Prefetch training dataset to improve computational speed
train_ds = train_ds.cache().prefetch(buffer_size = autotune)

# Define validation dataset
val_ds = image_dataset_from_directory(
    validation_dir,
    validation_split = 0,
    seed = 123,
    image_size = (img_height, img_width),
    batch_size = batch_size,
    label_mode = 'categorical'
)
# Normalize validation pixel values to improve learning performance
val_ds = val_ds.map(process)
# Prefetch validation dataset to improve computation speed
val_ds = val_ds.cache().prefetch(buffer_size = autotune)

# Compile model:
  # Adam algorithm optimization
  # Categorical crossentropy loss function for multilabel classification
  # Metrics: accuracy, precision, recall
vgg16.compile(
    optimizer = Adam(learn_rate_fc),
    loss = 'categorical_crossentropy',
    metrics = ['accuracy', Precision(), Recall()]
)

# ------------------ Train VGG-16 Fully Connected Layers ------------------- #
print('\n|------------------------------| Training Fully Connected Layers |--------------------------------|\n')

# Define checkpoint to save model after every epoch
model_checkpoint_fc = ModelCheckpoint(best_model_path_fc + '/vgg16-' + dataset_name + '-{epoch:02d}-{val_accuracy:.4f}.hdf5', save_best_only = True, save_weights_only = False, mode= 'auto')

# Train model
vgg16.fit(
    train_ds,
    validation_data = val_ds,
    epochs = epochs,
    verbose = 1,
    callbacks = [model_checkpoint_fc, model_callback]
)

print('\n|--------------------------| Fully Connected Layers Training Complete |---------------------------|\n')

# -------------- Train VGG-16 Later Feature Extraction Layers -------------- #
print('|------------------------| Fine Tuning Later Feature Extraction Layers |--------------------------|\n')

# Load model with best accuracy
print('Loading model with best accuracy...\n')
directory = listdir(best_model_path_fc)
for file in directory:
    best_model_name = file.replace('.hdf5', '')
vgg16_fe1 = load_model(best_model_path_fc + '/' + best_model_name + '.hdf5')

# Swap learning mode
vgg16_fe1 = freeze_fully_connected(vgg16_fe1)
vgg16_fe1 = unfreeze_feature_extraction_block(vgg16_fe1, split_layer, len(vgg16_fe1.layers[1].layers))

# Define checkpoint to save best model
best_model_checkpoint_fe1 = ModelCheckpoint(best_model_path_fe1 + '/' + best_model_name + '-fe1-{epoch:02d}-{val_accuracy:.4f}.hdf5', save_best_only = True, save_weights_only = False, mode= 'auto')

# Decrease learning rate to reduce overfitting
vgg16_fe1.optimizer = Adam(learn_rate_fe1)

# Fine tune model
vgg16_fe1.fit(
    train_ds,
    validation_data = val_ds,
    epochs = epochs,
    verbose = 1,
    callbacks = [best_model_checkpoint_fe1, model_callback]
)

print('\n|--------------------| Fine Tuning Later Feature Extraction Layers Complete |---------------------|\n')

# -------------- Train VGG-16 Early Feature Extraction Layers -------------- #
print('|------------------------| Fine Tuning Early Feature Extraction Layers |--------------------------|\n')

# Load model with best accuracy
print('Loading model with best accuracy...\n')
directory = listdir(best_model_path_fe1)
for file in directory:
    best_model_name = file.replace('.hdf5', '')
vgg16_fe2 = load_model(best_model_path_fe1 + '/' + best_model_name + '.hdf5')

# Swap learning mode
vgg16_fe2 = freeze_fully_connected(vgg16_fe2)
vgg16_fe2 = unfreeze_feature_extraction_block(vgg16_fe2, 1, split_layer - 1)

# Define checkpoint to save best model
best_model_checkpoint_fe2 = ModelCheckpoint(best_model_path_fe2 + '/' + best_model_name + '-fe2-{epoch:02d}-{val_accuracy:.4f}.hdf5', save_best_only = True, save_weights_only = False, mode= 'auto')

# Decrease learning rate to reduce overfitting
vgg16_fe2.optimizer = Adam(learn_rate_fe2)

# Fine tune model
vgg16_fe2.fit(
    train_ds,
    validation_data = val_ds,
    epochs = epochs,
    verbose = 1,
    callbacks = [best_model_checkpoint_fe2, model_callback]
)

print('\n|--------------------| Fine Tuning Early Feature Extraction Layers Complete |---------------------|\n')

# -------------------------- Disconnect Runtime ---------------------------- #
runtime.unassign()

# Model Testing

In [None]:
# Import directory navigation
from os import listdir
# Import tensorflow libraries for loading models and testing dataset
from tensorflow.keras.models import load_model
from tensorflow.keras.utils import image_dataset_from_directory
from tensorflow import cast
# Import tensorflow functions for metrics
from tensorflow.keras.metrics import Accuracy, Precision, Recall
# Import tensorflow functions for model building
from tensorflow.keras import Model
# Import Colab runtime disconnection
from google.colab import runtime

# Constants for model file paths
base_model_paths = []
models = ['DenseNet121', 'Inception V3', 'InceptionResNet V2', 'MobileNetV2', 'VGG-16']
training_cycles = ['1 - Fully Connected Training', '2 - Advanced Feature Extraction Training', '3 - Basic Feature Extraction Training']
test_dir = ''

# -------------------------- Load Testing Dataset -------------------------- #
# RGB value normalization to improve learning rate - RGB values within activation function range
def process(image, label):
    image = cast(image/255.0, 'float32')
    return image, label

# Define testing dataset
test_ds = image_dataset_from_directory(
    test_dir,
    validation_split = 0,
    seed = 123,
    image_size = (524, 524),
    batch_size = 1,
    label_mode = 'categorical'
)
test_ds = test_ds.map(process)

# --------------------- Dataset and Testing Functions ---------------------- #
def test_models(model_path, test):
    # Loop through training cycles
    for cycle in training_cycles:
        print(cycle)

        # Get directory of model
        directory = listdir(model_path+'/'+cycle)
        best_model = ''

        if directory:
            # Get best model name
            for file in directory:
                best_model = file

            print(model_path+'/'+best_model)
            
            # Test best model and print results
            model = load_model(model_path+'/'+cycle+'/'+best_model)
            loss, accuracy, precision, recall = model.evaluate(test)
        print()

# ---------------------------- Testing Models ------------------------------ #
# Test DiversionNet training
print('\n# --------------------------- Experiment 1 - DiversionNet Training ---------------------------- #\n')
for model in models:
    print(model)
    test_models(base_model_paths[0]+'/'+model, test_ds)

# Test RealWaste training
print('\n# ---------------------------- Experiment 2 - RealWaste Training ------------------------------ #\n')
for model in models:
    print(model)
    test_models(base_model_paths[1]+'/'+model, test_ds)

# -------------------------- Disconnect Runtime ---------------------------- #
runtime.unassign()

# Confusion Matrix

In [None]:
# Import directory navigation
from os import listdir
# Import tensorflow libraries for loading models and testing dataset
from tensorflow.keras.models import load_model
from tensorflow.keras.utils import image_dataset_from_directory
from tensorflow import cast
# Import tensorflow functions for metrics
from tensorflow.keras.metrics import Accuracy, Precision, Recall
# Import tensorflow functions for model building
from tensorflow.keras import Model
# Import Colab runtime disconnection
from google.colab import runtime

# Constants for model file paths
model_path = ''
test_dir = ''
# Constants for labels
labels = ['Cardboard', 'Food Organics', 'Glass', 'Metal', 'Miscellaneous Trash', 'Paper', 'Plastic', 'Textile Trash', 'Vegetation']
num_labels = 9
# Image dimensions
img_width = 524
img_height = 524
# Matrix for storing predictions for each label
label_counts = [[0,0,0,0,0,0,0,0,0],
                [0,0,0,0,0,0,0,0,0],
                [0,0,0,0,0,0,0,0,0],
                [0,0,0,0,0,0,0,0,0],
                [0,0,0,0,0,0,0,0,0],
                [0,0,0,0,0,0,0,0,0],
                [0,0,0,0,0,0,0,0,0],
                [0,0,0,0,0,0,0,0,0],
                [0,0,0,0,0,0,0,0,0]]

# RGB value normalization to improve learning rate - RGB values within activation function range
def process(image, label):
    image = cast(image/255.0, 'float32')
    return image, label

# Loop through testing label datasets and return results
def test_label(label):
    print('Testing ' + label + ' images...')
    
    # Empty array for prediction counts
    predictions_count = [0,0,0,0,0,0,0,0,0]

    # Define testing dataset for with only single label populated
    test_ds = image_dataset_from_directory(
        test_dir+label,
        validation_split = 0,
        image_size = (img_width, img_height),
        batch_size = 1,
        label_mode = 'categorical'
    )

    # Normalize RGB values of pixels
    test_ds = test_ds.map(process)
    
    # Load model
    model = load_model(model_path)

    # Run model over testing label dataset
    results = model.predict(test_ds)
    
    # Parse results
    for result in results:
        index = 0
        probability = 0
        for j in range(0, num_labels):
            if result[j] > probability:
                index = j
                probability = result[j]
        predictions_count[index] += 1
    print()
    # Return predictions count for each label over label testing dataset
    return predictions_count

# ---------------------------- Run Label Tests ----------------------------- #
# Run tests over each label dataset and populate results matrix
for index in range(0, num_labels):
    label_counts[index] = test_label(labels[index])
print()

# Print arrays for each label predictions
for index in range(0, num_labels):
    print(labels[index] + ': ' + str(label_counts[index]))

# -------------------------- Disconnect Runtime ---------------------------- #
runtime.unassign()

# Mislabel Testing

In [None]:
# Import directory navigation
from os import listdir
# Import tensorflow libraries for loading models and testing dataset
from tensorflow.keras.models import load_model
from tensorflow.keras.utils import image_dataset_from_directory
from tensorflow import cast
# Import tensorflow functions for model building
from tensorflow.keras import Model
# Import argmax to get index of maximum value
from numpy import argmax
# Import Colab runtime disconnection
from google.colab import runtime

# -------------------------------- Constant -------------------------------- #
# Constants for model file paths
model_path = ''
test_dir_base = ''
# Constants for labels
labels = ['Cardboard', 'Food Organics', 'Glass', 'Metal', 'Miscellaneous Trash', 'Paper', 'Plastic', 'Textile Trash', 'Vegetation']
num_labels = 9
# Image dimensions
img_width = 524
img_height = 524
# Matrix for storing mislabelled images
mislabel_paths = [[], [], [], [], [], [], [], [], []]

# -------------------------------- Functions ------------------------------- #
# RGB value normalization to improve learning rate - RGB values within activation function range
def process(image, label):
    image = cast(image/255.0, 'float32')
    return image, label

# Test images in path and return confusion and incorrect labelling
def test_label(label, label_index):
    print('Testing ' + label + ' images...\n')
    
    # Empty array for mislabels
    mislabels = []

    # Load model and test label directory
    model = load_model(model_path)
    img_count = len(listdir(test_dir_base+label))

    # Test each image and store results
    for temp_count in range(1, img_count + 1):
        # Define testing dataset
        test_dir = test_dir_base + label + '/' + str(temp_count)
        test_ds = image_dataset_from_directory(
            test_dir,
            validation_split = 0,
            image_size = (img_width, img_height),
            batch_size = 1,
            label_mode = 'categorical'
        )
        test_ds = test_ds.map(process)
        
        result = model.predict(test_ds)

        if (argmax(result) != label_index):
            mislabels.append(test_dir+label+'/'+label)
            print('\n'+test_dir+label+'/'+str(temp_count)+'/'+label+' predicted as: '+labels[argmax(result)])

    # Set values in global arrays for mislabels
    mislabel_paths[label_index] = mislabels

# ------------ Calculate Confusion Matrix and Print Mislabels -------------- #
# Run testing
label_index = 0
for label in labels:
    test_label(label, label_index)
    label_index += 1
    print()

# Print arrays for each label predictions
for index in range(0, num_labels):
    print('|--------------------------| '+labels[index]+' |--------------------------|\n')
    for mislabel in mislabel_paths[index]:
        print(mislabel)
    print()

# -------------------------- Disconnect Runtime ---------------------------- #
runtime.unassign()