In [1]:
import numpy as np

# 1. THE ENVIRONMENT (4x4 Grid)
class GridWorld:
    def __init__(self):
        self.grid_size = 4
        # Terminal states: Top-Left (0) and Bottom-Right (15)
        self.terminal_states = [0, 15]
        self.actions = ['UP', 'DOWN', 'LEFT', 'RIGHT']

    def step(self, state, action):
        if state in self.terminal_states:
            return state, 0, True

        # Move logic
        row, col = divmod(state, self.grid_size)
        if action == 'UP':    row = max(row - 1, 0)
        elif action == 'DOWN':  row = min(row + 1, self.grid_size - 1)
        elif action == 'LEFT':  col = max(col - 1, 0)
        elif action == 'RIGHT': col = min(col + 1, self.grid_size - 1)

        next_state = row * self.grid_size + col
        reward = -1  # Standard penalty for each step
        done = next_state in self.terminal_states
        return next_state, reward, done

    def reset(self):
        # Start anywhere except the terminal states
        start_state = np.random.randint(0, 16)
        while start_state in self.terminal_states:
            start_state = np.random.randint(0, 16)
        return start_state

# 2. THE POLICY (Random)
def generate_episode(env):
    episode = []
    state = env.reset()
    done = False
    while not done:
        # Random Policy: 25% chance for any direction
        action = np.random.choice(env.actions)
        next_state, reward, done = env.step(state, action)
        episode.append((state, action, reward))
        state = next_state
    return episode

# 3. THE ALGORITHM (First-Visit Monte Carlo Policy Evaluation)
def mc_policy_evaluation(env, num_episodes=5000):
    # Initialize Values to 0
    V = np.zeros(env.grid_size * env.grid_size)

    # Store all returns for every state
    returns = {s: [] for s in range(env.grid_size * env.grid_size)}

    for _ in range(num_episodes):
        episode = generate_episode(env)
        G = 0

        # Work backwards from the end of the episode
        for idx in range(len(episode) - 1, -1, -1):
            state, action, reward = episode[idx]
            G = G + reward # Gamma is 1.0, so G = G + R

            # "First-Visit" Check:
            # Only count the return if this was the first time
            # we visited this state in this specific episode.
            previous_states = [x[0] for x in episode[:idx]]
            if state not in previous_states:
                returns[state].append(G)
                V[state] = np.mean(returns[state]) # Average the returns
    return V

# --- Main Driver ---
if __name__ == "__main__":
    env = GridWorld()
    print("Running Monte Carlo Policy Evaluation (5000 Episodes)...\n")

    print("wait wait wait ....")

    # Run the algorithm
    values = mc_policy_evaluation(env)

    print("\nResulting Value Function (V):")
    # Reshape to 4x4 for easy reading
    print(np.round(values.reshape(4, 4), 1))

Running Monte Carlo Policy Evaluation (5000 Episodes)...

wait wait wait ....

Resulting Value Function (V):
[[  0.  -13.6 -19.7 -22.1]
 [-13.9 -17.6 -19.6 -20. ]
 [-19.7 -19.3 -17.9 -14.4]
 [-21.9 -19.7 -13.5   0. ]]


In [2]:
import numpy as np

def value_iteration_grid_world():
    # --- 1. SETUP THE ENVIRONMENT ---
    grid_size = 4
    gamma = 1.0  # Discount factor (1.0 for shortest path)
    theta = 1e-4 # Convergence threshold

    # Terminal states (Top-Left and Bottom-Right)
    terminal_states = [0, 15]
    actions = ['UP', 'DOWN', 'LEFT', 'RIGHT']

    # Initialize Value Function V(s) to zeros
    V = np.zeros(grid_size * grid_size)

    print("Starting Value Iteration...")
    iteration = 0

    # --- 2. THE ALGORITHM (Value Iteration) ---
    while True:
        delta = 0
        # Create a copy for synchronous updates
        V_new = np.copy(V)

        # Loop over every state in the world
        for s in range(grid_size * grid_size):
            if s in terminal_states:
                continue # Value of terminal state is always 0

            # Calculate coordinates
            row, col = divmod(s, grid_size)

            # Find the max value among all possible actions
            action_values = []

            for action in actions:
                # --- MODEL LOGIC (Simulating the move) ---
                next_r, next_c = row, col

                if action == 'UP':    next_r = max(row - 1, 0)
                elif action == 'DOWN':  next_r = min(row + 1, grid_size - 1)
                elif action == 'LEFT':  next_c = max(col - 1, 0)
                elif action == 'RIGHT': next_c = min(col + 1, grid_size - 1)

                next_state = next_r * grid_size + next_c

                # Standard Reward is -1 per step
                reward = -1

                # Bellman Optimality Equation: R + gamma * V(s')
                val = reward + gamma * V[next_state]
                action_values.append(val)

            # Update V(s) with the BEST possible action (Greedy)
            best_value = max(action_values)
            V_new[s] = best_value

            # Check how much the value changed
            delta = max(delta, abs(best_value - V[s]))

        V = V_new
        iteration += 1

        # Stop if converged
        if delta < theta:
            print(f"Converged after {iteration} iterations.")
            break

    # --- 3. DISPLAY RESULTS ---
    print("\nOptimal Value Function (V*):")
    print(np.round(V.reshape(4, 4), 1))

    # Optional: Display the Optimal Policy (Arrows)
    print("\nOptimal Policy (Planning Result):")
    arrows = {0: '↑', 1: '↓', 2: '←', 3: '→'}
    policy_grid = []

    for s in range(grid_size * grid_size):
        if s in terminal_states:
            policy_grid.append(" T ")
            continue

        row, col = divmod(s, grid_size)
        best_action_idx = -1
        best_val = -float('inf')

        # Check neighbors again to find which one gave that best value
        for i, action in enumerate(actions):
            next_r, next_c = row, col
            if action == 'UP':    next_r = max(row - 1, 0)
            elif action == 'DOWN':  next_r = min(row + 1, grid_size - 1)
            elif action == 'LEFT':  next_c = max(col - 1, 0)
            elif action == 'RIGHT': next_c = min(col + 1, grid_size - 1)

            val = -1 + gamma * V[next_r * grid_size + next_c]
            if val > best_val:
                best_val = val
                best_action_idx = i

        policy_grid.append(f" {arrows[best_action_idx]} ")

    # Print Policy Grid
    print("-" * 17)
    for i in range(0, 16, 4):
        print("|".join(policy_grid[i:i+4]))
        print("-" * 17)

if __name__ == "__main__":
    value_iteration_grid_world()

Starting Value Iteration...
Converged after 4 iterations.

Optimal Value Function (V*):
[[ 0. -1. -2. -3.]
 [-1. -2. -3. -2.]
 [-2. -3. -2. -1.]
 [-3. -2. -1.  0.]]

Optimal Policy (Planning Result):
-----------------
 T | ← | ← | ↓ 
-----------------
 ↑ | ↑ | ↑ | ↓ 
-----------------
 ↑ | ↑ | ↓ | ↓ 
-----------------
 ↑ | → | → | T 
-----------------


In [3]:
import numpy as np

class MDPGridWorld:
    def __init__(self):
        self.grid_size = 4
        # For "Rollout from (0,0)", we treat (0,0) as Start
        # and (3,3) [index 15] as the ONLY Goal/Terminal state.
        self.terminal_states = [15]
        self.actions = ['UP', 'DOWN', 'LEFT', 'RIGHT']
        self.gamma = 1.0 # Discount factor

    def get_next_state_reward(self, state, action):
        if state in self.terminal_states:
            return state, 0

        row, col = divmod(state, self.grid_size)

        # Move Logic
        if action == 'UP':    row = max(row - 1, 0)
        elif action == 'DOWN':  row = min(row + 1, self.grid_size - 1)
        elif action == 'LEFT':  col = max(col - 1, 0)
        elif action == 'RIGHT': col = min(col + 1, self.grid_size - 1)

        next_state = row * self.grid_size + col
        reward = -1 # Cost per step
        return next_state, reward

    def value_iteration(self):
        """1. PLAN: Calculate V* to find the best policy."""
        V = np.zeros(self.grid_size * self.grid_size)
        theta = 1e-4

        while True:
            delta = 0
            V_new = np.copy(V)
            for s in range(self.grid_size * self.grid_size):
                if s in self.terminal_states: continue

                action_values = []
                for a in self.actions:
                    ns, r = self.get_next_state_reward(s, a)
                    action_values.append(r + self.gamma * V[ns])

                new_val = max(action_values)
                V_new[s] = new_val
                delta = max(delta, abs(new_val - V[s]))
            V = V_new
            if delta < theta: break
        return V

    def rollout(self, start_state, V):
        """2. ACT: Execute the policy from the start state."""
        print(f"\n--- Rolling out Optimal Policy from State {start_state} (0,0) ---")

        curr_state = start_state
        steps = 0
        path = [curr_state]

        while curr_state not in self.terminal_states:
            row, col = divmod(curr_state, self.grid_size)

            # Find Best Action using V
            best_action = None
            best_val = -float('inf')

            for action in self.actions:
                ns, r = self.get_next_state_reward(curr_state, action)
                val = r + self.gamma * V[ns]

                if val > best_val:
                    best_val = val
                    best_action = action

            # Execute the Move
            next_state, _ = self.get_next_state_reward(curr_state, best_action)

            print(f"Step {steps+1}: At {curr_state} ({row},{col}) -> Action: {best_action} -> New State: {next_state}")

            curr_state = next_state
            path.append(curr_state)
            steps += 1

            if steps > 20: # Safety break
                print("Stuck in loop!")
                break

        print(f"\nGoal Reached at State {curr_state}!")
        print(f"Total Path: {path}")

# --- Main Execution ---
if __name__ == "__main__":
    world = MDPGridWorld()

    # 1. First, we need the plan (Value Iteration)
    print("Computing Optimal Policy...")
    optimal_values = world.value_iteration()

    # 2. Now, we Roll Out (Simulate) the plan from (0,0)
    world.rollout(start_state=0, V=optimal_values)

Computing Optimal Policy...

--- Rolling out Optimal Policy from State 0 (0,0) ---
Step 1: At 0 (0,0) -> Action: DOWN -> New State: 4
Step 2: At 4 (1,0) -> Action: DOWN -> New State: 8
Step 3: At 8 (2,0) -> Action: DOWN -> New State: 12
Step 4: At 12 (3,0) -> Action: RIGHT -> New State: 13
Step 5: At 13 (3,1) -> Action: RIGHT -> New State: 14
Step 6: At 14 (3,2) -> Action: RIGHT -> New State: 15

Goal Reached at State 15!
Total Path: [0, 4, 8, 12, 13, 14, 15]


In [4]:
import numpy as np

# --- 1. THE ENVIRONMENT (Unknown to the agent initially) ---
class GridWorld:
    def __init__(self):
        self.grid_size = 4
        self.terminal_states = [0, 15] # Top-Left, Bottom-Right
        self.actions = [0, 1, 2, 3] # 0:UP, 1:DOWN, 2:LEFT, 3:RIGHT

    def step(self, state, action):
        if state in self.terminal_states:
            return state, 0, True

        row, col = divmod(state, self.grid_size)

        # Action Logic
        if action == 0:   row = max(row - 1, 0) # UP
        elif action == 1: row = min(row + 1, self.grid_size - 1) # DOWN
        elif action == 2: col = max(col - 1, 0) # LEFT
        elif action == 3: col = min(col + 1, self.grid_size - 1) # RIGHT

        next_state = row * self.grid_size + col
        reward = -1 # Penalty for each step
        done = next_state in self.terminal_states

        return next_state, reward, done

    def reset(self):
        start_state = np.random.randint(0, 16)
        while start_state in self.terminal_states:
            start_state = np.random.randint(0, 16)
        return start_state

# --- 2. THE ALGORITHM (Q-Learning) ---
def q_learning():
    env = GridWorld()

    # Parameters
    num_episodes = 5000
    alpha = 0.1   # Learning Rate (How fast we accept new info)
    gamma = 0.99  # Discount Factor (Importance of future rewards)
    epsilon = 0.1 # Exploration Rate (Chance to try random move)

    # Initialize Q-Table: 16 States x 4 Actions
    # Q[s, a] stores the value of taking action 'a' in state 's'
    Q = np.zeros((16, 4))

    print("Training with Q-Learning (5000 Episodes)...")

    for episode in range(num_episodes):
        state = env.reset()
        done = False

        while not done:
            # A. Choose Action (Epsilon-Greedy)
            if np.random.rand() < epsilon:
                action = np.random.choice(env.actions) # Explore (Random)
            else:
                action = np.argmax(Q[state]) # Exploit (Best known action)

            # B. Take Action & Observe
            next_state, reward, done = env.step(state, action)

            # C. Update Q-Value (The Bellman Update Rule)
            # Old Value
            old_value = Q[state, action]
            # Best possible value from next state
            next_max = np.max(Q[next_state])

            # Formula: Q(s,a) = Q(s,a) + alpha * [Reward + gamma * max(Q(s')) - Q(s,a)]
            new_value = old_value + alpha * (reward + gamma * next_max - old_value)

            Q[state, action] = new_value

            state = next_state

    return Q

# --- 3. DISPLAY RESULTS ---
def print_results(Q):
    actions_map = {0: '↑', 1: '↓', 2: '←', 3: '→'}
    print("\nLearned Policy (from Q-Table):")
    print("-" * 17)

    grid_output = []
    for s in range(16):
        if s in [0, 15]: # Terminal
            grid_output.append(" T ")
            continue

        # The best action is the one with the highest Q-value for this state
        best_action_idx = np.argmax(Q[s])
        grid_output.append(f" {actions_map[best_action_idx]} ")

    # Print nicely
    for i in range(0, 16, 4):
        print("|".join(grid_output[i:i+4]))
        print("-" * 17)

    print("\nExample Q-Values for State 1 (Next to Top-Left Goal):")
    print(f"UP: {Q[1,0]:.2f}, DOWN: {Q[1,1]:.2f}, LEFT: {Q[1,2]:.2f}, RIGHT: {Q[1,3]:.2f}")
    print("(Notice 'LEFT' should have the highest value because it leads to the goal)")

if __name__ == "__main__":
    final_Q = q_learning()
    print_results(final_Q)

Training with Q-Learning (5000 Episodes)...

Learned Policy (from Q-Table):
-----------------
 T | ← | ← | ↓ 
-----------------
 ↑ | ↑ | ↑ | ↓ 
-----------------
 ↑ | ↓ | → | ↓ 
-----------------
 → | → | → | T 
-----------------

Example Q-Values for State 1 (Next to Top-Left Goal):
UP: -1.95, DOWN: -2.89, LEFT: -1.00, RIGHT: -2.90
(Notice 'LEFT' should have the highest value because it leads to the goal)


In [5]:
import numpy as np

# --- 1. ENVIRONMENT (4x4 Grid World) ---
class GridWorld:
    def __init__(self):
        self.grid_size = 4
        self.terminal_states = [0, 15] # Top-Left, Bottom-Right
        self.actions = ['UP', 'DOWN', 'LEFT', 'RIGHT']

    def step(self, state, action):
        if state in self.terminal_states:
            return state, 0, True

        row, col = divmod(state, self.grid_size)

        if action == 'UP':    row = max(row - 1, 0)
        elif action == 'DOWN':  row = min(row + 1, self.grid_size - 1)
        elif action == 'LEFT':  col = max(col - 1, 0)
        elif action == 'RIGHT': col = min(col + 1, self.grid_size - 1)

        next_state = row * self.grid_size + col
        reward = -1
        done = next_state in self.terminal_states
        return next_state, reward, done

    def reset(self):
        start_state = np.random.randint(0, 16)
        while start_state in self.terminal_states:
            start_state = np.random.randint(0, 16)
        return start_state

# --- 2. THE ALGORITHM: TD(0) Prediction ---
def td_zero_learning(env, num_episodes=5000, alpha=0.1, gamma=1.0):
    """
    TD(0) evaluates the value of states V(s) under a random policy.
    It updates V(s) after every single step.
    """
    # Initialize V(s) to 0
    V = np.zeros(env.grid_size * env.grid_size)

    print(f"Running TD(0) Learning for {num_episodes} episodes...")

    for _ in range(num_episodes):
        state = env.reset()
        done = False

        while not done:
            # Policy: Random Walk (Standard for evaluation)
            action = np.random.choice(env.actions)

            # Take step
            next_state, reward, done = env.step(state, action)

            # --- THE TD(0) UPDATE RULE ---
            # 1. Prediction: What we thought V(s) was
            current_value = V[state]

            # 2. Target: Reward + Discounted Value of Next State
            # (If next state is terminal, its value is 0)
            next_state_value = 0 if done else V[next_state]
            td_target = reward + gamma * next_state_value

            # 3. Update: Nudge current value towards the target
            # V(s) = V(s) + alpha * [ R + gamma*V(s') - V(s) ]
            V[state] = current_value + alpha * (td_target - current_value)

            state = next_state

    return V

# --- 3. EXECUTION ---
if __name__ == "__main__":
    env = GridWorld()

    # Run TD(0)
    v_values = td_zero_learning(env)

    print("\nState-Value Function V(s) learned via TD(0):")
    print("-" * 30)

    # Reshape and print
    print(np.round(v_values.reshape(4, 4), 2))

Running TD(0) Learning for 5000 episodes...

State-Value Function V(s) learned via TD(0):
------------------------------
[[  0.   -12.72 -19.53 -23.33]
 [-11.67 -18.15 -21.02 -21.98]
 [-19.48 -20.58 -18.86 -14.97]
 [-22.69 -21.02 -10.74   0.  ]]


In [6]:
import numpy as np

# --- 1. ENVIRONMENT ---
class GridWorld:
    def __init__(self):
        self.grid_size = 4
        self.terminal_states = [0, 15]
        self.actions = ['UP', 'DOWN', 'LEFT', 'RIGHT']

    def step(self, state, action):
        if state in self.terminal_states:
            return state, 0, True

        row, col = divmod(state, self.grid_size)

        if action == 'UP':    row = max(row - 1, 0)
        elif action == 'DOWN':  row = min(row + 1, self.grid_size - 1)
        elif action == 'LEFT':  col = max(col - 1, 0)
        elif action == 'RIGHT': col = min(col + 1, self.grid_size - 1)

        next_state = row * self.grid_size + col
        reward = -1
        done = next_state in self.terminal_states
        return next_state, reward, done

    def reset(self):
        start_state = np.random.randint(0, 16)
        while start_state in self.terminal_states:
            start_state = np.random.randint(0, 16)
        return start_state

# --- 2. THE ALGORITHM: TD(Lambda) ---
def td_lambda_learning(env, lam=0.5, num_episodes=5000, alpha=0.1, gamma=1.0):
    """
    TD(Lambda) uses 'Eligibility Traces' (E) to update past states.
    lam (lambda): 0 = TD(0), 1 = Monte Carlo.
    """
    # Initialize V(s)
    V = np.zeros(env.grid_size * env.grid_size)

    print(f"Running TD({lam}) Learning...")

    for episode in range(num_episodes):
        # Reset Eligibility Traces at start of every episode
        E = np.zeros(env.grid_size * env.grid_size)

        state = env.reset()
        done = False

        while not done:
            # Policy: Random Walk
            action = np.random.choice(env.actions)
            next_state, reward, done = env.step(state, action)

            # 1. Calculate TD Error (delta)
            # The difference between what we expect and what we got
            target = reward + (0 if done else gamma * V[next_state])
            delta = target - V[state]

            # 2. Increment Trace for CURRENT state
            # "I was just here, so I deserve credit/blame for this reward"
            E[state] += 1

            # 3. Update V for ALL states based on their Trace
            # V(s) = V(s) + alpha * delta * E(s)
            # (Vectorized update for efficiency)
            V += alpha * delta * E

            # 4. Decay Traces for ALL states
            # Memories fade over time: E = gamma * lambda * E
            E *= gamma * lam

            state = next_state

    return V

# --- 3. EXECUTION ---
if __name__ == "__main__":
    env = GridWorld()

    # Run TD(Lambda) with lambda = 0.5
    v_values = td_lambda_learning(env, lam=0.5)

    print(f"\nState-Value Function V(s) via TD(0.5):")
    print("-" * 30)
    print(np.round(v_values.reshape(4, 4), 2))

Running TD(0.5) Learning...

State-Value Function V(s) via TD(0.5):
------------------------------
[[  0.   -11.75 -16.5  -18.17]
 [-13.   -15.96 -17.9  -16.02]
 [-17.77 -17.79 -15.79  -9.58]
 [-20.24 -18.68 -14.54   0.  ]]


In [7]:
import numpy as np

# --- 1. ENVIRONMENT ---
class GridWorld:
    def __init__(self):
        self.grid_size = 4
        self.terminal_states = [0, 15]
        self.actions = [0, 1, 2, 3] # UP, DOWN, LEFT, RIGHT

    def step(self, state, action):
        if state in self.terminal_states:
            return state, 0, True

        row, col = divmod(state, self.grid_size)

        if action == 0:   row = max(row - 1, 0) # UP
        elif action == 1: row = min(row + 1, self.grid_size - 1) # DOWN
        elif action == 2: col = max(col - 1, 0) # LEFT
        elif action == 3: col = min(col + 1, self.grid_size - 1) # RIGHT

        next_state = row * self.grid_size + col
        reward = -1
        done = next_state in self.terminal_states
        return next_state, reward, done

    def reset(self):
        start_state = np.random.randint(0, 16)
        while start_state in self.terminal_states:
            start_state = np.random.randint(0, 16)
        return start_state

# --- 2. THE ALGORITHM: SARSA ---
def sarsa_learning():
    env = GridWorld()

    # Parameters
    num_episodes = 5000
    alpha = 0.1   # Learning Rate
    gamma = 1.0   # Discount Factor
    epsilon = 0.1 # Exploration Rate

    # Initialize Q-Table (16 States x 4 Actions)
    Q = np.zeros((16, 4))

    print("Training with SARSA (5000 Episodes)...")

    for _ in range(num_episodes):
        state = env.reset()

        # SARSA Step 1: Choose Action A (Epsilon-Greedy) BEFORE the loop
        if np.random.rand() < epsilon:
            action = np.random.choice(env.actions)
        else:
            action = np.argmax(Q[state])

        done = False

        while not done:
            # SARSA Step 2: Take Action A, observe R, S'
            next_state, reward, done = env.step(state, action)

            # SARSA Step 3: Choose Next Action A' (Epsilon-Greedy) based on S'
            # Note: We pick the next action NOW, before updating
            if np.random.rand() < epsilon:
                next_action = np.random.choice(env.actions)
            else:
                next_action = np.argmax(Q[next_state])

            # SARSA Step 4: Update Q(S, A) using Q(S', A')
            # Formula: Q(s,a) = Q(s,a) + alpha * [ R + gamma * Q(s',a') - Q(s,a) ]

            # Value of next state (0 if terminal)
            q_next = 0 if done else Q[next_state, next_action]

            target = reward + gamma * q_next
            Q[state, action] += alpha * (target - Q[state, action])

            # SARSA Step 5: Move to next state pair
            state = next_state
            action = next_action

    return Q

# --- 3. EXECUTION & RESULTS ---
def print_policy(Q):
    actions_map = {0: '↑', 1: '↓', 2: '←', 3: '→'}
    print("\nFinal Policy (SARSA):")
    print("-" * 17)

    grid_output = []
    for s in range(16):
        if s in [0, 15]:
            grid_output.append(" T ")
            continue
        best_action = np.argmax(Q[s])
        grid_output.append(f" {actions_map[best_action]} ")

    for i in range(0, 16, 4):
        print("|".join(grid_output[i:i+4]))
        print("-" * 17)

if __name__ == "__main__":
    q_table = sarsa_learning()
    print_policy(q_table)

Training with SARSA (5000 Episodes)...

Final Policy (SARSA):
-----------------
 T | ← | ← | ← 
-----------------
 ↑ | ↑ | ← | ↓ 
-----------------
 ↑ | ↑ | → | ↓ 
-----------------
 → | → | → | T 
-----------------


In [8]:
import numpy as np

# --- 1. ENVIRONMENT (4x4 Grid World) ---
class GridWorld:
    def __init__(self):
        self.grid_size = 4
        self.terminal_states = [0, 15] # Top-Left, Bottom-Right
        self.actions = [0, 1, 2, 3] # UP, DOWN, LEFT, RIGHT

    def step(self, state, action):
        if state in self.terminal_states:
            return state, 0, True

        row, col = divmod(state, self.grid_size)

        # Move Logic
        if action == 0:   row = max(row - 1, 0) # UP
        elif action == 1: row = min(row + 1, self.grid_size - 1) # DOWN
        elif action == 2: col = max(col - 1, 0) # LEFT
        elif action == 3: col = min(col + 1, self.grid_size - 1) # RIGHT

        next_state = row * self.grid_size + col
        reward = -1
        done = next_state in self.terminal_states
        return next_state, reward, done

    def reset(self):
        start_state = np.random.randint(0, 16)
        while start_state in self.terminal_states:
            start_state = np.random.randint(0, 16)
        return start_state

# --- 2. THE ALGORITHM: Q-Learning ---
def q_learning():
    env = GridWorld()

    # Parameters
    num_episodes = 5000
    alpha = 0.1   # Learning Rate
    gamma = 1.0   # Discount Factor (1.0 because we want shortest path)
    epsilon = 0.1 # Exploration Rate

    # Initialize Q-Table (16 States x 4 Actions)
    Q = np.zeros((16, 4))

    print("Training with Q-Learning (5000 Episodes)...")

    for _ in range(num_episodes):
        state = env.reset()
        done = False

        while not done:
            # 1. Choose Action (Epsilon-Greedy)
            if np.random.rand() < epsilon:
                action = np.random.choice(env.actions) # Explore
            else:
                action = np.argmax(Q[state]) # Exploit

            # 2. Take Action
            next_state, reward, done = env.step(state, action)

            # 3. Update Q-Value (Off-Policy)
            # We use max(Q[next_state]) regardless of what action we actually take next
            best_next_action_val = 0 if done else np.max(Q[next_state])

            # Update Rule: Q(S,A) = Q(S,A) + alpha * [ R + gamma * max_a Q(S',a) - Q(S,A) ]
            td_target = reward + gamma * best_next_action_val
            Q[state, action] += alpha * (td_target - Q[state, action])

            state = next_state

    return Q

# --- 3. EXECUTION & RESULTS ---
def print_policy(Q):
    actions_map = {0: '↑', 1: '↓', 2: '←', 3: '→'}
    print("\nFinal Optimal Policy (Q-Learning):")
    print("-" * 17)

    grid_output = []
    for s in range(16):
        if s in [0, 15]:
            grid_output.append(" T ")
            continue
        # Simply pick the best action from the learned table
        best_action = np.argmax(Q[s])
        grid_output.append(f" {actions_map[best_action]} ")

    for i in range(0, 16, 4):
        print("|".join(grid_output[i:i+4]))
        print("-" * 17)

if __name__ == "__main__":
    q_table = q_learning()
    print_policy(q_table)

Training with Q-Learning (5000 Episodes)...

Final Optimal Policy (Q-Learning):
-----------------
 T | ← | ← | ← 
-----------------
 ↑ | ↑ | ↑ | ↓ 
-----------------
 ↑ | ← | ↓ | ↓ 
-----------------
 → | → | → | T 
-----------------


In [9]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import random
from collections import deque

# --- 1. ENVIRONMENT ---
class GridWorld:
    def __init__(self):
        self.grid_size = 4
        self.terminal_states = [0, 15]
        self.actions = [0, 1, 2, 3] # UP, DOWN, LEFT, RIGHT

    def step(self, state, action):
        if state in self.terminal_states:
            return state, 0, True

        row, col = divmod(state, self.grid_size)

        if action == 0:   row = max(row - 1, 0)
        elif action == 1: row = min(row + 1, self.grid_size - 1)
        elif action == 2: col = max(col - 1, 0)
        elif action == 3: col = min(col + 1, self.grid_size - 1)

        next_state = row * self.grid_size + col
        reward = -1
        done = next_state in self.terminal_states
        return next_state, reward, done

    def reset(self):
        start_state = np.random.randint(0, 16)
        while start_state in self.terminal_states:
            start_state = np.random.randint(0, 16)
        return start_state

# --- 2. THE NEURAL NETWORK ---
class QNetwork(nn.Module):
    def __init__(self):
        super(QNetwork, self).__init__()
        # Input: 16 (One-hot encoding of the state)
        # Hidden: 128 neurons
        # Output: 4 (Q-values for UP, DOWN, LEFT, RIGHT)
        self.fc1 = nn.Linear(16, 128)
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, 4)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        return self.fc3(x)

# --- 3. HELPER: ONE-HOT ENCODING ---
def state_to_tensor(state):
    # Converts state integer (e.g., 5) to one-hot vector [0,0,0,0,0,1,0...]
    v = torch.zeros(16)
    v[state] = 1.0
    return v.unsqueeze(0) # Add batch dimension

# --- 4. THE ALGORITHM: DQN Training ---
def train_dqn():
    env = GridWorld()

    # Hyperparameters
    episodes = 1000
    gamma = 0.99
    epsilon = 1.0
    epsilon_decay = 0.995
    epsilon_min = 0.1
    learning_rate = 0.001
    batch_size = 32

    # Initialize Networks
    policy_net = QNetwork()
    optimizer = optim.Adam(policy_net.parameters(), lr=learning_rate)
    criterion = nn.MSELoss()

    # Replay Buffer (Memory)
    memory = deque(maxlen=2000)

    print("Training DQN (this may take a moment)...")

    for episode in range(episodes):
        state = env.reset()
        done = False

        while not done:
            state_tensor = state_to_tensor(state)

            # A. Select Action (Epsilon-Greedy)
            if random.random() < epsilon:
                action = random.choice(env.actions)
            else:
                with torch.no_grad():
                    q_values = policy_net(state_tensor)
                    action = torch.argmax(q_values).item()

            # B. Step
            next_state, reward, done = env.step(state, action)

            # C. Store in Memory
            memory.append((state, action, reward, next_state, done))
            state = next_state

            # D. Train (Experience Replay)
            if len(memory) > batch_size:
                minibatch = random.sample(memory, batch_size)

                # Prepare batch data
                states_b = torch.cat([state_to_tensor(x[0]) for x in minibatch])
                next_states_b = torch.cat([state_to_tensor(x[3]) for x in minibatch])

                # Get current Q values
                q_preds = policy_net(states_b)

                # Calculate Target Q values
                with torch.no_grad():
                    q_next = policy_net(next_states_b)

                target_q_values = q_preds.clone()

                for i, (s, a, r, ns, d) in enumerate(minibatch):
                    # Bellman Update: R + gamma * max(Q(s'))
                    target = r
                    if not d:
                        target += gamma * torch.max(q_next[i]).item()
                    target_q_values[i][a] = target

                # Gradient Descent
                loss = criterion(q_preds, target_q_values)
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

        # Decay Epsilon
        if epsilon > epsilon_min:
            epsilon *= epsilon_decay

        if (episode+1) % 200 == 0:
            print(f"Episode {episode+1}/{episodes} completed.")

    return policy_net

# --- 5. TEST THE TRAINED MODEL ---
if __name__ == "__main__":
    trained_model = train_dqn()

    print("\nVisualizing DQN Policy:")
    print("-" * 17)
    actions_map = {0: '↑', 1: '↓', 2: '←', 3: '→'}
    env = GridWorld()

    output_grid = []
    for s in range(16):
        if s in [0, 15]:
            output_grid.append(" T ")
            continue

        st = state_to_tensor(s)
        with torch.no_grad():
            q = trained_model(st)
            best_a = torch.argmax(q).item()
        output_grid.append(f" {actions_map[best_a]} ")

    for i in range(0, 16, 4):
        print("|".join(output_grid[i:i+4]))
        print("-" * 17)

Training DQN (this may take a moment)...
Episode 200/1000 completed.
Episode 400/1000 completed.
Episode 600/1000 completed.
Episode 800/1000 completed.
Episode 1000/1000 completed.

Visualizing DQN Policy:
-----------------
 T | ← | ← | ← 
-----------------
 ↑ | ↑ | ← | ↓ 
-----------------
 ↑ | ↑ | ↓ | ↓ 
-----------------
 ↑ | → | → | T 
-----------------


In [10]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

# --- 1. ENVIRONMENT (4x4 Grid World) ---
class GridWorld:
    def __init__(self):
        self.grid_size = 4
        self.terminal_states = [0, 15]
        self.actions = [0, 1, 2, 3] # UP, DOWN, LEFT, RIGHT

    def step(self, state, action):
        if state in self.terminal_states:
            return state, 0, True

        row, col = divmod(state, self.grid_size)
        if action == 0:   row = max(row - 1, 0)
        elif action == 1: row = min(row + 1, self.grid_size - 1)
        elif action == 2: col = max(col - 1, 0)
        elif action == 3: col = min(col + 1, self.grid_size - 1)

        next_state = row * self.grid_size + col
        reward = -1
        done = next_state in self.terminal_states
        return next_state, reward, done

    def reset(self):
        start_state = np.random.randint(0, 16)
        while start_state in self.terminal_states:
            start_state = np.random.randint(0, 16)
        return start_state

# --- 2. NEURAL NETWORKS ---

class PolicyNetwork(nn.Module):
    def __init__(self):
        super(PolicyNetwork, self).__init__()
        self.fc1 = nn.Linear(16, 128)
        self.fc2 = nn.Linear(128, 4)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        # Numerical stability fix: prevents NaN in softmax
        return F.softmax(x, dim=-1)

class ValueNetwork(nn.Module):
    def __init__(self):
        super(ValueNetwork, self).__init__()
        self.fc1 = nn.Linear(16, 128)
        self.fc2 = nn.Linear(128, 1)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        return self.fc2(x)

def state_to_tensor(state):
    v = torch.zeros(16)
    v[state] = 1.0
    return v.unsqueeze(0)

# --- 3. REINFORCE ALGORITHM ---
def train_reinforce_baseline():
    env = GridWorld()

    policy_net = PolicyNetwork()
    value_net = ValueNetwork()

    # Reduced learning rate slightly for stability
    policy_optimizer = optim.Adam(policy_net.parameters(), lr=0.0005)
    value_optimizer = optim.Adam(value_net.parameters(), lr=0.0005)

    num_episodes = 2000
    gamma = 0.99

    print("Training REINFORCE with Baseline")

    for episode in range(num_episodes):
        state = env.reset()
        done = False

        log_probs = []
        values = []
        rewards = []

        # A. Generate Episode
        while not done:
            state_t = state_to_tensor(state)

            probs = policy_net(state_t)
            value = value_net(state_t)

            # Create distribution
            dist = torch.distributions.Categorical(probs)
            action = dist.sample()

            # Step
            next_state, reward, done = env.step(state, action.item())

            log_probs.append(dist.log_prob(action))
            values.append(value)
            rewards.append(reward)

            state = next_state

            # Safety break if agent gets stuck in a loop
            if len(rewards) > 100:
                break

        # B. Calculate Returns
        returns = []
        G = 0
        for r in reversed(rewards):
            G = r + gamma * G
            returns.insert(0, G)
        returns = torch.tensor(returns, dtype=torch.float32)

        # C. Normalize Returns (Safe Mode)
        # Only normalize if we have more than 1 step, otherwise std is NaN
        if len(returns) > 1:
            returns = (returns - returns.mean()) / (returns.std() + 1e-9)
        else:
            returns = returns - returns.mean()

        # D. Calculate Losses
        policy_loss = []
        value_loss = []

        for log_prob, value, G_t in zip(log_probs, values, returns):
            advantage = G_t - value.item()

            # Policy Loss
            policy_loss.append(-log_prob * advantage)

            # Value Loss (Fixing the warning by using detach/clone logic if needed)
            # We target the actual scalar G_t
            target = torch.tensor([G_t], dtype=torch.float32)
            value_loss.append(F.mse_loss(value.view(-1), target))

        policy_optimizer.zero_grad()
        value_optimizer.zero_grad()

        # Check if lists are not empty (in case of immediate termination)
        if policy_loss:
            loss_p = torch.stack(policy_loss).sum()
            loss_v = torch.stack(value_loss).sum()

            loss_p.backward()
            loss_v.backward()

            # GRADIENT CLIPPING (The Fix for Exploding Gradients)
            torch.nn.utils.clip_grad_norm_(policy_net.parameters(), 1.0)
            torch.nn.utils.clip_grad_norm_(value_net.parameters(), 1.0)

            policy_optimizer.step()
            value_optimizer.step()

        if (episode + 1) % 500 == 0:
            print(f"Episode {episode + 1}/{num_episodes} completed.")

    return policy_net

# --- 4. VISUALIZE ---
if __name__ == "__main__":
    trained_policy = train_reinforce_baseline()

    print("\nFinal Policy (REINFORCE):")
    actions_map = {0: '↑', 1: '↓', 2: '←', 3: '→'}

    output_grid = []
    for s in range(16):
        if s in [0, 15]:
            output_grid.append(" T ")
            continue
        st = state_to_tensor(s)
        with torch.no_grad():
            probs = trained_policy(st)
            best_a = torch.argmax(probs).item()
        output_grid.append(f" {actions_map[best_a]} ")

    print("-" * 17)
    for i in range(0, 16, 4):
        print("|".join(output_grid[i:i+4]))
        print("-" * 17)

Training REINFORCE with Baseline
Episode 500/2000 completed.
Episode 1000/2000 completed.
Episode 1500/2000 completed.
Episode 2000/2000 completed.

Final Policy (REINFORCE):
-----------------
 T | ← | ← | ← 
-----------------
 ↑ | ← | ↑ | ↓ 
-----------------
 ↑ | ← | ↓ | ↓ 
-----------------
 ↑ | → | → | T 
-----------------


In [11]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

# --- 1. ENVIRONMENT (4x4 Grid World) ---
class GridWorld:
    def __init__(self):
        self.grid_size = 4
        self.terminal_states = [0, 15]
        self.actions = [0, 1, 2, 3] # UP, DOWN, LEFT, RIGHT

    def step(self, state, action):
        if state in self.terminal_states:
            return state, 0, True

        row, col = divmod(state, self.grid_size)
        if action == 0:   row = max(row - 1, 0)
        elif action == 1: row = min(row + 1, self.grid_size - 1)
        elif action == 2: col = max(col - 1, 0)
        elif action == 3: col = min(col + 1, self.grid_size - 1)

        next_state = row * self.grid_size + col
        reward = -1
        done = next_state in self.terminal_states
        return next_state, reward, done

    def reset(self):
        start_state = np.random.randint(0, 16)
        while start_state in self.terminal_states:
            start_state = np.random.randint(0, 16)
        return start_state

# --- 2. NEURAL NETWORKS ---
class PolicyNetwork(nn.Module):
    def __init__(self):
        super(PolicyNetwork, self).__init__()
        self.fc1 = nn.Linear(16, 128)
        self.fc2 = nn.Linear(128, 4)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return F.softmax(x, dim=-1)

class ValueNetwork(nn.Module):
    def __init__(self):
        super(ValueNetwork, self).__init__()
        self.fc1 = nn.Linear(16, 128)
        self.fc2 = nn.Linear(128, 1)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        return self.fc2(x)

def state_to_tensor(state):
    v = torch.zeros(16)
    v[state] = 1.0
    return v.unsqueeze(0)

# --- 3. REINFORCE WITH ADVANTAGE ALGORITHM ---
def train_reinforce_advantage():
    env = GridWorld()

    # Initialize Actor (Policy) and Critic (Value)
    policy_net = PolicyNetwork()
    value_net = ValueNetwork()

    # Use small learning rate and gradient clipping for stability
    policy_optimizer = optim.Adam(policy_net.parameters(), lr=0.0005)
    value_optimizer = optim.Adam(value_net.parameters(), lr=0.0005)

    num_episodes = 2000
    gamma = 0.99

    print("Training REINFORCE using Advantage Function...")

    for episode in range(num_episodes):
        state = env.reset()
        done = False

        log_probs = []
        values = []
        rewards = []

        # --- A. Collect Trajectory (Monte Carlo) ---
        while not done:
            state_t = state_to_tensor(state)

            # 1. Get Policy prob and Value estimate
            probs = policy_net(state_t)
            value = value_net(state_t)

            # 2. Sample Action
            dist = torch.distributions.Categorical(probs)
            action = dist.sample()

            # 3. Take Step
            next_state, reward, done = env.step(state, action.item())

            log_probs.append(dist.log_prob(action))
            values.append(value)
            rewards.append(reward)

            state = next_state
            if len(rewards) > 100: break # Safety break

        # --- B. Calculate Returns (G_t) ---
        returns = []
        G = 0
        for r in reversed(rewards):
            G = r + gamma * G
            returns.insert(0, G)
        returns = torch.tensor(returns, dtype=torch.float32)

        # Normalize returns for numerical stability
        if len(returns) > 1:
            returns = (returns - returns.mean()) / (returns.std() + 1e-9)
        else:
            returns = returns - returns.mean()

        # --- C. Calculate Advantage & Update ---
        policy_loss = []
        value_loss = []

        for log_prob, value, G_t in zip(log_probs, values, returns):
            # THE ADVANTAGE FUNCTION: A(s,a) = G_t - V(s)
            # We detach() value because we don't want to update the ValueNet based on Policy loss
            advantage = G_t - value.item()

            # Policy Update: Increase prob of actions with positive Advantage
            policy_loss.append(-log_prob * advantage)

            # Value Update: Make V(s) closer to actual G_t
            target = torch.tensor([G_t], dtype=torch.float32)
            value_loss.append(F.mse_loss(value.view(-1), target))

        # Backpropagation
        policy_optimizer.zero_grad()
        value_optimizer.zero_grad()

        if policy_loss:
            loss_p = torch.stack(policy_loss).sum()
            loss_v = torch.stack(value_loss).sum()

            loss_p.backward()
            loss_v.backward()

            # Gradient Clipping (Prevents Exploding Gradients/NaNs)
            torch.nn.utils.clip_grad_norm_(policy_net.parameters(), 1.0)
            torch.nn.utils.clip_grad_norm_(value_net.parameters(), 1.0)

            policy_optimizer.step()
            value_optimizer.step()

        if (episode + 1) % 500 == 0:
            print(f"Episode {episode + 1}/{num_episodes} completed.")

    return policy_net

# --- 4. VISUALIZE ---
if __name__ == "__main__":
    trained_policy = train_reinforce_advantage()

    print("\nFinal Policy (Advantage Method):")
    actions_map = {0: '↑', 1: '↓', 2: '←', 3: '→'}

    output_grid = []
    for s in range(16):
        if s in [0, 15]:
            output_grid.append(" T ")
            continue
        st = state_to_tensor(s)
        with torch.no_grad():
            probs = trained_policy(st)
            best_a = torch.argmax(probs).item()
        output_grid.append(f" {actions_map[best_a]} ")

    print("-" * 17)
    for i in range(0, 16, 4):
        print("|".join(output_grid[i:i+4]))
        print("-" * 17)

Training REINFORCE using Advantage Function...
Episode 500/2000 completed.
Episode 1000/2000 completed.
Episode 1500/2000 completed.
Episode 2000/2000 completed.

Final Policy (Advantage Method):
-----------------
 T | ← | ← | ← 
-----------------
 ↑ | ← | ↑ | ↓ 
-----------------
 ↑ | → | ↓ | ↓ 
-----------------
 ↑ | → | → | T 
-----------------


In [12]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.distributions import Normal

# --- 1. CONTINUOUS ENVIRONMENT (Simple 1D Target Seeking) ---
class ContinuousTargetEnv:
    def __init__(self):
        # State: Position on a line (Start at -2.0)
        # Goal: Reach 0.0
        self.state = np.array([-2.0], dtype=np.float32)
        self.max_steps = 200
        self.current_step = 0

    def reset(self):
        # Start at random position between -2 and -1
        self.state = np.array([np.random.uniform(-2, -1)], dtype=np.float32)
        self.current_step = 0
        return self.state

    def step(self, action):
        # Action is a continuous force [-1, 1]
        force = np.clip(action, -1.0, 1.0)

        # Dynamics: Position += Force * speed
        self.state[0] += force * 0.1

        # Calculate Reward (Negative distance to goal 0.0)
        dist = abs(self.state[0] - 0.0)
        reward = -dist

        # Check Done
        self.current_step += 1
        done = dist < 0.1 or self.current_step >= self.max_steps

        # Bonus reward for finishing
        if dist < 0.1:
            reward += 10.0

        return self.state, reward, done

# --- 2. ACTOR-CRITIC NETWORK ---
class ActorCritic(nn.Module):
    def __init__(self):
        super(ActorCritic, self).__init__()
        # Common layer
        self.fc1 = nn.Linear(1, 128)

        # ACTOR HEAD (Outputs Mean `mu` and Std Dev `sigma`)
        # Used to create a Normal Distribution (Gaussian)
        self.mu_head = nn.Linear(128, 1)
        self.sigma_head = nn.Linear(128, 1)

        # CRITIC HEAD (Outputs Value V(s))
        self.value_head = nn.Linear(128, 1)

    def forward(self, x):
        x = F.relu(self.fc1(x))

        # Actor outputs
        mu = torch.tanh(self.mu_head(x)) # Output between -1 and 1
        sigma = F.softplus(self.sigma_head(x)) + 1e-5 # Always positive

        # Critic output
        value = self.value_head(x)

        return mu, sigma, value

# --- 3. A2C ALGORITHM (Continuous) ---
def train_a2c_continuous():
    env = ContinuousTargetEnv()
    model = ActorCritic()

    # We update both heads with one optimizer
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    episodes = 1000
    gamma = 0.99

    print("Training A2C for Continuous Control...")

    for episode in range(episodes):
        state = env.reset()
        done = False
        total_reward = 0

        while not done:
            state_t = torch.FloatTensor(state)

            # 1. Forward pass
            mu, sigma, value = model(state_t)

            # 2. Sample Continuous Action from Normal Distribution
            dist = Normal(mu, sigma)
            action = dist.sample()

            # Clip action to valid range for environment
            action_numpy = action.detach().numpy()[0]

            # 3. Take Step
            next_state, reward, done = env.step(action_numpy)
            total_reward += reward

            # 4. Calculate Target (TD Target)
            next_state_t = torch.FloatTensor(next_state)

            with torch.no_grad():
                _, _, next_value = model(next_state_t)
                # If done, next value is 0
                target_value = reward + (0 if done else gamma * next_value.item())

            # 5. Calculate Advantage
            # Advantage = Target - Current_Prediction
            advantage = target_value - value

            # 6. Calculate Losses

            # Critic Loss: MSE(Target, Predicted)
            critic_loss = advantage.pow(2)

            # Actor Loss: -log_prob * advantage
            # (We detach advantage so we don't backprop through critic here)
            log_prob = dist.log_prob(action)
            actor_loss = -log_prob * advantage.detach()

            # Total Loss
            loss = actor_loss + critic_loss

            # 7. Update
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            state = next_state

        if (episode + 1) % 100 == 0:
            print(f"Episode {episode + 1}/episodes: Total Reward = {total_reward:.2f}")

    return model

# --- 4. TEST ---
if __name__ == "__main__":
    trained_model = train_a2c_continuous()

    print("\nTesting Trained Policy (Moving from -2.0 to 0.0):")
    env = ContinuousTargetEnv()
    state = env.reset()

    for i in range(10):
        state_t = torch.FloatTensor(state)
        with torch.no_grad():
            mu, sigma, _ = trained_model(state_t)
            # In testing, we just use the Mean (mu) - no randomness
            action = mu.item()

        next_state, _, done = env.step(action)
        print(f"Step {i+1}: Pos {state[0]:.2f} -> Action {action:.2f} -> New Pos {next_state[0]:.2f}")
        state = next_state
        if done: break

Training A2C for Continuous Control...
Episode 100/episodes: Total Reward = -4.85
Episode 200/episodes: Total Reward = -1.69
Episode 300/episodes: Total Reward = 0.61
Episode 400/episodes: Total Reward = -0.43
Episode 500/episodes: Total Reward = 4.89
Episode 600/episodes: Total Reward = 1.66
Episode 700/episodes: Total Reward = 0.76
Episode 800/episodes: Total Reward = 2.81
Episode 900/episodes: Total Reward = -4.71
Episode 1000/episodes: Total Reward = -3.22

Testing Trained Policy (Moving from -2.0 to 0.0):
Step 1: Pos -1.17 -> Action 1.00 -> New Pos -1.17
Step 2: Pos -1.07 -> Action 1.00 -> New Pos -1.07
Step 3: Pos -0.97 -> Action 1.00 -> New Pos -0.97
Step 4: Pos -0.87 -> Action 1.00 -> New Pos -0.87
Step 5: Pos -0.77 -> Action 1.00 -> New Pos -0.77
Step 6: Pos -0.67 -> Action 1.00 -> New Pos -0.67
Step 7: Pos -0.57 -> Action 1.00 -> New Pos -0.57
Step 8: Pos -0.47 -> Action 0.99 -> New Pos -0.47
Step 9: Pos -0.37 -> Action 0.99 -> New Pos -0.37
Step 10: Pos -0.27 -> Action 0.98 

In [13]:
import numpy as np

# --- 1. MULTI-AGENT ENVIRONMENT ---
class MultiAgentGridWorld:
    def __init__(self):
        self.grid_size = 4
        self.actions = [0, 1, 2, 3] # UP, DOWN, LEFT, RIGHT

        # Goals
        self.goal_A = (3, 3) # Bottom-Right
        self.goal_B = (0, 3) # Top-Right

    def reset(self):
        # Start positions
        self.pos_A = (0, 0)
        self.pos_B = (3, 0)

        # Return combined state: (rowA, colA, rowB, colB)
        return self.pos_A + self.pos_B

    def step(self, action_A, action_B):
        # 1. Calculate Proposed New Positions
        new_pos_A = self._move(self.pos_A, action_A)
        new_pos_B = self._move(self.pos_B, action_B)

        reward_A = -1
        reward_B = -1
        done_A = False
        done_B = False

        # 2. Check for Collisions (Agents hitting each other)
        if new_pos_A == new_pos_B:
            # Crash! Both stay in place and get big penalty
            reward_A = -10
            reward_B = -10
            new_pos_A = self.pos_A
            new_pos_B = self.pos_B
        else:
            # 3. Check for Goals
            if new_pos_A == self.goal_A:
                reward_A = 100
                done_A = True

            if new_pos_B == self.goal_B:
                reward_B = 100
                done_B = True

        # Update positions (if not done)
        if not done_A: self.pos_A = new_pos_A
        if not done_B: self.pos_B = new_pos_B

        next_state = self.pos_A + self.pos_B

        # Global Done: When BOTH finished
        # (For simplicity in this simulation, we reset if ONE finishes to keep them training together,
        # or we could wait. Here we'll treat episode as done if EITHER finishes for faster training cycles)
        done = done_A or done_B

        return next_state, reward_A, reward_B, done

    def _move(self, pos, action):
        r, c = pos
        if action == 0:   r = max(r - 1, 0) # UP
        elif action == 1: r = min(r + 1, self.grid_size - 1) # DOWN
        elif action == 2: c = max(c - 1, 0) # LEFT
        elif action == 3: c = min(c + 1, self.grid_size - 1) # RIGHT
        return (r, c)

# --- 2. INDEPENDENT Q-LEARNING ---
def train_marl():
    env = MultiAgentGridWorld()

    # State Space: 4x4 for Agent A * 4x4 for Agent B = 256 states
    # We map state tuple (r1, c1, r2, c2) to an index 0-255
    def get_state_idx(state_tuple):
        r1, c1, r2, c2 = state_tuple
        # Flattening 4D coordinate to 1D index
        return r1*64 + c1*16 + r2*4 + c2

    # Two Independent Q-Tables
    Q_A = np.zeros((256, 4))
    Q_B = np.zeros((256, 4))

    # Hyperparameters
    episodes = 5000
    alpha = 0.1
    gamma = 0.95
    epsilon = 0.1

    print("Training Multi-Agent System (Agents A & B)...")

    for episode in range(episodes):
        state = env.reset()
        state_idx = get_state_idx(state)
        done = False

        while not done:
            # --- ACTION SELECTION (Epsilon-Greedy) ---
            if np.random.rand() < epsilon:
                act_A = np.random.choice(env.actions)
            else:
                act_A = np.argmax(Q_A[state_idx])

            if np.random.rand() < epsilon:
                act_B = np.random.choice(env.actions)
            else:
                act_B = np.argmax(Q_B[state_idx])

            # --- STEP ---
            next_state, rA, rB, done = env.step(act_A, act_B)
            next_state_idx = get_state_idx(next_state)

            # --- UPDATE Q-TABLES SEPARATELY ---

            # Update Agent A
            old_val_A = Q_A[state_idx, act_A]
            next_max_A = np.max(Q_A[next_state_idx])
            Q_A[state_idx, act_A] = old_val_A + alpha * (rA + gamma * next_max_A - old_val_A)

            # Update Agent B
            old_val_B = Q_B[state_idx, act_B]
            next_max_B = np.max(Q_B[next_state_idx])
            Q_B[state_idx, act_B] = old_val_B + alpha * (rB + gamma * next_max_B - old_val_B)

            state_idx = next_state_idx

            # Safety break
            if rA == 100 or rB == 100:
                break

    return Q_A, Q_B, env

# --- 3. ANALYSE / TEST ---
if __name__ == "__main__":
    qa, qb, env = train_marl()

    print("\n--- Testing MARL Interaction ---")
    print("Agent A: (0,0) -> (3,3)")
    print("Agent B: (3,0) -> (0,3)")

    state = env.reset()
    state_idx = 0 # Calculated manually for (0,0,3,0)

    # Helper to print grid
    def print_grid(pos_a, pos_b):
        grid = [[' . ' for _ in range(4)] for _ in range(4)]
        grid[pos_a[0]][pos_a[1]] = ' A '
        grid[pos_b[0]][pos_b[1]] = ' B '
        if pos_a == pos_b: grid[pos_a[0]][pos_a[1]] = ' X ' # Collision
        for row in grid:
            print("".join(row))
        print("-" * 12)

    pos_A = (0,0)
    pos_B = (3,0)

    print_grid(pos_A, pos_B)

    for step in range(8):
        # Calculate state index from current positions
        idx = pos_A[0]*64 + pos_A[1]*16 + pos_B[0]*4 + pos_B[1]

        # Choose best actions
        act_A = np.argmax(qa[idx])
        act_B = np.argmax(qb[idx])

        move_map = {0:'UP', 1:'DOWN', 2:'LEFT', 3:'RIGHT'}
        print(f"Step {step+1}: Agent A goes {move_map[act_A]}, Agent B goes {move_map[act_B]}")

        # Execute (using internal env logic manually to show steps)
        # Note: In test, we assume they learned to avoid collision
        new_state, _, _, _ = env.step(act_A, act_B)

        # Extract positions from tuple (rA, cA, rB, cB)
        pos_A = (new_state[0], new_state[1])
        pos_B = (new_state[2], new_state[3])

        print_grid(pos_A, pos_B)

        if pos_A == (3,3) and pos_B == (0,3):
            print("Both Agents Reached Goals!")
            break

Training Multi-Agent System (Agents A & B)...

--- Testing MARL Interaction ---
Agent A: (0,0) -> (3,3)
Agent B: (3,0) -> (0,3)
 A  .  .  . 
 .  .  .  . 
 .  .  .  . 
 B  .  .  . 
------------
Step 1: Agent A goes DOWN, Agent B goes RIGHT
 .  .  .  . 
 A  .  .  . 
 .  .  .  . 
 .  B  .  . 
------------
Step 2: Agent A goes RIGHT, Agent B goes UP
 .  .  .  . 
 .  A  .  . 
 .  B  .  . 
 .  .  .  . 
------------
Step 3: Agent A goes DOWN, Agent B goes RIGHT
 .  .  .  . 
 .  .  .  . 
 .  A  B  . 
 .  .  .  . 
------------
Step 4: Agent A goes RIGHT, Agent B goes RIGHT
 .  .  .  . 
 .  .  .  . 
 .  .  A  B 
 .  .  .  . 
------------
Step 5: Agent A goes RIGHT, Agent B goes UP
 .  .  .  . 
 .  .  .  B 
 .  .  .  A 
 .  .  .  . 
------------
Step 6: Agent A goes DOWN, Agent B goes UP
 .  .  .  . 
 .  .  .  B 
 .  .  .  A 
 .  .  .  . 
------------
Step 7: Agent A goes DOWN, Agent B goes UP
 .  .  .  . 
 .  .  .  B 
 .  .  .  A 
 .  .  .  . 
------------
Step 8: Agent A goes DOWN, Agent B goes