In [None]:
!pip install gymnasium

In [None]:
import random
import pygame
import random
import numpy as np
from time import sleep
import matplotlib.pyplot as plt
from gymnasium import Env, spaces, register, make

In [None]:
class RandomWalkEnv(Env):
    metadata = {"render_modes": ["human", "rgb_array"], "render_fps": 4}

    def __init__(self, render_mode=None, size=7, slip_prob = 0.5, start_loc=1,
                 render=False, seed=31):

        self.size = size  # The size of the 1D grid
        self.window_size = 512  # The size of the PyGame window

        # We have 2 actions, corresponding to "left" & "right"
        self.action_space = spaces.Discrete(2)

        """
        The following dictionary maps abstract actions from `self.action_space` to
        the direction we will walk in if that action is taken.
        I.e. 0 corresponds to "left" and 1 to "right".
        """
        self._action_to_direction = {
            0: -1,
            1: 1
        }

        assert render_mode is None or render_mode in self.metadata["render_modes"]
        self.render_mode = render_mode

        """
        If human-rendering is used, `self.window` will be a reference
        to the window that we draw to. `self.clock` will be a clock that is used
        to ensure that the environment is rendered at the correct framerate in
        human-mode. They will remain `None` until human-mode is used for the
        first time.
        """
        self.window = None
        self.clock = None

        # The probability of the slip
        self.slip_prob = slip_prob

        self.start_loc = start_loc
        self._agent_location = self.start_loc
        self._target_location = 0
        self._dead_state = 0

        self.perform_render = render
        self.seed = seed

    @property
    def agent_loc(self):
        return self._agent_location

    def _get_obs(self):
        """
        A private function to return the locations
        """
        return {"agent": self._agent_location, "target": self._target_location}


    def _get_info(self):
        return {
            "distance": abs(self._agent_location - self._target_location)
        }

    def set_start_loc(self, start_loc=None):
        if start_loc is not None:
            self.start_loc = start_loc
        self._agent_location = self.start_loc
        observation = self._get_obs()
        info = self._get_info()

        if self.render_mode == "human" and self.perform_render is True:
            self._render_frame()

        return observation, info


    def reset(self, options=None):
        super().reset(seed=self.seed)
        # self._agent_location = 1
        self.set_start_loc()
        self._target_location = self.size-1
        self._dead_state = 0


        observation = self._get_obs()
        info = self._get_info()

        # if self.render_mode == "human":
        #     self._render_frame()

        return observation, info

    def step(self, action):

        direction = self._action_to_direction[action]

        # Now get the prob score
        random_sample = np.random.rand()

        if random_sample <= self.slip_prob:
            # Perform slip
            slip_direction = -direction  # Flip the action (move in the opposite direction)
        else:
            slip_direction = direction

        prev_location = self._agent_location
        # print(random_sample, self.slip_prob, direction, slip_direction)

        # We use `np.clip` to make sure we don't leave the grid
        # self._agent_location = np.clip(
        #     self._agent_location + slip_direction, 0, self.size - 1
        # )
        self._agent_location = self._agent_location + slip_direction

        # An episode is done iff the agent has reached the target or the dead_state
        goal_reached = self._agent_location == self._target_location
        dead_state_reached = self._agent_location == self._dead_state

        terminated = False

        if goal_reached or dead_state_reached:
            terminated = True

        reward = 1 if goal_reached else 0
        observation = self._get_obs()
        info = self._get_info()
        info["log"] = {"current_state": prev_location,
                       "action":action, "direction":direction,
                        "slipped_direction":slip_direction,
                        "next_state": self._agent_location}

        # print(info["log"])

        if self.render_mode == "human" and self.perform_render is True:
            self._render_frame()

        # Note: truncated can be set when we have empty trajectories
        return observation, reward, terminated, False, info

    def render(self):
        if self.render_mode == "rgb_array":
            return self._render_frame()

    def _render_frame(self):


        # The size of a single grid square in pixels
        pix_square_size = (
            self.window_size / self.size
        )

        if self.window is None and self.render_mode == "human":
            pygame.init()
            pygame.display.init()
            self.window = pygame.display.set_mode(
                (self.window_size, pix_square_size)
            )

        if self.clock is None and self.render_mode == "human":
            self.clock = pygame.time.Clock()

        canvas = pygame.Surface((self.window_size, pix_square_size))
        canvas.fill((255, 255, 255))


        # First we draw the target
        pygame.draw.rect(
            canvas,
            (0, 255, 0),
            pygame.Rect(
                pix_square_size * np.array([self._target_location, 0]),
                (pix_square_size, pix_square_size),
            ),
        )

        # First we draw the dead state
        pygame.draw.rect(
            canvas,
            (255, 0, 0),
            pygame.Rect(
                pix_square_size * np.array([self._dead_state, 0]),
                (pix_square_size, pix_square_size),
            ),
        )

        # Now we draw the agent
        pygame.draw.circle(
            canvas,
            (0, 0, 255),
            (np.array([self._agent_location, 0]) + 0.5) * pix_square_size,
            pix_square_size / 3,
        )

        # Finally, add some gridlines
        for x in range(self.size + 1):
            pygame.draw.line(
                canvas,
                0,
                (pix_square_size * x, 0),
                (pix_square_size * x, self.window_size),
                width=3,
            )

        if self.render_mode == "human":
            # The following line copies our drawings from `canvas` to the visible window
            self.window.blit(canvas, canvas.get_rect())
            pygame.event.pump()
            pygame.display.update()

            # We need to ensure that human-rendering occurs at the predefined framerate.
            # The following line will automatically add a delay to keep the framerate stable.
            self.clock.tick(self.metadata["render_fps"])
        else:  # rgb_array
            return np.transpose(
                np.array(pygame.surfarray.pixels3d(canvas)), axes=(1, 0, 2)
            )

    def close(self):
        if self.window is not None:
            pygame.display.quit()
            pygame.quit()

In [None]:
# Register the custom environment
# register(id='RandomWalk-v0', entry_point=RandomWalkEnv)

# # Create and use the environment
# env = make('RandomWalk-v0')

In [None]:
environment = RandomWalkEnv(render_mode="human", size=7, start_loc=3, slip_prob=0.5, render=False)

### Consider the actions based on the policy

In [None]:
NUM_STATES = 7

In [None]:
# For the left and right state make it as slippery
def get_policy(state=None):
    # policy = dict()
    # policy[0] = [0,0]
    # policy[NUM_STATES-1] = [0,0]
    # for val in range(1, NUM_STATES-1):
    #     policy[val] = [0.75, 0.25]
    # policy
    # return policy
    return 0

In [None]:
def generate_trajectory(env, maxSteps, policy_fn):
    trajectory = []
    observation, info = env.reset()
    init = random.choice(list(range(1, environment.size-1)))
    # print(init)
    env.set_start_loc(init)
    state = env.agent_loc

    for _ in range(maxSteps):
        # action = random.choices([0, 1], weights=policy[state])[0]
        action = policy_fn(state)
        observation, reward, terminated, truncated, info = env.step(action)
        next_state = observation["agent"]

        # Append the experience tuple (state, action, reward, next_state, done) to the trajectory
        trajectory.append((state, action, reward, next_state, terminated))

        state = next_state

        if terminated:
            return trajectory

    return []

In [None]:
generate_trajectory(environment, 10, get_policy)

## Decay

In [None]:
def decayAlpha(initialValue, finalValue, maxSteps, decayType):
    """
    Decay the step size parameter (α) from initialValue to finalValue over maxSteps steps.

    Args:
        initialValue: Initial value of the step size parameter.
        finalValue: Final value of the step size parameter.
        maxSteps: Maximum number of steps the step parameter should decay for.
        decayType: Type of decay, either 'linear' or 'exponential'.

    Returns:
        List of step size parameter values over time.
    """
    if decayType not in ['linear', 'exponential']:
        raise ValueError("Invalid decayType. Should be 'linear' or 'exponential'.")

    if decayType == 'linear':
        alpha_values = np.linspace(initialValue, finalValue, maxSteps)
    else:  # Exponential decay
        decay_factor = (finalValue / initialValue) ** (1 / (maxSteps -1))
        alpha_values = [initialValue * (decay_factor ** t) for t in range(maxSteps)]

    return alpha_values

# Test the function with linear decay
initial_alpha = 0.1
final_alpha = 0.01
max_steps = 100
decayTypes = ['linear', 'exponential']

alphas_linear = decayAlpha(initial_alpha, final_alpha,
                           max_steps, decayType='linear')
alphas_exp = decayAlpha(initial_alpha, final_alpha,
                           max_steps, decayType='exponential')


print(alphas_linear[-1])
print(alphas_exp[-1])
print("#################")

print(alphas_linear[0])
print(alphas_exp[0])
print("#################")

print(len(alphas_exp))

# Plot the results
plt.plot(alphas_linear, label='Linear Decay')
plt.plot(alphas_exp, label='Exponential Decay')
plt.xlabel('Time Step')
plt.ylabel('Alpha Value')
plt.legend()
plt.title('Decay of Alpha over Time Steps')
plt.show()

### Monte Carlo Prediction

In [None]:
def monteCarloPrediction(env, gamma, maxSteps, max_episodes, alpha_values,
                         visitType="FVMC"):
    if visitType not in ['FVMC', 'EVMC']:
        raise ValueError("Invalid visitType. Should be 'FVMC' or 'EVMC'.")
    # Initialize state value estimates
    V = np.zeros(env.size)
    V_episodes = np.zeros((max_episodes, env.size))
    GT_over_episodes_sum = np.zeros((max_episodes, env.size))
    for episode in range(max_episodes):
        # Generate trajectory
        # print(f"Episode: {episode}\n")
        visited = np.zeros(env.size)
        trajectory = generate_trajectory(env, maxSteps, get_policy)
        # print(trajectory, "\n\n")
        # print(trajectory)
        if not trajectory:
            continue  # Discard incomplete episodes
        G = 0
        for i, (s, a, r, _s, _) in enumerate(trajectory):
            if visitType == "FVMC" and visited[s] != 0:
                continue
            G = sum(((gamma**(j-i)) * trajectory[j][2]) for j in range(i, len(trajectory)))
            GT_over_episodes_sum[episode][s] += G
            visited[s] += 1
            # G = gamma * returns + r
            #updates first visit
            V[s] = V[s] + (alpha_values[episode] * (G - V[s]))
        if visitType == "EVMC":
            for s in range(env.size):
                if visited[s] > 0:
                    GT_over_episodes_sum[episode][s] /= visited[s]
                    #update for every visit
                    V[s] = V[s] + (alpha_values[episode] * (GT_over_episodes_sum[episode][s] - V[s]))
        V_episodes[episode] = V
    return V, V_episodes, GT_over_episodes_sum

In [None]:
MAX_EPISODES = 10000
MAX_STEPS = 1000
GAMMA = 0.99
ALPHA_INIT = 0.1
ALPHA_FIN = 0.01

In [None]:
alpha_values = decayAlpha(ALPHA_INIT, ALPHA_FIN, MAX_EPISODES, "linear")

In [None]:
state_values, state_values_per_episode, gt_per_episode = monteCarloPrediction(environment,
                                                              GAMMA,
                                                              MAX_STEPS,
                                                              MAX_EPISODES,
                                                              alpha_values)
print("Estimated State Values:", state_values)
print("###############")
print()
state_values_per_episode

In [None]:
gt_per_episode

In [None]:
visitTypes = ['FVMC', 'EVMC']
for visitType in visitTypes:
    V = monteCarloPrediction(environment, GAMMA, MAX_STEPS, MAX_EPISODES, alpha_values, visitType)
    print(f"Estimated State Values using {visitType.capitalize()} Visit Monte Carlo Prediction:")
    print(V[0])

### Temporal

In [None]:
def TemporalDifferencePrediction(env, gamma, alpha_values, max_episodes):

    # Initialize state values
    V = np.zeros(env.size)
    V_episodes = np.zeros((max_episodes, env.size))
    GT_over_episodes_sum = np.zeros((max_episodes, env.size))

    # Loop over episodes
    for episode in range(max_episodes):

        # print(f"Episode : {episode}")
        state_visited_count = np.zeros(env.size)        

        observation, info = env.reset()
        init = random.choice(list(range(1, environment.size-1)))
        env.set_start_loc(init)
        state = env.agent_loc
        done = False

        # Loop until episode terminates
        while not done:
            # Select action using policy
            action = get_policy()

            observation, r, terminated, truncated, info = env.step(action)
            _s = observation["agent"]

            # Append the experience tuple (state, action, reward, next_state, done) to the trajectory
            # print(state, action, r, _s, terminated)

            td_target = r

            if not terminated:
                td_target = td_target + (gamma * V[_s])

            GT_over_episodes_sum[episode][state] += td_target
            td_error = td_target - V[state]
            V[state] = V[state] + (alpha_values[episode] * td_error)
            state = _s

            state_visited_count[state] += 1
            # print(state)

            done = terminated        

        for s in range(env.size):
            if state_visited_count[s] > 0:
                GT_over_episodes_sum[episode][s] /= state_visited_count[s]        


        V_episodes[episode] = V

    return V, V_episodes, GT_over_episodes_sum

In [None]:
# Test the algorithm for RWE using Temporal Difference Prediction
V_td, V_td_per_episode, gt_per_episode = TemporalDifferencePrediction(environment, GAMMA,
                                    alpha_values, 1000)
print("Estimated State Values using Temporal Difference Prediction:")
print(V_td)


In [None]:
def plot_episodes(data, true, environment, total_episodes=500, period=50, name="MC-FVMC", log_scale=False):
    for episode_range_low in range(0, total_episodes, period):
        episode_range_high = episode_range_low + period
    
        for i in range(1, environment.size-1):
            plt.plot(range(episode_range_low, episode_range_high), data[episode_range_low:episode_range_high, i], label=f'State {i}')
    
            plt.plot(range(episode_range_low, episode_range_high), [true[i] for _ in range(episode_range_low, episode_range_high)],
                     label=f'True Estimate - State{i}', linestyle='--', color='red')
    
        plt.title(f'{name} Estimate - Episode {episode_range_low}-{episode_range_high}')
        plt.xlabel('Episode')
        plt.ylabel('Value Estimate')
        if log_scale:
            plt.xscale('log')        
        plt.legend()
        plt.grid(True)
        plt.show()

    for i in range(1, environment.size-1):
        plt.plot(range(0, total_episodes), data[:, i], label=f'State {i}')
        
        plt.plot(range(0, total_episodes), [true[i] for _ in range(0, 500)],
             label=f'True Estimate - State{i}', linestyle='--', color='red')
    
    plt.title(f'{name} Estimate - Episode 0-500')
    plt.xlabel('Episode')
    plt.ylabel('Value Estimate')
    if log_scale:
        plt.xscale('log')
    plt.legend()
    plt.grid(True)
    plt.show()

## Question 5 : MC-FVMC for non terminal states. 500 episodes

In [None]:
alpha_values = decayAlpha(0.5, 0.01,250, decayType='exponential')
alpha_values = alpha_values + [alpha_values[-1]]*250
len(alpha_values)

In [None]:
V, V_per_episode, _ = monteCarloPrediction(environment, GAMMA, 200, 500, alpha_values, "FVMC")

In [None]:
V

In [None]:
# true_values = [0, 1/6, 2/6, 3/6, 4/6, 5/6, 0]
true_values = [0,0.15008, 0.3032, 0.46244,0.63103,0.81236,0]

In [None]:
plot_episodes(V_per_episode, true_values, environment, total_episodes=500, period=50, name="MC-FVMC")

## Question 6 : MC-EVMC for non terminal states. 500 episodes

In [None]:
V, V_per_episode, _ = monteCarloPrediction(environment, GAMMA, 200, 500, alpha_values, "EVMC")

In [None]:
plot_episodes(V_per_episode, true_values, environment, total_episodes=500, period=50, name="MC-EVMC")

## Question 7: TD with 500 episodes

In [None]:
V_td, V_td_per_episode, _ = TemporalDifferencePrediction(environment, GAMMA,
                                    alpha_values, 500)

In [None]:
plot_episodes(V_td_per_episode, true_values, environment, total_episodes=500, period=50, name="TD")

## Question 8 : Smoother Curves

In [None]:
possible_seeds = [27, 36, 45, 64, 67]

### MC-FVMC

In [None]:
sum_over_V_per_episode = None
for seed in possible_seeds:
    env1 = RandomWalkEnv(render_mode="human", size=7, start_loc=3, slip_prob=0.5,
                         render=False, seed=seed)
    V, V_per_episode, _ = monteCarloPrediction(env1, GAMMA, 200, 500, alpha_values, "FVMC")

    if sum_over_V_per_episode is None:
        sum_over_V_per_episode = V_per_episode
    else:
        sum_over_V_per_episode += V_per_episode

sum_over_V_per_episode /= len(possible_seeds)

plot_episodes(sum_over_V_per_episode, true_values, environment, total_episodes=500, period=50, name="MC-FVMC Averaged")

### MC-EVMC

In [None]:
sum_over_V_per_episode = None
for seed in possible_seeds:
    env1 = RandomWalkEnv(render_mode="human", size=7, start_loc=3, slip_prob=0.5,
                         render=False, seed=seed)
    V, V_per_episode, _ = monteCarloPrediction(env1, GAMMA, 200, 500, alpha_values, "EVMC")

    if sum_over_V_per_episode is None:
        sum_over_V_per_episode = V_per_episode
    else:
        sum_over_V_per_episode += V_per_episode

sum_over_V_per_episode /= len(possible_seeds)


plot_episodes(sum_over_V_per_episode, true_values, environment, total_episodes=500, period=50, name="MC-EVMC Averaged")

### TD

In [None]:
sum_over_V_per_episode = None
for seed in possible_seeds:
    env1 = RandomWalkEnv(render_mode="human", size=7, start_loc=3, slip_prob=0.5,
                         render=False, seed=seed)
    V_td, V_td_per_episode, _ = TemporalDifferencePrediction(env1, GAMMA,
                                    alpha_values, 500)

    if sum_over_V_per_episode is None:
        sum_over_V_per_episode = V_td_per_episode
    else:
        sum_over_V_per_episode += V_td_per_episode

sum_over_V_per_episode /= len(possible_seeds)


plot_episodes(sum_over_V_per_episode, true_values, environment, total_episodes=500, period=50, name="TD Averaged")

## Question - 9 - MC - FVMC with logscale

In [None]:
V, V_per_episode, _ = monteCarloPrediction(environment, GAMMA, 200, 500, alpha_values, "FVMC")

plot_episodes(V_per_episode, true_values, environment, total_episodes=500, period=50, name="MC-FVMC", log_scale=True)

## Question - 10 - MC - EVMC with logscale

In [None]:
V, V_per_episode, _ = monteCarloPrediction(environment, GAMMA, 200, 500, alpha_values, "EVMC")

plot_episodes(V_per_episode, true_values, environment, total_episodes=500, period=50, name="MC-EVMC Log", log_scale=True)

## Question - 11 - TD with logscale

In [None]:
V, V_per_episode, _ = TemporalDifferencePrediction(env1, GAMMA, alpha_values, 500)

plot_episodes(V_per_episode, true_values, environment, total_episodes=500, period=50, name="TD Log", log_scale=True)

## Question - 13 - MC-FVMC - Target Value (Gt)

In [None]:
state = 4

In [None]:
_, _, gt_per_episode = monteCarloPrediction(environment, GAMMA, 200, 500, alpha_values, "FVMC")

gt_state_per_episode = gt_per_episode[:, state].flatten()

values = []
episodes = []

for episode, val in enumerate(gt_state_per_episode):
    episodes.append(episode)
    values.append(val)

print(0.5 * state / (environment.size - 1))
plt.figure(figsize=(12, 6))
plt.scatter(episodes, values)
plt.axhline(y=0.63103, color='r', linestyle='--', label='Optimal Value') #CHECK is this the optimal value?
plt.title(f'MC-FVMC Estimate - State {state}')
plt.xlabel('Episode')
plt.ylabel('Target Value (Gt)')
plt.grid(True)
plt.show()

## Question - 14 - MC-EVMC - Target Value (Gt)

In [None]:
state = 4

In [None]:
_, _, gt_per_episode = monteCarloPrediction(environment, GAMMA, 200, 500, alpha_values, "EVMC")

gt_state_per_episode = gt_per_episode[:, state].flatten()

values = []
episodes = []

for episode, val in enumerate(gt_state_per_episode):
    if val != 0:
        episodes.append(episode)
        values.append(val)

plt.figure(figsize=(12, 6))
plt.scatter(episodes, values)
plt.axhline(y=0.63103, color='r', linestyle='--', label='Optimal Value')
plt.title(f'MC-EVMC Estimate - State {state}')
plt.xlabel('Episode')
plt.ylabel('Target Value (Gt)')
plt.grid(True)
plt.show()

## Question - 15 - TD - Target Value (Gt)

In [None]:
state = 4

In [None]:
_, _, gt_per_episode = TemporalDifferencePrediction(env1, GAMMA, alpha_values, 500)

gt_state_per_episode = gt_per_episode[:, state].flatten()

values = []
episodes = []

for episode, val in enumerate(gt_state_per_episode):
    if val != 0:
        episodes.append(episode)
        values.append(val)

plt.figure(figsize=(12, 6))
plt.scatter(episodes, values)
plt.axhline(y=0.63103, color='r', linestyle='--', label='Optimal Value')
plt.title(f'TD Estimate - State {state}')
plt.xlabel('Episode')
plt.ylabel('Target Value (Gt)')
plt.grid(True)
plt.show()