In [1]:
import mesa
import numpy as np
from enum import Enum
import uuid
import random

def new_uuid():
    # Generate a UUID
    unique_id = uuid.uuid4()

    # Convert the UUID to an integer
    int_uuid = int(unique_id.int)

    return int_uuid

In [2]:
# class FireAgent(mesa.Agent):
#     def __init__(self, unique_id, model, state: int):
#         super().__init__(unique_id, model)
#         # Agent może być w jednym z 6 stanów:
#         # 0 - brak pożaru
#         # 1 - wczesny ogień
#         # 2 - średni ogień
#         # 3 - pełny ogień
#         # 4 - ekstremalny ogień
#         # 5 - obszar spalony (obszar spalony nie może podpalić się po raz kolejny, gdyż całe paliwo zostało spalone) 
#         self.state = state
#         self.time_step = 0
#         self.type = "Fire"

#     def extinguised(self):
#         if self.time_step <= 0:
#             self.time_step = 0
#             self.state = 0
#             return True
#         else:
#             return False
        
#     def step(self):
#         if 0 < self.state < 5:
#             self.time_step += 0.5
#         cellmates = self.model.grid.get_cell_list_contents([self.pos])
#         firefighting_units = 0
#         for c in cellmates:
#             if c.type == "FireFighter":
#                 firefighting_units += 1
                
#         if self.state == 1:
#             self.time_step -= firefighting_units*2
#             if self.extinguised():
#                 return
#             if self.time_step >= 20:
#                 self.time_step = 0
#                 self.state = 2
                
#         elif self.state == 2:
#             self.time_step -= firefighting_units
#             if self.extinguised():
#                 return
#             if self.time_step >= 10:
#                 self.time_step = 0
#                 self.state = 3
                
#         elif self.state == 3:
#             self.time_step -= firefighting_units*0.5
#             if self.extinguised():
#                 return
#             neighborhood = self.model.grid.get_neighborhood(self.pos, moore=True, include_center=False)
#             for n in neighborhood:
#                 agents_n = self.model.grid.get_cell_list_contents([(n[0], n[1])])
#                 break_search = False
#                 for c in agents_n:
#                     if c.type == "Fire" and c.state == 0:
#                         c.state = 1
#                         c.time_step = 0
#                         break_search = True
#                         break
#                 if break_search:
#                     break
#             if self.time_step >= 10:
#                 self.time_step = 0
#                 self.state = 4
                
#         elif self.state == 4:
#             self.time_step -= firefighting_units*0.25
#             if self.extinguised():
#                 return
#             neighborhood = self.model.grid.get_neighborhood(self.pos, moore=True, include_center=False)
#             for n in neighborhood:
#                 agents_n = self.model.grid.get_cell_list_contents([(n[0], n[1])])
#                 for c in cellmates:
#                     if c.type == "Fire" and c.state < 4:
#                         c.state += 1
#                         c.time_step = 0
#                         break
#             if self.time_step >= 10:
#                 self.time_step = 0
#                 self.state = 5

# class FireFighterAgent(mesa.Agent):
#     def __init__(self, unique_id, model):
#         super().__init__(unique_id, model)
#         self.type = "FireFighter"
        
#     def step(self):
#         pass

In [2]:
class FireAgent(mesa.Agent):
    def __init__(self, unique_id, model, state: int):
        super().__init__(unique_id, model)
        # Agent może być w jednym z 6 stanów:
        # 0 - brak pożaru
        # 1 - wczesny ogień
        # 2 - średni ogień
        # 3 - pełny ogień
        # 4 - ekstremalny ogień
        # 5 - obszar spalony (obszar spalony nie może podpalić się po raz kolejny, gdyż całe paliwo zostało spalone) 
        self.state = state
        self.type = "Fire"
        self.fuel = 70
        self.time_in_state = 0
        self.max_time_in_step = 10

    def extinguised(self):
        if self.time_step <= 0:
            self.time_step = 0
            self.state = 0
            return True
        else:
            return False
        
    def check_if_burned(self):
        if self.fuel <= 0:
            self.fuel = 0
            self.state = 5
    
    def make_damage(self):
        if self.state == 1:
            self.fuel -= 0.5
        elif self.state == 2:
            self.fuel -= 1
        elif self.state == 3:
            self.fuel -= 2
        elif self.state == 4:
            self.fuel -= 4
    
    def apply_firefighters(self):
        cellmates = self.model.grid.get_cell_list_contents([self.pos])
        firefighting_units = 0
        for c in cellmates:
            if c.type == "FireFighter":
                firefighting_units += 1
        
        if self.state == 1:
            # one firefighter needed to decrease time_in_state during step
            self.time_in_state -= firefighting_units*1.5
        elif self.state == 2:
            # two firefighters needed to decrease time_in_state during step
            self.time_in_state -= firefighting_units*1
        elif self.state == 3:
            # three firefighters needed to decrease time_in_state during step
            self.time_in_state -= firefighting_units*0.5
        elif self.state == 4:
            # four firefighters needed to decrease time_in_state during step
            self.time_in_state -= firefighting_units*0.3

    def change_state(self):
        if self.time_in_state > self.max_time_in_step:
            if self.state < 5:
                self.state += 1
                self.time_in_state = 0
        elif self.time_in_state < 0:
            if self.state > 0:
                self.state -= 1
                self.time_in_state = self.max_time_in_step

    def get_neighborhood(self):
        neighborhood = self.model.grid.get_neighborhood(self.pos, moore=True, include_center=False)
        fire_agents = []
        for n in neighborhood:
            agents_n = self.model.grid.get_cell_list_contents([(n[0], n[1])])
            for c in agents_n:
                if c.type == "Fire":
                   fire_agents.append(c)
        return fire_agents

    def spread_fire(self):
        neighborhood = self.get_neighborhood()
        sampled_neighbours = []
        if self.state == 3:
            sampled_neighbours = random.sample(neighborhood, 1)
        if self.state == 4:
            sampled_neighbours = random.sample(neighborhood, 2)
        for n in sampled_neighbours:
            if n.state == 0:
                n.state = 1
                n.time_in_state = 0
            elif n.state < 5:
                n.time_in_state += 1

    def step(self):
        if 0 < self.state < 5:
            self.make_damage()
            self.apply_firefighters()
            self.change_state()
            self.spread_fire()
            self.time_in_state += 1
        self.check_if_burned()

class FireFighterAgent(mesa.Agent):
    def __init__(self, unique_id, model):
        super().__init__(unique_id, model)
        self.type = "FireFighter"
        
    def step(self):
        pass

In [3]:
class FireControllerAgentModel(mesa.Model):
    def __init__(self, width, height, fire_fighters, fire_agents_values: list):
        """
        width, height - rozmiary lasu (dla symulacji przyjmujemy rozmiar 10x10
        
        fire_fighters - ilość jednostek straży pożarnej, jakie FCA ma do dyspozycji
        
        fire_agents_values - lista wartości ze zbioru {1,2,3,4} określająca początkowe wartości pożaru w losowo wybranych komórkach.
        Maksymalna długość listy to 100 (tyle ile mamy komórek w siatce 10x10). Przykładowo, dla listy [1,1,3,2], cztery losowo 
        wybrane komórki zostaną zainicjalizowane właśnie takimi wartościami poziomu pożaru
        """
        self.width = width
        self.height = height
        self.fire_fighters = fire_fighters
        self.grid = mesa.space.MultiGrid(width=width, height=height, torus=False)
        self.schedule = mesa.time.RandomActivation(self)
        self.firefighters_exposure_to_fire = 0

        all_indices = [(i, j) for i in range(width) for j in range(height)]
        fire_cells = random.sample(all_indices, len(fire_agents_values))
        no_fire_cells = [index for index in all_indices if index not in fire_cells]
        for i, index in enumerate(fire_cells):
            a = FireAgent(new_uuid(), self, fire_agents_values[i])
            self.grid.place_agent(a, index)
            self.schedule.add(a)
        for i, index in enumerate(no_fire_cells):
            a = FireAgent(new_uuid(), self, 0)
            self.grid.place_agent(a, index)
            self.schedule.add(a)

    def step(self):
        self.assign_firefighters_to_grid()
        self.firefighters_exposure_to_fire += self.get_firefighters_exposure()
        self.schedule.step()

    def set_fire_fighters(self, fire_fighters):
        self.fire_fighters = fire_fighters

    def print_grid(self):
        values = np.zeros((self.width, self.height))
        for x in range(self.width):
            for y in range(self.height):
                fire = self.get_fire_agent(x, y)
                fighters = self.get_firefighter_agent(x, y)
                values[x,y] = fire.state + len(fighters)*10
        print(values)

    def get_total_fire_level(self):
        result = 0
        for x in range(self.width):
            for y in range(self.height):
                fire = self.get_fire_agent(x, y)
                result += fire.state if fire.state < 5 else 0
        return result

    def get_total_burned_level(self):
        result = 0
        for x in range(self.width):
            for y in range(self.height):
                fire = self.get_fire_agent(x, y)
                result += fire.state if fire.state == 5 else 0
        return result
    
    def get_remaining_fuel(self):
        result = 0
        for x in range(self.width):
            for y in range(self.height):
                fire = self.get_fire_agent(x, y)
                result += fire.fuel
        return result

    def get_fire_agent(self, x, y):
        agents = self.grid.get_cell_list_contents([(x,y)])
        for a in agents:
            if a.type == "Fire":
                return a
            
    def get_firefighter_agent(self, x, y):
        agents = self.grid.get_cell_list_contents([(x,y)])
        result = []
        for a in agents:
            if a.type == "FireFighter":
                result.append(a)
        return result
    
    def get_all_firefighters(self):
        result = []
        for x in range(self.width):
            for y in range(self.height):
                result += self.get_firefighter_agent(x, y)
        return result
    
    def get_firefighters_exposure(self):
        result = 0
        for x in range(self.width):
            for y in range(self.height):
                all_agents = self.grid.get_cell_list_contents([(x,y)])
                firefighters = len(all_agents)-1 #number of firefighters in cell is equal to all agents minus fire agent
                result += firefighters*self.get_fire_agent(x, y).state
        return result

    def delete_firefighters(self):
        for x in range(self.width):
            for y in range(self.height):
                agents = self.grid.get_cell_list_contents([(x,y)])
                for a in agents:
                    if a.type == "FireFighter":
                        self.schedule.remove(a)
                        self.grid.remove_agent(a)
        
    def assign_firefighters_to_grid(self):
        self.delete_firefighters()
        values = np.zeros((self.width, self.height))
        for x in range(self.width):
            for y in range(self.height):
                fire = self.get_fire_agent(x, y)
                values[x,y] = fire.state if fire.state < 5 else 0
                
        # Get the indices in descending order of values
        indices = np.argsort(values.ravel())[::-1]
        # Convert the 1D indices to 2D indices
        row_indices, col_indices = np.unravel_index(indices, values.shape)
        # Filter indices where the value is greater than zero
        non_zero_indices = (values[row_indices, col_indices] > 0).nonzero()
        # Get the final indices
        sorted_indices = []
        for x, y in zip(row_indices[non_zero_indices], col_indices[non_zero_indices]):
            sorted_indices.append((x,y))

        available_firefighters = self.fire_fighters
        while available_firefighters > 0:
            for index in sorted_indices:
                x, y = index
                if self.get_fire_agent(x,y).state == 4:
                    new_firefighters = min(4, available_firefighters)
                elif self.get_fire_agent(x,y).state == 3:
                    new_firefighters = min(3, available_firefighters)
                elif self.get_fire_agent(x,y).state == 2:
                    new_firefighters = min(2, available_firefighters)
                else:
                    new_firefighters = min(1, available_firefighters) 
                for _ in range(new_firefighters):
                    a = FireFighterAgent(new_uuid(), self)
                    self.grid.place_agent(a, (x, y))
                    self.schedule.add(a)
                available_firefighters -= new_firefighters
                if available_firefighters == 0:
                    break
        

In [138]:
starter_model = FireControllerAgentModel(width=10, height=10, fire_fighters=30, fire_agents_values=[1,1,3,4,4,4,2,2,2,1,1,1,3])
#print(70*100)
i = 0
while starter_model.get_total_fire_level() > 0:
    starter_model.step()
    fuel = starter_model.get_remaining_fuel()
    exposure = starter_model.firefighters_exposure_to_fire
    print(f"step: {i}, fuel: {fuel}, exposure: {exposure}, fire:{starter_model.get_total_fire_level()}")
    i += 1

print(starter_model.get_remaining_fuel()-starter_model.firefighters_exposure_to_fire)

step: 0, fuel: 6978.0, exposure: 87, fire:19
step: 1, fuel: 6966.5, exposure: 159, fire:18
step: 2, fuel: 6956.0, exposure: 232, fire:19
step: 3, fuel: 6944.5, exposure: 304, fire:18
step: 4, fuel: 6932.5, exposure: 377, fire:18
step: 5, fuel: 6920.5, exposure: 447, fire:18
step: 6, fuel: 6910.0, exposure: 517, fire:11
step: 7, fuel: 6904.5, exposure: 565, fire:11
step: 8, fuel: 6899.0, exposure: 613, fire:11
step: 9, fuel: 6893.5, exposure: 661, fire:5
step: 10, fuel: 6891.0, exposure: 691, fire:3
step: 11, fuel: 6889.5, exposure: 721, fire:0
6168.5


In [4]:
"""
def agents_portrayal(agent):
    if agent is None:
        return

    portrayal = {}

    if type(agent) is FireAgent:
        if agent.state == 0:
            portrayal["Color"] = "green"
            portrayal["Shape"] = "rect"
            portrayal["Filled"] = "true"
            portrayal["Layer"] = 0
            portrayal["w"] = 1
            portrayal["h"] = 1
        elif agent.state == 1:
            portrayal["Color"] = "yellow"
            portrayal["Shape"] = "rect"
            portrayal["Filled"] = "true"
            portrayal["Layer"] = 0
            portrayal["w"] = 1
            portrayal["h"] = 1
        elif agent.state == 2:
            portrayal["Color"] = "#FFCE33"
            portrayal["Shape"] = "rect"
            portrayal["Filled"] = "true"
            portrayal["Layer"] = 0
            portrayal["w"] = 1
            portrayal["h"] = 1
        elif agent.state == 3:
            portrayal["Color"] = "#FF9933"
            portrayal["Shape"] = "rect"
            portrayal["Filled"] = "true"
            portrayal["Layer"] = 0
            portrayal["w"] = 1
            portrayal["h"] = 1
        elif agent.state == 4:
            portrayal["Color"] = "red"
            portrayal["Shape"] = "rect"
            portrayal["Filled"] = "true"
            portrayal["Layer"] = 0
            portrayal["w"] = 1
            portrayal["h"] = 1
        elif agent.state == 5:
            portrayal["Color"] = "black"
            portrayal["Shape"] = "rect"
            portrayal["Filled"] = "true"
            portrayal["Layer"] = 0
            portrayal["w"] = 1
            portrayal["h"] = 1
    elif type(agent) is FireFighterAgent:
        portrayal["Color"] = "blue"
        portrayal["Shape"] = "rect"
        portrayal["Filled"] = "true"
        portrayal["Layer"] = 0
        portrayal["w"] = 1
        portrayal["h"] = 1

    return portrayal

model_params = {
    "width": 10,
    "height": 10, 
    "fire_fighters": 1, 
    "fire_agents_values": [2,2,2,2,3,2,3]
}

grid = mesa.visualization.CanvasGrid(agents_portrayal, 10, 10, 500, 500)

server = mesa.visualization.ModularServer(
    FireControllerAgentModel, [grid], "Fire forest - model", model_params)
server.port = 8522
server.launch()
"""

'\ndef agents_portrayal(agent):\n    if agent is None:\n        return\n\n    portrayal = {}\n\n    if type(agent) is FireAgent:\n        if agent.state == 0:\n            portrayal["Color"] = "green"\n            portrayal["Shape"] = "rect"\n            portrayal["Filled"] = "true"\n            portrayal["Layer"] = 0\n            portrayal["w"] = 1\n            portrayal["h"] = 1\n        elif agent.state == 1:\n            portrayal["Color"] = "yellow"\n            portrayal["Shape"] = "rect"\n            portrayal["Filled"] = "true"\n            portrayal["Layer"] = 0\n            portrayal["w"] = 1\n            portrayal["h"] = 1\n        elif agent.state == 2:\n            portrayal["Color"] = "#FFCE33"\n            portrayal["Shape"] = "rect"\n            portrayal["Filled"] = "true"\n            portrayal["Layer"] = 0\n            portrayal["w"] = 1\n            portrayal["h"] = 1\n        elif agent.state == 3:\n            portrayal["Color"] = "#FF9933"\n            portrayal[

In [5]:
class QLearning():
    def __init__(self):
        self.width = 10
        self.height = 10
        self.fire_fighters = 51
        self.actions = [i for i in range(self.fire_fighters)]
        self.states = [20*(i+1) for i in range(25)]
        self.q_table = np.zeros((len(self.states), len(self.actions)))

    def run(self, alpha=0.1, gamma=0.9, epsilon=0.6, epsilon_decay=0.2, episodes=1000):
        for episode in range(episodes):
            if episode%100 == 0:
                print(f"Episode {episode}")
            # generated initial state shouldn't take into account burned places
            state = random.randint(0,len(self.states)-1)
            while self.states[state] > 400:
                state -=1
            total_fire = 400
            if state == 0:
                fire_list = self.generate_fire_list(1, self.states[state])
            else:
                fire_list = self.generate_fire_list(self.states[state-1]+1, self.states[state])
            self.model = FireControllerAgentModel(width=self.width, height=self.height, 
                                         fire_fighters=self.fire_fighters, fire_agents_values=fire_list)
            while total_fire > 0:
                if np.random.rand() < epsilon:
                    action = np.random.randint(0,len(self.actions)-1)
                else:
                    action = np.argmax(self.q_table[state])
                    
                next_state, reward, total_fire = self.take_action(state, action)
                self.q_table[state, action] = (1 - alpha) * self.q_table[state, action] + \
                                alpha * (reward + gamma * np.max(self.q_table[next_state]))
                state = next_state
            epsilon *= epsilon_decay

    def fire_level_to_state(self, fire_level):
        result = 0
        for i in range(len(self.states)):
            if fire_level <= self.states[i]:
                result = i
                break
        return result

    def take_action(self, state, action):
        self.model.set_fire_fighters(action)
        while True:
            self.model.step()
            total_fire = self.model.get_total_fire_level()
            if total_fire == 0:
                next_state = 0
                break
            next_state = self.fire_level_to_state(total_fire)
            if next_state != state:
                break
        reward = self.model.get_remaining_fuel()-self.model.firefighters_exposure_to_fire
        return next_state, reward, total_fire
                
    def generate_fire_list(self, range_start, range_end, max_length=100):
        digit_list = []
        
        for i in range(max_length):
            digit = random.randint(1, 4)
            digit_list.append(digit)
            if sum(digit_list) >= range_end:
                digit_list.pop()
                break
            if range_start < sum(digit_list) <= range_end:
                if random.random()<0.3:
                    break
        i = 0
        while sum(digit_list) < range_start+1:
            new_val = 4-digit_list[i]
            digit_list[i] += new_val
            i += 1
        return digit_list
        

In [6]:
q = QLearning()
q.run(episodes=1500, epsilon_decay=0.6, epsilon=0.9, alpha=0.55, gamma=0.9)

Episode 0
Episode 100
Episode 200
Episode 300
Episode 400
Episode 500
Episode 600
Episode 700
Episode 800
Episode 900
Episode 1000
Episode 1100
Episode 1200
Episode 1300
Episode 1400


In [7]:
for i in range(q.q_table.shape[0]):
    print(q.q_table[i])

[-3380.02463867 -3072.66362411 -3537.14129733 -3622.25862805
 -3478.76394844 -3472.87032658 -3692.03284586 -4018.15407278
 -3475.87913342 -3659.82097886 -3551.1559124  -3340.15147707
 -3767.79498926 -3782.78693375 -4936.9473417  -1081.08129718
 -3186.35084754 -4215.778215   -3436.13727846 -3578.4405345
 -3497.91761    -4923.35898716 -3969.88333815 -3486.12517679
 -3030.31073131 -3813.51868428 -3707.6458925  -4669.62718671
 -4913.20100932 -3154.06943456 -3180.99590054 -3946.36359677
 -4926.70725803 -3154.13824366 -4024.91526604 -4317.54865777
 -3256.24231143 -3480.08599813 -4411.439693   -4677.15268441
 -5544.15389646 -5497.73582184 -5027.98863645 -4495.73895401
 -3571.95457569 -3162.06778999 -4485.40282437 -3417.45263653
 -3401.27827875 -3558.00761719 -3936.23919968]
[-2629.34915222 -2742.81987312 -2991.79266356 -2636.84733363
 -3248.24805097 -3295.99103221 -3271.13270591 -3236.40566622
 -3044.41843706 -3093.98115451 -2614.01180779 -4050.07465892
 -2708.01768237 -3713.78013157 -2677.27

In [8]:
class TrainedModel(FireControllerAgentModel):
    def __init__(self, fire_agents_values: list, q_table):
        super().__init__(width=10, height=10, fire_fighters=0, fire_agents_values=fire_agents_values)
        self.q_table = q_table
        self.actions = [i for i in range(51)]
        self.states = [20*(i+1) for i in range(25)]
        self.fire_fighters = self.calculate_next_firefighters()
        self.assign_firefighters_to_grid()

    def calculate_next_firefighters(self):
        fire_level = self.get_total_fire_level()
        state = 0
        for i in range(len(self.states)):
            if fire_level <= self.states[i]:
                state = i
                break
        fire_fighters = np.argmax(self.q_table[state])
        return fire_fighters
        
    def step(self):
        self.assign_firefighters_to_grid()
        self.firefighters_exposure_to_fire += self.get_firefighters_exposure()
        self.schedule.step()
        self.fire_fighters = self.calculate_next_firefighters()
    

In [155]:
trained_model = TrainedModel([2,2,2,2,3,2,3], q.q_table)

i = 0
while trained_model.get_total_fire_level() > 0:
    trained_model.step()
    fuel = trained_model.get_remaining_fuel()
    exposure = trained_model.firefighters_exposure_to_fire
    print(f"step: {i}, fuel: {fuel}, exposure: {exposure}, fire:{trained_model.get_total_fire_level()}, firefighters: {trained_model.fire_fighters}")
    i += 1

print(trained_model.get_remaining_fuel()-trained_model.firefighters_exposure_to_fire)
print(f"Total burned: {trained_model.get_total_burned_level()}")

step: 0, fuel: 6991, exposure: 117, fire:9, firefighters: 49
step: 1, fuel: 6986.5, exposure: 190, fire:7, firefighters: 49
step: 2, fuel: 6983.0, exposure: 239, fire:2, firefighters: 49
step: 3, fuel: 6982.0, exposure: 288, fire:0, firefighters: 49
6694.0
Total burned: 0
