In [11]:
import os
import os.path as op
import json
from pathlib import Path
import shutil
import logging
import numpy as np
from tqdm import tqdm
from skimage import io
from sklearn.metrics import classification_report
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import numpy as np
import warnings
from sklearn.metrics import confusion_matrix
from sklearn.metrics import accuracy_score
from sklearn.metrics import precision_score, recall_score, f1_score
import tensorflow as tf
from tensorflow.keras import layers, models, regularizers
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.applications import EfficientNetB0, ResNet50, VGG16
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D, Dropout, BatchNormalization, Activation
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.layers import Input

warnings.filterwarnings('ignore')
warnings.filterwarnings("ignore", category=DeprecationWarning, module="tensorflow")
warnings.simplefilter(action='ignore', category=DeprecationWarning)

In [2]:
# Logging configuration
logging.basicConfig(level=logging.INFO,
                    datefmt='%H:%M:%S',
                    format='%(asctime)s | %(levelname)-5s | %(module)-15s | %(message)s')

IMAGE_SIZE = (299, 299)  # All images contained in this dataset are 299x299 (originally, to match Inception v3 input size)
SEED = 17

# Head directory containing all image subframes. Update with the relative path of your data directory
data_head_dir = Path('../Data/data')

# Find all subframe directories
subdirs = [Path(subdir.stem) for subdir in data_head_dir.iterdir() if subdir.is_dir()]
src_image_ids = ['_'.join(a_path.name.split('_')[:3]) for a_path in subdirs]

In [3]:
# Load train/val/test subframe IDs
def load_text_ids(file_path):
    """Simple helper to load all lines from a text file"""
    with open(file_path, 'r') as f:
        lines = [line.strip() for line in f.readlines()]
    return lines

# Load the subframe names for the three data subsets
train_ids = load_text_ids('../train_source_images.txt')
validate_ids = load_text_ids('../val_source_images.txt')
test_ids = load_text_ids('../test_source_images.txt')

# Generate a list containing the dataset split for the matching subdirectory names
subdir_splits = []
for src_id in src_image_ids:
    if src_id in train_ids:
        subdir_splits.append('train')
    elif src_id in validate_ids:
        subdir_splits.append('validate')
    elif(src_id in test_ids):
        subdir_splits.append('test')
    else:
        logging.warning(f'{src_id}: Did not find designated split in train/validate/test list.')
        subdir_splits.append(None)

In [4]:
import random
import tensorflow as tf
from PIL import Image 

def load_and_preprocess(img_loc, label):
    
    def _inner_function(img_loc, label):
        
        # Convert tensor to native type
        img_loc_str = img_loc.numpy().decode('utf-8')
        label_str = label.numpy().decode('utf-8')
        
        img = Image.open(img_loc_str).convert('RGB')
        
        
        return img, 1 if label_str=='frost' else 0

    # Wrap the Python function
    X, y = tf.py_function(_inner_function, [img_loc, label], [tf.float32, tf.int64])
    
    return X, y

def load_subdir_data(dir_path, image_size, seed=None):
    
    """Helper to create a TF dataset from each image subdirectory"""
    
    # Grab only the classes that (1) we want to keep and (2) exist in this directory
    tile_dir = dir_path / Path('tiles')
    label_dir = dir_path /Path('labels')
    
    loc_list = []
    
    for folder in os.listdir(tile_dir):
        if os.path.isdir(os.path.join(tile_dir, folder)):
            for file in os.listdir(os.path.join(tile_dir, folder)):
                if file.endswith(".png"):
                    loc_list.append((os.path.join(os.path.join(tile_dir, folder), file), folder))

    return loc_list

# Loop over all subframes, loading each into a list
tf_data_train, tf_data_test, tf_data_val = [], [], []
tf_dataset_train, tf_dataset_test, tf_dataset_val = [], [], []

# Update the batch and buffer size as per your model requirements
buffer_size = 64
batch_size = 32

for subdir, split in zip(subdirs, subdir_splits):
    full_path = data_head_dir / subdir
    if split=='validate':
        tf_data_val.extend(load_subdir_data(full_path, IMAGE_SIZE, SEED))
    elif split=='train':
        tf_data_train.extend(load_subdir_data(full_path, IMAGE_SIZE, SEED))
    elif split=='test':
        tf_data_test.extend(load_subdir_data(full_path, IMAGE_SIZE, SEED))
        
random.shuffle(tf_data_train)
img_list, label_list = zip(*tf_data_train)
img_list_t = tf.convert_to_tensor(img_list)
lb_list_t = tf.convert_to_tensor(label_list)

tf_dataset_train = tf.data.Dataset.from_tensor_slices((img_list_t, lb_list_t))
tf_dataset_train = tf_dataset_train.map(load_and_preprocess, num_parallel_calls=tf.data.experimental.AUTOTUNE)
tf_dataset_train = tf_dataset_train.shuffle(buffer_size=buffer_size).batch(batch_size) 

random.shuffle(tf_data_val)
img_list, label_list = zip(*tf_data_val)
img_list_t = tf.convert_to_tensor(img_list)
lb_list_t = tf.convert_to_tensor(label_list)

tf_dataset_val = tf.data.Dataset.from_tensor_slices((img_list_t, lb_list_t))
tf_dataset_val = tf_dataset_val.map(load_and_preprocess, num_parallel_calls=tf.data.experimental.AUTOTUNE)
tf_dataset_val = tf_dataset_val.shuffle(buffer_size=buffer_size).batch(batch_size) 

random.shuffle(tf_data_test)
img_list, label_list = zip(*tf_data_test)
img_list_t = tf.convert_to_tensor(img_list)
lb_list_t = tf.convert_to_tensor(label_list)

tf_dataset_test = tf.data.Dataset.from_tensor_slices((img_list_t, lb_list_t))
tf_dataset_test = tf_dataset_test.map(load_and_preprocess, num_parallel_calls=tf.data.experimental.AUTOTUNE)
tf_dataset_test = tf_dataset_test.shuffle(buffer_size=buffer_size).batch(batch_size) 

In [5]:
train_dataset_size = tf.data.experimental.cardinality(tf_dataset_train).numpy()
print("Train dataset", train_dataset_size)
val_dataset_size = tf.data.experimental.cardinality(tf_dataset_val).numpy()
print("Valid dataset", val_dataset_size)
test_dataset_size = tf.data.experimental.cardinality(tf_dataset_test).numpy()
print("Test dataset", test_dataset_size)

total_train_images = train_dataset_size * 32
total_val_images = val_dataset_size * 32
total_test_images = test_dataset_size * 32

print("Total training images:", total_train_images)
print("Total validation images:", total_val_images)
print("Total test images:", total_test_images)

for batch in tf_dataset_train.take(1):
    images, labels = batch

# Print the shape of the images and labels
print("Shape of images:", images.shape)
print("Shape of labels:", labels.shape)
for batch in tf_dataset_train.take(1):
    images, labels = batch

# Print the values of the labels
print("Labels:", labels.numpy())
# print("image:", images.numpy())

Train dataset 928
Valid dataset 353
Test dataset 401
Total training images: 29696
Total validation images: 11296
Total test images: 12832
Shape of images: (32, 299, 299, 3)
Shape of labels: (32,)
Labels: [1 1 0 1 1 0 0 1 1 1 1 0 0 1 1 1 1 1 0 0 0 1 0 1 1 0 1 0 1 0 1 1]


In [6]:
# https://www.tensorflow.org/api_docs/python/tf/image

def random_augmentation(img, label):
    img = tf.image.random_flip_left_right(img, seed=17)
    img = tf.image.random_flip_up_down(img, seed=17)
    img = tf.image.rot90(img, tf.random.uniform(shape=[], minval=0, maxval=4, dtype=tf.int32, seed=17))
    if img.shape[-1] == 3:
        img = tf.keras.preprocessing.image.random_zoom(img, (0.75, 1.25))
    img = tf.image.random_contrast(img, lower=0.5, upper=1.5, seed=17)
    
    return img, label

# comibining augmented dataset with new one
tf_dataset_train_augmented = tf_dataset_train.map(random_augmentation)
tf_dataset_train_aug = tf_dataset_train.concatenate(tf_dataset_train_augmented)

In [7]:
train_dataset_size = tf.data.experimental.cardinality(tf_dataset_train_aug).numpy()
print("Train dataset augmented", train_dataset_size)

Train dataset augmented 1856


In [8]:
tf_dataset_train = tf_dataset_train.map(lambda img, label: (img, tf.one_hot(label, depth=2)), num_parallel_calls=tf.data.experimental.AUTOTUNE)
tf_dataset_train_aug = tf_dataset_train_aug.map(lambda img, label: (img, tf.one_hot(label, depth=2)), num_parallel_calls=tf.data.experimental.AUTOTUNE)
tf_dataset_val = tf_dataset_val.map(lambda img, label: (img, tf.one_hot(label, depth=2)), num_parallel_calls=tf.data.experimental.AUTOTUNE)
tf_dataset_test = tf_dataset_test.map(lambda img, label: (img, tf.one_hot(label, depth=2)), num_parallel_calls=tf.data.experimental.AUTOTUNE)

In [9]:
def evaluate_model(model, test_dataset, model_name):
    all_predictions = []
    all_labels = []

    # predictions
    for images, labels in test_dataset:
        predictions = model.predict(images)
        predicted_labels = np.argmax(predictions, axis=1)

        all_predictions.extend(predicted_labels)
        all_labels.extend(np.argmax(labels.numpy(), axis=1))
        
    predicted_labels = np.array(all_predictions)
    true_labels = np.array(all_labels)

    # metrics
    precision = precision_score(true_labels, predicted_labels)
    recall = recall_score(true_labels, predicted_labels)
    f1 = f1_score(true_labels, predicted_labels)

    # df for comparison
    results_df = pd.DataFrame({'Model': [model_name],'Precision': [precision],'Recall': [recall], 'F1 Score': [f1]})

    return results_df

In [12]:
warnings.filterwarnings("ignore", category=DeprecationWarning, module="tensorflow")
warnings.simplefilter(action='ignore', category=DeprecationWarning)
model = models.Sequential()
warnings.filterwarnings("ignore", category=DeprecationWarning, module="tensorflow")
warnings.simplefilter(action='ignore', category=DeprecationWarning)
# 1st layer
model.add(layers.Conv2D(32, (3, 3), input_shape=(299, 299, 3)))
model.add(layers.BatchNormalization())
model.add(layers.Activation('relu'))
model.add(layers.MaxPooling2D((2, 2)))
model.add(layers.Dropout(0.3))

# 2nd layer
model.add(layers.Conv2D(64, (3, 3)))
model.add(layers.BatchNormalization())
model.add(layers.Activation('relu'))
model.add(layers.MaxPooling2D((2, 2)))
model.add(layers.Dropout(0.3))

# 3rd layer
model.add(layers.Conv2D(128, (3, 3)))
model.add(layers.BatchNormalization())
model.add(layers.Activation('relu'))
model.add(layers.MaxPooling2D((2, 2)))
model.add(layers.Dropout(0.3))

model.add(layers.Flatten())
model.add(layers.Dense(256, kernel_regularizer=regularizers.l2(0.01)))
model.add(layers.BatchNormalization())
model.add(layers.Activation('relu'))
model.add(layers.Dropout(0.3))

# https://datascience.stackexchange.com/questions/45246/can-i-use-the-softmax-function-with-a-binary-classification-in-deep-learning
# add softmax
model.add(layers.Dense(2, activation='softmax'))
model.compile(optimizer='adam', loss='binary_crossentropy')

# early stopping
early_stopping = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)

# train
history = model.fit(tf_dataset_train_aug, epochs=20, validation_data=tf_dataset_val, callbacks=[early_stopping])

# plot training and validation loss
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
 341/1856 [====>.........................] - ETA: 57:09 - loss: 1.0802

KeyboardInterrupt: 

In [None]:
print('Assessment of CNN + MLP model')
print('Evaluation metrics for training data')
evaluate_model(model, tf_dataset_train, model_name='CNN + MLP')
print('Evaluation metrics for validation data')
evaluate_model(model, tf_dataset_val, model_name='CNN + MLP')
print('Evaluation metrics for testing data')
evaluate_model(model, tf_dataset_test, model_name='CNN + MLP')