In [1]:
import numpy as np
import random

In [2]:
grid_size = 10
goal_reward = 10
dead_end_reward = -10
obstacle_reward = np.nan #cannot enter state
step_reward = -0.05

up = 0
down = 1
left = 2
right = 3
actions = [up, down, left, right]

def create_grid_world():
    grid = np.full((grid_size, grid_size), step_reward)
    grid[grid_size-1, grid_size-1] = goal_reward
    grid[grid_size-2, grid_size-1] = dead_end_reward
    
    #obstacles
    num_obstacles = 10
    obstacle_coords = []
    while len(obstacle_coords) < num_obstacles:
        row = random.randint(0, grid_size-1)
        col = random.randint(0, grid_size-1)
        if (row, col) not in (obstacle_coords) and (row, col) != (grid_size-1, grid_size-1) and (row, col) != (grid_size-2, grid_size-1):
            obstacle_coords.append((row, col))
            grid[row, col] = obstacle_reward
    return grid

def is_valid_state(state):
    row, col = state
    if row < grid_size and col < grid_size:
        return 0<=row<grid_size and 0<=col<grid_size

def next_state(state, action):
    row, col = state
    if action == up:
        next_row, next_col = row-1, col
    elif action == down:
        next_row, next_col = row+1, col
    elif action == right:
        next_row, next_col = row, col+1
    elif action == left:
        next_row, next_col = row, col-1
    else:
        raise ValueError("Invalid Action")
        
    if is

In [8]:
import numpy as np
import random

# Grid world dimensions
GRID_SIZE = 10

# Define rewards
GOAL_REWARD = 10
DEAD_END_REWARD = -10
OBSTACLE_REWARD = np.nan  # Represents a state you can't enter
STEP_REWARD = -0.05

# Actions
UP = 0
DOWN = 1
LEFT = 2
RIGHT = 3
ACTIONS = [UP, DOWN, LEFT, RIGHT]


def create_grid_world():
    """Creates the grid world with rewards and obstacles."""
    grid = np.full((GRID_SIZE, GRID_SIZE), STEP_REWARD)
    grid[GRID_SIZE - 1, GRID_SIZE - 1] = GOAL_REWARD
    grid[GRID_SIZE - 2, GRID_SIZE - 1] = DEAD_END_REWARD

    # Place random obstacles
    num_obstacles = 10
    obstacle_coords = []
    while len(obstacle_coords) < num_obstacles:
        row = random.randint(0, GRID_SIZE - 1)
        col = random.randint(0, GRID_SIZE - 1)
        if (row, col) not in obstacle_coords and (row, col) != (GRID_SIZE - 1, GRID_SIZE - 1) and (row, col) != (GRID_SIZE - 2, GRID_SIZE - 1) :
            obstacle_coords.append((row, col))
            grid[row, col] = OBSTACLE_REWARD

    return grid


def is_valid_state(state):
    """Checks if a state is within the grid bounds."""
    row, col = state
    return 0 <= row < GRID_SIZE and 0 <= col < GRID_SIZE


def next_state(state, action):
    """Calculates the next state based on the current state and action."""
    row, col = state
    if action == UP:
        next_row, next_col = row - 1, col
    elif action == DOWN:
        next_row, next_col = row + 1, col
    elif action == LEFT:
        next_row, next_col = row, col - 1
    elif action == RIGHT:
        next_row, next_col = row, col + 1
    else:
        raise ValueError("Invalid action.")

    if is_valid_state((next_row, next_col)):
        return next_row, next_col
    else:
        return row, col  # Stay in the same state if the next state is invalid


def define_policy():

    #Policy 1: Always move right then down, until goal or dead-end is reached or obstacle is encountered
    policy1 = {}
    for i in range(0,GRID_SIZE):
        for j in range(0,GRID_SIZE):
            policy1[(i,j)] = RIGHT if j < GRID_SIZE -1 else DOWN

    #Policy 2: Move randomly
    policy2 = {}
    for i in range(0, GRID_SIZE):
        for j in range(0, GRID_SIZE):
            policy2[(i, j)] = random.choice(ACTIONS)


    #Policy 3: Always move down if you are in first half of the columns then always right
    policy3 = {}
    for i in range(0,GRID_SIZE):
        for j in range(0,GRID_SIZE):
            policy3[(i,j)] = DOWN if j < GRID_SIZE//2 else RIGHT

    return policy1, policy2, policy3


def value_iteration(grid, policy, discount_factor=0.99, theta=0.001):
    """Performs value iteration to find the optimal value function."""

    V = np.zeros((GRID_SIZE, GRID_SIZE))


    while True:
        delta = 0
        for row in range(GRID_SIZE):
            for col in range(GRID_SIZE):
                if not np.isnan(grid[row, col]):  # Skip obstacles
                    v = V[row, col]
                    state = (row, col)

                    action = policy[state]
                    next_s = next_state(state, action)

                    reward = grid[next_s[0],next_s[1]]
                    
                    #If obstacle is encountered
                    if np.isnan(reward):
                        reward = grid[state[0], state[1]] #Stay in same state with corresponding reward
                        next_s = state

                    V[row, col] = reward + discount_factor * V[next_s[0],next_s[1]]


                    delta = max(delta, abs(v - V[row, col]))

        if delta < theta:
            break

    return V




def policy_evaluation(grid, policy, discount_factor=0.99, theta=0.001):
    V = np.zeros((GRID_SIZE, GRID_SIZE))

    while True:
        delta = 0
        for row in range(GRID_SIZE):  # Iterate over rows
            for col in range(GRID_SIZE):  # Iterate over cols
                s = (row, col)             # Create the state tuple
                if s in policy:  # Check if state is in policy (might not be for obstacles)
                    v = V[s]
                    a = policy[s]
                    next_s = next_state(s, a)
                    reward = grid[next_s] if not np.isnan(grid[next_s]) else grid[s]
                    next_s = next_s if not np.isnan(grid[next_s]) else s
                    V[s] = reward + discount_factor * V[next_s]
                    delta = max(delta, abs(v - V[s]))

        if delta < theta:
            break

    return V



def policy_improvement(grid, V, discount_factor=0.99):
    policy = {}
    for row in range(GRID_SIZE):         # Iterate by row
        for col in range(GRID_SIZE):      # Iterate by column
            s = (row, col)              # Create the state tuple

            if not np.isnan(grid[s]):   # Only update policy for non-obstacle states
                best_action = None
                best_value = -np.inf
                for a in ACTIONS:
                    next_s = next_state(s, a)
                    reward = grid[next_s] if not np.isnan(grid[next_s]) else grid[s]
                    next_s = next_s if not np.isnan(grid[next_s]) else s
                    value = reward + discount_factor * V[next_s]
                    if value > best_value:
                        best_value = value
                        best_action = a
                policy[s] = best_action  # Store best action for the current state
    return policy

def policy_iteration(grid, discount_factor=0.99):
    policy1, policy2, policy3 = define_policy()
    policy = policy1 #Initializing policy

    while True:
        V = policy_evaluation(grid, policy, discount_factor)
        new_policy = policy_improvement(grid, V, discount_factor)


        if new_policy == policy:
            break


        policy = new_policy

    return V, policy





# Example usage:
grid = create_grid_world()

policy1, policy2, policy3 = define_policy()
V_policy1 = value_iteration(grid, policy1)
V_policy2 = value_iteration(grid, policy2)
V_policy3 = value_iteration(grid, policy3)

optimal_V, optimal_policy = policy_iteration(grid)


print("\nOptimal Policy (partial view):") # Added a comment
for row in range(min(GRID_SIZE, 5)):   # Print only a portion if the grid is large
    for col in range(min(GRID_SIZE, 5)):
        s = (row, col)
        if s in optimal_policy:
            action_str = ["UP", "DOWN", "LEFT", "RIGHT"][optimal_policy[s]]
            print(f"({row}, {col}): {action_str}  ", end="")
        else:
            print(f"({row}, {col}): Obstacle/Not in Policy  ", end="")  # Indicate if obstacle
    print()  # Newline after each row


Optimal Policy (partial view):
(0, 0): Obstacle/Not in Policy  (0, 1): DOWN  (0, 2): DOWN  (0, 3): DOWN  (0, 4): DOWN  
(1, 0): DOWN  (1, 1): DOWN  (1, 2): DOWN  (1, 3): DOWN  (1, 4): DOWN  
(2, 0): DOWN  (2, 1): DOWN  (2, 2): DOWN  (2, 3): DOWN  (2, 4): DOWN  
(3, 0): DOWN  (3, 1): DOWN  (3, 2): DOWN  (3, 3): DOWN  (3, 4): DOWN  
(4, 0): DOWN  (4, 1): DOWN  (4, 2): DOWN  (4, 3): DOWN  (4, 4): DOWN  


In [12]:
import numpy as np
import random

class GridWorld:
    def __init__(self, size=(10, 10)):
        self.size = size
        self.grid = np.zeros(self.size)
        self.goal = (size[0]-1, size[1]-1)
        self.dead_end = (size[0]-2, size[1]-1)  # Adjusted dead end
        self.obstacles = self._generate_obstacles(10)  # 10 random obstacles

        self.grid[self.goal] = 10
        self.grid[self.dead_end] = -10

        for obs in self.obstacles:
            self.grid[obs] = np.nan


        self.actions = [(0, 1), (0, -1), (1, 0), (-1, 0)] # Right, Left, Down, Up

    def _generate_obstacles(self, num_obstacles):
        obstacles = []
        while len(obstacles) < num_obstacles:
            obs = (random.randint(0, self.size[0]-1), random.randint(0, self.size[1]-1))
            if obs != self.goal and obs != self.dead_end and obs not in obstacles:
                obstacles.append(obs)
        return obstacles


    def step(self, state, action):
        new_state = (state[0] + action[0], state[1] + action[1])

        if not (0 <= new_state[0] < self.size[0] and 0 <= new_state[1] < self.size[1]):  # Out of bounds
            return state, -0.05  # Small negative reward for hitting a wall
        elif new_state in self.obstacles:
             return state, -0.05  # Penalty for trying to move into obstacles
        elif new_state == self.goal:
            return new_state, 10
        elif new_state == self.dead_end:
            return new_state, -10
        else:
            return new_state, -0.05

    def reset(self):
        # Start at a random non-goal, non-obstacle state
        while True:
          start_state = (random.randint(0, self.size[0]-1), random.randint(0, self.size[1]-1))
          if start_state != self.goal and start_state != self.dead_end and start_state not in self.obstacles:
             return start_state

class QLearningAgent:
    def __init__(self, env, alpha=0.1, gamma=0.9, epsilon=0.1):
        self.env = env
        self.q_table = np.zeros((env.size[0], env.size[1], len(env.actions)))
        self.alpha = alpha  # Learning rate
        self.gamma = gamma  # Discount factor
        self.epsilon = epsilon # Exploration rate


    def choose_action(self, state):
        if random.uniform(0, 1) < self.epsilon:
            return random.choice(self.env.actions) # Explore
        else:
            return self.env.actions[np.argmax(self.q_table[state])]  # Exploit


    def update_q_table(self, state, action, reward, next_state):
        best_next_action = np.max(self.q_table[next_state])
        self.q_table[state][self.env.actions.index(action)] = (1 - self.alpha) * self.q_table[state][self.env.actions.index(action)] + \
                                                       self.alpha * (reward + self.gamma * best_next_action)





# Example usage
grid_env = GridWorld()
agent = QLearningAgent(grid_env)

num_episodes = 5000

for episode in range(num_episodes):
    state = grid_env.reset()
    done = False

    while not done:
        action = agent.choose_action(state)
        next_state, reward = grid_env.step(state, action)
        agent.update_q_table(state, action, reward, next_state)

        state = next_state

        if state == grid_env.goal or state == grid_env.dead_end:
            done = True


# After training, print the optimal policy:
print("Learned Q-table:")
print(agent.q_table)  

# Example of getting optimal actions for a specific state:
state = (0,0)
optimal_action_index = np.argmax(agent.q_table[state])
optimal_action = grid_env.actions[optimal_action_index]
print(f"\nOptimal action for state {state}: {optimal_action}")

Learned Q-table:
[[[ 1.20164244e+00 -7.08133215e-02 -7.12750914e-02 -7.47543017e-02]
  [ 2.46251412e-01 -9.03943933e-02  1.44542745e+00  4.04319488e-01]
  [-1.08635988e-01 -3.07563118e-02  1.62795095e+00 -1.64863893e-01]
  [ 1.87905968e+00  1.34857476e-01 -1.58579615e-01  1.83670358e-02]
  [ 6.43638682e-02  2.42468991e-01  2.16884335e+00  2.77824214e-01]
  [-7.44865697e-02  5.86419029e-02  2.45277366e+00  5.12335260e-02]
  [ 2.77616078e+00  1.60230408e-01  3.13858505e-01  3.40405303e-01]
  [ 7.23362907e-01  5.62682810e-01  3.16099252e+00  1.21633075e+00]
  [ 0.00000000e+00  0.00000000e+00  0.00000000e+00  0.00000000e+00]
  [ 6.08860168e-02  9.62943149e-02  2.45081510e+00 -1.30149813e-01]]

 [[ 0.00000000e+00  0.00000000e+00  0.00000000e+00  0.00000000e+00]
  [ 3.47932516e-01  5.66931150e-01  1.66185617e+00 -5.12245847e-02]
  [ 4.32674276e-01 -1.58234496e-01  1.90191983e+00 -1.57550073e-01]
  [ 0.00000000e+00  0.00000000e+00  0.00000000e+00  0.00000000e+00]
  [ 5.04321434e-01  2.6971818

In [33]:
state = (9, 8)
optimal_action_index = np.argmax(agent.q_table[state])
optimal_action = grid_env.actions[optimal_action_index]
print(f"\nOptimal action for state {state}: {optimal_action}")


Optimal action for state (9, 8): (0, 1)


In [2]:
import numpy as np
import pandas as pd
import random

# Define the environment
class Environment:
    def __init__(self, csv_file):
        self.grid = pd.read_csv(csv_file, header=None).values
        self.rows, self.cols = self.grid.shape
        self.actions = [(0, 1), (0, -1), (1, 0), (-1, 0)] # Right, Left, Down, Up


    def get_state_reward(self, state):
        row, col = state
        cell = self.grid[row, col]
        if cell == 0:
            return -0.05
        elif cell == 'X':
            return np.nan  # Obstacle
        elif cell == 'D':
            return -10
        elif cell == 'G':
            return 10
        return 0


    def is_valid_state(self, state):
         row, col = state
         return 0 <= row < self.rows and 0 <= col < self.cols and self.grid[row, col] != 'X'


    def next_state(self, current_state, action):
        row, col = current_state
        dr, dc = action
        new_row, new_col = row + dr, col + dc
        if self.is_valid_state((new_row, new_col)):
            return (new_row, new_col)
        else:
            return current_state  # Stay in the same state if the move is invalid


    def update_grid(self, path):
        for row, col in path:
            if self.grid[row, col] == 0:  # Only replace '0' cells 
                 self.grid[row, col] = 'A'
        self.save_grid("updated_grid.csv")

    def save_grid(self, filename):
        pd.DataFrame(self.grid).to_csv(filename, header=False, index=False)


# Define the agent
class Agent:
    def __init__(self, env, policies, gamma = 0.9): # Discount Factor
        self.env = env
        self.policies = policies
        self.gamma = gamma

    def choose_action(self, state, policy):
        if policy == 'random':
            return random.choice(self.env.actions)
        elif policy == 'greedy':
            best_action = None
            best_q = -float('inf')
            for action in self.env.actions:
                next_state = self.env.next_state(state, action)
                reward = self.env.get_state_reward(next_state)
                if not np.isnan(reward) and reward > best_q:  # Handle obstacles
                    best_q = reward
                    best_action = action
            return best_action if best_action is not None else random.choice(self.env.actions) #If no valid action found
        #Add more polices if needed


    def run_episode(self, policy, start_state): #added start state
        path = []
        current_state = start_state
        total_reward = 0

        while self.env.grid[current_state[0],current_state[1]] != 'G' and self.env.grid[current_state[0],current_state[1]] != 'D':
            action = self.choose_action(current_state, policy)
            next_state = self.env.next_state(current_state, action)
            reward = self.env.get_state_reward(next_state)
            
            if not np.isnan(reward): # Skip Obstacles
                total_reward += reward
                path.append(current_state) # adding previous state as we have already moved to next state
                current_state = next_state

        # Check if episode finished or reached an obstacle        
        if self.env.grid[current_state[0],current_state[1]] == 'G' or self.env.grid[current_state[0],current_state[1]] == 'D':
            path.append(current_state)


        return total_reward, path


# Example usage
env = Environment("grid.csv")
policies = ['random', 'greedy']
agent = Agent(env, policies)

start_state = (0,0) # Example start state
total_reward, path = agent.run_episode('greedy', start_state)

print("Total Reward:", total_reward)
print("Path:", path)

env.update_grid(path)

KeyboardInterrupt: 