In [1]:
#

**1.Policy evaluation using Monte Carlo Method (using any method for 4×4 grid world)**


import numpy as np
from collections import defaultdict
np.random.seed(0)

# -------- GRID SIZE --------
rows = 4
cols = 4
n = rows * cols

# terminal states
terminal = [0, n-1]

# actions = L, R, U, D
actions = [(0,-1),(0,1),(-1,0),(1,0)]

def step(s, a):
    r, c = divmod(s, cols)
    dr, dc = a
    nr, nc = r + dr, c + dc

    # invalid move → stay in same state
    if nr < 0 or nr >= rows or nc < 0 or nc >= cols:
        nr, nc = r, c

    ns = nr * cols + nc
    return ns, -1      # reward = -1


# -------- RANDOM POLICY (π(a|s) = 1/4) --------
def policy(s):
    return actions[np.random.randint(4)]


# -------- MONTE CARLO POLICY EVALUATION --------
V = defaultdict(float)
returns = defaultdict(list)

episodes = 5000

for _ in range(episodes):

    # start from a random NON-terminal state
    s = np.random.randint(1, n-1)
    while s in terminal:
        s = np.random.randint(1, n-1)

    episode = []

    # generate full episode
    while True:
        a = policy(s)
        ns, r = step(s, a)
        episode.append((s, r))

        if ns in terminal:
            break
        s = ns

    # FIRST-VISIT MC UPDATE
    G = 0
    visited = set()

    for s, r in reversed(episode):
        G += r
        if s not in visited:
            visited.add(s)
            returns[s].append(G)
            V[s] = np.mean(returns[s])


# -------- PRINT VALUE FUNCTION --------
value_grid = np.zeros((rows, cols))
for s, v in V.items():
    value_grid[s // cols][s % cols] = round(v, 2)

print("Monte Carlo Value Function (4x4 Grid):\n")
print(value_grid)


Monte Carlo Value Function (4x4 Grid):

[[  0.    -6.88 -10.08 -12.16]
 [ -7.72  -8.55  -9.21  -9.92]
 [-10.24  -9.13  -8.32  -7.02]
 [-11.97  -9.94  -7.31   0.  ]]


In [2]:
#Q2: Simple MDP simulation in a 4×4 grid world for Markov Decision Process (MDP) for planning with Value Iteration
import numpy as np

GRID_SIZE = 4
STATE_COUNT = GRID_SIZE * GRID_SIZE
TERMINAL_STATES = [0, STATE_COUNT - 1]
ACTIONS = ["U", "D", "L", "R"]
GAMMA = 1.0
THRESHOLD = 1e-4

V = np.zeros(STATE_COUNT)

def state_to_pos(state):
    return state // GRID_SIZE, state % GRID_SIZE

def pos_to_state(row, col):
    return row * GRID_SIZE + col

def step(state, action):
    if state in TERMINAL_STATES:
        return state, 0

    r, c = state_to_pos(state)

    if action == "U": r = max(r - 1, 0)
    elif action == "D": r = min(r + 1, GRID_SIZE - 1)
    elif action == "L": c = max(c - 1, 0)
    elif action == "R": c = min(c + 1, GRID_SIZE - 1)

    next_state = pos_to_state(r, c)
    reward = -1
    return next_state, reward

iteration = 0
while True:
    delta = 0
    new_V = np.copy(V)

    for s in range(STATE_COUNT):
        if s in TERMINAL_STATES:
            continue

        values = []
        for action in ACTIONS:
            next_state, reward = step(s, action)
            values.append(reward + GAMMA * V[next_state])

        new_V[s] = max(values)
        delta = max(delta, abs(V[s] - new_V[s]))

    V = new_V
    iteration += 1

    if delta < THRESHOLD:
        break

print("Converged in", iteration, "iterations")
print("\nOptimal Value Function:")
print(np.round(V.reshape(GRID_SIZE, GRID_SIZE), 2))

policy = {}
for s in range(STATE_COUNT):
    if s in TERMINAL_STATES:
        policy[s] = None
    else:
        q_values = []
        for action in ACTIONS:
            next_state, reward = step(s, action)
            q_values.append(reward + GAMMA * V[next_state])
        best_action = ACTIONS[np.argmax(q_values)]
        policy[s] = best_action

print("\nOptimal Policy (as arrows):")
policy_symbols = {"U": "↑", "D": "↓", "L": "←", "R": "→", None: "T"}

output = []
for s in range(STATE_COUNT):
    output.append(policy_symbols[policy[s]])
output = np.array(output).reshape(GRID_SIZE, GRID_SIZE)
print(output)


Converged in 4 iterations

Optimal Value Function:
[[ 0. -1. -2. -3.]
 [-1. -2. -3. -2.]
 [-2. -3. -2. -1.]
 [-3. -2. -1.  0.]]

Optimal Policy (as arrows):
[['T' '←' '←' '↓']
 ['↑' '↑' '↑' '↓']
 ['↑' '↑' '↓' '↓']
 ['↑' '→' '→' 'T']]


In [4]:
# Simple MDP simulation in a 4x4 grid world for markov decision process (MDP) for Rollout the optimal policy from (0,0)

import numpy as np

# Gridworld parameters
N_ROWS = 4
N_COLS = 4
N_STATES = N_ROWS * N_COLS
ACTIONS = ['U', 'D', 'L', 'R']
ACTION_DELTAS = {
    'U': (-1, 0),
    'D': (1, 0),
    'L': (0, -1),
    'R': (0, 1),
}
TERMINAL_STATES = [0, 15]

ARROWS = {
    'U': '↑',
    'D': '↓',
    'L': '←',
    'R': '→',
    'T': 'T'
}

def state_to_pos(s):
    return divmod(s, N_COLS)

def pos_to_state(r, c):
    return r * N_COLS + c

def step(state, action):
    """Returns next_state, reward."""
    if state in TERMINAL_STATES:
        return state, 0

    r, c = state_to_pos(state)
    dr, dc = ACTION_DELTAS[action]
    nr = min(max(r + dr, 0), N_ROWS - 1)
    nc = min(max(c + dc, 0), N_COLS - 1)
    next_state = pos_to_state(nr, nc)

    reward = -1
    return next_state, reward

def value_iteration(gamma=1.0, theta=1e-4):
    """Runs Value Iteration and returns optimal V and optimal policy."""
    V = np.zeros(N_STATES)

    while True:
        delta = 0
        new_V = np.copy(V)

        for s in range(N_STATES):
            if s in TERMINAL_STATES:
                continue

            q_values = []
            for a in ACTIONS:
                ns, r = step(s, a)
                q_values.append(r + gamma * V[ns])

            new_V[s] = max(q_values)
            delta = max(delta, abs(new_V[s] - V[s]))

        V = new_V
        if delta < theta:
            break

    # Extract optimal policy
    policy = np.empty(N_STATES, dtype=str)
    for s in range(N_STATES):
        if s in TERMINAL_STATES:
            policy[s] = 'T'
            continue

        q_values = []
        for a in ACTIONS:
            ns, r = step(s, a)
            q_values.append((r + gamma * V[ns], a))

        policy[s] = max(q_values)[1]

    return V, policy

def rollout_policy(policy, start_state=0):
    """Simulates following the optimal policy from start_state."""
    state = start_state
    trajectory = []

    while True:
        trajectory.append(state)
        if state in TERMINAL_STATES:
            break
        action = policy[state]
        next_state, reward = step(state, action)
        state = next_state

    return trajectory

def print_grid(values):
    for r in range(N_ROWS):
        row = values[r*N_COLS:(r+1)*N_COLS]
        print(" ".join(f"{x:6.2f}" if isinstance(x, (int,float)) else f"  {x}  " for x in row))
    print()

def print_policy_grid(policy):
    arrow_policy = [ARROWS[p] for p in policy]
    for r in range(N_ROWS):
        row = arrow_policy[r*N_COLS:(r+1)*N_COLS]
        print(" ".join(f"  {x}  " for x in row))
    print()

if __name__ == "__main__":
    # Step 1: Compute optimal V and policy
    V, policy = value_iteration()

    print("\nOptimal Value Function:")
    print_grid(V)

    print("Optimal Policy (arrows):")
    print_policy_grid(policy)

    # Step 2: Rollout from (0,0)
    print("Rollout from start (0,0):")
    trajectory = rollout_policy(policy, start_state=0)
    print(" → ".join(str(s) for s in trajectory))



Optimal Value Function:
  0.00  -1.00  -2.00  -3.00
 -1.00  -2.00  -3.00  -2.00
 -2.00  -3.00  -2.00  -1.00
 -3.00  -2.00  -1.00   0.00

Optimal Policy (arrows):
  T     ←     ←     ←  
  ↑     ↑     ↑     ↓  
  ↑     ↑     →     ↓  
  ↑     →     →     T  

Rollout from start (0,0):
0


In [5]:
# Simple MDP simulation in a 4x4 grid world for Markov Decision Process (MDP)
# Learn from interaction with Q-learning (unknown model)

import numpy as np
import random

# Gridworld parameters
N_ROWS = 4
N_COLS = 4
N_STATES = N_ROWS * N_COLS
ACTIONS = ['U', 'D', 'L', 'R']
ACTION_DELTAS = {
    'U': (-1, 0),
    'D': (1, 0),
    'L': (0, -1),
    'R': (0, 1),
}
TERMINAL_STATES = [0, 15]

# Arrow symbols
ARROWS = {
    'U': '↑',
    'D': '↓',
    'L': '←',
    'R': '→',
    'T': 'T'
}

# --- Helpers ---
def pos_to_state(r, c):
    return r * N_COLS + c

def state_to_pos(s):
    return divmod(s, N_COLS)

def step(state, action):
    """Environment transition: returns next_state, reward, done."""
    if state in TERMINAL_STATES:
        return state, 0, True

    r, c = state_to_pos(state)
    dr, dc = ACTION_DELTAS[action]

    nr = min(max(r + dr, 0), N_ROWS - 1)
    nc = min(max(c + dc, 0), N_COLS - 1)
    next_state = pos_to_state(nr, nc)

    reward = -1
    done = next_state in TERMINAL_STATES
    return next_state, reward, done

# --- Q-learning ---
def q_learning(alpha=0.1, gamma=0.99, epsilon=0.1, episodes=5000, max_steps=100):
    Q = np.zeros((N_STATES, len(ACTIONS)))

    for ep in range(episodes):
        state = random.choice([s for s in range(N_STATES) if s not in TERMINAL_STATES])

        for _ in range(max_steps):
            # ε-greedy selection
            if random.random() < epsilon:
                action_idx = random.randint(0, len(ACTIONS) - 1)
            else:
                action_idx = np.argmax(Q[state])

            action = ACTIONS[action_idx]
            next_state, reward, done = step(state, action)

            # Q-learning update
            td_target = reward + gamma * np.max(Q[next_state])
            Q[state, action_idx] += alpha * (td_target - Q[state, action_idx])

            state = next_state
            if done:
                break

    return Q

def extract_policy(Q):
    """Greedy policy using arrow symbols."""
    policy = []
    for s in range(N_STATES):
        if s in TERMINAL_STATES:
            policy.append(ARROWS['T'])
        else:
            best_a = ACTIONS[np.argmax(Q[s])]
            policy.append(ARROWS[best_a])
    return policy

def rollout(policy, start_state):
    """Rollout the learned policy."""
    state = start_state
    traj = [state]

    while state not in TERMINAL_STATES:
        # Convert arrow back to action letter
        arrow_to_action = {v: k for k, v in ARROWS.items()}
        action = arrow_to_action[policy[state]]

        next_state, reward, done = step(state, action)
        traj.append(next_state)
        state = next_state
        if done:
            break
    return traj

def print_grid(values):
    for r in range(N_ROWS):
        row = values[r*N_COLS:(r+1)*N_COLS]
        print(" ".join(f"  {v}  " for v in row))
    print()

# --- Run training ---
if __name__ == "__main__":
    Q = q_learning(episodes=8000)
    policy = extract_policy(Q)

    print("\nLearned Q-values:")
    for s in range(N_STATES):
        print(f"State {s}: {Q[s]}")

    print("\nLearned Policy (arrows):")
    print_grid(policy)

    print("Rollout from start state (0,1):")
    print(rollout(policy, start_state=1))



Learned Q-values:
State 0: [0. 0. 0. 0.]
State 1: [-1.98426336 -2.9595528  -1.         -2.95807319]
State 2: [-2.91733892 -3.71013589 -1.99       -3.84070755]
State 3: [-3.5059542 -2.9701    -2.9701    -3.4471014]
State 4: [-1.         -2.95765275 -1.97673318 -2.96591216]
State 5: [-1.99       -3.81556649 -1.99       -3.88200359]
State 6: [-2.97009998 -2.97009998 -2.97009998 -2.97009998]
State 7: [-3.82549926 -1.99       -3.85440009 -2.91770311]
State 8: [-1.99       -3.73895776 -2.93186307 -3.7958451 ]
State 9: [-2.97009995 -2.97009995 -2.97009995 -2.97009995]
State 10: [-3.67820949 -1.99       -3.77207614 -1.99      ]
State 11: [-2.96591764 -1.         -2.95972061 -1.98486613]
State 12: [-2.9701     -3.51451074 -3.70538632 -2.9701    ]
State 13: [-3.84813121 -2.88851547 -3.78080214 -1.99      ]
State 14: [-2.96767107 -1.98937214 -2.96921778 -1.        ]
State 15: [0. 0. 0. 0.]

Learned Policy (arrows):
  T     ←     ←     ←  
  ↑     ↑     ←     ↓  
  ↑     ←     ↓     ↓  
  →     →

In [6]:

#TD(0) Learning

import numpy as np
import random

# -----------------------------
# Simple Random Walk Environment
# -----------------------------
class RandomWalkEnv:
    """
    States: 0 1 2 3 4 5 6
    0 and 6 are terminal states.
    Start state = 3 every episode.
    Reward = +1 only when reaching state 6, else 0.
    """
    def __init__(self):
        self.start_state = 3
        self.terminal_states = [0, 6]
        self.state = self.start_state

    def reset(self):
        """Reset environment to start of episode."""
        self.state = self.start_state
        return self.state

    def step(self, action):
        """
        action: -1 = move left, +1 = move right
        returns: next_state, reward, done
        """
        next_state = self.state + action

        # Reward only if right terminal (state 6)
        if next_state == 6:
            reward = 1.0
        else:
            reward = 0.0

        self.state = next_state
        done = next_state in self.terminal_states
        return next_state, reward, done


# -----------------------------
# TD(0) Algorithm (Prediction)
# -----------------------------
def td0_prediction(num_episodes=100, alpha=0.1, gamma=1.0):
    """
    Temporal Difference (TD(0)) Learning to estimate V(s).

    num_episodes: how many episodes to run
    alpha: learning rate
    gamma: discount factor
    """
    env = RandomWalkEnv()

    # There are 7 states: 0 to 6
    # Initialize value function V(s)
    V = np.zeros(7)

    # Terminal states are fixed (by definition of the problem)
    V[0] = 0.0
    V[6] = 0.0

    for episode in range(num_episodes):
        state = env.reset()

        while True:
            # Policy: choose left or right with equal probability
            action = random.choice([-1, 1])

            next_state, reward, done = env.step(action)

            # TD(0) Update:
            # V(s) ← V(s) + α [ r + γ V(s') - V(s) ]
            td_target = reward + gamma * V[next_state]
            td_error = td_target - V[state]
            V[state] = V[state] + alpha * td_error

            state = next_state

            if done:
                break

    return V


if __name__ == "__main__":
    # Run TD(0)
    num_episodes = 200  # you can increase for better estimates
    alpha = 0.1
    gamma = 1.0

    V = td0_prediction(num_episodes=num_episodes, alpha=alpha, gamma=gamma)

    print("Estimated State Values after TD(0):")
    for s in range(7):
        print(f"V({s}) = {V[s]:.3f}")


Estimated State Values after TD(0):
V(0) = 0.000
V(1) = 0.099
V(2) = 0.288
V(3) = 0.410
V(4) = 0.714
V(5) = 0.888
V(6) = 0.000


In [7]:

#TD(lambda)Learning
import numpy as np
import random

# -----------------------------
# Simple Random Walk Environment
# -----------------------------
class RandomWalkEnv:
    """
    States: 0 1 2 3 4 5 6
    0 and 6 are terminal states.
    Start state = 3 every episode.
    Reward = +1 only when reaching state 6, else 0.
    """
    def __init__(self):
        self.start_state = 3
        self.terminal_states = [0, 6]
        self.state = self.start_state

    def reset(self):
        """Reset environment to start of episode."""
        self.state = self.start_state
        return self.state

    def step(self, action):
        """
        action: -1 = move left, +1 = move right
        returns: next_state, reward, done
        """
        next_state = self.state + action

        # Reward only if right terminal (state 6)
        if next_state == 6:
            reward = 1.0
        else:
            reward = 0.0

        self.state = next_state
        done = next_state in self.terminal_states
        return next_state, reward, done


# -----------------------------
# TD(λ) Algorithm (Prediction)
# -----------------------------
def td_lambda_prediction(num_episodes=100, alpha=0.1, gamma=1.0, lam=0.8):
    """
    Temporal Difference TD(λ) Learning to estimate V(s) using eligibility traces.

    num_episodes: number of episodes
    alpha       : learning rate
    gamma       : discount factor
    lam         : lambda parameter (0 <= λ <= 1)
    """
    env = RandomWalkEnv()

    # There are 7 states: 0 to 6
    V = np.zeros(7)

    # terminal values
    V[0] = 0.0
    V[6] = 0.0

    for episode in range(num_episodes):
        state = env.reset()

        # Initialize eligibility traces to 0 for all states
        E = np.zeros(7)

        while True:
            # Behaviour policy: random left/right
            action = random.choice([-1, 1])

            next_state, reward, done = env.step(action)

            # TD error δ = r + γ V(s') − V(s)
            td_target = reward + gamma * V[next_state]
            td_error = td_target - V[state]

            # Increase eligibility of current state
            E[state] += 1.0     # accumulating traces

            # Update all states' values using their eligibility
            V += alpha * td_error * E

            # Decay eligibility traces
            E *= gamma * lam

            state = next_state

            if done:
                break

    return V


if __name__ == "__main__":
    num_episodes = 200
    alpha = 0.1
    gamma = 1.0
    lam = 0.8

    V = td_lambda_prediction(num_episodes=num_episodes,
                             alpha=alpha, gamma=gamma, lam=lam)

    print("Estimated State Values after TD(λ):")
    for s in range(7):
        print(f"V({s}) = {V[s]:.3f}")


Estimated State Values after TD(λ):
V(0) = 0.000
V(1) = 0.108
V(2) = 0.362
V(3) = 0.477
V(4) = 0.772
V(5) = 0.929
V(6) = 0.000


In [8]:
#**7.Implement SARSA**
import numpy as np
np.random.seed(3)

terminal=[0,15]
actions=[-1,1,-4,4]

def step(s,a):
    ns=s+a
    if s%4==0 and a==-1: ns=s
    if s%4==3 and a==1:  ns=s
    if s<4 and a==-4:    ns=s
    if s>11 and a==4:    ns=s
    return ns, -1

Q = np.zeros((16,4))
alpha=0.1; gamma=0.9; eps=0.1

def choose_action(s):
    return np.random.randint(4) if np.random.rand()<eps else np.argmax(Q[s])

for ep in range(5000):
    s=np.random.randint(1,15)
    a=choose_action(s)
    while s not in terminal:
        ns,r=step(s, actions[a])
        na=choose_action(ns)
        Q[s,a]+=alpha*(r + gamma*Q[ns,na] - Q[s,a])
        s, a = ns, na

print("Q-values:\n",Q)
print("\nOptimal Policy (0=L,1=R,2=U,3=D):")
print(np.argmax(Q,axis=1).reshape(4,4))


Q-values:
 [[ 0.          0.          0.          0.        ]
 [-1.         -2.75969433 -1.93825794 -2.64678806]
 [-2.05736249 -3.27416015 -2.51254639 -3.21308758]
 [-2.77978564 -3.3441002  -3.26550571 -3.07328088]
 [-2.01571618 -2.74830145 -1.         -2.84609271]
 [-1.90439482 -3.26326488 -2.16444694 -3.31811676]
 [-2.8802241  -2.85954204 -2.89831784 -2.92199652]
 [-3.15908061 -2.63701824 -3.45466874 -1.9265822 ]
 [-2.62791552 -3.1930961  -1.91237278 -3.16110425]
 [-2.91718837 -2.94894018 -2.79835264 -2.9226367 ]
 [-3.05521843 -2.12749049 -3.3840957  -1.91455912]
 [-2.70340466 -1.96902541 -2.90034408 -1.        ]
 [-3.16344124 -2.80676776 -2.92140916 -3.16811076]
 [-3.08968346 -1.92454024 -3.2422792  -2.71800099]
 [-2.79405122 -1.         -2.71554707 -1.84246869]
 [ 0.          0.          0.          0.        ]]

Optimal Policy (0=L,1=R,2=U,3=D):
[[0 0 0 0]
 [2 0 1 3]
 [2 2 3 3]
 [1 1 1 0]]


In [9]:
# **8.Implement Q Learning**
import numpy as np
np.random.seed(5)

terminal=[0,15]
actions=[-1,1,-4,4]

def step(s,a):
    ns=s+a
    if s%4==0 and a==-1: ns=s
    if s%4==3 and a==1:  ns=s
    if s<4 and a==-4:    ns=s
    if s>11 and a==4:    ns=s
    return ns, -1

Q=np.zeros((16,4))
alpha=0.1; gamma=0.9; eps=0.1

def choose(s):
    return np.random.randint(4) if np.random.rand()<eps else np.argmax(Q[s])

for ep in range(5000):
    s=np.random.randint(1,15)
    while s not in terminal:
        a=choose(s)
        ns,r=step(s, actions[a])
        Q[s,a]+=alpha*(r + gamma*np.max(Q[ns]) - Q[s,a])
        s=ns

print("Q-table:\n",Q)
print("\nOptimal Policy:")
print(np.argmax(Q,axis=1).reshape(4,4))


Q-table:
 [[ 0.          0.          0.          0.        ]
 [-1.         -2.65063947 -1.88072721 -2.68025455]
 [-1.9        -2.91640453 -2.48169662 -3.20758561]
 [-2.71       -3.05913846 -3.1965142  -2.71      ]
 [-1.8760517  -2.65954214 -1.         -2.64150682]
 [-1.9        -3.10434146 -1.9        -3.26946587]
 [-2.70998897 -2.70998868 -2.70998868 -2.70998801]
 [-3.17904553 -2.48040655 -3.06180302 -1.9       ]
 [-2.5744077  -3.25902145 -1.9        -3.05404097]
 [-2.70998301 -2.70998334 -2.70998215 -2.70998187]
 [-3.136205   -1.9        -2.88301612 -1.9       ]
 [-2.67518004 -1.88586939 -2.58655912 -1.        ]
 [-3.18966072 -2.71       -2.71       -3.19693897]
 [-3.15297499 -1.9        -3.09542527 -2.55924191]
 [-2.60924688 -1.         -2.68022988 -1.86335926]
 [ 0.          0.          0.          0.        ]]

Optimal Policy:
[[0 0 0 0]
 [2 0 3 3]
 [2 3 1 3]
 [2 1 1 0]]


In [None]:
pip install torch

In [10]:
#DQN
import random, collections, math, numpy as np, gymnasium as gym, torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Simple Q-Network
class QNetwork(nn.Module):
    def __init__(self, obs_dim, n_actions):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(obs_dim, 128), nn.ReLU(),
            nn.Linear(128, 128), nn.ReLU(),
            nn.Linear(128, n_actions)
        )
    def forward(self, x):
        return self.net(x)

# Replay buffer
Transition = collections.namedtuple("Transition", ("s", "a", "r", "s2", "done"))
class ReplayBuffer:
    def __init__(self, capacity=10000):
        self.buffer = collections.deque(maxlen=capacity)
    def push(self, *args): self.buffer.append(Transition(*args))
    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        return Transition(*zip(*batch))
    def __len__(self): return len(self.buffer)

# DQN training function
def train_dqn():
    env = gym.make("CartPole-v1")
    obs_dim = env.observation_space.shape[0]
    n_actions = env.action_space.n

    policy_net = QNetwork(obs_dim, n_actions).to(device)
    target_net = QNetwork(obs_dim, n_actions).to(device)
    target_net.load_state_dict(policy_net.state_dict())
    target_net.eval()

    optimizer = optim.Adam(policy_net.parameters(), lr=1e-3)
    buffer = ReplayBuffer()
    GAMMA, BATCH_SIZE, TARGET_UPDATE, EPS_DECAY = 0.99, 64, 500, 5000
    EPS_START, EPS_END = 1.0, 0.01
    total_steps, rewards = 0, []

    def epsilon(step):
        return EPS_END + (EPS_START - EPS_END) * math.exp(-1. * step / EPS_DECAY)

    for ep in range(300):
        state, _ = env.reset()
        done, ep_reward = False, 0
        while not done:
            eps = epsilon(total_steps)
            total_steps += 1
            if random.random() < eps:
                action = env.action_space.sample()
            else:
                with torch.no_grad():
                    s = torch.tensor(np.array(state), dtype=torch.float32, device=device).unsqueeze(0)
                    action = int(policy_net(s).argmax(1).item())

            next_state, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated
            buffer.push(state, action, reward, next_state, done)
            state = next_state
            ep_reward += reward

            if len(buffer) >= BATCH_SIZE:
                transitions = buffer.sample(BATCH_SIZE)
                s = torch.tensor(np.array(transitions.s), dtype=torch.float32, device=device)
                a = torch.tensor(transitions.a, dtype=torch.int64, device=device).unsqueeze(1)
                r = torch.tensor(transitions.r, dtype=torch.float32, device=device).unsqueeze(1)
                s2 = torch.tensor(np.array(transitions.s2), dtype=torch.float32, device=device)
                done_mask = torch.tensor(transitions.done, dtype=torch.float32, device=device).unsqueeze(1)

                q_vals = policy_net(s).gather(1, a)
                with torch.no_grad():
                    q_next = target_net(s2).max(1)[0].unsqueeze(1)
                    q_target = r + GAMMA * q_next * (1 - done_mask)

                loss = nn.functional.mse_loss(q_vals, q_target)
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

            if total_steps % TARGET_UPDATE == 0:
                target_net.load_state_dict(policy_net.state_dict())

        rewards.append(ep_reward)
        if ep % 10 == 0:
            print(f"Episode {ep} | Avg Reward: {np.mean(rewards[-10:]):.2f}")

    env.close()
    plt.plot(rewards)
    plt.xlabel("Episode")
    plt.ylabel("Reward")
    plt.title("DQN Training (Gymnasium - CartPole)")
    plt.show()

train_dqn()

ModuleNotFoundError: No module named 'torch'

In [1]:
# Implement REINFORCE algorithm with Baseline

import torch
import torch.nn as nn
import torch.optim as optim
import gymnasium as gym
import numpy as np

# ---------------------------------------------
#  Policy Network (Actor)
# ---------------------------------------------
class PolicyNetwork(nn.Module):
    def _init_(self, state_dim, action_dim):
        super()._init_()
        self.fc = nn.Sequential(
            nn.Linear(state_dim, 128),
            nn.ReLU(),
            nn.Linear(128, action_dim),
            nn.Softmax(dim=-1)
        )

    def forward(self, x):
        return self.fc(x)

# ---------------------------------------------
#  Baseline Network (State Value)
# ---------------------------------------------
class ValueNetwork(nn.Module):
    def _init_(self, state_dim):
        super()._init_()
        self.fc = nn.Sequential(
            nn.Linear(state_dim, 128),
            nn.ReLU(),
            nn.Linear(128, 1)
        )

    def forward(self, x):
        return self.fc(x)

# ---------------------------------------------
#  Compute discounted returns
# ---------------------------------------------
def compute_returns(rewards, gamma=0.99):
    returns = []
    R = 0
    for r in reversed(rewards):
        R = r + gamma * R
        returns.insert(0, R)
    return torch.tensor(returns, dtype=torch.float32)

# ---------------------------------------------
#  REINFORCE with Baseline – Training Loop
# ---------------------------------------------
env = gym.make("CartPole-v1")
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n

policy_net = PolicyNetwork(state_dim, action_dim)
value_net = ValueNetwork(state_dim)

policy_optimizer = optim.Adam(policy_net.parameters(), lr=1e-3)
value_optimizer = optim.Adam(value_net.parameters(), lr=1e-3)

num_episodes = 100
gamma = 0.99

for episode in range(num_episodes):
    state, _ = env.reset()
    log_probs, values, rewards = [], [], []
    done = False

    while not done:
        state_tensor = torch.FloatTensor(state).unsqueeze(0)
        probs = policy_net(state_tensor)
        dist = torch.distributions.Categorical(probs)
        action = dist.sample()

        next_state, reward, terminated, truncated, _ = env.step(action.item())
        done = terminated or truncated

        log_probs.append(dist.log_prob(action))
        values.append(value_net(state_tensor))
        rewards.append(reward)

        state = next_state

    # Compute returns
    returns = compute_returns(rewards, gamma)
    values = torch.cat(values).squeeze()

    # Baseline is value function (detached)
    baseline = values.detach()

    # Advantage = Return - Baseline
    advantages = returns - baseline

    # Policy loss
    policy_loss = -(torch.stack(log_probs) * advantages).mean()

    # Value loss = MSE(Returns, Baseline)
    value_loss = (returns - values).pow(2).mean()

    # Update policy
    policy_optimizer.zero_grad()
    policy_loss.backward()
    policy_optimizer.step()

    # Update value baseline
    value_optimizer.zero_grad()
    value_loss.backward()
    value_optimizer.step()

    print(f"Episode {episode+1}: Reward = {sum(rewards)}")

env.close()
print("REINFORCE with baseline training finished.")

ModuleNotFoundError: No module named 'torch'

In [2]:
# Implement REINFORCE algorithm with Advantage Function.

import torch
import torch.nn as nn
import torch.optim as optim
import gymnasium as gym
import numpy as np

# ---------------------------------------------
#  Policy Network (Actor)
# ---------------------------------------------
class PolicyNetwork(nn.Module):
    def _init_(self, state_dim, action_dim):
        super()._init_()
        self.fc = nn.Sequential(
            nn.Linear(state_dim, 128),
            nn.ReLU(),
            nn.Linear(128, action_dim),
            nn.Softmax(dim=-1)
        )

    def forward(self, x):
        return self.fc(x)

# ---------------------------------------------
#  Value Network (Critic)
# ---------------------------------------------
class ValueNetwork(nn.Module):
    def _init_(self, state_dim):
        super()._init_()
        self.fc = nn.Sequential(
            nn.Linear(state_dim, 128),
            nn.ReLU(),
            nn.Linear(128, 1)
        )

    def forward(self, x):
        return self.fc(x)

# ---------------------------------------------
#  Compute discounted returns
# ---------------------------------------------
def compute_returns(rewards, gamma=0.99):
    R = 0
    returns = []
    for r in reversed(rewards):
        R = r + gamma * R
        returns.insert(0, R)
    return torch.tensor(returns, dtype=torch.float32)

# ---------------------------------------------
#  REINFORCE with Advantage – Training Loop
# ---------------------------------------------
env = gym.make("CartPole-v1")
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n

policy_net = PolicyNetwork(state_dim, action_dim)
value_net = ValueNetwork(state_dim)

policy_optimizer = optim.Adam(policy_net.parameters(), lr=1e-3)
value_optimizer = optim.Adam(value_net.parameters(), lr=1e-3)

num_episodes = 100
gamma = 0.99

for episode in range(num_episodes):
    state, _ = env.reset()
    log_probs, values, rewards = [], [], []
    done = False

    while not done:
        state_tensor = torch.FloatTensor(state).unsqueeze(0)
        probs = policy_net(state_tensor)
        dist = torch.distributions.Categorical(probs)
        action = dist.sample()

        next_state, reward, terminated, truncated, _ = env.step(action.item())
        done = terminated or truncated

        log_probs.append(dist.log_prob(action))
        values.append(value_net(state_tensor))
        rewards.append(reward)

        state = next_state

    # Compute returns
    returns = compute_returns(rewards, gamma)
    values = torch.cat(values).squeeze()

    # ADVANTAGE = Return − Value Estimate
    advantages = returns - values.detach()

    # POLICY LOSS (Actor)
    policy_loss = -(torch.stack(log_probs) * advantages).mean()

    # VALUE LOSS (Critic)
    value_loss = (returns - values).pow(2).mean()

    # Update policy
    policy_optimizer.zero_grad()
    policy_loss.backward()
    policy_optimizer.step()

    # Update critic
    value_optimizer.zero_grad()
    value_loss.backward()
    value_optimizer.step()

    print(f"Episode {episode+1}: Reward = {sum(rewards)}")

env.close()
print("REINFORCE with advantage training finished.")

ModuleNotFoundError: No module named 'torch'

In [6]:
# **12. Implement the Monte Carlo prediction and control.**
import numpy as np
np.random.seed(0)

actions = [-1, 1, -4, 4]         # L, R, U, D
terminal = [0, 15]

# Arrow mapping
ARROWS = {
    -1: "←",
     1: "→",
    -4: "↑",
     4: "↓",
    "T": "T"
}

def step(s,a):
    ns = s + a
    if s % 4 == 0 and a == -1: ns = s
    if s % 4 == 3 and a == 1:  ns = s
    if s < 4 and a == -4:      ns = s
    if s > 11 and a == 4:      ns = s
    return ns, -1

Q = np.zeros((16,4))
returns = { (s,a):[] for s in range(16) for a in range(4) }
eps = 0.1
gamma = 1.0

def choose(s):   # ε-greedy
    if np.random.rand() < eps:
        return np.random.randint(4)
    return np.argmax(Q[s])

# Monte Carlo Control
for ep in range(5000):
    s = np.random.randint(1,15)
    episode = []

    while s not in terminal:
        a = choose(s)
        ns, r = step(s, actions[a])
        episode.append((s,a,r))
        s = ns

    G = 0
    for i in reversed(range(len(episode))):
        s, a, r = episode[i]
        G += r

        # First-visit check
        if not any(s == x[0] and a == x[1] for x in episode[:i]):
            returns[(s,a)].append(G)
            Q[s,a] = np.mean(returns[(s,a)])

# Greedy policy
policy_idx = np.argmax(Q, axis=1)

# Convert to arrows
arrow_policy = []
for s in range(16):
    if s in terminal:
        arrow_policy.append(ARROWS["T"])
    else:
        arr = ARROWS[ actions[ policy_idx[s] ] ]
        arrow_policy.append(arr)

arrow_policy = np.array(arrow_policy).reshape(4,4)

print("Optimal Policy (arrows):")
print(arrow_policy)

print("\nQ-table:")
print(Q)


Optimal Policy (arrows):
[['T' '←' '←' '↓']
 ['↑' '←' '←' '↓']
 ['↑' '↓' '↑' '↓']
 ['↑' '←' '→' 'T']]

Q-table:
[[  0.           0.           0.           0.        ]
 [ -1.          -8.35714286  -5.72       -14.5       ]
 [ -2.46614583 -13.57142857 -18.36363636 -13.3       ]
 [-16.61904762  -8.93333333  -8.4         -3.64050633]
 [ -2.31460674  -3.47191011  -1.          -3.56521739]
 [ -2.17818182  -7.90625     -7.86206897 -11.60526316]
 [ -3.50831025  -9.21052632  -7.20833333 -10.85714286]
 [ -8.03030303  -9.36       -12.8         -2.37075718]
 [ -3.54054054  -6.64583333  -2.16372283  -9.85365854]
 [ -9.5625     -14.5        -18.33333333  -5.87259615]
 [-23.27272727 -18.85714286  -4.32533333  -7.66666667]
 [ -9.17857143  -2.34210526  -6.55        -1.        ]
 [-13.27272727 -12.13888889  -3.58212996 -13.2       ]
 [ -4.61216216  -9.         -12.39130435  -9.78947368]
 [-20.71428571  -1.         -12.8        -20.9       ]
 [  0.           0.           0.           0.        ]]


In [7]:
#Q13: To implement function approximation using linear model

import numpy as np

# Gridworld
N_ROWS, N_COLS = 4, 4
TERMINAL = [0, 15]

# Step function
def step(s, a):
    r, c = divmod(s, N_COLS)
    dr, dc = {0:(-1,0), 1:(1,0), 2:(0,-1), 3:(0,1)}[a]  # U,D,L,R
    nr, nc = min(max(r + dr, 0), N_ROWS-1), min(max(c + dc, 0), N_COLS-1)
    ns = nr * N_COLS + nc
    reward = -1
    return ns, reward

# --- Features ---
def features(s):
    r, c = divmod(s, N_COLS)
    return np.array([r, c, 1.0])   # simple 3-dimensional features

# --- Linear Value Function ---
def V(s, w):
    return np.dot(w, features(s))

# --- TD(0) with function approximation ---
def td_linear(alpha=0.01, gamma=0.99, episodes=1000):
    w = np.zeros(3)  # weights for the linear model

    for ep in range(episodes):
        s = np.random.randint(1, 15)   # start non-terminal

        while s not in TERMINAL:
            # choose a random action for prediction-only
            a = np.random.choice([0,1,2,3])

            ns, r = step(s, a)

            # TD target
            td_target = r + gamma * (0 if ns in TERMINAL else V(ns, w))
            td_error = td_target - V(s, w)

            # gradient update: w += alpha * error * x(s)
            w += alpha * td_error * features(s)

            s = ns

    return w

# --- Run ---
w = td_linear()
print("Learned weights w:", w)

# Value table from approximator
value_table = np.array([V(s, w) for s in range(16)]).reshape(4,4)
print("\nApproximated Value Function:")
print(value_table)


Learned weights w: [  0.11440153   0.15696317 -14.33689722]

Approximated Value Function:
[[-14.33689722 -14.17993405 -14.02297088 -13.86600771]
 [-14.22249568 -14.06553252 -13.90856935 -13.75160618]
 [-14.10809415 -13.95113098 -13.79416782 -13.63720465]
 [-13.99369262 -13.83672945 -13.67976628 -13.52280312]]
