In [1]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from collections import defaultdict

### 1 - op/wis/rec – Recursive formula for weighted importance sampling for off-policy learning.

In weighted importance sampling, we calculate the estimate:
\begin{equation*}
    V_{n + 1}^\pi := \frac{\sum_{k = 1}^n W_k G_k}{\sum_{k = 1}^n W_k}, \;\;\; n \geq 1,
\end{equation*}
given the returns $G_1, G_2, ...$ and the weights $W_1, W_2, ...$.

Show that the iteration
\begin{equation*}
    V_{n + 1} := V_n + \frac{W_n}{C_n}(G_n - V_n), \;\;\; n \geq 1,
\end{equation*}

where

\begin{align*}
    C_0 &= 0 \\
    C_{n + 1} &:= C_n + W_{n + 1}, \;\;\; n \geq 0
\end{align*}

yields the same values $V_n^\pi = V_n$ for all $n \in \{2, 3, ...\}$.


We have $C_0 = 0$ and $C_{n} = \sum_{k = 1}^n W_k$, therefore:

\begin{align*}
    V_{n + 1} &= \frac{\sum_{k = 1}^n W_k G_k}{C_n} \\
    V_{n + 1}C_n &= \sum_{k = 1}^n W_k G_k \\
    V_{n + 1}C_n &= W_n G_n + \sum_{k = 1}^{n - 1} W_k G_k \\
    V_{n + 1}C_n &= W_n G_n + V_n \sum_{k = 1}^{n - 1} W_k \\
    V_{n + 1}C_n &= W_n G_n + V_n C_{n - 1} \\ 
    V_{n + 1}C_n &= W_n G_n + V_n (C_n - W_n) \\
    V_{n + 1}C_n &= W_n G_n + V_n C_n - V_n W_n \\
    V_{n + 1} &= V_n + \frac{W_n}{C_n} (G_n - V_n) \\

\end{align*}

### 2 - mc/ctrl/impl – Off-policy MC control (programming).
Implement off-policy MC control and use it to estimate $\pi_*$ for the grid world in Exercise gw/simple.

In [3]:
class GridWorld:

    def __init__(self, size = 4, gamma = 1, max_steps = 100):
        
        self.size = size
        self.gamma = gamma
        self.max_steps =  max_steps
        self.terminal_state = [(0, 0), (size - 1, size - 1)]
        self.actions = {
            'up': (-1, 0),
            'down': (1, 0),
            'left': (0, -1),
            'right': (0, 1)
        }
        self.reward = -1
        self.current_step = 0
        self.current_x, self.current_y = self.reset()
        self.policy = self.init_policy()

    def reset(self):

        self.current_step = 0
        while True:
            x = np.random.randint(0, self.size)
            y = np.random.randint(0, self.size)
            if not self.is_terminal((x, y)):
                break
        self.current_x, self.current_y = x, y
        self.done = False
        return x, y

    def print_current_state(self):

        for i in range(0, self.size):
            for j in range(0, self.size):

                if (i, j) in self.terminal_state:
                    print('T', end=' ')
                elif (i, j) == (self.current_x, self.current_y):
                    print('X', end=' ')
                else:
                    print('-', end=' ')

            print()   
    
    def is_terminal(self,  state):

        return state in self.terminal_state
    

    def step(self, action):

        current_state = (self.current_x, self.current_y)

        if (self.is_terminal(current_state)) or (self.current_step >= self.max_steps):
            return 0, True  
        
        dx, dy = self.actions[action]
        new_x = max(0, min(self.size - 1, self.current_x + dx)) # if it hits wall it stays in current state
        new_y = max(0, min(self.size - 1, self.current_y + dy)) # if it hits wall it stays in current state
        new_state = (new_x, new_y)

        if (self.is_terminal(new_state)) or (self.current_step == self.max_steps - 1) : 
            self.done = True

        self.current_x, self.current_y = new_x, new_y 
        self.current_step += 1

        return self.reward, self.done 

    def init_policy(self):

        policy = {}

        for i in range(self.size):
            for j in range(self.size):
                
                if (i, j) not in self.terminal_state:
                    policy[(i, j)] = {action: 1 / len(self.actions) for action in self.actions}
        
        return policy 

In [4]:
grid_world = GridWorld(max_steps = 100) 

current_state = grid_world.current_x, grid_world.current_y
print(f"Initial state: {current_state}")
grid_world.print_current_state()

if (grid_world.is_terminal(current_state)):
   print("Initial state is terminal")
elif (grid_world.max_steps <= 0):
   print("Invalid time steps")

while (not grid_world.is_terminal(current_state)) and (grid_world.current_step < grid_world.max_steps):
    
    action_probabilities = 0.25 * np.ones(4) # equal probabilities for taking actions
    action = np.random.choice(list(grid_world.actions.keys()), p=action_probabilities) 

    reward, done = grid_world.step(action)
    new_state = grid_world.current_x, grid_world.current_y

    print(f"Step {grid_world.current_step}: s {current_state}, a {action} -> s' {new_state}, r {reward}")
    grid_world.print_current_state()

    if done:
        print(f"Terminated after {grid_world.current_step} steps")
        if (grid_world.is_terminal(new_state)):
          print("Terminal state end")
        else:
          print("Max steps end")

    current_state = new_state

Initial state: (1, 3)
T - - - 
- - - X 
- - - - 
- - - T 
Step 1: s (1, 3), a up -> s' (0, 3), r -1
T - - X 
- - - - 
- - - - 
- - - T 
Step 2: s (0, 3), a left -> s' (0, 2), r -1
T - X - 
- - - - 
- - - - 
- - - T 
Step 3: s (0, 2), a left -> s' (0, 1), r -1
T X - - 
- - - - 
- - - - 
- - - T 
Step 4: s (0, 1), a left -> s' (0, 0), r -1
T - - - 
- - - - 
- - - - 
- - - T 
Terminated after 4 steps
Terminal state end


In [5]:
def off_policy_mc_control(env, num_episodes=200000):
    Q = defaultdict(lambda: {a: np.random.uniform(-1, 1) for a in env.actions})
    C = defaultdict(lambda: {a: 0.0 for a in env.actions})
    target_policy = {}

    actions = list(env.actions.keys())
    epsilon = 0.2

    def behavior_policy(s):
        probs = {a: epsilon / len(actions) for a in actions}
        best_action = max(Q[s], key=lambda a: Q[s][a]) 
        probs[best_action] += 1 - epsilon
        total = sum(probs.values())
        norm_probs = [probs[a] / total for a in actions]

        a = np.random.choice(actions, p=norm_probs)
        return a, probs[a]

    for _ in range(num_episodes):
        episode = []
        env.reset()

        for _ in range(env.max_steps):
            s = (env.current_x, env.current_y)
            a, prob = behavior_policy(s)
            r, done = env.step(a)
            episode.append((s, a, r, prob))
            if done:
                break

        G = 0
        W = 1

        for s, a, r, b_prob in reversed(episode):
            G = env.gamma * G + r
            C[s][a] += W
            Q[s][a] += (W / C[s][a]) * (G - Q[s][a])

            target_policy[s] = max(Q[s], key=Q[s].get)

            if a != target_policy[s]:
                break
            if b_prob == 0:  # prevent divide-by-zero
                break
            W /= b_prob

    return target_policy, Q

In [6]:
env = GridWorld()
policy, Q = off_policy_mc_control(env)

# Print greedy policy
for i in range(env.size):
    for j in range(env.size):
        s = (i, j)
        if s in env.terminal_state:
            print("T", end="  ")
        else:
            print(policy.get(s, "-")[0].upper(), end="  ")
    print()

T  L  U  U  
U  U  U  D  
U  U  D  D  
D  R  R  T  


In [7]:
for i in range(env.size):
    for j in range(env.size):
        s = (i, j)
        if s in Q:
            q_vals = Q[s]
            best_q = max(q_vals.values())
            print(f"{best_q:5.1f}", end=" ")
        else:
            print("  N/A", end=" ")
    print()

  N/A  -1.0  -1.3  -1.9 
 -1.0  -2.0  -2.0  -2.0 
 -2.0  -3.0  -2.0  -1.0 
 -2.4  -2.0  -1.0   N/A 


### 3. mc/ctrl/ratio – Importance-sampling ratio in off-policy MC control.

In the off-policy MC-control algorithm in [1, Section 5.7], the importance sampling ratio is updated according to

\begin{equation*}
    W := \frac{W}{b(A_t|S_t)}
\end{equation*}

although the definition of the importance-sampling ratio $\rho_{t:T-1}$ implies

\begin{equation*}
    W := \frac{\pi(A_t|S_t)}{b(A_t|S_t)}W
\end{equation*}

Why is the update in the algorithm nevertheless correct?

In the off-policy MC algorithm target policy $\pi$ is deterministic and greedy:

\begin{equation*}
    \pi(A_t | S_t) = \begin{cases}
                        1, \;\; \text{if} A_t = \argmax_{a} Q(S_t, a) \\
                        0, \;\; \text{otherwise}
                     \end{cases} 
\end{equation*}

So for the trajectory to contribute to learning, the actions must match the greedy policy. If $A_t \neq \argmax_{a} Q(S_t, a)$, then the importance weight becomes zero and the episode's return is ignored beyond that point. But when the action matches greedy policy we have $\pi(A_t | S_t) = 1$ and update becomes:

\begin{equation*}
    W := \frac{1}{b(A_t|S_t)}W
\end{equation*}

### 4. impl – Windy Grid World.

Implement Windy Grid World (see Section 2.4.2).

In [6]:
class WindyGridWorld:

    def __init__(self, width=10, height=7, gamma=1, max_steps=100):
        self.width = width
        self.height = height
        self.gamma = gamma
        self.max_steps = max_steps
        self.terminal_state = (3, 7)  # default from Sutton's book
        self.start_state = (3, 0)

        self.wind = {3: 1, 4: 1, 5: 1, 6: 2, 7: 2, 8: 1}  # wind strength by column

        self.actions = {
            'up': (-1, 0),
            'down': (1, 0),
            'left': (0, -1),
            'right': (0, 1)
        }
        self.reward = -1
        self.current_step = 0
        self.current_x, self.current_y = self.reset()
        self.policy = self.init_policy()

    def reset(self):
        self.current_step = 0
        self.done = False
        self.current_x, self.current_y = self.start_state
        return self.current_x, self.current_y

    def is_terminal(self, state):
        return state == self.terminal_state

    def step(self, action):

        current_state = (self.current_x, self.current_y)
        if (self.is_terminal(current_state)) or (self.current_step >= self.max_steps):
            return 0, True

        dx, dy = self.actions[action]
        new_x = self.current_x + dx
        new_y = self.current_y + dy

        # Apply wind (vertical movement)
        wind_strength = self.wind.get(new_y, 0)
        new_x -= wind_strength  # wind pushes up

        # Clamp to grid boundaries
        new_x = max(0, min(self.height - 1, new_x))
        new_y = max(0, min(self.width - 1, new_y))

        self.current_x, self.current_y = new_x, new_y
        self.current_step += 1

        if self.is_terminal((new_x, new_y)) or self.current_step >= self.max_steps:
            self.done = True

        return self.reward, self.done

    def print_current_state(self):
        for i in range(self.height):
            for j in range(self.width):
                if (i, j) == self.terminal_state:
                    print('T', end=' ')
                elif (i, j) == (self.current_x, self.current_y):
                    print('X', end=' ')
                else:
                    print('-', end=' ')
            print()

    def init_policy(self):
        policy = {}
        for i in range(self.height):
            for j in range(self.width):
                if (i, j) != self.terminal_state:
                    policy[(i, j)] = {a: 1 / len(self.actions) for a in self.actions}
        return policy


In [10]:
grid_world = WindyGridWorld(max_steps = 50000) 

current_state = grid_world.current_x, grid_world.current_y
print(f"Initial state: {current_state}")
grid_world.print_current_state()

if (grid_world.is_terminal(current_state)):
   print("Initial state is terminal")
elif (grid_world.max_steps <= 0):
   print("Invalid time steps")

while (not grid_world.is_terminal(current_state)) and (grid_world.current_step < grid_world.max_steps):
    
    action_probabilities = 0.25 * np.ones(4) # equal probabilities for taking actions
    action = np.random.choice(list(grid_world.actions.keys()), p=action_probabilities) 

    reward, done = grid_world.step(action)
    new_state = grid_world.current_x, grid_world.current_y

    print(f"Step {grid_world.current_step}: s {current_state}, a {action} -> s' {new_state}, r {reward}")
    grid_world.print_current_state()

    if done:
        print(f"Terminated after {grid_world.current_step} steps")
        if (grid_world.is_terminal(new_state)):
          print("Terminal state end")
        else:
          print("Max steps end")

    current_state = new_state

Initial state: (3, 0)
- - - - - - - - - - 
- - - - - - - - - - 
- - - - - - - - - - 
X - - - - - - T - - 
- - - - - - - - - - 
- - - - - - - - - - 
- - - - - - - - - - 
Step 1: s (3, 0), a up -> s' (2, 0), r -1
- - - - - - - - - - 
- - - - - - - - - - 
X - - - - - - - - - 
- - - - - - - T - - 
- - - - - - - - - - 
- - - - - - - - - - 
- - - - - - - - - - 
Step 2: s (2, 0), a left -> s' (2, 0), r -1
- - - - - - - - - - 
- - - - - - - - - - 
X - - - - - - - - - 
- - - - - - - T - - 
- - - - - - - - - - 
- - - - - - - - - - 
- - - - - - - - - - 
Step 3: s (2, 0), a left -> s' (2, 0), r -1
- - - - - - - - - - 
- - - - - - - - - - 
X - - - - - - - - - 
- - - - - - - T - - 
- - - - - - - - - - 
- - - - - - - - - - 
- - - - - - - - - - 
Step 4: s (2, 0), a left -> s' (2, 0), r -1
- - - - - - - - - - 
- - - - - - - - - - 
X - - - - - - - - - 
- - - - - - - T - - 
- - - - - - - - - - 
- - - - - - - - - - 
- - - - - - - - - - 
Step 5: s (2, 0), a right -> s' (2, 1), r -1
- - - - - - - - - - 
- -

### 5. impl – Cliff Walking
Implement Cliff Walking (see Section 2.4.3).

In [11]:
class CliffWalking:
    def __init__(self, width=12, height=4):
        self.height = height
        self.width = width
        self.start = (height - 1, 0)
        self.goal = (height - 1, width - 1)
        self.cliff = [(height - 1, i) for i in range(1, width - 1)]  # the cliff region
        self.actions = ['up', 'down', 'left', 'right'] 
        self.state = self.start # current state
        self.rewards = -1  # constant reward of -1 for each step
        self.state_values = np.zeros((width, height))
        self.action_space = len(self.actions)

    def reset(self):
        self.state = self.start
        return self.state

    def is_terminal(self, state):
        return state == self.goal

    def step(self, action): # make one step in the environment S_t, A_t -> R_t, S'_t
        if self.state in self.cliff:
            return self.start, -100  # if you step on a cliff -> go back to the start with -100 reward 

        # calculate next position based on current state and action
        x, y = self.state
        if action == 'up':
            x = max(x - 1, 0)
        elif action == 'down':
            x = min(x + 1, self.height - 1)
        elif action == 'left':
            y = max(y - 1, 0)
        elif action == 'right':
            y = min(y + 1, self.width - 1)

        next_state = (x, y)
        # check if the next state is the cliff
        if next_state in self.cliff:
            self.reset()
            return self.start, -100

        self.state = next_state
        return next_state, self.rewards # const -1

In [12]:
def run_episode(gridworld, max_trials):
    state = gridworld.reset() # come to the start
    total_reward = 0
    steps = []
    while not gridworld.is_terminal(state): # as long as the goal isn't reached
        action = np.random.choice(gridworld.actions)  # random policy for demonstration
        next_state, reward = gridworld.step(action)
        total_reward += reward
        steps.append((state, action))
        state = next_state
        if state == gridworld.start and reward == -100:
            print("Fell into the cliff, back to start. Current reward: ", total_reward)
        if gridworld.is_terminal(state):
            print("Reached goal, episode ends. Final reward: ", total_reward)
            break
        # not cliff, not goal -> just continue moving
    return total_reward, steps

cliff_grid_world = CliffWalking(width=12, height=4)
episode_reward, steps = run_episode(cliff_grid_world, max_trials=20)

Fell into the cliff, back to start. Current reward:  -107
Fell into the cliff, back to start. Current reward:  -246
Fell into the cliff, back to start. Current reward:  -350
Fell into the cliff, back to start. Current reward:  -466
Fell into the cliff, back to start. Current reward:  -567
Fell into the cliff, back to start. Current reward:  -671
Fell into the cliff, back to start. Current reward:  -772
Fell into the cliff, back to start. Current reward:  -874
Fell into the cliff, back to start. Current reward:  -981
Fell into the cliff, back to start. Current reward:  -1117
Fell into the cliff, back to start. Current reward:  -1219
Fell into the cliff, back to start. Current reward:  -1324
Fell into the cliff, back to start. Current reward:  -1483
Fell into the cliff, back to start. Current reward:  -1589
Fell into the cliff, back to start. Current reward:  -1708
Fell into the cliff, back to start. Current reward:  -1808
Fell into the cliff, back to start. Current reward:  -1911
Fell i

### 6. impl - Frozen Lake.

Implement Frozen Lake (see Section 2.4.4).

In [None]:
class FrozenLake:

    def __init__(self, size=4, gamma=1.0, max_steps=100, slip=True):
        self.size = size
        self.gamma = gamma
        self.max_steps = max_steps
        self.slip = slip

        self.actions = {
            'up': (-1, 0),
            'down': (1, 0),
            'left': (0, -1),
            'right': (0, 1)
        }

        self.frozen = [(i, j) for i in range(size) for j in range(size)]
        self.holes = [(1, 1), (1, 3), (3, 0), (2, 3)]  # classic layout
        self.goal = (3, 3)
        self.start_state = (0, 0)
        for tile in self.holes + [self.goal]:
            if tile in self.frozen:
                self.frozen.remove(tile)

        self.current_step = 0
        self.current_x, self.current_y = self.reset()
        self.done = False
        self.policy = self.init_policy()

    def reset(self):
        self.current_step = 0
        self.done = False
        self.current_x, self.current_y = self.start_state
        return self.current_x, self.current_y

    def is_terminal(self, state):
        return state in self.holes or state == self.goal

    def perpendicular_actions(self, action):
        if action in ['up', 'down']:
            return ['left', 'right']
        else:
            return ['up', 'down']
        
    def step(self, action):
        if self.done or self.current_step >= self.max_steps:
            return 0, True

        if self.slip:
            all_options = [action] + self.perpendicular_actions(action)
            action = np.random.choice(all_options, p=[1/3, 1/3, 1/3])

        dx, dy = self.actions[action]
        new_x = max(0, min(self.size - 1, self.current_x + dx))
        new_y = max(0, min(self.size - 1, self.current_y + dy))
        new_state = (new_x, new_y)

        self.current_x, self.current_y = new_x, new_y
        self.current_step += 1

        if new_state in self.holes:
            self.done = True
            return 0, True
        elif new_state == self.goal:
            self.done = True
            return 1, True
        elif self.current_step >= self.max_steps:
            self.done = True
            return 0, True

        return 0, False

    def print_current_state(self):
        for i in range(self.size):
            for j in range(self.size):
                pos = (i, j)
                if pos == (self.current_x, self.current_y):
                    print('X', end=' ')
                elif pos in self.holes:
                    print('H', end=' ')
                elif pos == self.goal:
                    print('G', end=' ')
                else:
                    print('-', end=' ')
            print()

    def init_policy(self):
        policy = {}
        for i in range(self.size):
            for j in range(self.size):
                if (i, j) not in self.holes and (i, j) != self.goal:
                    policy[(i, j)] = {a: 1 / len(self.actions) for a in self.actions}
        return policy


In [21]:
grid_world = FrozenLake(max_steps = 10000) 

current_state = grid_world.current_x, grid_world.current_y
print(f"Initial state: {current_state}")
grid_world.print_current_state()

if (grid_world.is_terminal(current_state)):
   print("Initial state is terminal")
elif (grid_world.max_steps <= 0):
   print("Invalid time steps")

while (not grid_world.is_terminal(current_state)) and (grid_world.current_step < grid_world.max_steps):
    
    action_probabilities = 0.25 * np.ones(4) # equal probabilities for taking actions
    action = np.random.choice(list(grid_world.actions.keys()), p=action_probabilities) 

    reward, done = grid_world.step(action)
    new_state = grid_world.current_x, grid_world.current_y

    print(f"Step {grid_world.current_step}: s {current_state}, a {action} -> s' {new_state}, r {reward}")
    grid_world.print_current_state()

    if done:
        print(f"Terminated after {grid_world.current_step} steps")
        if (grid_world.is_terminal(new_state)):
          print("Terminal state end")
        else:
          print("Max steps end")

    current_state = new_state

Initial state: (0, 0)
X - - - 
- H - H 
- - - H 
H - - G 
Step 1: s (0, 0), a up -> s' (0, 0), r 0
X - - - 
- H - H 
- - - H 
H - - G 
Step 2: s (0, 0), a down -> s' (1, 0), r 0
- - - - 
X H - H 
- - - H 
H - - G 
Step 3: s (1, 0), a down -> s' (1, 1), r 0
- - - - 
- X - H 
- - - H 
H - - G 
Terminated after 3 steps
Terminal state end


### 7. impl – Blackjack.
Implement Blackjack (see Section 2.4.5).

In [None]:
class BlackjackEnv:

    def __init__(self, gamma=1.0, max_steps=100):
        self.gamma = gamma
        self.max_steps = max_steps
        self.actions = {
            'hit': 0,
            'stick': 1
        }
        self.action_list = list(self.actions.keys())

        self.done = False
        self.current_step = 0
        self.reset()
        self.policy = self.init_policy()

    def draw_card(self):
        card = np.random.randint(1, 14)
        return min(card, 10)  # JQK cards = 10

    def draw_hand(self):
        return [self.draw_card(), self.draw_card()]

    def usable_ace(self, hand):
        return 1 in hand and sum(hand) + 10 <= 21

    def sum_hand(self, hand):
        total = sum(hand)
        if self.usable_ace(hand):
            return total + 10
        return total

    def is_bust(self, hand):
        return self.sum_hand(hand) > 21

    def has_natural(self, hand):
        return sorted(hand) == [1, 10]

    def reset(self):
        self.player = self.draw_hand()
        self.dealer = self.draw_hand()
        self.done = False
        self.current_step = 0

        # check natural
        if self.has_natural(self.player):
            if self.has_natural(self.dealer):
                self.done = True
                self.reward = 0
            else:
                self.done = True
                self.reward = 1
        else:
            self.reward = 0

        return self.get_state()

    def step(self, action):
        if self.done:
            return self.reward, True

        if action == 'hit':
            self.player.append(self.draw_card())
            self.current_step += 1
            if self.is_bust(self.player):
                self.done = True
                self.reward = -1
        else: 
            self.done = True
            # dealers turn
            while self.sum_hand(self.dealer) < 17:
                self.dealer.append(self.draw_card())

            player_sum = self.sum_hand(self.player)
            dealer_sum = self.sum_hand(self.dealer)

            if self.is_bust(self.dealer):
                self.reward = 1
            elif dealer_sum > player_sum:
                self.reward = -1
            elif dealer_sum < player_sum:
                self.reward = 1
            else:
                self.reward = 0

        return self.reward, self.done

    def get_state(self):
        return (self.sum_hand(self.player),
                self.dealer[0],
                self.usable_ace(self.player))

    def print_current_state(self):
        player_sum = self.sum_hand(self.player)
        usable = self.usable_ace(self.player)
        print(f"Player hand: {self.player} (sum={player_sum}, usable_ace={usable})")
        print(f"Dealer showing: {self.dealer[0]}")
        if self.done:
            print(f"Dealer full hand: {self.dealer} (sum={self.sum_hand(self.dealer)})")

    def init_policy(self):
        policy = {}
        for total in range(12, 22):  # below 12 always hit
            for dealer_card in range(1, 11):
                for usable in [True, False]:
                    policy[(total, dealer_card, usable)] = {
                        a: 1 / len(self.actions) for a in self.actions
                    }
        return policy


In [None]:
env = BlackjackEnv()
state = env.get_state()
print("Initial state:", state)
env.print_current_state()

while not env.done:
    print("-------------------------")
    action = np.random.choice(['hit', 'stick']) # random play
    reward, done = env.step(action)
    env.print_current_state()
    print(f"Action: {action}, Reward: {reward}")


Initial state: (13, 6, False)
Player hand: [3, 10] (sum=13, usable_ace=False)
Dealer showing: 6
-------------------------
Player hand: [3, 10, 2] (sum=15, usable_ace=False)
Dealer showing: 6
Action: hit, Reward: 0
-------------------------
Player hand: [3, 10, 2] (sum=15, usable_ace=False)
Dealer showing: 6
Dealer full hand: [6, 2, 2, 1] (sum=21)
Action: stick, Reward: -1


### 8. impl – Autoregressive Trend Process.
Implement an autoregressive trend process (see Section 2.4.7).

In [None]:
class AutoregressiveStockEnv:

    def __init__(self, length=100, lam=3, kappa=0.9, gamma=1.0, max_steps=99):
        self.length = length
        self.lam = lam
        self.kappa = kappa
        self.gamma = gamma
        self.max_steps = max_steps

        self.actions = {
            'short': -1,
            'neutral': 0,
            'long': 1
        }

        self.reward = 0
        self.prices = []
        self.current_step = 0
        self.done = False

        self.generate_prices()
        self.reset()
        self.policy = self.init_policy()

    def generate_prices(self):
        a = [1.0]
        b = [np.random.normal(0, 1)]

        for k in range(1, self.length):
            a_k = a[-1] + b[-1] + self.lam * np.random.normal(0, 1)
            b_k = self.kappa * b[-1] + np.random.normal(0, 1)
            a.append(a_k)
            b.append(b_k)

        a = np.array(a)
        scaled = a / (np.max(a) - np.min(a))
        self.prices = np.exp(scaled)

    def reset(self):
        self.current_step = 0
        self.done = False
        self.position = 0  # 0 = neutral, -1 = short, 1 = long
        return self.get_state()

    def get_state(self):
        return (self.current_step, self.position)

    def step(self, action):
        if self.done or self.current_step >= self.max_steps:
            return 0, True

        next_step = self.current_step + 1
        price_change = self.prices[next_step] - self.prices[self.current_step]
        position = self.actions[action] 

        reward = position * price_change - 0.01 * abs(position) * self.prices[self.current_step]
        self.position = position
        self.current_step = next_step

        if self.current_step >= self.max_steps:
            self.done = True

        return reward, self.done

    def print_current_state(self):
        print(f"Step: {self.current_step}, Price: {self.prices[self.current_step]:.3f}, Position: {self.position}")

    def init_policy(self):
        policy = {}
        for t in range(self.length):
            for p in [-1, 0, 1]:
                policy[(t, p)] = {a: 1 / len(self.actions) for a in self.actions}
        return policy

In [53]:
stock_env = AutoregressiveStockEnv(max_steps=99)

current_state = stock_env.get_state()
print(f"Initial state: {current_state}")
stock_env.print_current_state()

if stock_env.max_steps <= 0:
    print("Invalid time steps")

while not stock_env.done and stock_env.current_step < stock_env.max_steps:
    action_probabilities = 1/3 * np.ones(3)  # short, neutral, long
    action = np.random.choice(list(stock_env.actions.keys()), p=action_probabilities)

    reward, done = stock_env.step(action)
    new_state = stock_env.get_state()

    print(f"Step {stock_env.current_step}: s {current_state}, a {action} -> s' {new_state}, r {reward:.4f}")
    stock_env.print_current_state()

    if done:
        print(f"Terminated after {stock_env.current_step} steps (end of time or episode)")

    current_state = new_state


Initial state: (0, 0)
Step: 0, Price: 1.015, Position: 0
Step 1: s (0, 0), a neutral -> s' (1, 0), r -0.0000
Step: 1, Price: 0.938, Position: 0
Step 2: s (1, 0), a short -> s' (2, -1), r -0.0111
Step: 2, Price: 0.940, Position: -1
Step 3: s (2, -1), a neutral -> s' (3, 0), r 0.0000
Step: 3, Price: 0.981, Position: 0
Step 4: s (3, 0), a short -> s' (4, -1), r 0.0378
Step: 4, Price: 0.933, Position: -1
Step 5: s (4, -1), a neutral -> s' (5, 0), r 0.0000
Step: 5, Price: 1.013, Position: 0
Step 6: s (5, 0), a short -> s' (6, -1), r 0.0418
Step: 6, Price: 0.961, Position: -1
Step 7: s (6, -1), a short -> s' (7, -1), r 0.0318
Step: 7, Price: 0.920, Position: -1
Step 8: s (7, -1), a long -> s' (8, 1), r 0.0305
Step: 8, Price: 0.959, Position: 1
Step 9: s (8, 1), a neutral -> s' (9, 0), r -0.0000
Step: 9, Price: 0.930, Position: 0
Step 10: s (9, 0), a neutral -> s' (10, 0), r 0.0000
Step: 10, Price: 1.009, Position: 0
Step 11: s (10, 0), a long -> s' (11, 1), r 0.0453
Step: 11, Price: 1.064, P