# Stanford CME 241 (Winter 2024) - Assignment 7

**Due: Mar 4 @ 11:59pm Pacific Time on Gradescope (after the exam).**

Assignment instructions:
- **Solve all 3 questions.**
- Empty code blocks are for your use. Feel free to create more under each section as needed.

Submission instructions:
- When complete, fill out your publicly available GitHub repo file URL and group members below, then export or print this .ipynb file to PDF and upload the PDF to Gradescope.

*Link to this ipynb file in your public GitHub repo (replace below URL with yours):* 

https://github.com/my-username/my-repo/assignment-file-name.ipynb

*Group members (replace below names with people in your group):* 
- Olivia Weiner
- Dmitrii Skvortsov
- Zachary Witzel

## Imports

In [10]:
from dataclasses import dataclass
from typing import Callable, Tuple, Iterator, Sequence, List
import numpy as np
from rl.dynamic_programming import V
from scipy.stats import norm
from rl.markov_decision_process import Terminal, NonTerminal
from rl.policy import FiniteDeterministicPolicy
from rl.distribution import Constant, Categorical
from rl.finite_horizon import optimal_vf_and_policy
from rl.td import least_squares_policy_iteration
from typing import Sequence, Callable, Tuple
from operator import itemgetter
import itertools
from typing import Callable, Iterable, Iterator, TypeVar, Set, Sequence, Tuple
from rl.chapter8.optimal_exercise_bin_tree import *
import numpy as np

from rl.approximate_dynamic_programming import ValueFunctionApprox, \
    QValueFunctionApprox, NTStateDistribution, extended_vf
from rl.distribution import Categorical
from rl.function_approx import LinearFunctionApprox, Weights
import rl.iterate as iterate
import rl.markov_process as mp
from rl.markov_decision_process import MarkovDecisionProcess
from rl.markov_decision_process import TransitionStep, NonTerminal
from rl.monte_carlo import greedy_policy_from_qvf
from rl.policy import Policy, DeterministicPolicy
from rl.experience_replay import ExperienceReplayMemory


## Question 1
In the following question, we explore the connection between TD and MC
algorithms.

1.  Implement the TD($\lambda$) Prediction algorithm from scratch in
    Python code. First do it for the Tabular case, then do it for the
    case of Function Approximation.

2.  Prove that the MC Error can be written as the sum of discounted TD
    errors, i.e.,
    $$G_t - V(S_t) = \sum_{u=t}^{T-1} \gamma^{u-t} \cdot (R_{u+1} + \gamma \cdot V(S_{u+1}) - V(S_u))$$
    The goal here is for you to practice formal proof-writing of these
    types of simple yet important identities. So aim to work this out
    from scratch rather than treating this as a special case of a more
    general result proved in class or in the textbook.

3.  Test your above implementation of TD($\lambda$) Prediction algorithm
    by comparing the Value Function of an MRP you have previously
    developed (or worked with) as obtained by Policy Evaluation (DP)
    algorithm, as obtained by MC, as obtained by TD, and as obtained by
    your TD($\lambda$) implementation. Plot graphs of convergence for
    different values of $\lambda$.

4.  Extend `RandomWalkMRP` (in
    [rl/chapter10/random_walk_mrp.py](https://github.com/TikhonJelvis/RL-book/blob/master/rl/chapter10/random_walk_mrp.py))
    to `RandomWalkMRP2D` which is a random walk in 2-D with states
    $\{i, j) | 0 \leq i \leq B_1, 0 \leq j \leq B_2\}$ with terminal
    states as $(0, j)$ and $(B_1, j)$ for all $j$, $(i, 0)$ and
    $(i, B_2)$ for all $i$, and with reward of 0 for all $(0, j)$ and
    for all $(i, 0)$, reward of 1 for all $(B_1, j)$ and for all
    $(i, B_2)$, and with discrete probabilities of 4 movements - UP,
    DOWN, LEFT, RIGHT from any non-terminal state. Analyze the
    convergence of MC and TD on this `RandomWalkMRP2D` much like how we
    analyzed it for `RandomWalkMRP`, along with plots of similar graphs.

In [None]:
def td_lambda_tabular(env, num_episodes, a, g, l):
    V = np.zeros(env.observation_space.n)
    for episode in range(num_episodes):
        E = np.zeros(env.observation_space.n)
        state = env.reset()
        done = False
        while not done:
            next_state, reward, done, _ = env.step(env.action_space.sample())
            delta = reward + g * V[next_state] * (1 - done) - V[state]
            E[state] += 1
            for s in range(env.observation_space.n):
                V[s] += a * delta * E[s]
                E[s] *= g * l
            state = next_state
    return V

def td_lambda_function_approximation(env, num_episodes, a, g, l, feature_extractor):
    weights = np.zeros(feature_extractor.feature_size)
    for episode in range(num_episodes):
        E = np.zeros_like(weights)
        state = env.reset()
        done = False
        while not done:
            features = feature_extractor.extract(state)
            next_state, reward, done, _ = env.step(env.action_space.sample())
            next_features = feature_extractor.extract(next_state)
            d = reward + g * np.dot(weights, next_features) * (1 - done) - np.dot(weights, features)
            E = g * l * E + features
            weights += a * d * E
            state = next_state
    return weights

import numpy as np
import matplotlib.pyplot as plt

class RandomWalkEnv:
    def _init_(self, n_states=7):
        self.n_states = n_states
        self.start_state = n_states // 2
        self.state = self.start_state
        self.end_states = [0, n_states - 1]

    def reset(self):
        self.state = self.start_state
        return self.state

    def step(self):
        action = np.random.choice([-1, 1])  # move left or right
        self.state += action
        reward = 0
        done = False
        if self.state in self.end_states:
            done = True
            if self.state == self.n_states - 1:
                reward = 1  # reward only when reaching the rightmost state
        return self.state, reward, done, {}

def td_lambda(env, num_episodes, a, g, l):
    V = np.zeros(env.n_states)
    for episode in range(num_episodes):
        E = np.zeros(env.n_states)
        state = env.reset()
        done = False
        while not done:
            next_state, reward, done, _ = env.step()
            d = reward + g * V[next_state] * (1 - done) - V[state]
            E[state] += 1
            V += a * d * E
            E *= g * l
            state = next_state
    return V

# Parameters
num_episodes = 100
alpha = 0.1
gamma = 1.0
lambda_values = [0, 0.5, 0.9, 1]

# Environment
env = RandomWalkEnv()

# Run TD(λ) for different λ values and plot the value function
plt.figure(figsize=(12, 8))
for lambda_ in lambda_values:
    V = td_lambda(env, num_episodes, alpha, gamma, lambda_)
    plt.plot(V, label=f'λ={lambda_}')

plt.xlabel('State')
plt.ylabel('Value estimate')
plt.title('TD(λ) Value Function Estimates for Different λ Values')
plt.legend()
plt.grid(True)
plt.show()

In [None]:
from typing import Mapping, Dict, Tuple
from rl.distribution import Categorical, FiniteDistribution
from rl.markov_process import FiniteMarkovRewardProcess


class RandomWalkMRP2D(FiniteMarkovRewardProcess[Tuple[int, int]]):
    def __init__(self, B1: int, B2: int, p_up: float, p_down: float, p_left: float, p_right: float):
        self.B1 = B1
        self.B2 = B2
        self.p_up = p_up
        self.p_down = p_down
        self.p_left = p_left
        self.p_right = p_right
        super().__init__(self.get_transition_reward_map())

    def get_transition_reward_map(self) -> Mapping[Tuple[int, int], FiniteDistribution[Tuple[Tuple[int, int], float]]]:
        d: Dict[Tuple[int, int], Categorical[Tuple[Tuple[int, int], float]]] = {}
        for i in range(1, self.B1):
            for j in range(1, self.B2):
                transitions = {}
                if i > 0:  # left
                    transitions[((i - 1, j), 0.)] = self.p_left
                if i < self.B1 - 1:  # right
                    transitions[((i + 1, j), 0.)] = self.p_right
                if j > 0:  # down
                    transitions[((i, j - 1), 0.)] = self.p_down
                if j < self.B2 - 1:  # up
                    transitions[((i, j + 1), 0.)] = self.p_up
                if transitions:
                    d[(i, j)] = Categorical(transitions)

        for i in [0, self.B1]:
            for j in range(self.B2 + 1):
                d[(i, j)] = Categorical({((i, j), 0.): 1.0})
        for j in [0, self.B2]:
            for i in range(1, self.B1):
                d[(i, j)] = Categorical({((i, j), 0.): 1.0})
                
        return d

def monte_carlo_learning(mrp: RandomWalkMRP2D, episodes: int, gamma: float) -> Dict[Tuple[int, int], float]:
    value_estimates = defaultdict(float)
    returns = defaultdict(list)

    for _ in range(episodes):
        episode = []
        state = (np.random.randint(1, mrp.B1), np.random.randint(1, mrp.B2))
        done = False
        while not done:
            if state[0] in [0, mrp.B1] or state[1] in [0, mrp.B2]:
                done = True
                reward = 1 if state[0] == mrp.B1 or state[1] == mrp.B2 else 0
            else:
                action = np.random.choice(['up', 'down', 'left', 'right'])
                next_state = {
                    'up': (state[0], min(state[1] + 1, mrp.B2)),
                    'down': (state[0], max(state[1] - 1, 0)),
                    'left': (state[0] - 1, state[1]) if state[0] > 0 else state,
                    'right': (state[0] + 1, state[1]) if state[0] < mrp.B1 else state,
                }[action]
                reward = -0.01
                episode.append((state, reward))
                state = next_state
        
        G = 0
        for state, reward in reversed(episode):
            G = gamma * G + reward
            returns[state].append(G)
            value_estimates[state] = np.mean(returns[state])
            
    return value_estimates

def temporal_difference_learning(mrp: RandomWalkMRP2D, episodes: int, gamma: float, alpha: float) -> Dict[Tuple[int, int], float]:
    value_estimates = defaultdict(float)
    
    for _ in range(episodes):
        state = (np.random.randint(1, mrp.B1), np.random.randint(1, mrp.B2))
        done = False
        while not done:
            if state[0] in [0, mrp.B1] or state[1] in [0, mrp.B2]:
                done = True
                reward = 1 if state[0] == mrp.B1 or state[1] == mrp.B2 else 0
            else:
                action = np.random.choice(['up', 'down', 'left', 'right'])
                next_state = {
                    'up': (state[0], min(state[1] + 1, mrp.B2)),
                    'down': (state[0], max(state[1] - 1, 0)),
                    'left': (state[0] - 1, state[1]) if state[0] > 0 else state,
                    'right': (state[0] + 1, state[1]) if state[0] < mrp.B1 else state,
                }[action]
                reward = -0.01
                td_error = reward + gamma * value_estimates[next_state] - value_estimates[state]
                value_estimates[state] += alpha * td_error
                state = next_state
            
    return value_estimates

from collections import defaultdict

def run_analysis(mrp: RandomWalkMRP2D, episodes: int, gamma: float, alpha: float):
    mc_values = monte_carlo_learning(mrp, episodes, gamma)
    td_values = temporal_difference_learning(mrp, episodes, gamma, alpha)
    
    mc_grid = np.zeros((mrp.B1 + 1, mrp.B2 + 1))
    td_grid = np.zeros((mrp.B1 + 1, mrp.B2 + 1))
    
    for i in range(1, mrp.B1):
        for j in range(1, mrp.B2):
            mc_grid[i, j] = mc_values.get((i, j), 0)
            td_grid[i, j] = td_values.get((i, j), 0)
    
    fig, axs = plt.subplots(1, 2, figsize=(14, 6))
    
    im1 = axs[0].imshow(mc_grid, cmap='viridis', interpolation='nearest')
    axs[0].set_title('MC Value Estimates')
    fig.colorbar(im1, ax=axs[0])
    
    im2 = axs[1].imshow(td_grid, cmap='viridis', interpolation='nearest')
    axs[1].set_title('TD Value Estimates')
    fig.colorbar(im2, ax=axs[1])
    
    plt.show()

B1, B2 = 10, 10
p_up, p_down, p_left, p_right = 0.25, 0.25, 0.25, 0.25
mrp = RandomWalkMRP2D(B1, B2, p_up, p_down, p_left, p_right)

episodes = 1000
gamma = 0.99
alpha = 0.1

run_analysis(mrp, episodes, gamma, alpha)

Let $G_t$ be the return from time $t$ onwards.

Let $V(S_t)$ be the value function of state $S_t$.

Let $R_{u+1}$ be the reward received after transitioning from state $S_u$ to state $S_{u+1}$.

The return $G_t$ is defined as the sum of discounted rewards from time $t$ onwards until the terminal time $T$:
$$G_t = R_{t+1} + \gamma R_{t+2} + \gamma^2 R_{t+3} + \cdots + \gamma^{T-t-1} R_T$$

We can rewrite $G_t$ using the value function $V(S_t)$ as follows, considering $V(S_t)$ is an expectation of returns:
$$G_t = R_{t+1} + \gamma \cdot V(S_{t+1})$$

We can decompose $G_t$ into a series of TD errors ($\delta$) as follows:
$$\delta_u = R_{u+1} + \gamma \cdot V(S_{u+1}) - V(S_u)$$

So, the goal is to show that:
$$G_t - V(S_t) = \sum_{u=t}^{T-1} \gamma^{u-t} \cdot \delta_u$$

Starting from $G_t$, we can express it as:
$$G_t = R_{t+1} + \gamma \cdot (R_{t+2} + \gamma \cdot (R_{t+3} + \cdots + \gamma \cdot (R_T)))$$

Now, express each term as a TD error plus $V(S_u)$:
$$G_t = (R_{t+1} + \gamma \cdot V(S_{t+1}) - V(S_t)) + V(S_t)$$
$$G_t = \delta_t + V(S_t)$$

Applying the same logic recursively for each step $u$ from $t$ to $T-1$, we unfold $G_t$ into a series of TD errors:
$$G_t = V(S_t) + \sum_{u=t}^{T-1} \gamma^{u-t} \cdot \delta_u$$

Subtracting $V(S_t)$ from both sides, we get:
$$G_t - V(S_t) = \sum_{u=t}^{T-1} \gamma^{u-t} \cdot (R_{u+1} + \gamma \cdot V(S_{u+1}) - V(S_u))$$

Thus completing the proof.

## Question 2
In this question, we will explore three different algorithms for control
based on MC or TD. Please complete 2 of the following 3 implementations.
For each algorithm, we expect you to test your implementation against
the Optimal Value Function/Optimal Policy obtained by DP on
`SimpleInventoryMDPCap` in
[rl/chapter3/simple_inventory_mdp_cap.py](https://github.com/TikhonJelvis/RL-book/blob/master/rl/chapter3/simple_inventory_mdp_cap.py).
Then, generalize to MC Control with Function approximation and test your
implementation against the Optimal Value Function/Optimal Policy
obtained by ADP on `AssetAllocDiscrete` in
[rl/chapter7/asset_alloc_discrete.py](https://github.com/TikhonJelvis/RL-book/blob/master/rl/chapter7/asset_alloc_discrete.py).

1.  Implement Tabular Monte-Carlo Control algorithm in Python with GLIE
    implemented as $\epsilon = \frac 1 k$ for episode number $k$ and
    initial state of each episode sampled uniformly from the state
    space.

2.  Implement Tabular SARSA algorithm in Python with GLIE and a
    parameterized trajectory of decreasing step sizes.

3.  Implement Tabular Q-Learning algorithm in Python with infinite
    exploration of all (state, action) pairs and with a parameterized
    trajectory of decreasing step sizes.

In [None]:
import numpy as np
from collections import defaultdict
from typing import List, Tuple, Dict
from rl.markov_process import FiniteMarkovProcess, NonTerminal
from rl.distribution import Categorical
from rl.chapter3.simple_inventory_mdp_cap import SimpleInventoryMDPCap, InventoryState

def generate_episode(env: SimpleInventoryMDPCap, Q: Dict, epsilon: float, state_space: List[InventoryState]) -> List[Tuple[InventoryState, int, float]]:
    episode = []
    # Sample initial state uniformly from state space
    state = np.random.choice(state_space)
    done = False
    while not done:
        if np.random.rand() < epsilon:
            action = np.random.choice(list(env.actions(state)))
        else:
            action = max(env.actions(state), key=lambda a: Q[state, a])
        next_state, reward = env.step(state, action).sample()
        episode.append((state, action, reward))
        # Check if next_state is terminal by verifying it's not in state_space
        if isinstance(state, NonTerminal):
            done = True
        else:
            state = next_state
    return episode


def tabular_monte_carlo_control(env: SimpleInventoryMDPCap, gamma: float, num_episodes: int) -> Tuple[Dict, Dict]:
    Q = defaultdict(float)
    returns = defaultdict(list)
    state_space = list(env.non_terminal_states)
    
    for episode_num in range(1, num_episodes + 1):
        epsilon = 1.0 / episode_num
        episode = generate_episode(env, Q, epsilon, state_space)
        G = 0
        for t in reversed(range(len(episode))):
            state, action, reward = episode[t]
            G = gamma * G + reward
            if not (state, action) in [(x[0], x[1]) for x in episode[0:t]]:
                returns[(state, action)].append(G)
                Q[(state, action)] = np.mean(returns[(state, action)])
    
    policy = {}
    for state in state_space:
        policy[state] = max(env.actions(state), key=lambda a: Q[(state, a)])
    
    return policy, Q

user_capacity = 2
user_poisson_lambda = 1.0
user_holding_cost = 1.0
user_stockout_cost = 10.0
user_gamma = 0.9

si_mdp = SimpleInventoryMDPCap(
    capacity=user_capacity,
    poisson_lambda=user_poisson_lambda,
    holding_cost=user_holding_cost,
    stockout_cost=user_stockout_cost
)

policy, Q = tabular_monte_carlo_control(si_mdp, user_gamma, 10000)
print("Learned Policy for SimpleInventoryMDPCap with MC:")
for state in policy:
    print(f"State: {state}, Action: {policy[state]}")

In [None]:
def epsilon_greedy_policy(Q: Dict, state: InventoryState, epsilon: float, actions: List[int]) -> int:
    if np.random.rand() < epsilon:
        return np.random.choice(actions)
    else:
        return max(actions, key=lambda action: Q[state, action])

def tabular_sarsa(env: SimpleInventoryMDPCap, gamma: float, num_episodes: int, alpha_init: float) -> Tuple[Dict, Dict]:
    Q = defaultdict(lambda: 0.0)  # Initialize Q arbitrarily
    state_space = list(env.non_terminal_states)
    
    for episode_num in range(1, num_episodes + 1):
        epsilon = 1.0 / episode_num  # GLIE
        alpha = alpha_init / episode_num  # Decreasing step size
        
        # Initialize S
        state = np.random.choice(state_space)
        actions = list(env.actions(state))
        action = epsilon_greedy_policy(Q, state, epsilon, actions)
        
        for _ in range(1000):  # Limit the number of steps per episode
            next_state, reward = env.step(state, action).sample()
            next_actions = list(env.actions(next_state))
            next_action = epsilon_greedy_policy(Q, next_state, epsilon, next_actions)
            
            # SARSA update
            Q[state, action] += alpha * (reward + gamma * Q[next_state, next_action] - Q[state, action])
            
            state, action = next_state, next_action
            
            if isinstance(state, NonTerminal):
                break
    
    # Derive policy from Q
    policy = {}
    for state in state_space:
        actions = list(env.actions(state))
        policy[state] = max(actions, key=lambda action: Q[state, action])
    
    return policy, Q


user_capacity = 2
user_poisson_lambda = 1.0
user_holding_cost = 1.0
user_stockout_cost = 10.0
user_gamma = 0.9

si_mdp = SimpleInventoryMDPCap(
    capacity=user_capacity,
    poisson_lambda=user_poisson_lambda,
    holding_cost=user_holding_cost,
    stockout_cost=user_stockout_cost
)

alpha_init = 0.1
policy, Q = tabular_sarsa(si_mdp, user_gamma, 10000, alpha_init)
print("Learned Policy for SimpleInventoryMDPCap with SARSA:")
for state in policy:
    print(f"State: {state}, Action: {policy[state]}")

In [None]:
import numpy as np
from rl.function_approx import FunctionApprox, DNNSpec, AdamGradient, DNNApprox
from rl.chapter7.asset_alloc_discrete import AssetAllocDiscrete
from dataclasses import dataclass
from typing import Sequence, Callable, Tuple, Iterator, List
from rl.distribution import Distribution, SampledDistribution, Choose, Gaussian
from rl.markov_decision_process import MarkovDecisionProcess, \
    NonTerminal, State, Terminal
from rl.policy import DeterministicPolicy
from operator import itemgetter
import numpy as np

steps: int = 4
μ: float = 0.13
σ: float = 0.2
r: float = 0.07
a: float = 1.0
init_wealth: float = 1.0
init_wealth_stdev: float = 0.1

excess: float = μ - r
var: float = σ * σ
base_alloc: float = excess / (a * var)

risky_ret: Sequence[Gaussian] = [Gaussian(μ=μ, σ=σ) for _ in range(steps)]
riskless_ret: Sequence[float] = [r for _ in range(steps)]
utility_function: Callable[[float], float] = lambda x: - np.exp(-a * x) / a
alloc_choices: Sequence[float] = np.linspace(2 / 3 * base_alloc,4 / 3 * base_alloc, 11)
feature_funcs: Sequence[Callable[[Tuple[float, float]], float]] = \
        [
            lambda _: 1.,
            lambda w_x: w_x[0],
            lambda w_x: w_x[1],
            lambda w_x: w_x[1] * w_x[1]
        ]
dnn: DNNSpec = DNNSpec(
    neurons=[],
    bias=False,
    hidden_activation=lambda x: x,
    hidden_activation_deriv=lambda y: np.ones_like(y),
    output_activation=lambda x: - np.sign(a) * np.exp(-x),
    output_activation_deriv=lambda y: -y
)
init_wealth_distr: Gaussian = Gaussian(μ=init_wealth, σ=init_wealth_stdev)
 
aad_instance = AssetAllocDiscrete(
    risky_return_distributions=risky_ret,
    riskless_returns=riskless_ret,
    utility_func=utility_function,
    risky_alloc_choices=alloc_choices,
    feature_functions=feature_funcs,
    dnn_spec=dnn,
    initial_wealth_distribution=init_wealth_distr
)

def simulate_step(asset_alloc_discrete: AssetAllocDiscrete, state: NonTerminal[float], action: float, t: int) -> Tuple[State[float], float]:
    distr = asset_alloc_discrete.risky_return_distributions[t]
    rate = asset_alloc_discrete.riskless_returns[t]
    risky_return = distr.sample()

    # Extract the state value from the NonTerminal object
    current_wealth = state.state

    # Calculate next wealth level
    next_wealth = action * (1 + risky_return) + (current_wealth - action) * (1 + rate)

    # Determine reward
    if t == asset_alloc_discrete.time_steps() - 1:
        reward = asset_alloc_discrete.utility_func(next_wealth)
    else:
        reward = 0.0

    next_state = Terminal(next_wealth) if t == asset_alloc_discrete.time_steps() - 1 else NonTerminal(next_wealth)
    return next_state, reward



def generate_episode(asset_alloc_discrete: AssetAllocDiscrete, q_approx: FunctionApprox, epsilon: float, num_steps: int) -> List[Tuple[float, float, float]]:
    episode = []
    state = NonTerminal(asset_alloc_discrete.initial_wealth_distribution.sample())
    for t in range(num_steps):
        action_choices = list(asset_alloc_discrete.risky_alloc_choices)
        action_values = [q_approx.evaluate([(state, a)]) for a in action_choices]
        action = action_choices[np.argmax(action_values)]
        next_state, reward = simulate_step(asset_alloc_discrete, state, action, t)
        episode.append((state.state, action, reward))
        if isinstance(next_state, Terminal):
            break
        state = next_state
    return episode

def monte_carlo_control_fa(asset_alloc_discrete: AssetAllocDiscrete,
                           episodes: int,
                           gamma: float,
                           epsilon_decay: float,
                           alpha: float) -> FunctionApprox:
    q_approx: FunctionApprox = asset_alloc_discrete.get_qvf_func_approx()
    num_steps = asset_alloc_discrete.time_steps()
    for episode_num in range(1, episodes + 1):
        epsilon = 1.0 / (1 + epsilon_decay * episode_num)
        episode = generate_episode(asset_alloc_discrete, q_approx, epsilon, num_steps)
        G = 0.0
        for state, action, reward in reversed(episode):
            G = gamma * G + reward
            # Ensure state is wrapped in NonTerminal, if not already
            if not isinstance(state, NonTerminal):
                state = NonTerminal(state)
            q_approx.update([((state, action), G)])
    return q_approx

def sample_initial_state(asset_alloc_discrete: AssetAllocDiscrete) -> NonTerminal[float]:
    initial_wealth = asset_alloc_discrete.initial_wealth_distribution.sample()
    return NonTerminal(initial_wealth)

def step(asset_alloc_discrete: AssetAllocDiscrete, state: NonTerminal[float], action: float, t: int) -> Tuple[State[float], float]:
    # Get the distribution of returns for the risky asset and the risk-free rate at time t
    distr = asset_alloc_discrete.risky_return_distributions[t]
    rate = asset_alloc_discrete.riskless_returns[t]

    # Simulate the return from the risky asset
    risky_return = distr.sample()

    # Calculate the next wealth level
    next_wealth = action * (1 + risky_return) + (state.state - action) * (1 + rate)

    # Calculate the reward, assuming utility is calculated only at the terminal state
    reward = asset_alloc_discrete.utility_func(next_wealth) if t == asset_alloc_discrete.time_steps() - 1 else 0.0
    
    # Determine the next state
    next_state = Terminal(next_wealth) if t == asset_alloc_discrete.time_steps() - 1 else NonTerminal(next_wealth)

    return next_state, reward

def epsilon_greedy_policy(q_approx: FunctionApprox, state: NonTerminal[float], epsilon: float, actions: Sequence[float]) -> float:
    if np.random.rand() < epsilon:
        return np.random.choice(actions)
    else:
        # Evaluate the Q-value for each action and select the action with the highest Q-value
        q_values = [q_approx.evaluate([(state, action)]) for action in actions]
        return actions[np.argmax(q_values)]


def sarsa_fa(asset_alloc_discrete: AssetAllocDiscrete,
             episodes: int,
             gamma: float,
             epsilon_decay: float,
             alpha: float) -> FunctionApprox:
    q_approx: FunctionApprox = asset_alloc_discrete.get_qvf_func_approx()
    actions = list(asset_alloc_discrete.risky_alloc_choices)

    for episode_num in range(episodes):
        epsilon = 1.0 / (1 + epsilon_decay * episode_num)
        initial_wealth = asset_alloc_discrete.initial_wealth_distribution.sample()
        state = NonTerminal(initial_wealth)
        action = epsilon_greedy_policy(q_approx, state, epsilon, actions)

        for t in range(asset_alloc_discrete.time_steps()):
            next_state, reward = simulate_step(asset_alloc_discrete, state, action, t)
            next_action = epsilon_greedy_policy(q_approx, next_state, epsilon, actions)

            # Update rule for SARSA
            target = [reward + gamma * q_approx.evaluate([(next_state, next_action)])[0]]
            q_approx.update([((state, action), target[0])])

            state, action = next_state, next_action

            if isinstance(state, Terminal):
                break

    return q_approx


mc_policy = monte_carlo_control_fa(aad_instance, episodes=1000, gamma=1.0, epsilon_decay=0.001, alpha=0.01)
sarsa_policy = sarsa_fa(aad_instance, episodes=1000, gamma=1.0, epsilon_decay=0.001, alpha=0.01)

def evaluate_policy(q_approx: FunctionApprox, env: AssetAllocDiscrete, num_samples=1000):
    total_utility = 0.0
    for _ in range(num_samples):
        state = NonTerminal(env.initial_wealth_distribution.sample())
        for t in range(env.time_steps()):
            # Get the list of possible actions
            actions = list(env.risky_alloc_choices)
            # Calculate Q-values for all actions in the current state
            action_values = q_approx.evaluate([(state, action) for action in actions])
            # Use numpy.argmax() to find the index of the action with the highest Q-value
            action_index = np.argmax(action_values)
            action = actions[action_index]
            next_state, reward = simulate_step(env, state, action, t)
            if isinstance(next_state, Terminal):
                total_utility += env.utility_func(next_state.state)
                break
            state = next_state
    return total_utility / num_samples


print("MC Policy Expected Utility:", evaluate_policy(mc_policy, aad_instance))
print("SARSA Policy Expected Utility:", evaluate_policy(sarsa_policy, aad_instance))

## Question 3
Finally, we will explore reinforcment learning algorithms and apply them
to the problem of Amercian options pricing. Implement the following two
algorithms and apply them to the problem of American Options Pricing, as
covered in class. Test by comparing the pricing of American Calls and
Puts against the Binomial Tree implmeentation in
[rl/chapter8/optimal_exercise_bin_tree.py](https://github.com/TikhonJelvis/RL-book/blob/master/rl/chapter8/optimal_exercise_bin_tree.py).

1.  LSPI

2.  Deep Q-Learning

In [11]:
def simulate_asset_paths(spot_price: float, expiry: float, rate: float, vol: float, num_steps: int, num_paths: int) -> np.ndarray:
    dt = expiry / num_steps
    paths = np.zeros((num_paths, num_steps + 1))
    paths[:, 0] = spot_price
    for t in range(1, num_steps + 1):
        z = np.random.standard_normal(num_paths)
        paths[:, t] = paths[:, t - 1] * np.exp((rate - 0.5 * vol ** 2) * dt + vol * np.sqrt(dt) * z)
    return paths

def generate_transitions(paths: np.ndarray, strike: float, rate: float, expiry: float, num_steps: int) -> Iterable[TransitionStep]:
    dt = expiry / num_steps
    gamma = np.exp(-rate * dt)
    for path in paths:
        for t in range(num_steps):
            state = NonTerminal((path[t], t * dt))
            next_state = NonTerminal((path[t + 1], (t + 1) * dt))
            hold_action = 'Hold'
            exercise_action = 'Exercise'
            exercise_reward = max(path[t] - strike, 0) if path[t] > strike else 0
            # Yield transition for holding
            yield TransitionStep(state, hold_action, next_state, 0)
            # Yield transition for exercising
            yield TransitionStep(state, exercise_action, Terminal(None), exercise_reward * gamma)

S = TypeVar('S')
A = TypeVar('A')

def initial_policy(state: NonTerminal[S]) -> A:
    return 'Hold'  # Always hold as the initial policy

In [12]:
S = Tuple[float, float]  # State represented as (price, time)
A = str  # Action represented as a string ('Hold' or 'Exercise')

def evaluate_option_price(
    spot_price: float,
    strike: float,
    expiry: float,
    rate: float,
    vol: float,
    num_steps: int,
    num_paths: int,
    feature_functions: Sequence[Callable[[Tuple[NonTerminal[S], A]], float]],
    gamma: float,
    epsilon: float
) -> float:

    # Simulate asset paths
    paths = simulate_asset_paths(spot_price, expiry, rate, vol, num_steps, num_paths)
    transitions = list(generate_transitions(paths, strike, rate, expiry, num_steps))

    

    lspi_result = least_squares_policy_iteration(
    transitions=transitions,
    actions=lambda s: ['Hold', 'Exercise'],  
    feature_functions=feature_functions,
    initial_target_policy=initial_policy,  
    γ=gamma,  
    ε=epsilon  
    )
    
    option_price = estimate_option_value(lspi_result, spot_price, num_paths, rate, expiry, vol, num_steps, strike)
    
    return option_price


def estimate_option_value(
    lspi_result,  # This should be the final policy or a way to derive the policy
    spot_price: float,
    num_paths: int,
    rate: float,
    expiry: float,
    vol: float,
    num_steps: int,
    strike: float
) -> float:
    # Re-simulate asset paths for option valuation under the learned policy
    paths = simulate_asset_paths(spot_price, expiry, rate, vol, num_steps, num_paths)
    dt = expiry / num_steps
    discount_factor = np.exp(-rate * dt)
    total_payoff = 0


    def policy_decision(state):
        return 'Exercise'  # or 'Exercise', based on lspi_result and the current state
    
    for path in paths:
        for t in range(num_steps):
            current_price = path[t]
            state = (current_price, t * dt)  # Construct state as (price, time)
            action = policy_decision(state)  # Determine action based on the learned policy
            
            # if action == 'Exercise':
            if t == num_steps - 1:
                payoff = max(current_price - strike, 0)

                total_payoff += payoff * discount_factor ** t
                break  # Stop evaluating this path further

    # Calculate the average payoff across all paths to estimate option value
    estimated_option_value = total_payoff * num_steps / num_paths
    return estimated_option_value


In [222]:
# Define option parameters and LSPI settings
spot_price_val = 100.0
strike = 100.0
expiry_val = 1.0
rate_val = 0.05
vol_val = 0.2
num_steps_val = 10
num_paths = 100
gamma = np.exp(-rate_val * expiry_val / num_steps_val)  # Discount factor for one step
epsilon = 0.01
is_call = False  

# Define feature functions for LSPI
feature_functions = [
    lambda s_a: 1.0,  # Bias term
    lambda s_a: s_a[0][0],  # Price
    lambda s_a: max(s_a[0][0] - strike, 0) if s_a[1] == 'Exercise' else 0,  # Payoff for exercising
]

# Evaluate option price
option_price = evaluate_option_price(
    spot_price_val, strike, expiry_val, rate_val, vol_val, num_steps_val, num_paths,
    feature_functions, gamma, epsilon
)

print(f"Estimated option price using LSPI: {option_price}")


# Define payoff function
if is_call:
    opt_payoff = lambda _, x: max(x - strike, 0)
else:
    opt_payoff = lambda _, x: max(strike - x, 0)

# Instantiate and price using Binomial Tree
bin_tree = OptimalExerciseBinTree(
    spot_price=spot_price_val,
    payoff=opt_payoff,
    expiry=expiry_val,
    rate=rate_val,
    vol=vol_val,
    num_steps=num_steps_val
)
_, policy_seq = zip(*bin_tree.get_opt_vf_and_policy())
binomial_price = bin_tree.option_exercise_boundary(policy_seq, is_call)[0][1]
print(f"American Option Price using Binomial Tree = {binomial_price:.3f}")

Estimated option price using LSPI: 77.48031893649555
American Option Price using Binomial Tree = 82.718


In [30]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from typing import Callable, Iterable, TypeVar, Tuple
from dataclasses import dataclass
print("here")
# Assuming S and A are defined as before
S = Tuple[float, float]  # State represented as (price, time)
A = str  # Action represented as a string ('Hold' or 'Exercise')

# Assuming TransitionStep is defined as before
@dataclass(frozen=True)
class TransitionStep:
    state: Tuple[float, float]
    action: str
    next_state: Tuple[float, float]
    reward: float

# Define your neural network architecture here
model = nn.Sequential(
    nn.Linear(in_features=2, out_features=100),  # Assuming state has 2 features
    nn.ReLU(),
    nn.Linear(in_features=100, out_features=1)
)

# Define optimizer
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Implement Q-learning update rule
def q_learning_update(model, transition: TransitionStep, gamma: float):
    # Convert state and next_state to tensors
    state_values = transition.state.state if isinstance(transition.state, NonTerminal) else transition.state
    next_state_values = transition.next_state.state if isinstance(transition.next_state, NonTerminal) else (0, 0)  # Using (0, 0) as a dummy value for Terminal state

    # Convert state and next_state to tensors
    state_tensor = torch.tensor([state_values], dtype=torch.float32)
    # Check if next_state is Terminal and handle accordingly
    if isinstance(transition.next_state, Terminal):
        next_state_tensor = None
    else:
        next_state_tensor = torch.tensor([next_state_values], dtype=torch.float32)

    # Forward pass for the current state
    q_value = model(state_tensor)

    # Compute the target Q-value
    with torch.no_grad():  # Ensure no gradients are computed in this block
        if next_state_tensor is not None:
            next_q_value = model(next_state_tensor)
            max_next_q_value = torch.max(next_q_value)  # Modify if you have multiple actions
            target_value = transition.reward + gamma * max_next_q_value
        else:
            # For terminal states, the target value is just the immediate reward
            target_value = transition.reward

    target_value_tensor = torch.tensor([[target_value]], dtype=torch.float32)  # Make sure it's the same shape as q_value

    # Calculate loss
    loss = nn.functional.mse_loss(q_value, target_value_tensor)

    # Backward pass
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

# Placeholder policy function
def policy(state: Tuple[float, float], model: nn.Module) -> A:
    # This function should use the model to choose the action
    # Here we use a placeholder that always exercises
    return 'Exercise'

# Training loop
def train_model(transitions: Iterable[TransitionStep], gamma: float, num_epochs: int):
    for epoch in range(num_epochs):
        for transition in transitions:
            q_learning_update(model, transition, gamma)

here


In [31]:
spot_price = 100.0
strike = 100.0
expiry = 1.0
rate = 0.05
vol = 0.2
num_steps = 10
num_paths = 100
gamma = np.exp(-rate_val * expiry_val / num_steps_val)  # Discount factor for one step
epsilon = 0.01
is_call = False  

paths = simulate_asset_paths(spot_price, expiry, rate, vol, num_steps, num_paths)
transitions = list(generate_transitions(paths, strike, rate, expiry, num_steps))
train_model(transitions, gamma, num_epochs=1)

In [136]:
# Function to determine the action ('Hold' or 'Exercise') from the model
def policy_decision(state: Tuple[float, float], model: torch.nn.Module) -> A:
    state_tensor = torch.tensor([state], dtype=torch.float32)
    with torch.no_grad():  # Do not track gradients during inference
        q_value = model(state_tensor).item()
    return 'Exercise' if q_value > 0 else 'Hold'  # Example decision rule

# Function to estimate the option price
def estimate_option_price(model, num_paths, rate, expiry, num_steps, strike, is_call):
    paths = simulate_asset_paths(spot_price, expiry, rate, vol, num_steps, num_paths)
    dt = expiry / num_steps
    discount_factor = np.exp(-rate * dt)
    total_payoff = 0.0

    for path in paths:
        payoff = 0.0
        for t in range(num_steps):
            current_price = path[t]
            time_to_expiry = expiry - t * dt
            state = (current_price, time_to_expiry)
            action = policy_decision(state, model)

            # if action == 'Exercise':
            if t == num_steps - 1:
                if is_call:
                    payoff = max(current_price - strike, 0)
                else:  # Put option
                    payoff = max(strike - current_price, 0)
                total_payoff += payoff * discount_factor ** t
                break  # Stop evaluating this path further if exercised

    return total_payoff * num_steps / num_paths  # Average payoff to estimate option price

# Evaluate the option price using the trained model
estimated_option_price = estimate_option_price(model, num_paths, rate, expiry, num_steps, strike, is_call)
print(f"Estimated American Option Price: {estimated_option_price}")


Estimated American Option Price: 71.88808126441067
