In [1]:
import os
import os.path as op
import json
from pathlib import Path
import shutil
import logging
import numpy as np
from tqdm import tqdm
from skimage import io
import warnings
warnings.filterwarnings('ignore')
import cv2
import matplotlib.pyplot as plt


  "class": algorithms.Blowfish,


In [2]:
# Logging configuration
logging.basicConfig(level=logging.INFO,
                    datefmt='%H:%M:%S',
                    format='%(asctime)s | %(levelname)-5s | %(module)-15s | %(message)s')

IMAGE_SIZE = (299, 299)  # All images contained in this dataset are 299x299 (originally, to match Inception v3 input size)
SEED = 17

# Head directory containing all image subframes. Update with the relative path of your data directory
data_head_dir = Path('data/data/')

# Find all subframe directories
subdirs = [Path(subdir.stem) for subdir in data_head_dir.iterdir() if subdir.is_dir()]
src_image_ids = ['_'.join(a_path.name.split('_')[:3]) for a_path in subdirs]

# Load train/val/test subframe IDs
def load_text_ids(file_path):
    """Simple helper to load all lines from a text file"""
    with open(file_path, 'r') as f:
        lines = [line.strip() for line in f.readlines()]
    return lines

# Load the subframe names for the three data subsets
train_ids = load_text_ids('train_source_images.txt')
validate_ids = load_text_ids('val_source_images.txt')
test_ids = load_text_ids('test_source_images.txt')

# Generate a list containing the dataset split for the matching subdirectory names
subdir_splits = []
for src_id in src_image_ids:
    if src_id in train_ids:
        subdir_splits.append('train')
    elif src_id in validate_ids:
        subdir_splits.append('validate')
    elif(src_id in test_ids):
        subdir_splits.append('test')
    else:
        logging.warning(f'{src_id}: Did not find designated split in train/validate/test list.')
        subdir_splits.append(None)


In [4]:
import tensorflow as tf
from tensorflow.keras import layers, models, optimizers
from tensorflow.keras.applications import EfficientNetB0, ResNet50, VGG16
from tensorflow.keras.callbacks import EarlyStopping
from sklearn.metrics import classification_report, confusion_matrix
import matplotlib.pyplot as plt
from tensorflow.keras.preprocessing.image import img_to_array, load_img
import tensorflow as tf
import os
import random
import numpy as np
from PIL import Image
from pathlib import Path
from tensorflow.keras.applications.efficientnet import preprocess_input

def load_and_preprocess_transfer_learning(img_loc, label):
    def _inner_function(img_loc, label):
        img_loc_str = img_loc.numpy().decode('utf-8')
        img = Image.open(img_loc_str).convert('RGB')
        img = np.array(img)
        img = tf.image.resize(img, [224, 224])  # Adjust size for EfficientNetB0
        img = preprocess_input(img)  # Preprocess for EfficientNetB0
        label = 1 if label.numpy().decode('utf-8') == 'frost' else 0
        return img, label
    
    X, y = tf.py_function(_inner_function, [img_loc, label], [tf.float32, tf.int64])
    X.set_shape([224, 224, 3])  # Adjust shape for EfficientNetB0
    y.set_shape([])
    
    return X, y

def load_subdir_data_transfer_learning(dir_path, image_size, seed=None):
    """Helper to create a TF dataset from each image subdirectory for transfer learning"""
    tile_dir = dir_path / Path('tiles')
    label_dir = dir_path / Path('labels')
    
    loc_list = []
    
    for folder in os.listdir(tile_dir):
        if os.path.isdir(os.path.join(tile_dir, folder)):
            for file in os.listdir(os.path.join(tile_dir, folder)):
                if file.endswith(".png"):
                    loc_list.append((os.path.join(os.path.join(tile_dir, folder), file), folder))

    return loc_list


# Adapt the original data preprocessing for transfer learning
tf_data_train_transfer, tf_data_test_transfer, tf_data_val_transfer = [], [], []
tf_dataset_train_transfer, tf_dataset_test_transfer, tf_dataset_val_transfer = [], [], []

for subdir, split in zip(subdirs, subdir_splits):
    full_path = data_head_dir / subdir
    if split == 'validate':
        tf_data_val_transfer.extend(load_subdir_data_transfer_learning(full_path, IMAGE_SIZE, SEED))
    elif split == 'train':
        tf_data_train_transfer.extend(load_subdir_data_transfer_learning(full_path, IMAGE_SIZE, SEED))
    elif split == 'test':
        tf_data_test_transfer.extend(load_subdir_data_transfer_learning(full_path, IMAGE_SIZE, SEED))

random.shuffle(tf_data_train_transfer)
img_list_transfer, label_list_transfer = zip(*tf_data_train_transfer)
img_list_t_transfer = tf.convert_to_tensor(img_list_transfer)
lb_list_t_transfer = tf.convert_to_tensor(label_list_transfer)
buffer_size = 64
batch_size = 32
tf_dataset_train_transfer = tf.data.Dataset.from_tensor_slices((img_list_t_transfer, lb_list_t_transfer))
tf_dataset_train_transfer = tf_dataset_train_transfer.map(load_and_preprocess_transfer_learning,
                                                          num_parallel_calls=tf.data.experimental.AUTOTUNE)
tf_dataset_train_transfer = tf_dataset_train_transfer.shuffle(buffer_size=buffer_size).batch(batch_size)

random.shuffle(tf_data_val_transfer)
img_list_transfer, label_list_transfer = zip(*tf_data_val_transfer)
img_list_t_transfer = tf.convert_to_tensor(img_list_transfer)
lb_list_t_transfer = tf.convert_to_tensor(label_list_transfer)

tf_dataset_val_transfer = tf.data.Dataset.from_tensor_slices((img_list_t_transfer, lb_list_t_transfer))
tf_dataset_val_transfer = tf_dataset_val_transfer.map(load_and_preprocess_transfer_learning,
                                                      num_parallel_calls=tf.data.experimental.AUTOTUNE)
tf_dataset_val_transfer = tf_dataset_val_transfer.shuffle(buffer_size=buffer_size).batch(batch_size)

random.shuffle(tf_data_test_transfer)
img_list_transfer, label_list_transfer = zip(*tf_data_test_transfer)
img_list_t_transfer = tf.convert_to_tensor(img_list_transfer)
lb_list_t_transfer = tf.convert_to_tensor(label_list_transfer)

tf_dataset_test_transfer = tf.data.Dataset.from_tensor_slices((img_list_t_transfer, lb_list_t_transfer))
tf_dataset_test_transfer = tf_dataset_test_transfer.map(load_and_preprocess_transfer_learning,
                                                        num_parallel_calls=tf.data.experimental.AUTOTUNE)
tf_dataset_test_transfer = tf_dataset_test_transfer.shuffle(buffer_size=buffer_size).batch(batch_size)


# Function to build transfer learning model
def build_transfer_learning_model(base_model):
    model = models.Sequential()
    model.add(base_model)
    model.add(layers.GlobalAveragePooling2D())
    model.add(layers.Dense(256, activation='relu'))
    model.add(layers.BatchNormalization())
    model.add(layers.Dropout(0.3))
    model.add(layers.Dense(1, activation='sigmoid'))

    # Freeze the pre-trained layers
    for layer in base_model.layers:
        layer.trainable = False

    return model

# Define input size
input_size = (224, 224, 3)

# Build models for EfficientNetB0, ResNet50, and VGG16
resnet50_model = ResNet50(weights='imagenet', include_top=False, input_shape=input_size)
vgg16_model = VGG16(weights='imagenet', include_top=False, input_shape=input_size)

# Build transfer learning models
resnet50_transfer_model = build_transfer_learning_model(resnet50_model)
vgg16_transfer_model = build_transfer_learning_model(vgg16_model)

# Compile models
def compile_model(model):
    model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
    return model

resnet50_transfer_model = compile_model(resnet50_transfer_model)
vgg16_transfer_model = compile_model(vgg16_transfer_model)

# Function to train transfer learning model
def train_transfer_learning_model(model, train_data, val_data, epochs=20):
    early_stopping = EarlyStopping(monitor='val_loss', patience=7, restore_best_weights=True)

    history = model.fit(
        train_data,
        epochs=epochs,
        validation_data=val_data,
        callbacks=[early_stopping]
    )
    
    return model, history

epochs = 20

resnet50_transfer_model, resnet50_history = train_transfer_learning_model(
    resnet50_transfer_model, tf_dataset_train_transfer, tf_dataset_val_transfer, epochs=epochs
)
vgg16_transfer_model, vgg16_history = train_transfer_learning_model(
    vgg16_transfer_model, tf_dataset_train_transfer, tf_dataset_val_transfer, epochs=epochs
)

# Evaluate and report metrics
def evaluate_and_report(model, test_data):
    y_pred = model.predict(test_data)
    y_pred_binary = (y_pred > 0.5).astype(int)
    
    y_true = tf.concat([label for _, label in test_data], axis=0).numpy()
    
    print("Classification Report:")
    print(classification_report(y_true, y_pred_binary))
    
    print("Confusion Matrix:")
    print(confusion_matrix(y_true, y_pred_binary))

# Evaluate transfer learning models

print("\nEvaluation for ResNet50 Transfer Learning Model:")
evaluate_and_report(resnet50_transfer_model, tf_dataset_test_transfer)

print("\nEvaluation for VGG16 Transfer Learning Model:")
evaluate_and_report(vgg16_transfer_model, tf_dataset_test_transfer)

# Plot training and validation errors vs. epochs
def plot_training_history(history):
    plt.plot(history.history['loss'], label='Training Loss')
    plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()
    plt.show()

# Plot training history for transfer learning models
print("\nTraining History for ResNet50 Transfer Learning Model:")
plot_training_history(resnet50_history)

print("\nTraining History for VGG16 Transfer Learning Model:")
plot_training_history(vgg16_history)







Epoch 1/20






Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20