### Import

In [None]:
import numpy as np
import pandas as pd
from pathlib import Path
import os.path
from PIL import ImageFile
ImageFile.LOAD_TRUNCATED_IMAGES = True
import matplotlib.pyplot as plt
import seaborn as sns
import tensorflow as tf
from sklearn.metrics import confusion_matrix
import keras

### Connect to drive and unzip

In [None]:
# from google.colab import drive
# drive.mount('/content/drive')

# import zipfile
# zip_file_path = '/content/drive/MyDrive/Wildfirev2.zip'
# extract_to_dir = '/content/'

# # Create a ZipFile object
# with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
#     zip_ref.extractall(extract_to_dir)

# print(f"Files extracted to: {extract_to_dir}")

### Hyperparameters

In [None]:
IMG_HEIGHT = 224
IMG_WIDTH = 224
IMAGE_SIZE = (IMG_HEIGHT,IMG_WIDTH)
BATCH_SIZE = 64
NUM_OF_CLASSES = 1
EPOCHS = 1
ROOT_DIR = 'D:\Omkar\COEP BTech\BTech Sem VIII\Research AI\wildfire susceptibility'

ACTIVATION = 'relu'
PADDING = 'same'
DROPOUT_RATE = 0.1
EPSILON = 0.001

MODEL_NAME = 'vgg16' #'vgg16' or 'resnet50' or 'inceptionv3' or 'efficientnetb3' or 'proposed_model'

# only for vgg16
ADDITIONAL_TYPE = 'vgg16' #'vgg16' or 'vgg16_DO' or 'vgg16_DO_L2' or 'vgg16_BN_DO_L2'

### Dataloader

In [None]:
def prepare_image_data(image_dir, batch_size, target_size=(224, 224)):
    """
    Prepares image data for training, validation, or testing.

    Args:
        image_dir (Path): Path to the directory containing image files.
        batch_size (int): Number of images to include in each batch.
        target_size (tuple): Target size for resizing images (height, width).

    Returns:
        keras.preprocessing.image.DataFrameIterator: An iterator over the image data.
    """
    # Get file paths for all images in the directory
    filepaths = list(image_dir.glob(r'**/*.jpg'))
    
    # Extract labels from the directory structure
    labels = list(map(lambda x: os.path.split(os.path.split(x)[0])[1], filepaths))
    
    # Create a dataframe with file paths and corresponding labels
    filepaths_series = pd.Series(filepaths, name='Filepath').astype(str)
    labels_series = pd.Series(labels, name='Label')
    image_df = pd.concat([filepaths_series, labels_series], axis=1)
    
    # Initialize an ImageDataGenerator for preprocessing
    generator = keras.preprocessing.image.ImageDataGenerator(rescale=1./255)
    
    # Create an iterator for the image data
    image_data = generator.flow_from_dataframe(
        dataframe=image_df,
        x_col='Filepath',
        y_col='Label',
        target_size=target_size,
        color_mode='rgb',
        class_mode='binary',
        batch_size=batch_size,
        shuffle=(image_dir != image_dir_test)  # Shuffle only for train and validation datasets
    )
    return image_data

# Define paths
image_dir_train = Path(f'{ROOT_DIR}/Dataset/Wildfirev2/train')
image_dir_test = Path(f'{ROOT_DIR}/Dataset/Wildfirev2/test')
image_dir_val = Path(f'{ROOT_DIR}/Dataset/Wildfirev2/valid')

# Prepare data
train_images = prepare_image_data(image_dir_train, BATCH_SIZE, target_size=IMAGE_SIZE)
val_images = prepare_image_data(image_dir_val, BATCH_SIZE, target_size=IMAGE_SIZE)
test_images = prepare_image_data(image_dir_test, BATCH_SIZE, target_size=IMAGE_SIZE)

Found 20000 validated image filenames belonging to 2 classes.
Found 5640 validated image filenames belonging to 2 classes.
Found 5640 validated image filenames belonging to 2 classes.


In [None]:
def plot_sample_images(image_data, title, num_images=5, figsize=(15, 5)):
    """
    Plots a grid of sample images from the given image data.

    Args:
        image_data (keras.preprocessing.image.DataFrameIterator): The image data iterator containing images and labels.
        title (str): The title of the plot.
        num_images (int, optional): The number of images to display. Defaults to 3.
        figsize (tuple, optional): The size of the figure. Defaults to (15, 5).

    Returns:
        None: Displays the plot of sample images.
    """
    plt.figure(figsize=figsize)
    for images, labels in image_data:
        for i in range(num_images):
            ax = plt.subplot(1, 5, i + 1)
            plt.imshow(images[i])
            plt.title(int(labels[i]))  # Uncomment to display labels as titles
            plt.axis("off")
        break
    plt.suptitle(title)
    plt.show()

# Example usage
plot_sample_images(test_images, "Test Images")
plot_sample_images(train_images, "Train Images")
plot_sample_images(val_images, "Validation Images")

### Model definition

In [None]:
from keras.models import Sequential
from tensorflow import keras
from keras.layers import BatchNormalization, Flatten, Dropout, Dense
from keras.regularizers import l2
from keras.applications import VGG16, ResNet50, InceptionV3, EfficientNetB3

class VGG16Transfer:
    """
    A class to define and manage a transfer learning model using VGG16 as the base.

    Attributes:
        input_shape (tuple): The shape of the input images (height, width, channels).
        dropout_rate (float): The dropout rate for regularization.
        epsilon (float): The epsilon value for BatchNormalization layers.
        model (keras.models.Sequential): The compiled Keras Sequential model.
    """

    def __init__(self, input_shape=(224, 224, 3), dropout_rate=0.1, epsilon=0.001, additional_type='vgg16'):
        """
        Initializes the VGG16Transfer class with the given parameters and builds the model.

        Args:
            input_shape (tuple, optional): The shape of the input images. Defaults to (224, 224, 3).
            dropout_rate (float, optional): The dropout rate for Dropout layers. Defaults to 0.1.
            epsilon (float, optional): The epsilon value for BatchNormalization layers. Defaults to 0.001.
        """
        self.input_shape = input_shape
        self.dropout_rate = dropout_rate
        self.epsilon = epsilon
        self.additional_type = additional_type
        self.model = self._build_model()

    def _build_model(self):
        """
        Builds and compiles the transfer learning model using VGG16 as the base.

        Returns:
            keras.models.Sequential: The compiled Keras Sequential model.
        """
        # Load the VGG16 model with pre-trained weights
        vgg16 = VGG16(input_shape=self.input_shape, weights='imagenet', include_top=False)

        # Freeze layers of the VGG16 model
        for layer in vgg16.layers:
            layer.trainable = False

        # Create a Sequential model
        model = Sequential()
        model.add(vgg16)
        model.add(Flatten())

        # Add additional layers based on the specified type
        if self.additional_type == 'vgg16':
            pass  # No additional layers
        elif self.additional_type == 'vgg16_DO':
            model.add(Dropout(DROPOUT_RATE))
            model.add(Dense(1, activation='sigmoid'))
        elif self.additional_type == 'vgg16_DO_L2':
            model.add(Dense(256, kernel_regularizer=l2(0.01)))
            model.add(Dropout(DROPOUT_RATE))
            model.add(Dense(1, activation='sigmoid'))
        elif self.additional_type == 'vgg16_BN_DO_L2':
            model.add(BatchNormalization(epsilon=self.epsilon))
            model.add(Dropout(self.dropout_rate))
            model.add(Dense(256, kernel_regularizer=l2(0.01)))
            model.add(Dropout(self.dropout_rate))
            model.add(Dense(1, activation='sigmoid'))

        model.add(Dense(1, activation='sigmoid'))

        return model

    def summary(self):
        """
        Prints the summary of the model.
        """
        self.model.summary()

class ResNet50Model:
    """
    A class to define and manage a transfer learning model using ResNet50 as the base.

    Attributes:
        input_shape (tuple): The shape of the input images (height, width, channels).
        dropout_rate (float): The dropout rate for regularization.
        epsilon (float): The epsilon value for BatchNormalization layers.
        activation (str): The activation function to use in Dense layers.
        model (keras.models.Sequential): The compiled Keras Sequential model.
    """

    def __init__(self, input_shape=(IMG_HEIGHT, IMG_WIDTH, 3), dropout_rate=DROPOUT_RATE, epsilon=EPSILON, activation=ACTIVATION):
        """
        Initializes the ResNet50Model class with the given parameters and builds the model.

        Args:
            input_shape (tuple, optional): The shape of the input images. Defaults to (IMG_HEIGHT, IMG_WIDTH, 3).
            dropout_rate (float, optional): The dropout rate for Dropout layers. Defaults to DROPOUT_RATE.
            epsilon (float, optional): The epsilon value for BatchNormalization layers. Defaults to EPSILON.
            activation (str, optional): The activation function to use in Dense layers. Defaults to ACTIVATION.
        """
        self.input_shape = input_shape
        self.dropout_rate = dropout_rate
        self.epsilon = epsilon
        self.activation = activation
        self.model = self._build_model()

    def _build_model(self):
        """
        Builds and compiles the transfer learning model using ResNet50 as the base.

        Returns:
            keras.models.Sequential: The compiled Keras Sequential model.
        """
        # Load the ResNet50 model with pre-trained weights
        resnet50 = ResNet50(input_shape=self.input_shape, weights='imagenet', include_top=False)

        # Freeze layers of the ResNet50 model
        for layer in resnet50.layers:
            layer.trainable = False

        # Create a Sequential model
        model = Sequential()
        model.add(resnet50)
        model.add(Flatten())
        model.add(Dense(1, activation='sigmoid'))

        return model

    def summary(self):
        """
        Prints the summary of the model.
        """
        self.model.summary()

class InceptionV3Model:
    """
    A class to define and manage a transfer learning model using ResNet50 as the base.

    Attributes:
        input_shape (tuple): The shape of the input images (height, width, channels).
        dropout_rate (float): The dropout rate for regularization.
        epsilon (float): The epsilon value for BatchNormalization layers.
        activation (str): The activation function to use in Dense layers.
        model (keras.models.Sequential): The compiled Keras Sequential model.
    """

    def __init__(self, input_shape=(IMG_HEIGHT, IMG_WIDTH, 3), dropout_rate=DROPOUT_RATE, epsilon=EPSILON, activation=ACTIVATION):
        """
        Initializes the InceptionV3Model class with the given parameters and builds the model.

        Args:
            input_shape (tuple, optional): The shape of the input images. Defaults to (IMG_HEIGHT, IMG_WIDTH, 3).
            dropout_rate (float, optional): The dropout rate for Dropout layers. Defaults to DROPOUT_RATE.
            epsilon (float, optional): The epsilon value for BatchNormalization layers. Defaults to EPSILON.
            activation (str, optional): The activation function to use in Dense layers. Defaults to ACTIVATION.
        """
        self.input_shape = input_shape
        self.dropout_rate = dropout_rate
        self.epsilon = epsilon
        self.activation = activation
        self.model = self._build_model()

    def _build_model(self):
        """
        Builds and compiles the transfer learning model using ResNet50 as the base.

        Returns:
            keras.models.Sequential: The compiled Keras Sequential model.
        """
        # Load the InceptionV3 model with pre-trained weights
        # Note: InceptionV3 requires input shape of (299, 299, 3)
        inceptionv3 = InceptionV3(input_shape=(299,299,3), weights='imagenet', include_top=False)
        # Freeze layers
        for layer in inceptionv3.layers:
            layer.trainable = False

        # Create a Sequential model
        model = Sequential()
        model.add(inceptionv3)
        model.add(Flatten())
        model.add(Dense(1, activation='sigmoid'))

        return model

    def summary(self):
        """
        Prints the summary of the model.
        """
        self.model.summary()

class EfficientNetB3Model:
    """
    A class to define and manage a transfer learning model using ResNet50 as the base.

    Attributes:
        input_shape (tuple): The shape of the input images (height, width, channels).
        dropout_rate (float): The dropout rate for regularization.
        epsilon (float): The epsilon value for BatchNormalization layers.
        activation (str): The activation function to use in Dense layers.
        model (keras.models.Sequential): The compiled Keras Sequential model.
    """

    def __init__(self, input_shape=(IMG_HEIGHT, IMG_WIDTH, 3), dropout_rate=DROPOUT_RATE, epsilon=EPSILON, activation=ACTIVATION):
        """
        Initializes the EfficientNetB3Model class with the given parameters and builds the model.

        Args:
            input_shape (tuple, optional): The shape of the input images. Defaults to (IMG_HEIGHT, IMG_WIDTH, 3).
            dropout_rate (float, optional): The dropout rate for Dropout layers. Defaults to DROPOUT_RATE.
            epsilon (float, optional): The epsilon value for BatchNormalization layers. Defaults to EPSILON.
            activation (str, optional): The activation function to use in Dense layers. Defaults to ACTIVATION.
        """
        self.input_shape = input_shape
        self.dropout_rate = dropout_rate
        self.epsilon = epsilon
        self.activation = activation
        self.model = self._build_model()

    def _build_model(self):
        """
        Builds and compiles the transfer learning model using ResNet50 as the base.

        Returns:
            keras.models.Sequential: The compiled Keras Sequential model.
        """
        # Load the EfficientNetB3 model with pre-trained weights
        efficientNetB3 = EfficientNetB3(input_shape=(IMG_HEIGHT,IMG_WIDTH, 3), weights='imagenet', include_top=False)
        # Freeze layers
        for layer in efficientNetB3.layers:
            layer.trainable = False

        # Create a Sequential model
        model = Sequential()
        model.add(efficientNetB3)
        model.add(Flatten())
        model.add(Dense(1, activation='sigmoid'))

        return model

    def summary(self):
        """
        Prints the summary of the model.
        """
        self.model.summary()

class ProposedModel:
    """
    A class to define and manage a custom transfer learning model using VGG16 as the base.

    Attributes:
        input_shape (tuple): The shape of the input images (height, width, channels).
        dropout_rate (float): The dropout rate for regularization.
        epsilon (float): The epsilon value for BatchNormalization layers.
        activation (str): The activation function to use in Dense layers.
        model (keras.models.Sequential): The compiled Keras Sequential model.
    """

    def __init__(self, input_shape=(224, 224, 3), dropout_rate=0.1, epsilon=0.001, activation='relu'):
        """
        Initializes the ProposedModel class with the given parameters and builds the model.

        Args:
            input_shape (tuple, optional): The shape of the input images. Defaults to (224, 224, 3).
            dropout_rate (float, optional): The dropout rate for Dropout layers. Defaults to 0.1.
            epsilon (float, optional): The epsilon value for BatchNormalization layers. Defaults to 0.001.
            activation (str, optional): The activation function to use in Dense layers. Defaults to 'relu'.
        """
        self.input_shape = input_shape
        self.dropout_rate = dropout_rate
        self.epsilon = epsilon
        self.activation = activation
        self.model = self._build_model()

    def _build_model(self):
        """
        Builds and compiles the custom transfer learning model using VGG16 as the base.

        Returns:
            keras.models.Sequential: The compiled Keras Sequential model.
        """
        vgg16 = VGG16(input_shape=self.input_shape, weights='imagenet', include_top=False)

        # Freeze layers
        for layer in vgg16.layers:
            layer.trainable = False

        # Create a Sequential model
        model = Sequential()
        model.add(vgg16)
        model.add(Flatten())
        model.add(Dense(256, activation=self.activation))
        model.add(BatchNormalization(epsilon=self.epsilon))
        model.add(Dropout(self.dropout_rate))
        model.add(Dense(128, activation=self.activation))
        model.add(BatchNormalization(epsilon=self.epsilon))
        model.add(Dropout(self.dropout_rate))
        model.add(Dense(64, activation=self.activation))
        model.add(BatchNormalization(epsilon=self.epsilon))
        model.add(Dropout(self.dropout_rate))
        model.add(Dense(1, activation='sigmoid'))

        return model

    def summary(self):
        """
        Prints the summary of the model.
        """
        self.model.summary()


### Utils

In [None]:
import json
def save_history_to_json(history, filename):
    """
    Saves the training history to a JSON file.

    Args:
        history (keras.callbacks.History): The history object returned by model.fit().
        filename (str): The name of the JSON file to save the history.
    """
    with open(filename, 'w') as f:
        json.dump(history.history, f)

### Model Run

In [None]:
# Example usage
input_shape = (224,224,3)

if MODEL_NAME == 'vgg16':
    model = VGG16Transfer(input_shape=input_shape, dropout_rate=DROPOUT_RATE, 
                          epsilon=EPSILON, additional_type=ADDITIONAL_TYPE)
elif MODEL_NAME == 'resnet50':
    model = ResNet50Model(input_shape=input_shape,dropout_rate=DROPOUT_RATE,
                          epsilon=EPSILON, activation=ACTIVATION)
elif MODEL_NAME == 'inceptionv3':
    model = InceptionV3Model(input_shape=input_shape,dropout_rate=DROPOUT_RATE,
                             epsilon=EPSILON,activation=ACTIVATION)
elif MODEL_NAME == 'efficientnetb3':
    model = EfficientNetB3Model(input_shape=input_shape,dropout_rate=DROPOUT_RATE,
                                epsilon=EPSILON,activation=ACTIVATION)
elif MODEL_NAME == 'proposed_model':
    model = ProposedModel(input_shape=input_shape,dropout_rate=DROPOUT_RATE,
                          epsilon=EPSILON,activation=ACTIVATION)

model.model.compile(
    loss='binary_crossentropy',
    optimizer=keras.optimizers.Adam(0.001),
    metrics=['accuracy', keras.metrics.Precision(), keras.metrics.Recall()]
)

# Train the model and save the history
best_loss = float('inf')

history = model.model.fit(
    train_images,
    validation_data=val_images,
    epochs=EPOCHS,
    callbacks=[
        tf.keras.callbacks.EarlyStopping(
            monitor='val_loss',
            patience=4,
            restore_best_weights=True
        ),
        tf.keras.callbacks.ReduceLROnPlateau(
            monitor='val_loss',
            patience=3
        ),
        tf.keras.callbacks.LambdaCallback(
            on_epoch_end=lambda epoch, logs: (
                model.save(f'{MODEL_NAME}_{EPOCHS}_model.h5') 
                if logs['val_loss'] < best_loss and not (best_loss := logs['val_loss']) 
                else None
            )
        )
    ]
)
# Save the history to a JSON file
save_history_to_json(history, f'{MODEL_NAME}_{EPOCHS}_train_history.json')

### Testing

In [None]:
from keras.models import load_model
import json

def evaluate_model(model_path, test_images):
    """
    Loads a model and evaluates it on the test dataset.

    Args:
        model_path (str): Path to the saved model file.
        test_images (keras.preprocessing.image.DataFrameIterator): Test dataset iterator.

    Returns:
        None: Prints evaluation metrics and displays the confusion matrix.
    """
    # Load the model
    model = load_model(model_path)

    # Evaluate the model
    results = model.evaluate(test_images)
    print("Test Loss: {:.5f}".format(results[0]))
    print("Test Accuracy: {:.2f}%".format(results[1] * 100))

    # Generate predictions
    predictions = (model.predict(test_images) >= 0.5).astype(int)

    # Compute confusion matrix
    cm = confusion_matrix(test_images.labels, predictions, labels=[0, 1])
    plt.figure(figsize=(6, 6))
    sns.heatmap(cm, annot=True, fmt='g', vmin=0, cmap='Blues', cbar=False)
    plt.xticks(ticks=[0.5, 1.5], labels=["No Wildfire", "Wildfire"])
    plt.yticks(ticks=[0.5, 1.5], labels=["No Wildfire", "Wildfire"])
    plt.xlabel("Predicted")
    plt.ylabel("Actual")
    plt.title("Confusion Matrix")
    plt.show()

    # Evaluate with batch size
    results = model.evaluate(test_images, batch_size=16)
    print("Loss:", results[0])
    print("Accuracy:", results[1])
    print("Precision:", results[2])
    print("Recall:", results[3])

def save_test_results(results, model_name):
    test_results = {
        "Test Loss": results[0],
        "Test Accuracy": results[1],
        "Precision": results[2],
        "Recall": results[3]
    }

    with open(f'{model_name}_test_results.json', 'w') as f:
        json.dump(test_results, f)

# Example usage
checkpoint_path = f'{MODEL_NAME}_{EPOCHS}_model.h5'

results = evaluate_model(checkpoint_path, test_images)
save_test_results(results, f'{MODEL_NAME}_{EPOCHS}')

In [None]:
def plot_training_history(history):
    """
    Plots the training and validation loss and accuracy from the training history.

    Args:
        history (keras.callbacks.History): The history object returned by model.fit().
    """
    # Extracting metrics
    loss = history.history['loss']
    accuracy = history.history['accuracy']
    val_loss = history.history['val_loss']
    val_accuracy = history.history['val_accuracy']

    # Number of epochs
    epochs = range(1, len(loss) + 1)

    # Plotting subplots
    plt.figure(figsize=(15, 10))

    # Loss subplot
    plt.subplot(2, 1, 1)
    plt.plot(epochs, loss, label='Training Loss')
    plt.plot(epochs, val_loss, label='Validation Loss')
    plt.title('Training and Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()

    # Accuracy subplot
    plt.subplot(2, 1, 2)
    plt.plot(epochs, accuracy, label='Training Accuracy')
    plt.plot(epochs, val_accuracy, label='Validation Accuracy')
    plt.title('Training and Validation Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend()

    plt.tight_layout()
    plt.show()

# Example usage
plot_training_history(history)

### Generate Model Diagram

In [None]:
from keras.utils import plot_model
plot_model(model, show_shapes=True, show_layer_names=True)

In [None]:
import visualkeras
visualkeras.layered_view(model, legend=True) # without custom font
#from PIL import ImageFont
#font = ImageFont.truetype("arial.ttf", 12)
visualkeras.layered_view(model, legend=True) # selected font