In [None]:
# Niveen Abdul-Mohsen (bvn9ad)
# Reinforcement Learning (CS 4771) - Example 6.2
# Random Walk Markov Reward Problem
# i used numpy for numerical operations and matplotlib for plotting

import numpy as np
import matplotlib.pyplot as plt
from collections import defaultdict

class RandomWalkEnvironment:
    """
    random walk markov reward process
    
    from sutton & barto section 6.2:
    "consider a random walk along a linear sequence of states. the walk begins
    in the center state. at each step, the walk moves either to the left or right
    by one state with equal probability. the walk terminates when it reaches either
    end of the sequence."
    
    environment structure:
    - 5 non-terminal states: A, B, C, D, E
    - 2 terminal states: left terminal and right terminal
    - agent starts in center state C
    - each step: move left or right with probability 0.5
    - reward is 0 on all transitions except terminating right (reward = 1)
    - discount factor γ = 1 (undiscounted)
    """
    
    # state constants
    STATE_A = 0
    STATE_B = 1
    STATE_C = 2  # starting state
    STATE_D = 3
    STATE_E = 4
    
    # terminal markers
    TERMINAL_LEFT = -1
    TERMINAL_RIGHT = 5
    
    # starting position
    START_STATE = STATE_C
    
    # reward values
    REWARD_TERMINATE_RIGHT = 1.0
    REWARD_DEFAULT = 0.0
    
    def __init__(self, seed=None):
        """
        initialize random walk environment
        
        args:
            seed: random seed for reproducibility
        """
        self.rng = np.random.default_rng(seed)
        self.current_state = None
        
        # state names for printing/debugging
        self.state_names = {
            self.STATE_A: 'A',
            self.STATE_B: 'B', 
            self.STATE_C: 'C',
            self.STATE_D: 'D',
            self.STATE_E: 'E',
            self.TERMINAL_LEFT: 'LEFT',
            self.TERMINAL_RIGHT: 'RIGHT'
        }
    
    def reset(self):
        """
        start new episode
        
        from textbook: "all walks start in the center state"
        
        returns:
            initial_state: the starting state (always STATE_C)
        """
        self.current_state = self.START_STATE
        return self.current_state
    
    def step(self):
        """
        take one random step in the environment
        
        from textbook section 6.2:
        "at each step, the walk moves to one of the two neighboring states
        with equal probability. when either terminal state is reached, the
        walk terminates."
        
        returns:
            next_state: state after taking the step
            reward: reward received (0 except 1 when reaching right terminal)
            is_terminal: whether episode ended
        """
        move_right = self.rng.random() < 0.5
        
        if move_right:
            next_state = self.current_state + 1
        else:
            next_state = self.current_state - 1
        
        # check if we reached a terminal state
        if next_state == self.TERMINAL_LEFT:
            # terminated on left - reward is 0
            reward = self.REWARD_DEFAULT
            is_terminal = True
        elif next_state == self.TERMINAL_RIGHT:
            # terminated on right - reward is 1
            reward = self.REWARD_TERMINATE_RIGHT
            is_terminal = True
        else:
            # still in non-terminal state - reward is 0
            reward = self.REWARD_DEFAULT
            is_terminal = False
            self.current_state = next_state
        
        return next_state, reward, is_terminal
    
    def get_true_values(self):
        """
        return analytically computed true state values
        
        from textbook: "the true value of each state under the random walk
        is the probability of terminating on the right if started from that state."
        
        these can be computed as:
        V(A) = 1/6, V(B) = 2/6, V(C) = 3/6, V(D) = 4/6, V(E) = 5/6
        """
        true_values = {
            self.STATE_A: 1.0/6.0,
            self.STATE_B: 2.0/6.0,
            self.STATE_C: 3.0/6.0,
            self.STATE_D: 4.0/6.0,
            self.STATE_E: 5.0/6.0
        }
        return true_values


def td_zero_prediction(env, num_episodes, alpha, initial_value=0.5):
    """
    temporal difference td(0) algorithm for value prediction
    
    from sutton & barto section 6.1:
    "whereas monte carlo methods must wait until the end of the episode to
    determine the increment to V(S_t), TD methods need to wait only until
    the next time step."
    
    algorithm pseudocode (from page 120):
    initialize V(s) arbitrarily for all s, except V(terminal) = 0
    
    loop for each episode:
        initialize S
        loop for each step of episode:
            take action, observe R, S'
            V(S) ← V(S) + α[R + γV(S') - V(S)]
            S ← S'
        until S is terminal
    
    the key update rule:
        V(S) ← V(S) + α[R + γV(S') - V(S)]
    
    this is called "TD(0)" or "one-step TD" because it updates based on
    one step of experience (the immediate reward + next state value estimate)
    
    args:
        env: random walk environment instance
        num_episodes: number of training episodes to run
        alpha: learning rate (step-size parameter)
        initial_value: initial value estimate for all non-terminal states
    
    returns:
        value_function: dictionary mapping states to learned value estimates
    """
    # TODO: initialize value function using defaultdict(float)
    # set all non-terminal states (STATE_A through STATE_E) to initial_value
    # set terminal states to 0.0
    
    value_function = defaultdict(float)
    for state in range(env.STATE_A, env.STATE_E + 1):
        value_function[state] = initial_value
    
    # terminal states have value 0
    value_function[env.TERMINAL_LEFT] = 0.0
    value_function[env.TERMINAL_RIGHT] = 0.0
    
    gamma = 1.0  # discount factor (undiscounted episode)
    
    # run episodes
    for episode_idx in range(num_episodes):
        # start new episode
        current_state = env.reset()
        
        # run until terminal state
        episode_finished = False
        while not episode_finished:
            # take random step
            next_state, reward, episode_finished = env.step()
            
            # td(0) update rule:
            # V(S) ← V(S) + α[R + γV(S') - V(S)]
            # this is the core of temporal difference learning
            td_target = reward + gamma * value_function[next_state]
            td_error = td_target - value_function[current_state]
            value_function[current_state] += alpha * td_error
            
            # move to next state
            current_state = next_state
    
    return value_function



def monte_carlo_prediction(env, num_episodes, alpha, initial_value=0.5):
    """
    constant-alpha monte carlo prediction
    
    from sutton & barto section 6.1:
    "whereas monte carlo methods must wait until the end of the episode to
    determine the increment to V(S_t) (only then is G_t known), TD methods
    need wait only until the next time step."
    
    monte carlo update rule (equation 6.1, page 120):
        V(S_t) ← V(S_t) + α[G_t - V(S_t)]
    
    where G_t is the actual return (sum of rewards) from time t onward
    
    key difference from td:
    - mc uses actual complete return G_t (must wait for episode end)
    - td uses estimated return R + γV(S') (can update immediately)
    
    args:
        env: random walk environment instance
        num_episodes: number of training episodes to run
        alpha: learning rate (step-size parameter)
        initial_value: initial value estimate for all non-terminal states
    
    returns:
        value_function: dictionary mapping states to learned value estimates
    """
    # TODO: initialize value function
    # set all non-terminal states to initial_value
    # set terminal states to 0.0
    
    gamma = 1.0  # discount factor
    
    # TODO: loop for num_episodes
    #   - create empty lists: episode_states = [], episode_rewards = []
    #   
    #   - reset environment and append starting state to episode_states
    #   
    #   - generate complete episode by stepping until terminal:
    #       - get next_state, reward, episode_finished from env.step()
    #       - append reward to episode_rewards
    #       - if not episode_finished, append next_state to episode_states
    #   
    #   - now episode is complete, calculate returns and update values
    #   - initialize cumulative_return = 0.0
    #   
    #   - loop backwards through episode (from last state to first):
    #       - for step_idx from len(episode_states)-1 down to 0:
    #           - get state_visited = episode_states[step_idx]
    #           - get reward_received = episode_rewards[step_idx]
    #           
    #           - update return: cumulative_return = gamma * cumulative_return + reward_received
    #           
    #           - compute mc_error = cumulative_return - value_function[state_visited]
    #           
    #           - update value: value_function[state_visited] += alpha * mc_error
    
    # TODO: return value_function
    pass


def compute_rms_error(value_function, true_values):
    """
    compute root mean squared error between estimated and true values
    
    from textbook section 6.2:
    "to compare the two methods, we measure the root mean-squared error
    between the value function learned and the true values"
    
    formula: RMSE = sqrt( mean( (V_estimated - V_true)^2 ) )
    averaged over all non-terminal states
    
    args:
        value_function: dictionary of estimated state values
        true_values: dictionary of true state values
    
    returns:
        rms_error: root mean squared error
    """
    # TODO: create list to store squared errors
    # TODO: for each state in true_values:
    #   - compute error = value_function[state] - true_values[state]
    #   - append error^2 to list
    # TODO: compute mean of squared errors
    # TODO: return square root of mean
    pass


def run_single_training_sequence(env, method, num_episodes, alpha):
    """
    run one training sequence and track value function after each episode
    
    this is used to generate the left plot showing how values evolve
    
    args:
        env: environment instance
        method: 'td' or 'mc'
        num_episodes: number of episodes to train
        alpha: learning rate
    
    returns:
        value_history: list of value function snapshots (one per episode)
    """
    # TODO: initialize value function (all states start at 0.5)
    
    # TODO: initialize empty list: value_history = []
    
    # TODO: loop for num_episodes:
    #   - if method == 'td':
    #       run one td episode (similar to td_zero_prediction but just 1 episode)
    #   - elif method == 'mc':
    #       run one mc episode (similar to monte_carlo_prediction but just 1 episode)
    #   
    #   - after each episode, create snapshot of current value function
    #   - append snapshot to value_history
    
    # TODO: return value_history
    pass


def run_experiment_for_rms_plot(alpha_values_td, alpha_values_mc, 
                                num_episodes, num_runs):
    """
    run multiple experiments to generate rms error curves
    
    from textbook section 6.2 figure 6.2 caption:
    "the data are averaged over 100 runs"
    
    this produces the right plot comparing td vs mc performance
    
    args:
        alpha_values_td: list of learning rates to test for td
        alpha_values_mc: list of learning rates to test for mc
        num_episodes: episodes per run
        num_runs: number of independent runs to average over
    
    returns:
        td_errors: dict mapping alpha -> array of rms errors per episode
        mc_errors: dict mapping alpha -> array of rms errors per episode
    """
    # TODO: get true values from environment
    
    # TODO: initialize dictionaries to store errors:
    #   td_errors[alpha] = np.zeros(num_episodes) for each alpha
    #   mc_errors[alpha] = np.zeros(num_episodes) for each alpha
    
    # TODO: loop for num_runs:
    #   use different seed for each run
    #   
    #   - for each td alpha value:
    #       - create environment with seed
    #       - run training sequence
    #       - for each episode, compute rms error and add to td_errors[alpha]
    #   
    #   - for each mc alpha value:
    #       - create environment with seed
    #       - run training sequence  
    #       - for each episode, compute rms error and add to mc_errors[alpha]
    
    # TODO: divide all errors by num_runs to get average
    
    # TODO: return td_errors, mc_errors
    pass


def plot_figure_6_2():
    """
    recreate figure 6.2 from the textbook
    
    left plot: value estimates at different points during training
    right plot: rms error comparison between td and mc
    """
    fig, (ax_left, ax_right) = plt.subplots(1, 2, figsize=(14, 5))
    fig.patch.set_facecolor('white')
    
    # ===== LEFT PLOT: value estimates over states =====
    
    # TODO: create environment, get true values
    
    # TODO: run training sequence for td (e.g., 100 episodes, alpha=0.1)
    
    # TODO: select episodes to plot (e.g., [0, 1, 10, 100])
    
    # TODO: plot true values as black line with markers
    
    # TODO: for each selected episode:
    #   plot estimated values as colored line
    
    # TODO: set labels, title, legend, grid
    
    # ===== RIGHT PLOT: rms error comparison =====
    
    # TODO: define alpha values to test
    #   e.g., alpha_values_td = [0.05, 0.1, 0.15]
    #        alpha_values_mc = [0.01, 0.02, 0.03, 0.04]
    
    # TODO: run rms experiments (e.g., 100 episodes, 100 runs)
    
    # TODO: plot td curves (solid lines)
    
    # TODO: plot mc curves (dashed lines)
    
    # TODO: set labels, title, legend, grid
    
    # TODO: add text labels "TD" and "MC" on plot
    
    plt.tight_layout()
    plt.savefig('figure_6_2_random_walk.png', dpi=300, bbox_inches='tight')
    plt.show()


if __name__ == "__main__":
    print("=" * 60)
    print("example 6.2: random walk - td(0) vs monte carlo")
    print("=" * 60)
    
    # TODO: create environment
    
    # TODO: get and print true values
    
    # TODO: call plot_figure_6_2()
    
    print("\ndone!")

example 6.2: random walk - td(0) vs monte carlo

done!
