In [None]:
import tensorflow as tf

print("num GPU's Available: ", len(tf.config.list_physical_devices("GPU")))

In [None]:
physical_devices = tf.config.list_physical_devices('GPU')
for device in physical_devices:
    tf.config.experimental.set_memory_growth(device, True)


In [None]:
import tensorflow as tf
from tensorflow import keras
import os
import numpy as np
import cv2
import tensorflow as tf
import matplotlib.pyplot as plt
from tensorflow.keras.applications import ResNet50
from tensorflow.keras.applications import VGG19
from tensorflow.keras.applications import MobileNetV2
from tensorflow.keras.applications.inception_v3 import InceptionV3
from tensorflow.keras.layers import Flatten, Dense, Dropout, GlobalAveragePooling2D
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.preprocessing import image_dataset_from_directory
from tensorflow.keras.models import Sequential
from tensorflow.keras.models import Model
from tensorflow.keras.models import load_model
from tensorflow.keras.preprocessing import image
from sklearn.metrics import confusion_matrix, roc_curve, roc_auc_score, precision_recall_curve, auc


In [None]:
train_dir = "D:/Dataset1/Train"

In [None]:
# Load datasets
train, val = tf.keras.utils.image_dataset_from_directory(
    directory="D:/Dataset/Train",
    labels="inferred",
    label_mode="binary",
    color_mode="rgb",
    image_size=(224, 224),  # we  Adjust for four pre-trained model
    shuffle=True,
    seed=1337,
    validation_split=0.2,
    subset="both",
    interpolation="bilinear"
)

In [None]:
# Define data augmentation
data_augmentation = tf.keras.Sequential([
    tf.keras.layers.RandomFlip('horizontal'),
    tf.keras.layers.RandomRotation(0.4),
    tf.keras.layers.RandomZoom(0.2),
    tf.keras.layers.RandomContrast(0.2),
    tf.keras.layers.RandomBrightness(0.2),
    tf.keras.layers.RandomTranslation(0.1, 0.1),
    
])

In [None]:
# Normalize the pixel values (this is necessary for most pre-trained models)
normalization_layer = tf.keras.layers.Rescaling(1./255)

In [None]:
# Apply data augmentation and normalization to the training dataset
train = train.map(lambda x, y: (data_augmentation(x, training=True), y))
train = train.map(lambda x, y: (normalization_layer(x), y))

In [None]:
# Apply normalization to the validation dataset (without augmentation)
val = val.map(lambda x, y: (normalization_layer(x), y))

In [None]:
from tensorflow.keras.optimizers import Adam,SGD
# Hyperparameter definitions (adjust these for tuning)
optimizer = SGD(learning_rate=0.001)  # we can adjust the learning rate here(0.001 &0.0001)
#optimizer = Adam(learning_rate=0.001)  # we can adjust the learning rate here(0.001 &0.0001)
batch_size = 32 
epochs = 40



In [None]:
# Load the base model
base_model = VGG19(
    include_top=False, 
    weights='imagenet', 
    input_shape=(224, 224, 3))

In [None]:
#base_model.trainable = False
# Freeze some layers in the base model
num_unfreeze_layers = 5
for layer in base_model.layers[:-num_unfreeze_layers]:
    layer.trainable = False

In [None]:
# Add custom layers on top of the base model
model = Sequential([
    base_model,
    GlobalAveragePooling2D(),
    Dense(512, activation='relu'),
    Dropout(0.5),
    Dense(1, activation='sigmoid')  # For binary classification
])

In [None]:

model.compile(loss='binary_crossentropy',
              optimizer=optimizer,
              metrics=['accuracy'],)


In [None]:
model.summary()

In [None]:
from keras.callbacks import EarlyStopping

early_stopping = EarlyStopping(monitor='val_loss', patience=5)  # Monitor validation loss, stop after 5 epochs of no improvement

In [None]:
from keras.callbacks import ModelCheckpoint

# Define ModelCheckpoint callback
model_checkpoint_callback = ModelCheckpoint(
    filepath='best_fire_detection_VGG19_model9010.h5',  # Path to save the best model
    monitor='val_accuracy',  # Monitor validation accuracy
    mode='max',  # Save the model with the highest validation accuracy
    save_best_only=True  # Only save the best model based on the monitored metric
)

In [None]:
VGG19 = model.fit(
    train,
    epochs=epochs,  # Adjust epochs as needed
    validation_data=val,
     callbacks=[early_stopping, model_checkpoint_callback]  # Pass Early Stopping callback
)

In [None]:
model.save("VGG19_based_fire_detection_40epochs910exp1_model.h5")

In [None]:
import matplotlib.pyplot as plt

plt.plot(VGG19.history['accuracy'])
plt.plot(VGG19.history['val_accuracy'])
plt.title('VGG19 Model accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend(['Train', 'Val'], loc='lower right')
plt.show()

In [None]:
plt.plot(VGG19.history['loss'])
plt.plot(VGG19.history['val_loss'])
plt.title('VGG19 Model loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['Train', 'Val'], loc='upper right')
plt.ylim(top=1.2, bottom=0)
plt.show()

In [None]:
"""Adjustable parameters"""
model = "VGG19_based_fire_detection_40epochs910exp1_model.h5"  # path to model save file
result_directory = "D:/Dataset1/Mobilenetv2/result910" # test result save directory
# paths to positive and negative labeled images
pos_path = r"D:/Dataset1/Tests/Fire"
neg_path = r"D:/Dataset1/Tests/Non-fire"
prediction_threshold = .5
image_size = (224, 224)
"""End adjustable parameters"""

In [None]:
# Evaluate the model 
test_dataset = tf.keras.utils.image_dataset_from_directory(
    directory="D:/Dataset1/Tests",
    labels="inferred",
    label_mode="categorical",
    color_mode="rgb",
    image_size=(224, 224),
    shuffle=False,
    interpolation="bilinear"
)
# Apply normalization to the test dataset
test_dataset = test_dataset.map(lambda x, y: (normalization_layer(x), y))

#test_loss, test_accuracy = model.evaluate(test_dataset)
#print(f'Test accuracy: {test_accuracy}')

In [None]:
actual = []  # list for actual labels
predicted = []  # list for predicted labels
FalsePos = []  # list for missed image file names to save
FalseNeg = []  # list for missed image file names to save
ConfidenceScores = []  # list for confidence scores for each prediction

# Keras built in evaluation
reconstructed_model = keras.models.load_model(model)
eval_loss, eval_acc = reconstructed_model.evaluate(test_dataset)
print(f"Evaluation accuracy {eval_acc}, Evaluation loss {eval_loss}")

In [None]:
import os
os.makedirs(result_directory, exist_ok=True) # Check result_directory exists

In [None]:
import os
from tensorflow import keras
import numpy as np
import matplotlib.pyplot as plt
from sklearn import metrics
import csv
for pos_image in os.listdir(pos_path):
    image_path = pos_path + "/" + pos_image
    pos_image = keras.preprocessing.image.load_img(image_path, target_size=image_size)
    pos_image = keras.preprocessing.image.img_to_array(pos_image)
    pos_image = np.expand_dims(pos_image, axis=0)
    prediction = reconstructed_model.predict(pos_image)
    actual.append("1")
    ConfidenceScores.append(((1 - prediction[0][0] * 100), image_path))
    if prediction[0][0] < 0.5:
        predicted.append("1")
    else:
        FalseNeg.append(image_path)
        predicted.append("0")

for neg_image in os.listdir(neg_path):
    image_path = neg_path + "/" + neg_image
    neg_image = keras.preprocessing.image.load_img(image_path, target_size=image_size)
    neg_image = keras.preprocessing.image.img_to_array(neg_image)
    neg_image = np.expand_dims(neg_image, axis=0)
    prediction = reconstructed_model.predict(neg_image)
    actual.append("0")
    ConfidenceScores.append(((prediction[0][0] * 100), image_path))
    if prediction[0][0] > 0.5:
        predicted.append("0")
    else:
        FalsePos.append(image_path)
        predicted.append("1")


In [None]:
import os
from tensorflow import keras
import numpy as np
import matplotlib.pyplot as plt
from sklearn import metrics
import csv
# Create CSV with missed image paths
with open(os.path.join(result_directory, "missed.csv"), "w", newline="") as file:
    writer = csv.writer(file)
    for image in FalsePos:
        writer.writerow([image])
    for image in FalseNeg:
        writer.writerow([image])

# create CSV with confidence (prediction) scores
with open(os.path.join(result_directory, "ConfidenceScores.csv"), "w", newline="") as file:
    writer = csv.writer(file)
    for score in ConfidenceScores:
        writer.writerow(score)
# ... (previous code)

# After processing all images, check if the lengths are equal
if len(actual) != len(predicted):
    # Determine which list is shorter
    shorter_length = min(len(actual), len(predicted))
    # Truncate the longer list to match the shorter one
    actual = actual[:shorter_length]
    predicted = predicted[:shorter_length]


In [None]:
# Now you can proceed with creating the confusion matrix
confusion_matrix = metrics.confusion_matrix(actual, predicted)
cm_display = metrics.ConfusionMatrixDisplay(confusion_matrix=confusion_matrix, display_labels=["No-Fire", "Fire"])
cm_display.plot(cmap="Blues")
plt.savefig(os.path.join(result_directory, "CM.jpg"))

In [None]:
# Create CSV with performance metrics
with open(os.path.join(result_directory, "metrics.csv"), "w", newline="") as file:
    file.write(f"Accuracy: {(metrics.accuracy_score(actual,predicted))}\n")
    file.write(f"f1-score: {metrics.f1_score(actual,predicted,pos_label='1')}\n")
    file.write(f"Precision: {metrics.precision_score(actual, predicted, pos_label='1')}\n")
    file.write(f"Recall: {metrics.recall_score(actual,predicted, pos_label='1')}\n")

In [None]:
print(f"Accuracy: {(metrics.accuracy_score(actual,predicted))}")
print(f"f1-score: {metrics.f1_score(actual,predicted,pos_label='1')}")
print(f"Precision: {metrics.precision_score(actual, predicted, pos_label='1')}")
print(f"Recall: {metrics.recall_score(actual,predicted, pos_label='1')}")