<a href="https://colab.research.google.com/github/matanaaa14/ai_task2/blob/main/bellman.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [24]:
import numpy as np
import random

class GridGameMBRL:
    def __init__(self, rows, cols, rewards, success_probability, costs, walls, gamma=0.9):
        self.rows = rows
        self.cols = cols
        self.rewards = rewards
        self.success_probability = success_probability
        self.costs = costs
        self.walls = walls
        self.gamma = gamma
        self.value_table = np.zeros((rows, cols))
        self.policy = np.zeros((rows, cols), dtype=int)
        self.actions = [(0, 1), (0, -1), (1, 0), (-1, 0)]

    def is_valid_location(self, location):
        return (0 <= location[0] < self.rows and
                0 <= location[1] < self.cols and
                location not in self.walls)

    def get_next_state(self, state, action):
        move = self.actions[action]
        next_state = [state[0] + move[0], state[1] + move[1]]
        if not self.is_valid_location(next_state):
            next_state = state
        return next_state

    def value_iteration(self, theta=0.0001):
        while True:
            delta = 0
            new_value_table = np.copy(self.value_table)
            for row in range(self.rows):
                for col in range(self.cols):
                    if [row, col] in self.walls:
                        continue
                    old_value = self.value_table[row, col]
                    new_value = float('-inf')
                    for action in range(4):
                        action_value = 0
                        main_prob = self.success_probability
                        side_prob = (1 - main_prob) / 2

                        # Define the actions for main, left, and right
                        main_action = action
                        left_action = (action + 1) % 4
                        right_action = (action - 1) % 4

                        # Get the next states
                        next_state_main = self.get_next_state([row, col], main_action)
                        next_state_left = self.get_next_state([row, col], left_action)
                        next_state_right = self.get_next_state([row, col], right_action)

                        # Calculate the action value considering the probabilities
                        reward = self.rewards[row][col] + self.costs[row][col]
                        action_value += main_prob * (reward + self.gamma * self.value_table[next_state_main[0], next_state_main[1]])
                        action_value += side_prob * (reward + self.gamma * self.value_table[next_state_left[0], next_state_left[1]])
                        action_value += side_prob * (reward + self.gamma * self.value_table[next_state_right[0], next_state_right[1]])

                        # Update the new value and policy
                        if action_value > new_value:
                            new_value = action_value
                            self.policy[row, col] = action

                    new_value_table[row, col] = new_value
                    delta = max(delta, abs(old_value - new_value))
            self.value_table = new_value_table
            if delta < theta:
                break

    def evaluate_policy(self, episodes=10000, epsilon=0.1):
        total_return = 0
        for _ in range(episodes):
            state = [0, 0]
            episode_return = 0
            while self.rewards[state[0]][state[1]] == 0:
                if random.uniform(0, 1) < epsilon:
                    action = random.choice(range(4))
                else:
                    action = self.policy[state[0], state[1]]
                next_state = self.get_next_state(state, action)
                reward = self.rewards[state[0]][state[1]] + self.costs[state[0]][state[1]]
                episode_return += reward
                state = next_state
            total_return += episode_return
        return total_return / episodes

    def print_policy(self):
        direction_mapping = ['→', '←', '↓', '↑']
        policy = [[None for _ in range(self.cols)] for _ in range(self.rows)]
        for row in range(self.rows):
            for col in range(self.cols):
                if [row, col] in self.walls:
                    policy[row][col] = 'W'  # Wall
                elif self.rewards[row][col] > 0:
                    policy[row][col] = 'P'  # Positive reward
                elif self.rewards[row][col] < 0:
                    policy[row][col] = 'N'  # Negative reward
                else:
                    best_action = self.policy[row, col]
                    policy[row][col] = direction_mapping[best_action]
        for row in policy:
            print(" ".join(row))

def main():
    # Set up the grid parameters
    w = 4
    h = 3
    L = [(1, 1, 0), (3, 2, 1), (3, 1, -1)]
    p = 0.9
    r = -0.04

    # Initialize the rewards, costs, and walls
    rewards = [[0 for _ in range(w)] for _ in range(h)]
    costs = [[r for _ in range(w)] for _ in range(h)]
    walls = []

    for x, y, value in L:
        if value == 0:
            walls.append([y, x])  # Note the (x, y) to (row, col) conversion
        else:
            rewards[y][x] = value

    # Define success probabilities
    success_probability = p

    # Create the GridGameMBRL instance
    game_mbrl = GridGameMBRL(h, w, rewards, success_probability, costs, walls)

    # Perform value iteration to find the best policy
    import time
    start_time = time.time()
    game_mbrl.value_iteration()
    end_time = time.time()

    # Evaluate the policy
    policy_score = game_mbrl.evaluate_policy(episodes=1000, epsilon=0.1)
    print("Policy Score:", policy_score)
    print(f"Value Iteration Time: {end_time - start_time:.4f} seconds")

    # Print the policy for visual comparison
    print("\nPolicy:")
    game_mbrl.print_policy()

if __name__ == "__main__":
    main()


Policy Score: -0.21535999999999778
Value Iteration Time: 0.0181 seconds

Policy:
→ → ↓ ↓
↓ W ↓ N
→ → → P


In [41]:
import numpy as np
import random

class GridGameMBRL:
    def __init__(self, rows, cols, rewards, success_probability, costs, walls, gamma=0.9):
        self.rows = rows
        self.cols = cols
        self.rewards = rewards
        self.success_probability = success_probability
        self.costs = costs
        self.walls = walls
        self.gamma = gamma
        self.value_table = np.zeros((rows, cols))
        self.policy = np.zeros((rows, cols), dtype=int)
        self.actions = [(0, 1), (0, -1), (1, 0), (-1, 0)]

    def is_valid_location(self, location):
        return (0 <= location[0] < self.rows and
                0 <= location[1] < self.cols and
                location not in self.walls)

    def get_next_state(self, state, action):
        move = self.actions[action]
        next_state = [state[0] + move[0], state[1] + move[1]]
        if not self.is_valid_location(next_state):
            next_state = state
        return next_state

    def value_iteration(self, theta=0.0001):
        while True:
            delta = 0
            new_value_table = np.copy(self.value_table)
            for row in range(self.rows):
                for col in range(self.cols):
                    if [row, col] in self.walls:
                        continue
                    old_value = self.value_table[row, col]
                    new_value = float('-inf')
                    for action in range(4):
                        action_value = 0
                        main_prob = self.success_probability
                        side_prob = (1 - main_prob) / 2

                        main_action = action
                        left_action = (action + 1) % 4
                        right_action = (action - 1) % 4

                        next_state_main = self.get_next_state([row, col], main_action)
                        next_state_left = self.get_next_state([row, col], left_action)
                        next_state_right = self.get_next_state([row, col], right_action)

                        reward = self.rewards[row][col] + self.costs[row][col]

                        main_value = reward + self.gamma * self.value_table[next_state_main[0], next_state_main[1]]
                        left_value = reward + self.gamma * self.value_table[next_state_left[0], next_state_left[1]]
                        right_value = reward + self.gamma * self.value_table[next_state_right[0], next_state_right[1]]

                        if self.rewards[next_state_main[0]][next_state_main[1]] < 0:
                            main_value += self.rewards[next_state_main[0]][next_state_main[1]]
                        if self.rewards[next_state_left[0]][next_state_left[1]] < 0:
                            left_value += self.rewards[next_state_left[0]][next_state_left[1]]
                        if self.rewards[next_state_right[0]][next_state_right[1]] < 0:
                            right_value += self.rewards[next_state_right[0]][next_state_right[1]]

                        action_value += main_prob * main_value
                        action_value += side_prob * left_value
                        action_value += side_prob * right_value

                        if action_value > new_value:
                            new_value = action_value
                            self.policy[row, col] = action

                    new_value_table[row, col] = new_value
                    delta = max(delta, abs(old_value - new_value))
            self.value_table = new_value_table
            if delta < theta:
                break

    def evaluate_policy(self, episodes=10000, epsilon=0.1):
        total_return = 0
        for _ in range(episodes):
            state = [0, 0]
            episode_return = 0
            while self.rewards[state[0]][state[1]] == 0:
                if random.uniform(0, 1) < epsilon:
                    action = random.choice(range(4))
                else:
                    action = self.policy[state[0], state[1]]
                next_state = self.get_next_state(state, action)
                reward = self.rewards[state[0]][state[1]] + self.costs[state[0]][state[1]]
                episode_return += reward
                state = next_state
            total_return += episode_return
        return total_return / episodes

    def print_policy(self):
        direction_mapping = ['→', '←', '↓', '↑']
        policy = [[None for _ in range(self.cols)] for _ in range(self.rows)]
        for row in range(self.rows):
            for col in range(self.cols):
                if [row, col] in self.walls:
                    policy[row][col] = 'W'  # Wall
                elif self.rewards[row][col] > 0:
                    policy[row][col] = 'P'  # Positive reward
                elif self.rewards[row][col] < 0:
                    policy[row][col] = 'N'  # Negative reward
                else:
                    best_action = self.policy[row, col]
                    policy[row][col] = direction_mapping[best_action]
        for row in policy:
            print(" ".join(row))

def main():
    # Set up the grid parameters
    w = 4
    h = 3
    L = [(1,1,0),(3,2,1),(3,1,-1)]
    p = 0.8
    r = -0.1

    # Initialize the rewards, costs, and walls
    rewards = [[0 for _ in range(w)] for _ in range(h)]
    costs = [[r for _ in range(w)] for _ in range(h)]
    walls = []

    for x, y, value in L:
        if value == 0:
            walls.append([h - y - 1, x])  # Convert (x, y) to (row, col) with (0,0) at lower-left
        else:
            rewards[h - y - 1][x] = value

    # Define success probabilities
    success_probability = p

    # Create the GridGameMBRL instance
    game_mbrl = GridGameMBRL(h, w, rewards, success_probability, costs, walls)

    # Perform value iteration to find the best policy
    import time
    start_time = time.time()
    game_mbrl.value_iteration()
    end_time = time.time()

    # Evaluate the policy
    policy_score = game_mbrl.evaluate_policy(episodes=1000, epsilon=0.1)
    print("Policy Score:", policy_score)
    print(f"Value Iteration Time: {end_time - start_time:.4f} seconds")

    # Print the policy for visual comparison
    print("\nPolicy:")
    game_mbrl.print_policy()

if __name__ == "__main__":
    main()


Policy Score: -0.335000000000005
Value Iteration Time: 0.0962 seconds

Policy:
→ → → P
↑ W ↑ N
↑ → ↑ ←
