In [4]:
import numpy as np
from copy import deepcopy
from math import exp, sqrt
import matplotlib.pyplot as plt
import matplotlib.animation as animation

In [27]:
def gaussian_mixture_field(size, n_components=None):
    x, y = np.meshgrid(np.linspace(0, 1, size), np.linspace(0, 1, size))
    field = np.zeros((size, size))

    if not n_components:
        n_components = np.random.poisson(20)

    for _ in range(n_components):
        cx, cy = np.random.uniform(0, 1, 2) # random center
        sx, sy = np.random.uniform(0.01, 0.2, 2) # random covariance scale
        w = np.random.exponential(10) # weight scale

        gaussian = w * np.exp(-(((x - cx) ** 2) / (2 * sx**2) +
                                ((y - cy) ** 2) / (2 * sy**2)))
        field += gaussian

    field = (1 - (field - field.min()) / (field.max() - field.min()))*2
    return field

In [None]:
class Simulator():
    def __init__(self, size=256, wind_speed=0, wind_direction=[0,0], response_rate=0.1, response_start=20, base_spread_rate=0.3, n_components=None, decay_rate=1e-3):
        self.size = size
        self.map = np.zeros((size, size))
        self.wind_speed = wind_speed
        self.wind_direction = wind_direction
        self.response_rate = response_rate
        self.response_start = response_start
        self.spread_rate = base_spread_rate
        self.time = 0
        self.decay_rate = decay_rate
        self.maps = {}

        self.terrain = gaussian_mixture_field(size, n_components=n_components)

    def step(self):
        new_map = deepcopy(self.map)

        for i in range(self.size):
            for j in range(self.size):
                if self.map[i, j] >= 1:
                    if np.random.rand() < self.spread_rate*self.terrain[i, j]*np.exp(-self.decay_rate * self.time) and new_map[i, j] < 5:
                        new_map[i, j] += 1
                    
                    for di in [-1, 0, 1]:
                        for dj in [-1, 0, 1]:
                            if di == 0 and dj == 0:
                                continue

                            ni, nj = i + di, j + dj
                            spread_chance = self.spread_rate*self.map[i,j]

                            if 0 <= ni < self.size and 0 <= nj < self.size:
                                if self.wind_speed > 0:
                                    wind_influence = (di * self.wind_direction[0] + dj * self.wind_direction[1]) / (np.linalg.norm(self.wind_direction) + 1e-6)
                                    wind_influence *= np.random.normal(1, 0.5)


                                    if wind_influence > 0:
                                        spread_chance *= (1 + self.wind_speed * wind_influence)
                                    spread_chance *= self.terrain[ni, nj]
                                    spread_chance *= np.exp(-self.decay_rate * self.time)
                                    spread_chance = np.clip(spread_chance, 0, 1)

                                if np.random.rand() < spread_chance and new_map[ni, nj] <= new_map[i, j]:
                                    if new_map[ni, nj] < 5:
                                        if np.random.rand() <= exp(-self.time/1000):
                                            new_map[ni, nj] += 1

                                if self.time >= self.response_start and new_map[ni, nj] == 0:
                                    if np.random.rand() < 1 - exp(-(self.response_rate*(0.5+self.terrain[i,j]) * (self.time - self.response_start))):
                                        if new_map[i, j] > 0:
                                            new_map[i, j] -= 1 # Firefighting effort
                            if np.exp(-self.decay_rate * self.time) < 0.5:
                                if ni < 0 or ni >= self.size or nj < 0 or nj >= self.size:
                                    if np.random.rand() < 1 - exp(-(self.response_rate) * (self.time - self.response_start)):
                                        if new_map[i, j] > 0:
                                            new_map[i, j] -= 1 # Edge effect
                    
                else:
                    if 1 < i < self.size - 1 and 1 < j < self.size - 1:
                        neighbors_on_fire = np.sum(self.map[i-1:i+2, j-1:j+2] >= 1) - (1 if self.map[i, j] >= 1 else 0)
                        if neighbors_on_fire >= 6 and new_map[i, j] == 0:
                            new_map[i, j] += 1
        
        self.maps[self.time] = deepcopy(self.map)
        self.map = new_map
        self.time += 1

    def simulate(self):
        nodes = np.random.poisson(3)

        x_init, y_init = np.random.randint(0, self.size, size=2)
        self.map[x_init, y_init] = np.random.poisson(3)

        for _ in range(nodes - 1):
            while True:
                x, y = np.random.randint(-20, 21, size=2)
                if x_init+x < self.size and y_init+y < self.size:
                    break
                
            if self.map[x_init+x, y_init+y] == 0:
                self.map[x_init+x, y_init+y] = np.random.poisson(3)
                break
    
        while np.any(self.map > 0):
            self.step()

In [29]:
simulator = Simulator(size=256, wind_speed=1.5, wind_direction=[1,2], response_rate=0.05, response_start=100, base_spread_rate=0.05)
simulator.simulate()

In [30]:
def make_heatmap_gif(simulator, filename="simulation.gif", cmap="plasma"):
    times = sorted(simulator.maps.keys())
    frames = [simulator.maps[t] for t in times]

    fig, ax = plt.subplots()

    # --- fire as background ---
    vmin, vmax = np.min(frames), np.max(frames)
    fire_img = ax.imshow(frames[0], cmap=cmap, vmin=vmin, vmax=vmax)

    # --- terrain overlay ---
    terrain_img = ax.imshow(simulator.terrain, cmap="Greens", alpha=0.2)  # low alpha on top

    cbar = fig.colorbar(fire_img, ax=ax)
    cbar.set_label("Fire Intensity", rotation=270, labelpad=15)

    def update(frame):
        fire_img.set_data(frame)    
        return [fire_img, terrain_img]

    ani = animation.FuncAnimation(
        fig, update, frames=frames, interval=60, blit=True
    )

    ani.save(filename, writer="pillow")
    plt.close(fig)

make_heatmap_gif(simulator, "fire.gif")

In [1]:
# One approach could be using GNN's, which would result in a scalable network to different grid sizes
# Another is to use a transformer, easier to train
# I'm going to try a Graph Attention Network.

In [3]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import time
from scipy.sparse import lil_matrix
import os

In [None]:
def adjacency_matrix(length, width):
    N = length * width
    adj = lil_matrix((N, N), dtype=np.float32)
    directions = [(-1,0), (-1,1), (0,1), (1,1), (1,0), (1,-1), (0,-1), (-1,-1), (0,0)] # 8 sided
    for i in range(N):
        x, y = divmod(i, width)
        for dx, dy in directions:
            nx, ny = x+dx, y+dy
            if 0 <= nx < length and 0 <= ny < width:
                j = nx*width + ny
                adj[i,j] = 1
    return adj.tocoo()

In [None]:
class FireGraph(Dataset):
    def __init__(self, length=256, width=256, path="/simulation_data"):
        self.path = path
        self.length = length
        self.width = width


        adj = adjacency_matrix(self.length, self.width)
        self.adjacency_matrix = torch.sparse_coo_tensor(
            indices=torch.tensor(np.vstack((adj.row, adj.col)), dtype=torch.long),
            values=torch.tensor(adj.data, dtype=torch.float32),
            size=adj.shape
        )

        self.data = []

        self.save_dir = os.path.join(self.path, f"{self.length}x{self.width}")
        os.makedirs(self.save_dir, exist_ok=True)

    def generate_data(self, topology:np.array=None, past_info:np.array=None, wind_direction:np.array=np.array([0,0]),
                         wind_speed:int=0, time:int=0, label:np.array=None):
        
        flat_topo = topology.ravel()
        flat_info = past_info[:, :, 0].ravel()
        flat_info_date = past_info[:, :, 1].ravel()
        flat_label = label.ravel()
        
        data = np.stack([
            flat_topo,
            flat_info,
            flat_info_date,
            np.full(flat_topo.shape, wind_direction[0], dtype=np.float32),
            np.full(flat_topo.shape, wind_direction[1], dtype=np.float32),
            np.full(flat_topo.shape, wind_speed, dtype=np.float32),
            np.full(flat_topo.shape, time, dtype=np.float32),
            flat_label
        ], axis=1)

        return data
    
    def save_data(self, data):
        np.save(os.path.join(self.save_dir, f"{time.time():.0f}.npy"), data)
    
    def generate_dataset(self):
        for file in os.listdir(self.save_dir):
            if file.endswith(".npy"):
                self.data.append(np.load(os.path.join(self.save_dir, file)))

    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        return torch.from_numpy(self.data[idx]).float()

In [None]:
class BelieverModel(nn.Module):
    def __init__(self, nodes=256*256, input_features=8, num_layers=3, num_heads=3, num_features_per_head=4, num_output_classes=5):
        self.leakyrelu = nn.LeakyReLU(0.2)
        self.relu = nn.ReLU()
        self.N = nodes

        self.initial_transformation = nn.Parameter(torch.zeros(size=(input_features, num_features_per_head*num_heads)))
        nn.init.xavier_normal(self.initial_transformation.data)

        self.layers = []
        for i in range(num_layers):
            attention_vector = nn.Parameter(torch.zeros(size=(2*num_features_per_head*num_heads, num_heads, 1)))
            nn.init.xavier_uniform(attention_vector.data)

            W = nn.Parameter(torch.zeros(size=(num_features_per_head*num_heads, num_heads, num_features_per_head)))
            nn.init.xavier_normal(W.data)

            layer = { 
                "a" : attention_vector,
                "W" : W 
                }
            
            self.layers.append(layer)
        
        self.final_transformation = nn.Parameter(torch.zeros(num_heads*num_features_per_head, num_output_classes))
    
    def forward(self, x, adj):
        # adj = (N, N) adjacency matrix
        # x = inputs (N, F_inputs)

        x = self.relu(torch.chain_matmul(adj, x, self.initial_transformation)) # (N, F_in)

        for _, layer in enumerate(self.layers):
            outputs = []
            for __, head in enumerate(layer):
                W = head["W"]                       # (F_in, F_out)
                a = head["a"]                       # (2*F_out, 1)

                # compute pairwise attention scores 
                # expand for all pairs (i,j)
                row, col = adj.indices()

                x_i = x[row]  # (E, F_out), E is the number of viable pairs.
                x_j = x[col]  # (E, F_out)
                attn_input = torch.cat([x_i, x_j], dim=1)  # (E, 2*F_out)

                e = self.leakyrelu(attn_input @ a).squeeze(-1) # (E, ) # vector of attention scores

                # need to implement softmax, then the rest is easy




                # linear transformation
                h = x @ W                           # (N, F_out)

                # adj@self.relu(attention scores @ W @ x)

            x = torch.cat(outputs, dim=-1)  # concat heads, (N, num_heads*F_out)



                

