In [None]:
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.utils import to_categorical
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
import matplotlib.pyplot as plt
from PIL import Image
import shutil
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.models import Sequential
from tensorflow.keras import regularizers
import cv2
from sklearn.model_selection import train_test_split

import rasterio
from sklearn.utils import shuffle
import openslide

import os
import sys
from shutil import copyfile, move
from tqdm import tqdm
import h5py
import random
from random import randint

import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras import Input
import requests
from io import BytesIO
from PIL import Image
from tensorflow.keras.layers import InputLayer, Input
from tensorflow.keras.layers import Conv2D, Dense, Flatten, Dropout, Activation
from tensorflow.keras.layers import BatchNormalization, Reshape, MaxPooling2D, GlobalAveragePooling2D
from tensorflow.keras.callbacks import Callback
from tensorflow.keras.callbacks import ModelCheckpoint, ReduceLROnPlateau, EarlyStopping
from keras.losses import mean_squared_error
import keras as K
from sklearn.metrics import cohen_kappa_score
from tensorflow.keras import models
from tensorflow.keras.models import load_model


# Load Image Data into dataframe

In [None]:
def get_image_paths_and_labels(fire_dir, non_fire_dir):
    image_path_list = []
    Labels = []
    
    fire_label = 1
    non_fire_label = 0
    
    #Fetch Image paths from fire directory
    for root, dirs, files in os.walk(fire_dir):
        for file in files:
            if file.endswith(('.png', '.jpg', '.jpeg')):  # Add more extensions if needed
                image_path = os.path.join(root, file)
                image_path_list.append(image_path)
                Labels.append(fire_label)
    
    # Fetch image paths from the non_fire directory
    for root, dirs, files in os.walk(non_fire_dir):
        for file in files:
            if file.endswith(('.png', '.jpg', '.jpeg')):
                image_path = os.path.join(root, file)
                image_path_list.append(image_path)
                Labels.append(non_fire_label)
    
    return image_path_list, Labels

#Create Dataframe
def create_image_dataframe(image_path_list, Labels):
    # Generate unique image IDs
    image_ids = [f"img_{i+1}" for i in range(len(image_path_list))]
    
    df = pd.DataFrame({
        'image_id': image_ids,
        'image_path': image_path_list,
        'fire': Labels  #1 for fire, 0 for non-fire
    })
    
    return df


fire_dir = "/kaggle/input/firedataset-jpg-224/FireDataset-V6-JPG-Reshaped224/train/fire"
non_fire_dir = "/kaggle/input/firedataset-jpg-224/FireDataset-V6-JPG-Reshaped224/train/non_fire"
image_path_list, Labels = get_image_paths_and_labels(fire_dir, non_fire_dir)

# Create the DataFrame for train set
df = create_image_dataframe(image_path_list, Labels)
print('Train Data set')
print(df.head())

fire_dir = "/kaggle/input/firedataset-jpg-224/FireDataset-V6-JPG-Reshaped224/test/fire"
non_fire_dir = "/kaggle/input/firedataset-jpg-224/FireDataset-V6-JPG-Reshaped224/test/non_fire"
image_path_list, Labels = get_image_paths_and_labels(fire_dir, non_fire_dir)

# Create DataFrame for test set
test_df = create_image_dataframe(image_path_list, Labels)
print('Test Data set')
print(test_df.head())


In [None]:
#Shuffle and split generated dataframe into train and validation sets, with validation ratio 0.2

def shuffle_and_split(df, validation_ratio=0.2):
    df = df.sample(frac=1, random_state=42).reset_index(drop=True)
    
    train_df, val_df = train_test_split(df, test_size=validation_ratio, random_state=42)
    
    # Reindex the resulting DataFrames from 0
    train_df = train_df.reset_index(drop=True)
    val_df = val_df.reset_index(drop=True)
    
    return train_df, val_df

train_df, val_df = shuffle_and_split(df)

print("Training Set:")
print(train_df.head())

print("\nValidation Set:")
print(val_df.head())


In [None]:
IMAGE_SHAPE =(150,150,3)
BATCH_SIZE=16
IMG_HEIGHT= 150
IMG_WIDTH =150

In [None]:
#Applies edge filtering, heatmap, and color masks to each image. 
def apply_masks(image):
    #Generates an edge mask using the Canny edge detector.
    edge_mask = generate_edge_mask(image)
    
    #Heatmap mask - convert image to grayscale and apply a threshold to find bright regions
    heatmap_mask = generate_heatmap_mask(image)
    
    #Color mask
    color_mask = generate_color_mask(image)

    #Apply edge mask (edge detection in grayscale)
    edge_mask_colored = cv2.cvtColor(edge_mask, cv2.COLOR_GRAY2BGR)  # Convert to 3 channels
    masked_image1 = cv2.bitwise_and(image, edge_mask_colored)

    # Apply heatmap mask (highlight areas likely to be hot)
    heatmap_mask_colored = cv2.applyColorMap(heatmap_mask, cv2.COLORMAP_JET)  # Apply colormap to create a heatmap effect
    masked_image2 = cv2.addWeighted(image, 0.5, heatmap_mask_colored, 0.5, 0)  # Blend original image with heatmap

    #Generates a color mask by isolating fire-like colors (reds, oranges, yellows).
    color_mask_colored = cv2.cvtColor(color_mask, cv2.COLOR_GRAY2BGR)  # Convert to 3 channels
    masked_image3 = cv2.bitwise_and(image, color_mask_colored)  # Highlight areas with fire-like colors

    return masked_image1, masked_image2, masked_image3

def generate_edge_mask(image):
    gray_image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    edges = cv2.Canny(gray_image, 100, 200)  # Thresholds for Canny edge detection
    return edges

def generate_heatmap_mask(image):
    gray_image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    _, heatmap_mask = cv2.threshold(gray_image, 200, 255, cv2.THRESH_BINARY)
    return heatmap_mask

def generate_color_mask(image):
    # Convert image to HSV color space
    hsv_image = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
    lower_bound = np.array([0, 100, 100])  # Lower bound for red colors
    upper_bound = np.array([10, 255, 255])  #Upper bound for red colors
    mask1 = cv2.inRange(hsv_image, lower_bound, upper_bound)

    lower_bound = np.array([10, 100, 100])  
    upper_bound = np.array([30, 255, 255]) 
    mask2 = cv2.inRange(hsv_image, lower_bound, upper_bound)

    lower_bound = np.array([30, 100, 100])
    upper_bound = np.array([50, 255, 255])
    mask3 = cv2.inRange(hsv_image, lower_bound, upper_bound)

    color_mask = cv2.bitwise_or(mask1, mask2)
    color_mask = cv2.bitwise_or(color_mask, mask3)

    return color_mask



Create custom generator to generate 3 seperate versions of the same image

In [None]:

def custom_generator(image_path_list, Labels, batch_size=16):
    while True:
        for start in range(0, len(image_path_list), batch_size):
            X_batch_1 = []
            X_batch_2 = []
            X_batch_3 = []
            Y_batch = []
            
            # Ensure that the index doesn't exceed the size of the image_path_list
            end = min(start + batch_size, len(image_path_list))

            for i in range(start, end):
                image_path = image_path_list[i]
                label = Labels[i] 
                
                # Load image 
                image = cv2.imread(image_path)
                image = cv2.resize(image, (150, 150))  # Resize image
                
                # Apply masks
                masked_image1, masked_image2, masked_image3 = apply_masks(image)
                
                #Normalize the images
                masked_image1 = masked_image1 / 255.0
                masked_image2 = masked_image2 / 255.0
                masked_image3 = masked_image3 / 255.0
                
                # Append masked images to respective batches
                X_batch_1.append(masked_image1)
                X_batch_2.append(masked_image2)
                X_batch_3.append(masked_image3)
                
                # Append the relevant label
                Y_batch.append(label)
            
            # Convert to numpy arrays
            X_batch_1 = np.array(X_batch_1)
            X_batch_2 = np.array(X_batch_2)
            X_batch_3 = np.array(X_batch_3)
            
            Y_batch = np.array(Y_batch).reshape(-1, 1)
            
            yield (X_batch_1, X_batch_2, X_batch_3), Y_batch


In [None]:
url='https://i0.wp.com/yaleclimateconnections.org/wp-content/uploads/2023/03/323_wildfireroad_1600.png?fit=1200%2C675&ssl=1'
response = requests.get(url)
image_np = np.asarray(bytearray(response.content), dtype=np.uint8)
image = cv2.imdecode(image_np, cv2.IMREAD_COLOR)
masked_image1, masked_image2, masked_image3 = apply_masks(image)

# Display the images in a 2x2 grid
plt.figure(figsize=(10, 10))

# Original image
plt.subplot(2, 2, 1)
plt.imshow(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
plt.title('Original Image')
plt.axis('off')

# Masked image 1
plt.subplot(2, 2, 2)
plt.imshow(cv2.cvtColor(masked_image1, cv2.COLOR_BGR2RGB))
plt.title('Edge Mask')
plt.axis('off')

# Masked image 2
plt.subplot(2, 2, 3)
plt.imshow(cv2.cvtColor(masked_image2, cv2.COLOR_BGR2RGB))
plt.title('Heatmap Mask')
plt.axis('off')

# Masked image 3
plt.subplot(2, 2, 4)
plt.imshow(cv2.cvtColor(masked_image3, cv2.COLOR_BGR2RGB))
plt.title('Smoke Mask')
plt.axis('off')

plt.show()


In [None]:
def create_model(IMAGE_SHAPE):
    """
    Creates a CNN model with 3 input branches and a binary output.
    
    Parameters:
    - IMAGE_SHAPE: Shape of each input image (height, width, channels)
    
    Returns:
    - Compiled Keras model
    """
    num_channels = IMAGE_SHAPE[2]

    def branch(input_image):
        x = layers.Conv2D(32, (3, 3), activation='relu', padding='same')(input_image)
        x = layers.MaxPooling2D(pool_size=(2, 2))(x)
        x = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(x)
        x = layers.MaxPooling2D(pool_size=(2, 2))(x)
        x = layers.Conv2D(128, (3, 3), activation='relu', padding='same')(x)
        x = layers.GlobalAveragePooling2D()(x)
        x = layers.Dense(128, activation='relu')(x)
        x = layers.Dropout(0.5)(x)
        return x

    # Input layers for 3 channels
    input_image1 = layers.Input(shape=IMAGE_SHAPE)
    input_image2 = layers.Input(shape=IMAGE_SHAPE)
    input_image3 = layers.Input(shape=IMAGE_SHAPE)

    # Create Branches for each input
    branch1 = branch(input_image1)
    branch2 = branch(input_image2)
    branch3 = branch(input_image3)

    # Merge all the branches
    merge = layers.Concatenate()([branch1, branch2, branch3])
    dense = layers.Dense(128, activation='relu')(merge)
    dropout = layers.Dropout(0.5)(dense)
    output = layers.Dense(1, activation='sigmoid')(dropout)  # Use sigmoid activation for Binary output

    model = models.Model(inputs=[input_image1, input_image2, input_image3], outputs=output)
    
    #Set the learning rate to a low value for a more stable learning curve
    model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.0001),
                  loss='binary_crossentropy',
                  metrics=['accuracy'])

    return model


# Create and compile the model
model = create_model(IMAGE_SHAPE)

model.summary()


# Train the Model

In [None]:
# Define callbacks
callbacks = [
    ReduceLROnPlateau(monitor='val_loss', patience=1, verbose=1, factor=0.5),
    EarlyStopping(monitor='val_loss', patience=3),
    ModelCheckpoint(filepath='best_model.keras', monitor='val_loss', save_best_only=True)
]

# Instantiate the generators
train_generator = custom_generator(train_df["image_path"], train_df["fire"], batch_size=BATCH_SIZE)
val_generator = custom_generator(val_df["image_path"], val_df["fire"], batch_size=BATCH_SIZE)

# Train the model
history = model.fit(
    train_generator,
    steps_per_epoch=len(train_df) // BATCH_SIZE,
    validation_data=val_generator,
    validation_steps=len(val_df) // BATCH_SIZE,
    epochs=10,
    callbacks=callbacks
)

In [None]:
# Plot training & validation loss & accuracy
plt.figure(figsize=(12, 5))

plt.subplot(1, 2, 1)
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Model Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend(['Training Loss', 'Validation Loss'])

plt.subplot(1, 2, 2)
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('Model Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend(['Training Accuracy', 'Validation Accuracy'])

plt.show()


In [None]:
model.save('multi_model.h5')
#Load trained model
#model = load_model('/working/mutli_model.h5')

In [None]:

img_height, img_width = 150, 150

# get test images
test_data_dir = '/kaggle/input/firedataset-jpg-224/FireDataset-V6-JPG-Reshaped224/test'

# Create test generator
test_gen = custom_generator(test_df['image_path'],test_df['fire'], BATCH_SIZE)

loss, accuracy = model.evaluate(test_gen, steps=len(test_df['image_path']) // BATCH_SIZE, verbose=1)

print(f"Test Loss: {loss:.4f}")
print(f"Test Accuracy: {accuracy:.4f}")

In [None]:
data_dir = '/kaggle/input/firedataset-jpg-224/FireDataset-V6-JPG-Reshaped224/train'
fire_dir = os.path.join(data_dir, 'fire')
non_fire_dir = os.path.join(data_dir, 'non_fire')

num_images_per_class = 4

def load_image(image_path):
    image = cv2.imread(image_path)
    image = cv2.resize(image, (img_height, img_width))
    return image

#Preprocess images for evaluation
def preprocess_and_predict(image):
    masked_image1, masked_image2, masked_image3 = apply_masks(image)
    
    masked_image1 = masked_image1 / 255.0
    masked_image2 = masked_image2 / 255.0
    masked_image3 = masked_image3 / 255.0
    
    # Stack the images into one input batch
    X_batch_1 = np.expand_dims(masked_image1, axis=0)
    X_batch_2 = np.expand_dims(masked_image2, axis=0)
    X_batch_3 = np.expand_dims(masked_image3, axis=0)
    
    # Predict
    prediction = model.predict([X_batch_1, X_batch_2, X_batch_3])
    return prediction

# Display image with prediction
def plot_images_with_predictions(directory, num_images):
    img_paths = [os.path.join(directory, img) for img in os.listdir(directory)][:num_images]
    
    rows = 2
    cols = num_images // 2 if num_images > 1 else 1
    
    plt.figure(figsize=(12, 8))
    
    for i, img_path in enumerate(img_paths):
        image = load_image(img_path)
        prediction = preprocess_and_predict(image)
        predicted_class = 'fire' if prediction[0] > 0.5 else 'non_fire'
        
        plt.subplot(rows, cols, i + 1)
        plt.imshow(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
        plt.title(predicted_class)
        plt.axis('off')

plot_images_with_predictions(fire_dir, num_images_per_class)
plot_images_with_predictions(non_fire_dir, num_images_per_class)

plt.tight_layout()
plt.show()
