<a href="https://colab.research.google.com/github/matanaaa14/AI_task2_matan_and_gal/blob/main/bellman_f.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import heapq
import numpy as np
import random
from tabulate import tabulate

class GridGameMBRL:
    def __init__(self, rows, cols, rewards, success_probability, costs, walls, gamma=0.5):
        self.rows = rows
        self.cols = cols
        self.rewards = rewards
        self.success_probability = success_probability
        self.costs = costs
        self.walls = walls
        self.gamma = gamma
        self.value_table = np.zeros((rows, cols))
        self.policy = np.zeros((rows, cols), dtype=int)
        self.actions = [(0, 1), (0, -1), (1, 0), (-1, 0)]
        self.safety_distance = 2  # Safety distance to negative rewards

    def is_valid_location(self, location):
        return (0 <= location[0] < self.rows and
                0 <= location[1] < self.cols and
                location not in self.walls)

    def get_next_state(self, state, action):
        move = self.actions[action]
        next_state = [state[0] + move[0], state[1] + move[1]]

        if not self.is_valid_location(next_state):
            next_state = state

        # Check if next state is within safety distance of any negative reward
        if self.success_probability < 0.5:
            for dr in range(-self.safety_distance, self.safety_distance + 1):
                for dc in range(-self.safety_distance, self.safety_distance + 1):
                    neighbor_row = next_state[0] + dr
                    neighbor_col = next_state[1] + dc
                    if (0 <= neighbor_row < self.rows and
                        0 <= neighbor_col < self.cols and
                        self.rewards[neighbor_row][neighbor_col] < 0):
                        # Avoid moving towards negative reward if success_probability is low
                        if random.uniform(0, 1) > self.success_probability:
                            return state  # Stay in the current state

        return next_state

    def value_iteration(self, theta=0.0001):
        priority_queue = []
        for row in range(self.rows):
            for col in range(self.cols):
                if [row, col] not in self.walls:
                    heapq.heappush(priority_queue, (0, (row, col)))

        while priority_queue:
            delta, (row, col) = heapq.heappop(priority_queue)
            old_value = self.value_table[row, col]
            new_value = float('-inf')
            best_action = None

            for action in range(4):
                action_value = 0
                main_prob = self.success_probability
                side_prob = (1 - main_prob) / 2

                main_action = action
                left_action = (action + 1) % 4
                right_action = (action - 1) % 4

                next_state_main = self.get_next_state([row, col], main_action)
                next_state_left = self.get_next_state([row, col], left_action)
                next_state_right = self.get_next_state([row, col], right_action)

                reward = self.rewards[row][col] + self.costs[row][col]

                main_value = reward + self.gamma * self.value_table[next_state_main[0], next_state_main[1]]
                left_value = reward + self.gamma * self.value_table[next_state_left[0], next_state_left[1]]
                right_value = reward + self.gamma * self.value_table[next_state_right[0], next_state_right[1]]

                action_value += main_prob * main_value
                action_value += side_prob * left_value
                action_value += side_prob * right_value

                if action_value > new_value:
                    new_value = action_value
                    best_action = action

            self.value_table[row, col] = new_value
            self.policy[row, col] = best_action
            diff = abs(old_value - new_value)

            if diff > theta:
                for action in range(4):
                    next_state = self.get_next_state([row, col], action)
                    if self.is_valid_location(next_state):
                        heapq.heappush(priority_queue, (-diff, tuple(next_state)))

    def evaluate_policy(self, episodes=10000, max_steps=100, epsilon=0.1):
        total_return = 0
        for episode in range(episodes):
            state = [0, 0]
            episode_return = 0
            step = 0
            while step < max_steps:
                if random.uniform(0, 1) < epsilon:
                    action = random.choice(range(4))
                else:
                    action = self.policy[state[0], state[1]]
                next_state = self.get_next_state(state, action)
                reward = self.rewards[state[0]][state[1]] + self.costs[state[0]][state[1]]
                episode_return += reward
                if self.rewards[state[0]][state[1]] != 0 and self.costs[state[0]][state[1]] <= 0:
                    break  # Terminate the episode at a terminal state with non-positive cost
                state = next_state
                step += 1
            total_return += episode_return
        return total_return / episodes

    def print_policy(self):
        direction_mapping = ['→', '←', '↓', '↑']
        policy = [[None for _ in range(self.cols)] for _ in range(self.rows)]
        for row in range(self.rows):
            for col in range(self.cols):
                if [row, col] in self.walls:
                    policy[row][col] = 'W'  # Wall
                elif self.rewards[row][col] > 0:
                    policy[row][col] = 'P'  # Positive reward
                elif self.rewards[row][col] < 0:
                    policy[row][col] = 'N'  # Negative reward
                else:
                    best_action = self.policy[row, col]
                    policy[row][col] = direction_mapping[best_action]
        for row in policy:
            print(" ".join(row))

    def print_utilities(self):
        utilities_table = []
        for row in range(self.rows):
            row_values = []
            for col in range(self.cols):
                if [row, col] in self.walls:
                    row_values.append("W")  # Wall
                elif self.rewards[row][col] > 0:
                    row_values.append("{:.2f}".format(self.rewards[row][col]))  # Positive reward value
                elif self.rewards[row][col] < 0:
                    row_values.append("{:.2f}".format(self.rewards[row][col]))  # Negative reward value
                else:
                    row_values.append("{:.2f}".format(self.value_table[row][col]))  # Utility value
            utilities_table.append(row_values)

        print("\nUtilities:")
        print(tabulate(utilities_table, tablefmt="fancy_grid", numalign="center", stralign="center"))

def run_test_case(w, h, L, p, r):
    # Initialize the rewards, costs, and walls
    rewards = [[0 for _ in range(w)] for _ in range(h)]
    costs = [[r for _ in range(w)] for _ in range(h)]
    walls = []

    for x, y, value in L:
        if value == 0:
            walls.append([h - y - 1, x])  # Convert (x, y) to (row, col) with (0,0) at lower-left
        else:
            rewards[h - y - 1][x] = value

    # Define success probabilities
    success_probability = p

    # Create the GridGameMBRL instance
    game_mbrl = GridGameMBRL(h, w, rewards, success_probability, costs, walls)

    # Perform value iteration to find the best policy
    import time
    start_time = time.time()
    game_mbrl.value_iteration()
    end_time = time.time()

    # Evaluate the policy
    policy_score = game_mbrl.evaluate_policy(episodes=1000, max_steps=100, epsilon=0.1)
    print("Policy Score (Value Iteration):", policy_score)
    print(f"Value Iteration Time: {end_time - start_time:.4f} seconds")

    # Print the policy and utilities for visual comparison
    print("\nPolicy:")
    game_mbrl.print_policy()

    # Print utilities
    game_mbrl.print_utilities()

def main():
    test_cases = [
        (4, 3, [(1,1,0),(3,2,1),(3,1,-1)], 0.8, -0.04),
        (4, 3, [(1,1,0),(3,2,1),(3,1,-1)], 0.8, 0.04),
        (4, 3, [(1,1,0),(3,2,1),(3,1,-1)], 0.8, -1),
        (12, 4, [(1,0,-100),(2,0,-100),(3,0,-100),(4,0,-100),(5,0,-100),(6,0,-100),(7,0,-100),(8,0,-100),(9,0,-100),(10,0,-100),(11,0,1)], 1, -1),
        (12, 6, [(1,0,-100),(2,0,-100),(3,0,-100),(4,0,-100),(5,0,-100),(6,0,-100),(7,0,-100),(8,0,-100),(9,0,-100),(10,0,-100),(11,0,1)], 0.9, -1),
        (5, 5, [(4,0,-10),(0,4,-10),(1,1,1),(3,3,2)], 0.9, -0.5),
        (5, 5, [(2,2,-2),(4,4,-1),(1,1,1),(3,3,2)], 0.9, -0.25),
        (7, 7, [(1,1,-4),(1,5,-6),(5,1,1),(5,5,4)], 0.8, -0.5),
        (7, 7, [(1,1,-4),(1,5,-6),(5,1,1),(5,5,4)], 0.8, -0.5),
        (7, 7, [(3,1,0),(3,5,0),(1,1,-4),(1,5,-6),(5,1,1),(5,5,4)], 0.8, -0.25)
    ]

    for i, (w, h, L, p, r) in enumerate(test_cases):
        print(f"\nRunning Test Case {i + 1}")
        run_test_case(w, h, L, p, r)

if __name__ == "__main__":
    main()



Running Test Case 1
Policy Score (Value Iteration): 0.8222000000000033
Value Iteration Time: 0.0041 seconds

Policy:
→ → → P
↑ W ↑ N
↑ → ↑ ←

Utilities:
╒═══════╤═══════╤══════╤═══════╕
│ 0.08  │ 0.27  │ 0.74 │   1   │
├───────┼───────┼──────┼───────┤
│ -0.01 │   W   │ 0.24 │  -1   │
├───────┼───────┼──────┼───────┤
│ -0.05 │ -0.02 │ 0.06 │ -0.02 │
╘═══════╧═══════╧══════╧═══════╛

Running Test Case 2
Policy Score (Value Iteration): 93.21500000000002
Value Iteration Time: 0.0050 seconds

Policy:
→ → → P
↑ W ↑ N
↑ → ↑ ←

Utilities:
╒══════╤══════╤══════╤══════╕
│ 0.24 │ 0.43 │ 0.9  │  1   │
├──────┼──────┼──────┼──────┤
│ 0.15 │  W   │ 0.4  │  -1  │
├──────┼──────┼──────┼──────┤
│ 0.11 │ 0.14 │ 0.22 │ 0.14 │
╘══════╧══════╧══════╧══════╛

Running Test Case 3
Policy Score (Value Iteration): -3.433
Value Iteration Time: 0.0109 seconds

Policy:
→ → → P
↑ W ↑ N
↑ → ↑ ←

Utilities:
╒═══════╤═══════╤═══════╤═══════╕
│ -1.84 │ -1.65 │ -1.18 │   1   │
├───────┼───────┼───────┼───────┤
│ -1.93 