In [None]:
# from google.colab import drive
# drive.mount('/content/drive')

import sys
import math
import random
import copy

import pandas as pd
import numpy as np

from tqdm import tqdm
import time

import matplotlib.pyplot as plt
from matplotlib import cm
from matplotlib import animation as animation

from PIL import Image
import matplotlib as mpl

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader
import os

!pip install torchmetrics
from torchmetrics.image import StructuralSimilarityIndexMeasure

from PIL import Image, ImageSequence
from skimage.transform import resize

from skimage.metrics import structural_similarity as ssim

directory = ''  # your directory


In [None]:
# Define the default colormap and norm for the input, target, and prediction
default_cmap = mpl.colors.ListedColormap(['orange', 'yellow', 'green', 'black', 'blue', 'purple'])

# default_bounds = [0.0, 0.02, 0.27, 0.8, 1.99, 2.8, 13.5] # for chimney
default_bounds = [0.0, 0.2, 0.5, 0.99, 2.45, 2.99, 13.8] # for bear & ferguson

default_norm = mpl.colors.BoundaryNorm(default_bounds, default_cmap.N)

In [None]:
VERBOSE = False

#### import data training & testing data ferguson



In [None]:
forest = Image.open(directory + 'Data/land_data_CA/Ferguson/canopy_Ferguson_2018.tif')
forest = np.array(forest)
forest[forest<-999.] = 0.
forest = forest/np.max(forest)

forest = resize(forest, (128, 128))

In [None]:
# # # ca simulation data
# forest_fer_train = np.load(directory + 'Data/training_data/forest_fer_rand_ig_rand_wind_rand_tilt_4_ptbar_ml_state.npy')
# forest_fer_val_4pt = np.load(directory + 'Data/training_data/forest_fer_rand_ig_rand_wind_rand_tilt_4_ptbar_val_ml_state.npy')

# # forest_fer_test_nobar = np.load(directory + 'Data/test_data/forest_fer_rand_ig_rand_wind_rand_no_ptbar_test_ml_state.npy')

In [None]:
forest_fer_test_4pt = np.load(directory + 'Data/test_data/forest_fer_rand_ig_rand_wind_rand_tilt_4_ptbar_test_ml_state.npy')
forest_fer_test_3t = np.load(directory + 'Data/test_data/forest_fer_rand_ig_rand_wind_rand_tilt_3t_test_ml_state.npy')
forest_fer_test_2p = np.load(directory + 'Data/test_data/forest_fer_rand_ig_rand_wind_rand_tilt_2p_test_ml_state.npy')
forest_fer_test_no_bar = np.load(directory + 'Data/test_data/forest_fer_rand_ig_rand_wind_rand_tilt_no_bar_test_ml_state.npy')
forest_fer_test_2p_longtime = np.load(directory + 'Data/test_data/forest_ferguson_rand_ig_rand_wind_rand_tilt_2p_CA_test_long_time_ml_state.npy')

#### import data training & testing data bear



In [None]:
forest = Image.open(directory + 'Data/land_data_CA/Bear/canopy_Bear_2020.tif')
forest = np.array(forest)
forest[forest<-999.] = 0.
forest = forest/np.max(forest)

forest = resize(forest, (128, 128))

In [None]:
forest_bear_test_4pt = np.load(directory + 'Data/test_data/forest_bear_rand_ig_rand_wind_rand_tilt_4_ptbar_test_ml_state.npy')
forest_bear_test_3t = np.load(directory + 'Data/test_data/forest_bear_rand_ig_rand_wind_rand_tilt_3t_test_ml_state.npy')
forest_bear_test_2p = np.load(directory + 'Data/test_data/forest_bear_rand_ig_rand_wind_rand_tilt_2p_test_ml_state.npy')
forest_bear_test_no_bar = np.load(directory + 'Data/test_data/forest_bear_rand_ig_rand_wind_rand_tilt_no_bar_test_ml_state.npy')
forest_bear_test_2p_longtime = np.load(directory + 'Data/test_data/forest_bear_rand_ig_rand_wind_rand_tilt_2p_CA_test_long_time_ml_state.npy')

In [None]:
# #ca simulation data
forest_bear_train = np.load(directory + 'Data/training_data/forest_bear_rand_ig_rand_wind_rand_tilt_4_ptbar_ml_state.npy')
forest_bear_val_4pt = np.load(directory + 'Data/training_data/forest_bear_rand_ig_rand_wind_rand_tilt_4_ptbar_val_ml_state.npy')
# forest_fer_test_nobar = np.load(directory + 'Data/test_data/forest_bear_rand_ig_rand_wind_rand_no_ptbar_test_ml_state.npy')

#### import data training & testing data chimney



In [None]:
forest = Image.open(directory + 'Data/land_data_CA/Chimney/canopy_Chimney_2016.tif')
forest = np.array(forest)
forest[forest<-999.] = 0.

forest = forest/np.max(forest)
forest = resize(forest, (128, 128))

In [None]:
# # #ca simulation train data
# forest_chimney_train = np.load(directory + 'Data/training_data/forest_chimney_rand_ig_rand_wind_rand_tilt_4_ptbar_ml_state.npy')
# forest_chimney_val_4pt = np.load(directory + 'Data/training_data/forest_chimney_rand_ig_rand_wind_rand_tilt_4_ptbar_val_ml_state.npy')
# # forest_chimney_test_nobar = np.load(directory + 'Data/test_data/forest_chimney_rand_ig_rand_wind_rand_no_ptbar_test_ml_state.npy')

In [None]:
#ca simulation test data
forest_chimney_test_4pt = np.load(directory + 'Data/test_data/forest_chimney_rand_ig_rand_wind_rand_tilt_4_ptbar_test_ml_state.npy')
forest_chimney_test_3t = np.load(directory + 'Data/test_data/forest_chimney_rand_ig_rand_wind_rand_tilt_3t_test_ml_state.npy')
forest_chimney_test_2p = np.load(directory + 'Data/test_data/forest_chimney_rand_ig_rand_wind_rand_tilt_2p_test_ml_state.npy')
forest_chimney_test_no_bar = np.load(directory + 'Data/test_data/forest_chimney_rand_ig_rand_wind_rand_tilt_no_bar_test_ml_state.npy')

#### model

In [None]:
class FireDataset(Dataset):
    def __init__(self, fire_data, seq_len):
        self.fire_data = fire_data
        shape = self.fire_data.shape

        self.number = shape[0]
        self.time_step = shape[1]
        self.frame_len = shape[2]
        self.seq_len = seq_len

    def __len__(self):
        return self.number* (self.time_step - self.seq_len +1)

    def __getitem__(self, idx):
        idx0 = idx//(self.time_step - self.seq_len +1)
        idx1 = idx%(self.time_step - self.seq_len +1)
        input, target = torch.tensor(self.fire_data[idx0][idx1:idx1+3]),  torch.tensor(self.fire_data[idx0][idx1+3:idx1+6])

        return  torch.stack((input, input), dim=1), target


In [None]:
class ConvLSTMCell(nn.Module):
  def __init__(self, input_dim, hidden_dim, kernel_size):
    super(ConvLSTMCell, self).__init__()

    self.input_dim = input_dim
    self.hidden_dim = hidden_dim
    self.kernel_size = kernel_size

    padding = kernel_size // 2
    self.conv = nn.Conv2d(in_channels=input_dim + hidden_dim,
                out_channels=4 * hidden_dim,
                kernel_size=kernel_size,
                padding=padding,
                bias=True)

  def forward(self, input_tensor, cur_state):
    h_cur, c_cur = cur_state

    combined = torch.cat([input_tensor, h_cur], dim=1)
    combined_conv = self.conv(combined)

    cc_i, cc_f, cc_o, cc_g = torch.split(combined_conv, self.hidden_dim, dim=1)

    i = torch.sigmoid(cc_i)
    f = torch.sigmoid(cc_f)
    o = torch.sigmoid(cc_o)
    g = torch.tanh(cc_g)

    c_next = f * c_cur + i * g
    h_next = o * torch.tanh(c_next)

    return h_next, c_next

  def init_hidden(self, batch_size, image_size):
    height, width = image_size
    return (torch.zeros(batch_size, self.hidden_dim, height, width, device=self.conv.weight.device),
        torch.zeros(batch_size, self.hidden_dim, height, width, device=self.conv.weight.device))


In [None]:
class EncoderDecoderConvLSTM(nn.Module):
  def __init__(self, input_dim, hidden_dim):
    super(EncoderDecoderConvLSTM, self).__init__()

    self.encoder_1_convlstm = ConvLSTMCell(input_dim=input_dim, hidden_dim=hidden_dim, kernel_size=3)
    self.encoder_2_convlstm = ConvLSTMCell(input_dim=hidden_dim, hidden_dim=hidden_dim, kernel_size=3)
    self.decoder_1_convlstm = ConvLSTMCell(input_dim=hidden_dim, hidden_dim=hidden_dim, kernel_size=3)
    self.decoder_2_convlstm = ConvLSTMCell(input_dim=hidden_dim, hidden_dim=hidden_dim, kernel_size=3)
    self.decoder_CNN = nn.Conv3d(in_channels=hidden_dim, out_channels = 16 , kernel_size=(1, 3, 3), padding=(0, 1, 1))


  def autoencoder(self, x, seq_len, future_step, h_t, c_t, h_t2, c_t2, h_t3, c_t3, h_t4, c_t4):

    outputs = []

    # encoder
    for t in range(seq_len):
      h_t, c_t = self.encoder_1_convlstm(input_tensor=x[:, t, :, :], cur_state=[h_t, c_t])  # we could concat to provide skip conn here
      h_t2, c_t2 = self.encoder_2_convlstm(input_tensor=h_t, cur_state=[h_t2, c_t2])  # we could concat to provide skip conn here

    # encoder_vector
    encoder_vector = h_t2

    # decoder
    for t in range(future_step):
      h_t3, c_t3 = self.decoder_1_convlstm(input_tensor=encoder_vector, cur_state=[h_t3, c_t3])  # we could concat to provide skip conn here
      h_t4, c_t4 = self.decoder_2_convlstm(input_tensor=h_t3, cur_state=[h_t4, c_t4])  # we could concat to provide skip conn here
      encoder_vector = h_t4
      outputs += [h_t4]  # predictions

    outputs = torch.stack(outputs, 1)
    outputs = outputs.permute(0, 2, 1, 3, 4)
    outputs = self.decoder_CNN(outputs)
    #outputs = outputs.permute(0, 2, 1, 3, 4)

    return outputs

  def forward(self, x, future_seq=3, hidden_state=None):

    # find size of different input dimensions
    b, seq_len, _, h, w = x.size()

    # initialize hidden states
    h_t, c_t = self.encoder_1_convlstm.init_hidden(batch_size=b, image_size=(h, w))
    h_t2, c_t2 = self.encoder_2_convlstm.init_hidden(batch_size=b, image_size=(h, w))
    h_t3, c_t3 = self.decoder_1_convlstm.init_hidden(batch_size=b, image_size=(h, w))
    h_t4, c_t4 = self.decoder_2_convlstm.init_hidden(batch_size=b, image_size=(h, w))

    # autoencoder forward
    outputs = self.autoencoder(x, seq_len, future_seq, h_t, c_t, h_t2, c_t2, h_t3, c_t3, h_t4, c_t4)

    return outputs



In [None]:
def train_convLSTM(train_data, test=False, test_data=None):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    input_dim = 2
    hidden_dim = 128
    kernel_size = 3
    seq_len = 3
    model = EncoderDecoderConvLSTM(input_dim, hidden_dim).to(device)

    criterion = nn.CrossEntropyLoss(weight=None, size_average=None, ignore_index=-100, reduce=None, reduction='mean', label_smoothing=0.0)
    criterion2 = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=3e-4)

    batch_size = 16
    iter = 5

    fire_data_loader_train = DataLoader(train_data, batch_size=batch_size, shuffle=True)

    if test:
        train_loss_list = []
        mse_list = []
        ssim_list = []
        rrmse_list = []
        rpe_list = []

    for it in range(iter):
        for batch_no, (batch_input, batch_target) in enumerate(fire_data_loader_train):
            model.train()

            batch_input, batch_target = batch_input.to(device, dtype=torch.float32), batch_target.to(device, dtype=torch.int64)

            optimizer.zero_grad()

            logits = model(batch_input)
            # if batch_no % 100 == 0:
            #     print(torch.unique(batch_input),torch.unique(batch_target),torch.unique(torch.argmax(logits, dim=1)))
            #print(f'logit shape{logits.shape}')
            train_loss = criterion(logits, batch_target)

            train_loss.backward()
            optimizer.step()

            # if batch_no % 50 == 0:
            #     print('Batch:{:5d}  |  Train loss: {:.4f}'.format(batch_no, train_loss))

            if test:
                if batch_no % 200 == 0:

                    n_list=[21*random.randint(0,20)+14 for i in range(16)]

                    input1 = []
                    target = []

                    for n in n_list:
                        input1.append(test_data[n][0].numpy())
                        target.append(test_data[n+6][1].numpy())

                    input1 = torch.tensor(input1)
                    target = torch.tensor(target)
                    input0 = input1.clone()

                    # Validation
                    model.eval()
                    with torch.no_grad():
                        test_input_seq, test_input1_seq, test_target_seq = input0, input1, target
                        np_test_target_seq = test_target_seq.numpy()
                        test_input_seq, test_target_seq, test_input1_seq = test_input_seq.to(device, dtype=torch.float32), test_target_seq.to(device, dtype=torch.float32), test_input1_seq.to(device, dtype=torch.float32)
                        test_predicted_seq = loop_testing(test_input1_seq, 3, model)
                        np_test_predicted_seq = test_predicted_seq.cpu().numpy()

                        mse_test_loss = criterion2(test_predicted_seq, test_target_seq)
                        mse_list.append(mse_test_loss)

                        ssim_test_loss = ssim(np_test_target_seq, np_test_predicted_seq, data_range=np_test_predicted_seq.max() - np_test_predicted_seq.min(), channel_axis=1)
                        # test_loss3 = ssim(np_target12[i], np_input1[i], data_range=np_input1[i].max() - np_input1[i].min(), channel_axis=0)
                        ssim_list.append(ssim_test_loss)

                        rrmse_test_loss = torch.sqrt((torch.sum(torch.tensor(np.square(np_test_target_seq - np_test_predicted_seq))) / batch_size)/torch.sum(torch.square(torch.tensor(np_test_predicted_seq))))
                        rrmse_list.append(rrmse_test_loss)

                        rpe_test_loss = np.sum(np_test_target_seq != np_test_predicted_seq)/(128*128)
                        rpe_list.append(rpe_test_loss)

                        print("Iter:{:5d}|Train loss: {:.4f}|Test loss MSE: {:.4f}| SSIM:{:.4f}| RRMSE:{:.4f}| RPE:{:.4f}".format(batch_no, train_loss, mse_test_loss, ssim_test_loss, rrmse_test_loss, rpe_test_loss))


    if test:
        return model, mse_list, ssim_list, rrmse_list, rpe_list
    else:
        return model


### data function

#### part 1

In [None]:
def loop_testing_all_ver1(input, loop):

    ans_list = []

    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    input = input.to(device, dtype=torch.float32)
    old_input = input.clone().unsqueeze(0)

    for i in range(loop):
        print(f'i={i}')
        # new_input = input.colne()

        output = convLSTM(old_input).clone().detach()
        #print(input.shape)
        output = torch.argmax(output[:,:3], dim=1)
        ans_list.extend(output.squeeze(0))
        #print(input.shape)
        output = torch.stack((output, output), dim=2)

        old_input = output

    return ans_list

In [None]:
def count_fire(forest):
    count_list=[]

    i, m, n = np.shape(forest)

    for i1 in range(i):
        count = 0
        for m1 in range(m):
            for n1 in range(n):
                if forest[i1][m1][n1] == 1:
                    count += 1
        count_list.append(count)

    return count_list

In [None]:
def loop_testing(input1, loop, model):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    input1 = input1.to(device, dtype=torch.float32)

    for i in range(loop-1):
        old_input = input1
        input1 = model(old_input).detach()
        input1 = torch.argmax(input1, dim=1)
        input1 = torch.stack((input1, input1), dim=2)

    old_input = input1
    input1 = model(old_input).detach()
    input1 = torch.argmax(input1, dim=1)

    return input1

In [None]:
# Function to filter out outliers based on IQR
def filter_outliers(data_column):
    Q1 = np.percentile(data_column, 25)
    Q3 = np.percentile(data_column, 75)
    IQR = Q3 - Q1
    lower_bound = Q1 - 1.5 * IQR
    upper_bound = Q3 + 1.5 * IQR
    return data_column[(data_column >= lower_bound) & (data_column <= upper_bound)]


In [None]:
def loop_testing_all(input11, loop):
    ans_list = []

    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    input11 = input11.to(device, dtype=torch.float32).unsqueeze(0)

    for i in range(loop-1):
        old_input1 = input11.clone()  # Clone to avoid altering the original tensor

        # Forward pass through the convLSTM model
        input11 = convLSTM(old_input1).clone().detach()

        # Take the argmax along the class dimension
        input11 = torch.argmax(input11, dim=1)

        # Store predictions
        ans_list.extend(input11.squeeze(0))  # Append without extending; keep sequence structure

        # Prepare input for the next loop iteration
        input11 = input11.unsqueeze(1)  # Adjust dimensions if needed for the next input

    old_input1 = input11.clone()
    #print(input.shape)
    input11 = convLSTM(old_input1).clone().detach()
    input11 = torch.argmax(input11, dim=1)
    ans_list.extend(input11.squeeze(0))

    return ans_list


In [None]:
def loop_testing_msessimrpe_1to1(test_data, m):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    input1, target1 = test_data[m]
    _, target3 = test_data[m+3]
    _, target6 = test_data[m+6]
    _, target9 = test_data[m+9]
    _, target12 = test_data[m+12]
    input0 = input1.clone()

    input1 = input1.to(device, dtype=torch.float32).unsqueeze(0)
    target1 = target1.to(device, dtype=torch.float32).unsqueeze(0)
    target3 = target3.to(device, dtype=torch.float32).unsqueeze(0)
    target6 = target6.to(device, dtype=torch.float32).unsqueeze(0)
    target9 = target9.to(device, dtype=torch.float32).unsqueeze(0)
    target12 = target12.to(device, dtype=torch.float32).unsqueeze(0)

    mse_list = []
    ssim_list = []
    rpe_list = []
    pre_fire_count = []
    tar_fire_count = []

    time_list=[]

    mse = nn.MSELoss()

    #loop=1
    old_input = input1
    start = time.time()
    input1 = convLSTM(old_input).detach()
    time_list.append(time.time()-start)
    input1 = torch.argmax(input1, dim=1)

    np_input1 = input1.cpu().squeeze(0).numpy()
    np_target1 = target1.cpu().squeeze(0).numpy()

    for i in range(3):
        test_loss2 = mse(input1[0][i], target1[0][i])
        mse_list.append(test_loss2)

    for i in range(3):
        test_loss3 = ssim(np_target1[i], np_input1[i], data_range=np_input1[i].max() - np_input1[i].min(), channel_axis=0)
        ssim_list.append(test_loss3)

    for i in range(3):
        test_loss4 = np.sum(np_target1[i] != np_input1[i])/(128*128)
        rpe_list.append(test_loss4)

    pre_fire_count.append(count_fire(np_input1))
    tar_fire_count.append(count_fire(np_target1))

    input1 = torch.stack((input1, input1), dim=2)

    #loop=2
    old_input = input1
    start = time.time()
    input1 = convLSTM(old_input).detach()
    time_list.append(time.time()-start)
    input1 = torch.argmax(input1, dim=1)
    np_input1 = input1.cpu().squeeze(0).numpy()
    np_target3 = target3.cpu().squeeze(0).numpy()

    for i in range(3):
        test_loss2 = mse(input1[0][i], target3[0][i])
        mse_list.append(test_loss2)

    for i in range(3):
        test_loss3 = ssim(np_target3[i], np_input1[i], data_range=np_input1[i].max() - np_input1[i].min(), channel_axis=0)
        ssim_list.append(test_loss3)

    for i in range(3):
        test_loss4 = np.sum(np_target3[i] != np_input1[i])/(128*128)
        rpe_list.append(test_loss4)

    pre_fire_count.append(count_fire(np_input1))
    tar_fire_count.append(count_fire(np_target3))

    input1 = torch.stack((input1, input1), dim=2)

    #loop=3
    old_input = input1
    #print(input.shape)
    start = time.time()
    input1 = convLSTM(old_input).detach()
    time_list.append(time.time()-start)
    input1 = torch.argmax(input1, dim=1)
    np_input1 = input1.cpu().squeeze(0).numpy()
    np_target6 = target6.cpu().squeeze(0).numpy()

    for i in range(3):
        test_loss2 = mse(input1[0][i], target6[0][i])
        mse_list.append(test_loss2)

    for i in range(3):
        test_loss3 = ssim(np_target6[i], np_input1[i], data_range=np_input1[i].max() - np_input1[i].min(), channel_axis=0)
        ssim_list.append(test_loss3)

    for i in range(3):
        test_loss4 = np.sum(np_target6[i] != np_input1[i])/(128*128)
        rpe_list.append(test_loss4)

    pre_fire_count.append(count_fire(np_input1))
    tar_fire_count.append(count_fire(np_target6))

    input1 = torch.stack((input1, input1), dim=2)

    #loop=4
    old_input = input1
    start = time.time()
    input1 = convLSTM(old_input).detach()
    time_list.append(time.time()-start)
    input1 = torch.argmax(input1, dim=1)
    np_input1 = input1.cpu().squeeze(0).numpy()
    np_target9 = target9.cpu().squeeze(0).numpy()


    for i in range(3):
        test_loss2 = mse(input1[0][i], target9[0][i])
        mse_list.append(test_loss2)

    for i in range(3):
        test_loss3 = ssim(np_target9[i], np_input1[i], data_range=np_input1[i].max() - np_input1[i].min(), channel_axis=0)
        ssim_list.append(test_loss3)

    for i in range(3):
        test_loss4 = np.sum(np_target9[i] != np_input1[i])/(128*128)
        rpe_list.append(test_loss4)

    pre_fire_count.append(count_fire(np_input1))
    tar_fire_count.append(count_fire(np_target9))

    input1 = torch.stack((input1, input1), dim=2)

    #loop=5
    old_input = input1
    #print(input.shape)
    start = time.time()
    input1 = convLSTM(old_input).detach()
    time_list.append(time.time()-start)
    input1 = torch.argmax(input1, dim=1)
    np_input1 = input1.cpu().squeeze(0).numpy()
    np_target12 = target12.cpu().squeeze(0).numpy()

    for i in range(3):
        test_loss2 = mse(input1[0][i], target12[0][i])
        mse_list.append(test_loss2)

    for i in range(3):
        test_loss3 = ssim(np_target12[i], np_input1[i], data_range=np_input1[i].max() - np_input1[i].min(), channel_axis=0)
        ssim_list.append(test_loss3)

    for i in range(3):
        test_loss4 = np.sum(np_target12[i] != np_input1[i])/(128*128)
        rpe_list.append(test_loss4)

    pre_fire_count.append(count_fire(np_input1))
    tar_fire_count.append(count_fire(np_target12))

    input1 = torch.argmax(input1, dim=1)

    for i in range(len(mse_list)):
        mse_list[i] = float(mse_list[i].cpu())

    return input0, input1, np.array(mse_list), np.array(ssim_list), np.array(rpe_list), pre_fire_count, tar_fire_count, np.mean(time_list)


#### part 2 error map

In [None]:
def create_error_map(target, predict):
    """
    Creates an error map from target (ground truth) and prediction arrays.
    0 for correct predictions,
    1 for false negatives (missed fires),
    2 for false positives (over-predicted fires).
    """
    error_map = np.zeros_like(target, dtype=np.uint8)

    # False Negatives (FN)
    error_map[(target == 1) & (predict == 0)] = 1

    # False Positives (FP)
    error_map[(target == 0) & (predict == 1)] = 2

    return error_map

def save_custom_layout(input_images, target_images, prediction_images, error_images, image_directory, fire, loop_times, VERBOSE):
    total_columns = max(len(target_images), len(prediction_images), len(error_images))
    total_rows = 3  # Rows for Input, Target, Prediction, Error

    fig, axes = plt.subplots(total_rows, total_columns, figsize=(total_columns * 3, total_rows * 3), gridspec_kw={'width_ratios': [1]*total_columns}, dpi=500)

    # Helper to hide axes
    def hide_axes(ax):
        ax.axis('off')

    # Function to add image titles
    def add_image_titles(ax, category, index):
        ax.set_title(f'{category} {2*(index + 1)}', fontsize=12)

    # Titles for the rows
    # row_titles = ['Input', 'Target', 'Prediction', 'Error']
    row_titles = [ 'Target', 'Prediction', 'Error']

    # Plot Images in each row
    # image_lists = [input_images, target_images, prediction_images, error_images]
    image_lists = [target_images, prediction_images, error_images]
    for row_idx, image_list in enumerate(image_lists):
        for col_idx in range(total_columns):
            if col_idx < len(image_list):
                cmap = error_cmap if row_idx == 2 else default_cmap
                norm = error_norm if row_idx == 2 else default_norm
                axes[row_idx, col_idx].imshow(image_list[col_idx], cmap=cmap, norm=norm)
                # if row_idx == 0:
                    # print(np.unique(image_list[col_idx]))
                add_image_titles(axes[row_idx, col_idx], row_titles[row_idx], col_idx)
            hide_axes(axes[row_idx, col_idx])

    plt.tight_layout()

    # Save the figure
    file_format = 'pdf'

    if VERBOSE:
        plt.savefig(image_directory + f'.{file_format}', format=file_format, bbox_inches='tight')

    plt.show()

In [None]:
# Define the error colormap and norm
error_cmap = mpl.colors.ListedColormap(['lightgrey', 'red', 'blue'])
error_bounds = [-0.5, 0.5, 1.5, 2.5]
error_norm = mpl.colors.BoundaryNorm(error_bounds, error_cmap.N)

### train model

In [None]:
# chimney

#chimney train model init
train_data = FireDataset(forest_chimney_train, seq_len=6)
validation_data = FireDataset(forest_chimney_val_4pt, seq_len=6)
convLSTM, mse_4pt_list, ssim_4pt_list, rrmse_4pt_list, rpe_4pt_list  = train_convLSTM(train_data, test=True, test_data=validation_data)

mse_4pt_arr = np.array([tensor.to('cpu').numpy() for tensor in mse_4pt_list])

# Save the NumPy array to a file
np.save(directory + 'Data/training_metric_data/chimney_mse_4pt_arr.npy', mse_4pt_arr)

ssim_4pt_arr = np.array(ssim_4pt_list)

# Save the NumPy array to a file
np.save(directory + 'Data/training_metric_data/chimney_ssim_4pt_arr.npy', ssim_4pt_arr)
rrmse_4pt_arr = np.array(rrmse_4pt_list)

# Save the NumPy array to a file
np.save(directory + 'Data/training_metric_data/chimney_rrmse_4pt_arr.npy', rrmse_4pt_arr)

rpe_4pt_arr = np.array(rpe_4pt_list)

# Save the NumPy array to a file
np.save(directory + 'Data/training_metric_data/chimney_rpe_4pt_arr.npy', rpe_4pt_arr)

print(rpe_4pt_arr.shape)

#Save the model
os.makedirs(directory + 'Data/trained_models/', exist_ok=True)
model_path = os.path.join(directory + 'Data/trained_models/', 'convlstm_chimney_rand_ig_rand_wind_3t1p.pt')
torch.save(convLSTM.state_dict(), model_path)

In [None]:
# ferguson

#ferguson train model init
train_data = FireDataset(forest_fer_train, seq_len=6)
validation_data = FireDataset(forest_fer_val_4pt, seq_len=6)
convLSTM, mse_4pt_list, ssim_4pt_list, rrmse_4pt_list, rpe_4pt_list  = train_convLSTM(train_data, test=True, test_data=validation_data)

mse_4pt_arr = np.array([tensor.to('cpu').numpy() for tensor in mse_4pt_list])

# Save the NumPy array to a file
np.save(directory + 'Data/training_metric_data/ferguson_mse_4pt_arr.npy', mse_4pt_arr)

ssim_4pt_arr = np.array(ssim_4pt_list)

# Save the NumPy array to a file
np.save(directory + 'Data/training_metric_data/ferguson_ssim_4pt_arr.npy', ssim_4pt_arr)
rrmse_4pt_arr = np.array(rrmse_4pt_list)

# Save the NumPy array to a filewu wu
np.save(directory + 'Data/training_metric_data/ferguson_rrmse_4pt_arr.npy', rrmse_4pt_arr)

rpe_4pt_arr = np.array(rpe_4pt_list)

# Save the NumPy array to a file
np.save(directory + 'Data/training_metric_data/ferguson_rpe_4pt_arr.npy', rpe_4pt_arr)

print(rpe_4pt_arr.shape)

#Save the model
os.makedirs(directory + 'Data/trained_models/', exist_ok=True)
model_path = os.path.join(directory + 'Data/trained_models/', 'convlstm_ferguson_rand_ig_rand_wind_3t1p.pt')
torch.save(convLSTM.state_dict(), model_path)

In [None]:
# bear
mse_4pt_arr = np.array([tensor.to('cpu').numpy() for tensor in mse_4pt_list])

# Save the NumPy array to a file
np.save(directory + 'Data/training_metric_data/bear_mse_4pt_arr.npy', mse_4pt_arr)

ssim_4pt_arr = np.array(ssim_4pt_list)

# Save the NumPy array to a file
np.save(directory + 'Data/training_metric_data/bear_ssim_4pt_arr.npy', ssim_4pt_arr)
rrmse_4pt_arr = np.array(rrmse_4pt_list)

# Save the NumPy array to a file
np.save(directory + 'Data/training_metric_data/bear_rrmse_4pt_arr.npy', rrmse_4pt_arr)

rpe_4pt_arr = np.array(rpe_4pt_list)

# Save the NumPy array to a file
np.save(directory + 'Data/training_metric_data/bear_rpe_4pt_arr.npy', rpe_4pt_arr)

rpe_4pt_arr.shape

#Save the model
os.makedirs(directory + 'Data/trained_models/', exist_ok=True)
model_path = os.path.join(directory + 'Data/trained_models/', 'convlstm_bear_rand_ig_rand_wind_3t1p.pt')
torch.save(convLSTM.state_dict(), model_path)

### test on Ferguson


In [None]:
# Load the model from the .pt file
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
input_dim = 2
hidden_dim = 128
convLSTM = EncoderDecoderConvLSTM(input_dim, hidden_dim).to(device)
model_path = os.path.join(directory + 'Data/trained_models/', 'convlstm_ferguson_rand_ig_rand_wind_3t1p.pt')
convLSTM.load_state_dict(torch.load(model_path))

# Set the model to evaluation mode (if needed)
convLSTM.eval()

In [None]:
test_data_tilt_4_ptbar = FireDataset(forest_fer_test_4pt, seq_len=6)
test_data_tilt_no_ptbar = FireDataset(forest_fer_test_no_bar, seq_len=6)
test_data_tilt_3_tbar = FireDataset(forest_fer_test_3t, seq_len=6)
test_data_tilt_2_pbar = FireDataset(forest_fer_test_2p, seq_len=6)
test_data_tilt_2_p_longtime_bar = FireDataset(forest_fer_test_2p_longtime, seq_len=6)

In [None]:
# 2p version

# Main processing
loop_times = 5
good_n = [90]
n = [x for x in range (90,100)]
fire = 'ferguson'
VERBOSE = True

# Example usage in your main loop
for n1 in good_n:
    n1 = n1
    print(f'n={n1}')
    m = 76 * n1 + 60
    image_directory = directory + 'Plots/test_image_2p/{fire}_loop{loop_times}_{n1}'

    input11, _ = test_data_tilt_2_p_longtime_bar[m]
    input01 = input11.clone()
    predicted_labels_list = loop_testing_all_ver1(input11, loop=loop_times)

    # Prepare images
    input_images = [input01[i, 0].cpu().numpy() + forest for i in range(3)]
    target_images = []
    prediction_images = []
    error_images = []

    for i in range((loop_times) * 3):
        np_pre = predicted_labels_list[i].cpu().numpy()
        _, target1 = test_data_tilt_2_p_longtime_bar[m + i]
        target_np = target1[0].cpu().numpy()
        error_map = create_error_map(target_np, np_pre)

        if i % 2 == 1:
            target_images.append(target_np + forest)
            prediction_images.append(np_pre + forest)
            error_images.append(error_map)

    # Call the custom function to save the layout
    # print(np.unique(target_images[0]))
    save_custom_layout(input_images, target_images, prediction_images, error_images, image_directory, fire, loop_times, VERBOSE =  VERBOSE)


In [None]:
# Main processing
loop_times = 5
good_n = [35]
n = [x for x in range(0,11)]
fire = 'ferguson'
VERBOSE = False

# Example usage in your main loop
for n1 in n:
    n1 = n1
    print(f'n={n1}')
    m = 21 * n1 + 6
    image_directory = directory + 'Plots/test_image/{fire}_loop{loop_times}_{n1}'

    input11, _ = test_data_tilt_4_ptbar[m]
    input01 = input11.clone()
    predicted_labels_list = loop_testing_all_ver1(input11, loop=loop_times)

    # Prepare images
    input_images = [input01[i, 0].cpu().numpy() + forest for i in range(3)]
    target_images = []
    prediction_images = []
    error_images = []

    for i in range((loop_times) * 3):
        np_pre = predicted_labels_list[i].cpu().numpy()
        _, target1 = test_data_tilt_4_ptbar[m + i]
        target_np = target1[0].cpu().numpy()
        error_map = create_error_map(target_np, np_pre)

        if i % 2 == 1:
            target_images.append(target_np + forest)
            prediction_images.append(np_pre + forest)
            error_images.append(error_map)

    # Call the custom function to save the layout
    save_custom_layout(input_images, target_images, prediction_images, error_images, image_directory, fire, loop_times, VERBOSE = VERBOSE)


#### Testing: MSE, SSIM and RPE on senarios of 3T1P & no bar & 3T $ 2P

In [None]:
mse_2_p_list = []
ssim_2_p_list = []
rpe_2_p_list = []
pre_count_2_p_list = []
tar_count_2_p_list = []

for n in tqdm(range(64)):
    m = 21*n + 6
    _, _, mse_arr, ssim_arr, rpe_arr, pre_fire_count, tar_fire_count, mean_time = loop_testing_msessimrpe_1to1(test_data_tilt_2_pbar, m)
    mse_2_p_list.append(mse_arr)
    ssim_2_p_list.append(ssim_arr)
    rpe_2_p_list.append(rpe_arr)
    pre_count_2_p_list.append(pre_fire_count)
    tar_count_2_p_list.append(tar_fire_count)

In [None]:
mse_2_p_array = np.array(mse_2_p_list)
np.save(directory + 'Data/test_metric_data/Ferguson_data/mse_2_p_array.npy', mse_2_p_array)

ssim_2_p_array = np.array(ssim_2_p_list)
np.save(directory + 'Data/test_metric_data/Chimney_data/ssim_2_p_array.npy', ssim_2_p_array)

rpe_2_p_array = np.array(rpe_2_p_list)
np.save(directory + 'Data/test_metric_data/Chimney_data/rpe_2_p_array.npy', rpe_2_p_array)

In [None]:
mse_3_t_list = []
ssim_3_t_list = []
rpe_3_t_list = []
pre_count_3_t_list = []
tar_count_3_t_list = []

for n in tqdm(range(64)):
    m = 21*n + 6
    _, _, mse_arr, ssim_arr, rpe_arr, pre_fire_count, tar_fire_count, mean_time = loop_testing_msessimrpe_1to1(test_data_tilt_3_tbar, m)
    mse_3_t_list.append(mse_arr)
    ssim_3_t_list.append(ssim_arr)
    rpe_3_t_list.append(rpe_arr)
    pre_count_3_t_list.append(pre_fire_count)
    tar_count_3_t_list.append(tar_fire_count)

In [None]:
mse_3_t_array = np.array(mse_3_t_list)
np.save(directory + 'Data/test_metric_data/Chimney_data/mse_3_t_array.npy', mse_3_t_array)

ssim_3_t_array = np.array(ssim_3_t_list)
np.save(directory + 'Data/test_metric_data/Chimney_data/ssim_3_t_array.npy', ssim_3_t_array)

rpe_3_t_array = np.array(rpe_3_t_list)
np.save(directory + 'Data/test_metric_data/Chimney_data/rpe_3_t_array.npy', rpe_3_t_array)

In [None]:
mse_4_pt_list = []
ssim_4_pt_list = []
rpe_4_pt_list = []
pre_count_4_pt_list = []
tar_count_4_pt_list = []
mean_time_list = []

for n in tqdm(range(64)):
    m = 21*n + 6
    _, _, mse_arr, ssim_arr, rpe_arr, pre_fire_count, tar_fire_count, mean_time = loop_testing_msessimrpe_1to1(test_data_tilt_4_ptbar, m)
    mse_4_pt_list.append(mse_arr)
    ssim_4_pt_list.append(ssim_arr)
    rpe_4_pt_list.append(rpe_arr)
    pre_count_4_pt_list.append(pre_fire_count)
    tar_count_4_pt_list.append(tar_fire_count)

In [None]:
mse_4_pt_array = np.array(mse_4_pt_list)
np.save(directory + 'Data/test_metric_data/Chimney_data/mse_4_pt_array.npy', mse_4_pt_array)

ssim_4_pt_array = np.array(ssim_4_pt_list)
np.save(directory + 'Data/test_metric_data/Chimney_data/ssim_4_pt_array.npy', ssim_4_pt_array)

rpe_4_pt_array = np.array(rpe_4_pt_list)
np.save(directory + 'Data/test_metric_data/Chimney_data/rpe_4_pt_array.npy', rpe_4_pt_array)

In [None]:
mse_no_pt_list = []
ssim_no_pt_list = []
rpe_no_pt_list = []
pre_count_no_pt_list = []
tar_count_no_pt_list = []
mean_time_list = []

for n in tqdm(range(64)):
    m = 21*n + 6
    _, _, mse_arr, ssim_arr, rpe_arr, pre_fire_count, tar_fire_count, mean_time = loop_testing_msessimrpe_1to1(test_data_tilt_no_ptbar, m)
    mse_no_pt_list.append(mse_arr)
    ssim_no_pt_list.append(ssim_arr)
    rpe_no_pt_list.append(rpe_arr)
    pre_count_no_pt_list.append(pre_fire_count)
    tar_count_no_pt_list.append(tar_fire_count)

In [None]:
mse_no_pt_array = np.array(mse_no_pt_list)
np.save(directory + 'Data/test_metric_data/Chimney_data/mse_no_pt_array.npy', mse_no_pt_array)

ssim_no_pt_array = np.array(ssim_no_pt_list)
np.save(directory + 'Data/test_metric_data/Chimney_data/ssim_no_pt_array.npy', ssim_no_pt_array)

rpe_no_pt_array = np.array(rpe_no_pt_list)
np.save(directory + 'Data/test_metric_data/Chimney_data/rpe_no_pt_array.npy', rpe_no_pt_array)

### test on chimney

In [None]:
# Load the model from the .pt file
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
input_dim = 2
hidden_dim = 128
convLSTM = EncoderDecoderConvLSTM(input_dim, hidden_dim).to(device)
model_path = os.path.join(directory + 'Data/trained_models/', 'convlstm_chimney_rand_ig_rand_wind_3t1p.pt')
convLSTM.load_state_dict(torch.load(model_path))

# Set the model to evaluation mode (if needed)
convLSTM.eval()

In [None]:
test_data_tilt_4_ptbar = FireDataset(forest_chimney_test_4pt, seq_len=6)
test_data_tilt_no_ptbar = FireDataset(forest_chimney_test_no_bar, seq_len=6)
test_data_tilt_3_tbar = FireDataset(forest_chimney_test_3t, seq_len=6)
test_data_tilt_2_pbar = FireDataset(forest_chimney_test_2p, seq_len=6)

In [None]:
# 2p version

# Main processing
loop_times = 5
good_n = [36]
n = [x for x in range (30,60)]
fire = 'chimney'
VERBOSE = True

# Example usage in your main loop
for n1 in good_n:
    n1 = n1
    print(f'n={n1}')
    m = 21 * n1 + 6
    image_directory = directory + 'Plots/test_image_2p/{fire}_loop{loop_times}_{n1}'

    input11, _ = test_data_tilt_2_pbar[m]
    input01 = input11.clone()
    predicted_labels_list = loop_testing_all_ver1(input11, loop=loop_times)

    # Prepare images
    input_images = [input01[i, 0].cpu().numpy() + forest for i in range(3)]
    target_images = []
    prediction_images = []
    error_images = []

    for i in range((loop_times) * 3):
        np_pre = predicted_labels_list[i].cpu().numpy()
        _, target1 = test_data_tilt_2_pbar[m + i]
        target_np = target1[0].cpu().numpy()
        error_map = create_error_map(target_np, np_pre)

        if i % 2 == 1:
            target_images.append(target_np + forest)
            prediction_images.append(np_pre + forest)
            error_images.append(error_map)

    # Call the custom function to save the layout
    # print(np.unique(target_images[0]))
    save_custom_layout(input_images, target_images, prediction_images, error_images, image_directory, fire, loop_times, VERBOSE =  VERBOSE)


In [None]:
# Main processing
loop_times = 5
good_n = [25]
n = [40,41,42,43,44,45,46,47,48,49]
fire = 'chimney'
VERBOSE = True

# Example usage in your main loop
for n1 in good_n:
    n1 = n1
    print(f'n={n1}')
    m = 21 * n1 + 6
    image_directory = directory + 'Plots/test_image/{fire}_loop{loop_times}_{n1}'


    input11, _ = test_data_tilt_4_ptbar[m]
    input01 = input11.clone()
    predicted_labels_list = loop_testing_all_ver1(input11, loop=loop_times)

    # Prepare images
    input_images = [input01[i, 0].cpu().numpy() + forest for i in range(3)]
    target_images = []
    prediction_images = []
    error_images = []

    for i in range((loop_times) * 3):
        np_pre = predicted_labels_list[i].cpu().numpy()
        _, target1 = test_data_tilt_4_ptbar[m + i]
        target_np = target1[0].cpu().numpy()
        error_map = create_error_map(target_np, np_pre)

        if i % 2 == 1:
            target_images.append(target_np + forest)
            prediction_images.append(np_pre + forest)
            error_images.append(error_map)

    # Call the custom function to save the layout
    save_custom_layout(input_images, target_images, prediction_images, error_images, image_directory, fire, loop_times, VERBOSE =  VERBOSE)


#### Testing: MSE, SSIM and RPE on senarios of 3T1P & no bar & 3T $ 2P

In [None]:
mse_2_p_list = []
ssim_2_p_list = []
rpe_2_p_list = []
pre_count_2_p_list = []
tar_count_2_p_list = []

for n in tqdm(range(64)):
    m = 21*n + 6
    _, _, mse_arr, ssim_arr, rpe_arr, pre_fire_count, tar_fire_count, mean_time = loop_testing_msessimrpe_1to1(test_data_tilt_2_pbar, m)
    mse_2_p_list.append(mse_arr)
    ssim_2_p_list.append(ssim_arr)
    rpe_2_p_list.append(rpe_arr)
    pre_count_2_p_list.append(pre_fire_count)
    tar_count_2_p_list.append(tar_fire_count)

In [None]:
mse_2_p_array = np.array(mse_2_p_list)
np.save(directory + 'Data/test_metric_data/Chimney_data/mse_2_p_array.npy', mse_2_p_array)

ssim_2_p_array = np.array(ssim_2_p_list)
np.save(directory + 'Data/test_metric_data/Chimney_data/ssim_2_p_array.npy', ssim_2_p_array)

rpe_2_p_array = np.array(rpe_2_p_list)
np.save(directory + 'Data/test_metric_data/Chimney_data/rpe_2_p_array.npy', rpe_2_p_array)

In [None]:
mse_3_t_list = []
ssim_3_t_list = []
rpe_3_t_list = []
pre_count_3_t_list = []
tar_count_3_t_list = []

for n in tqdm(range(64)):
    m = 21*n + 6
    _, _, mse_arr, ssim_arr, rpe_arr, pre_fire_count, tar_fire_count, mean_time = loop_testing_msessimrpe_1to1(test_data_tilt_3_tbar, m)
    mse_3_t_list.append(mse_arr)
    ssim_3_t_list.append(ssim_arr)
    rpe_3_t_list.append(rpe_arr)
    pre_count_3_t_list.append(pre_fire_count)
    tar_count_3_t_list.append(tar_fire_count)

In [None]:
mse_3_t_array = np.array(mse_3_t_list)
np.save(directory + 'Data/test_metric_data/Chimney_data/mse_3_t_array.npy', mse_3_t_array)

ssim_3_t_array = np.array(ssim_3_t_list)
np.save(directory + 'Data/test_metric_data/Chimney_data/ssim_3_t_array.npy', ssim_3_t_array)

rpe_3_t_array = np.array(rpe_3_t_list)
np.save(directory + 'Data/test_metric_data/Chimney_data/rpe_3_t_array.npy', rpe_3_t_array)

In [None]:
mse_4_pt_list = []
ssim_4_pt_list = []
rpe_4_pt_list = []
pre_count_4_pt_list = []
tar_count_4_pt_list = []
mean_time_list = []

for n in tqdm(range(64)):
    m = 21*n + 6
    _, _, mse_arr, ssim_arr, rpe_arr, pre_fire_count, tar_fire_count, mean_time = loop_testing_msessimrpe_1to1(test_data_tilt_4_ptbar, m)
    mse_4_pt_list.append(mse_arr)
    ssim_4_pt_list.append(ssim_arr)
    rpe_4_pt_list.append(rpe_arr)
    pre_count_4_pt_list.append(pre_fire_count)
    tar_count_4_pt_list.append(tar_fire_count)

In [None]:
mse_4_pt_array = np.array(mse_4_pt_list)
np.save(directory + 'Data/test_metric_data/Chimney_data/mse_4_pt_array.npy', mse_4_pt_array)

ssim_4_pt_array = np.array(ssim_4_pt_list)
np.save(directory + 'Data/test_metric_data/Chimney_data/ssim_4_pt_array.npy', ssim_4_pt_array)

rpe_4_pt_array = np.array(rpe_4_pt_list)
np.save(directory + 'Data/test_metric_data/Chimney_data/rpe_4_pt_array.npy', rpe_4_pt_array)

In [None]:
mse_no_pt_list = []
ssim_no_pt_list = []
rpe_no_pt_list = []
pre_count_no_pt_list = []
tar_count_no_pt_list = []
mean_time_list = []

for n in tqdm(range(64)):
    m = 21*n + 6
    _, _, mse_arr, ssim_arr, rpe_arr, pre_fire_count, tar_fire_count, mean_time = loop_testing_msessimrpe_1to1(test_data_tilt_no_ptbar, m)
    mse_no_pt_list.append(mse_arr)
    ssim_no_pt_list.append(ssim_arr)
    rpe_no_pt_list.append(rpe_arr)
    pre_count_no_pt_list.append(pre_fire_count)
    tar_count_no_pt_list.append(tar_fire_count)

In [None]:
mse_no_pt_array = np.array(mse_no_pt_list)
np.save(directory + 'Data/test_metric_data/Chimney_data/mse_no_pt_array.npy', mse_no_pt_array)

ssim_no_pt_array = np.array(ssim_no_pt_list)
np.save(directory + 'Data/test_metric_data/Chimney_data/ssim_no_pt_array.npy', ssim_no_pt_array)

rpe_no_pt_array = np.array(rpe_no_pt_list)
np.save(directory + 'Data/test_metric_data/Chimney_data/rpe_no_pt_array.npy', rpe_no_pt_array)

### test on bear

In [None]:
# Load the model from the .pt file
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
input_dim = 2
hidden_dim = 128
convLSTM = EncoderDecoderConvLSTM(input_dim, hidden_dim).to(device)
model_path = os.path.join(directory + 'Data/trained_models/', 'convlstm_bear_rand_ig_rand_wind_3t1p.pt')
convLSTM.load_state_dict(torch.load(model_path))

# Set the model to evaluation mode
convLSTM.eval()

In [None]:
test_data_tilt_4_ptbar = FireDataset(forest_bear_test_4pt, seq_len=6)
test_data_tilt_no_ptbar = FireDataset(forest_bear_test_no_bar, seq_len=6)
test_data_tilt_3_tbar = FireDataset(forest_bear_test_3t, seq_len=6)
test_data_tilt_2_pbar = FireDataset(forest_bear_test_2p, seq_len=6)
test_data_tilt_2_p_longtime_bar = FireDataset(forest_bear_test_2p_longtime, seq_len=6)

In [None]:
# 2p version

# Main processing
loop_times = 5
good_n = [90]
n = [x for x in range (0,50)]
fire = 'bear'
VERBOSE = False

# Example usage in your main loop
for n1 in n:
    n1 = n1
    print(f'n={n1}')
    m = 76 * n1 + 60
    image_directory = directory + 'Plots/test_image_2p/{fire}_loop{loop_times}_{n1}'

    input11, _ = test_data_tilt_2_p_longtime_bar[m]
    input01 = input11.clone()
    predicted_labels_list = loop_testing_all_ver1(input11, loop=loop_times)

    # Prepare images
    input_images = [input01[i, 0].cpu().numpy() + forest for i in range(3)]
    target_images = []
    prediction_images = []
    error_images = []

    for i in range((loop_times) * 3):
        np_pre = predicted_labels_list[i].cpu().numpy()
        _, target1 = test_data_tilt_2_p_longtime_bar[m + i]
        target_np = target1[0].cpu().numpy()
        error_map = create_error_map(target_np, np_pre)

        if i % 2 == 1:
            target_images.append(target_np + forest)
            prediction_images.append(np_pre + forest)
            error_images.append(error_map)

    # Call the custom function to save the layout
    # print(np.unique(target_images[0]))
    save_custom_layout(input_images, target_images, prediction_images, error_images, image_directory, fire, loop_times, VERBOSE =  VERBOSE)


In [None]:
# 4pt version

# Main processing
loop_times = 5
good_n = [28]
n = [40,41,42,43,44,45,46,47,48,49]
fire = 'bear'
VERBOSE = True

# Example usage in your main loop
for n1 in good_n:
    n1 = n1
    print(f'n={n1}')
    m = 21 * n1 + 6
    image_directory = directory + 'Plots/test_image/{fire}_loop{loop_times}_{n1}'

    input11, _ = test_data_tilt_4_ptbar[m]
    input01 = input11.clone()
    predicted_labels_list = loop_testing_all_ver1(input11, loop=loop_times)

    # Prepare images
    input_images = [input01[i, 0].cpu().numpy() + forest for i in range(3)]
    target_images = []
    prediction_images = []
    error_images = []

    for i in range((loop_times) * 3):
        np_pre = predicted_labels_list[i].cpu().numpy()
        _, target1 = test_data_tilt_4_ptbar[m + i]
        target_np = target1[0].cpu().numpy()
        error_map = create_error_map(target_np, np_pre)

        if i % 2 == 1:
            target_images.append(target_np + forest)
            prediction_images.append(np_pre + forest)
            error_images.append(error_map)

    # Call the custom function to save the layout
    # print(np.unique(target_images[0]))
    save_custom_layout(input_images, target_images, prediction_images, error_images, image_directory, fire, loop_times, VERBOSE =  VERBOSE)


#### Testing: MSE, SSIM and RPE on senarios of 3T1P & no bar & 3T $ 2P

In [None]:
mse_2_p_list = []
ssim_2_p_list = []
rpe_2_p_list = []
pre_count_2_p_list = []
tar_count_2_p_list = []

for n in tqdm(range(64)):
    m = 21*n + 6
    _, _, mse_arr, ssim_arr, rpe_arr, pre_fire_count, tar_fire_count, mean_time = loop_testing_msessimrpe_1to1(test_data_tilt_2_pbar, m)
    mse_2_p_list.append(mse_arr)
    ssim_2_p_list.append(ssim_arr)
    rpe_2_p_list.append(rpe_arr)
    pre_count_2_p_list.append(pre_fire_count)
    tar_count_2_p_list.append(tar_fire_count)

In [None]:
mse_2_p_array = np.array(mse_2_p_list)
np.save(directory + 'Data/test_metric_data/Bear_data/mse_2_p_array.npy', mse_2_p_array)

ssim_2_p_array = np.array(ssim_2_p_list)
np.save(directory + 'Data/test_metric_data/Bear_data/ssim_2_p_array.npy', ssim_2_p_array)

rpe_2_p_array = np.array(rpe_2_p_list)
np.save(directory + 'Data/test_metric_data/Bear_data/rpe_2_p_array.npy', rpe_2_p_array)

In [None]:
mse_3_t_list = []
ssim_3_t_list = []
rpe_3_t_list = []
pre_count_3_t_list = []
tar_count_3_t_list = []

for n in tqdm(range(64)):
    m = 21*n + 6
    _, _, mse_arr, ssim_arr, rpe_arr, pre_fire_count, tar_fire_count, mean_time = loop_testing_msessimrpe_1to1(test_data_tilt_3_tbar, m)
    mse_3_t_list.append(mse_arr)
    ssim_3_t_list.append(ssim_arr)
    rpe_3_t_list.append(rpe_arr)
    pre_count_3_t_list.append(pre_fire_count)
    tar_count_3_t_list.append(tar_fire_count)

In [None]:
mse_3_t_array = np.array(mse_3_t_list)
np.save(directory + 'Data/test_metric_data/Bear_data/mse_3_t_array.npy', mse_3_t_array)

ssim_3_t_array = np.array(ssim_3_t_list)
np.save(directory + 'Data/test_metric_data/Bear_data/ssim_3_t_array.npy', ssim_3_t_array)

rpe_3_t_array = np.array(rpe_3_t_list)
np.save(directory + 'Data/test_metric_data/Bear_data/rpe_3_t_array.npy', rpe_3_t_array)

In [None]:
mse_4_pt_list = []
ssim_4_pt_list = []
rpe_4_pt_list = []
pre_count_4_pt_list = []
tar_count_4_pt_list = []
mean_time_list = []

for n in tqdm(range(64)):
    m = 21*n + 6
    _, _, mse_arr, ssim_arr, rpe_arr, pre_fire_count, tar_fire_count, mean_time = loop_testing_msessimrpe_1to1(test_data_tilt_4_ptbar, m)
    mse_4_pt_list.append(mse_arr)
    ssim_4_pt_list.append(ssim_arr)
    rpe_4_pt_list.append(rpe_arr)
    pre_count_4_pt_list.append(pre_fire_count)
    tar_count_4_pt_list.append(tar_fire_count)

In [None]:
mse_4_pt_array = np.array(mse_4_pt_list)
np.save(directory + 'Data/test_metric_data/Bear_data/mse_4_pt_array.npy', mse_4_pt_array)

ssim_4_pt_array = np.array(ssim_4_pt_list)
np.save(directory + 'Data/test_metric_data/Bear_data/ssim_4_pt_array.npy', ssim_4_pt_array)

rpe_4_pt_array = np.array(rpe_4_pt_list)
np.save(directory + 'Data/test_metric_data/Bear_data/rpe_4_pt_array.npy', rpe_4_pt_array)

In [None]:
mse_no_pt_list = []
ssim_no_pt_list = []
rpe_no_pt_list = []
pre_count_no_pt_list = []
tar_count_no_pt_list = []
mean_time_list = []

for n in tqdm(range(64)):
    m = 21*n + 6
    _, _, mse_arr, ssim_arr, rpe_arr, pre_fire_count, tar_fire_count, mean_time = loop_testing_msessimrpe_1to1(test_data_tilt_no_ptbar, m)
    mse_no_pt_list.append(mse_arr)
    ssim_no_pt_list.append(ssim_arr)
    rpe_no_pt_list.append(rpe_arr)
    pre_count_no_pt_list.append(pre_fire_count)
    tar_count_no_pt_list.append(tar_fire_count)

In [None]:
mse_no_pt_array = np.array(mse_no_pt_list)
np.save(directory + 'Data/test_metric_data/Bear_data/mse_no_pt_array.npy', mse_no_pt_array)

ssim_no_pt_array = np.array(ssim_no_pt_list)
np.save(directory + 'Data/test_metric_data/Bear_data/ssim_no_pt_array.npy', ssim_no_pt_array)

rpe_no_pt_array = np.array(rpe_no_pt_list)
np.save(directory + 'Data/test_metric_data/Bear_data/rpe_no_pt_array.npy', rpe_no_pt_array)