In [1]:
import random
import pandas as pd
import numpy as np
from numpy.random import choice
from typing import Dict

import seaborn as sns
import matplotlib.pyplot as plt
from matplotlib import animation

%matplotlib qt

In [2]:
state_probability_matrix = {
  "rain": {
    "rain": .9,
    "cloudy": .1,
    "sunny": 0,
    "meteor": 0
  },
  "cloudy": {
    "rain": .2,
    "cloudy": .5,
    "sunny": .3,
    "meteor": 0
  },
  "sunny": {
    "rain": 0,
    "cloudy": .3,
    "sunny": .6,
    "meteor": .1
  },
  "meteor": {
    "rain": 0,
    "cloudy": 0,
    "sunny": 0,
    "meteor": 0
  }
}

rewards = {
  "rain": -2,
  "cloudy": 0,
  "sunny": 3,
  "meteor": -10
}

# 1. Prediction
## 1.1 Markov Chain
**Uitwerking: https://observablehq.com/@selenecodes/avkas-inleveropgave-1-1**

## 1.2 Markov Reward Process
**Uitwerking: https://observablehq.com/@selenecodes/avkas-inleveropgave-1-1-2**

## 1.3 Sampling. Een voorbereiding voor Monte-Carlo Policy Evaluation
Pak twee mogelijke samples van je MRP en leg uit wat de return $G_{t}$  was voor elke sample. Een sample is een reeks transities vanaf een arbitrair gekozen beginstate tot de eindstate. Dus je begint in state s, en loopt een mogelijk pad door het MRP tot aan de eindstate. We beginnen voor het gemak met γ=1. De formule voor de return is als volgt:

$$
G_t\:=  \sum_{k=0}^{\infty} \gamma^{k}  R_{t+k+1}
$$

Voor deze opdracht heb ik gekozen om dmv python een random start_state te pakken en daarna een random pad te doorlopen.
P.S. Men kan dit codeblock meerdere keren runnen om andere samples te krijgen.

In [3]:
def calculate_gt(state, probability_matrix, rewards, gt = 0, k = 0, discount = 1):
    # If you reached the meteor state you're done.
    if state == 'meteor':
        return gt
    
    # Choose one of the paths to the next state based on the probability for that state
    # This is basically random.choice but weighted for the probabilities.
    state_probabilities = list(probability_matrix[state].values())
    next_state = choice(list(probability_matrix[state].keys()), replace=False, p=state_probabilities)

    # Calculate the Gt
    # This goes as follows:
    # current gt + discount^k * value(next_state)
    probability = probability_matrix[state][next_state]
    next_value = (probability * rewards[next_state])
    gt = gt + discount ** k * next_value
    print(f"{state}->{next_state}; R={gt:.2f} ({probability} * {rewards[next_state]})")
    return calculate_gt(
        next_state,
        probability_matrix,
        rewards,
        gt,
        k + 1,
        discount
    )

start_state = random.sample(list(state_probability_matrix), 1)[0]
print("Start state:", start_state)

reward = calculate_gt(
    start_state,
    state_probability_matrix,
    rewards,
    discount=1
)

print(f"Final reward: {reward:.2f}")

Start state: rain
rain->rain; R=-1.80 (0.9 * -2)
rain->rain; R=-3.60 (0.9 * -2)
rain->rain; R=-5.40 (0.9 * -2)
rain->rain; R=-7.20 (0.9 * -2)
rain->rain; R=-9.00 (0.9 * -2)
rain->rain; R=-10.80 (0.9 * -2)
rain->rain; R=-12.60 (0.9 * -2)
rain->rain; R=-14.40 (0.9 * -2)
rain->rain; R=-16.20 (0.9 * -2)
rain->rain; R=-18.00 (0.9 * -2)
rain->rain; R=-19.80 (0.9 * -2)
rain->rain; R=-21.60 (0.9 * -2)
rain->rain; R=-23.40 (0.9 * -2)
rain->rain; R=-25.20 (0.9 * -2)
rain->rain; R=-27.00 (0.9 * -2)
rain->rain; R=-28.80 (0.9 * -2)
rain->cloudy; R=-28.80 (0.1 * 0)
cloudy->cloudy; R=-28.80 (0.5 * 0)
cloudy->cloudy; R=-28.80 (0.5 * 0)
cloudy->rain; R=-29.20 (0.2 * -2)
rain->cloudy; R=-29.20 (0.1 * 0)
cloudy->cloudy; R=-29.20 (0.5 * 0)
cloudy->cloudy; R=-29.20 (0.5 * 0)
cloudy->cloudy; R=-29.20 (0.5 * 0)
cloudy->cloudy; R=-29.20 (0.5 * 0)
cloudy->cloudy; R=-29.20 (0.5 * 0)
cloudy->cloudy; R=-29.20 (0.5 * 0)
cloudy->sunny; R=-28.30 (0.3 * 3)
sunny->sunny; R=-26.50 (0.6 * 3)
sunny->meteor; R=-27.50 (0.1

## 1.4 De value-function bepalen
Bepaal nu voor alle states wat de value is na 2 iteraties. De value voor alle states worden geinitialiseerd op 0. Gebruik hiervoor de Bellman expectation equation met $\gamma = 1$. De Bellman Expectation Equation gaat als volgt:
$$V\left(s\right)\:=\:E\left[G_t\:\mid S_t\:=\:t\right]$$
en (negeer subscript $\pi$ voor nu even in de onderstaande afleiding)
![image.png](attachment:image.png)
Lees eventueel hoofdstuk 4.1 van http://incompleteideas.net/book/RLbook2020.pdf



In [4]:
# Decided that working with numpy is nicer than
# working with dicts.
rewards = np.array([-2, 0, 3, -10])
state_prob_matrix = np.array([
    [.9, .1, 0, 0], # Rain
    [.2, .5, .3, 0], # Cloudy
    [0, .3, .6, .1], # Sunny
    [0, 0, 0, 0], # Meteor (why is meteor a weather type???)
])

def calculate_values(matrix, rewards, iterations, gamma = 1):
    # Stupid python iteration fix
    iterations = iterations + 1
    
    n_states = len(matrix)
    V = np.zeros((iterations, n_states))
    for k in range(1, iterations):
        for state in range(0, n_states):
            probability = matrix[state]
            discount = gamma**k
            prev_value = V[k-1]
            V[k][state] = sum(probability * (rewards + (discount * prev_value)))
    
    # Transpose the matrix from iterations x states -> states x iterations
    return V.T

In [5]:
values = calculate_values(
    state_prob_matrix,
    rewards,
    iterations = 2,
    gamma = 1
)

pd.DataFrame(data={
    "Rain": values[0, :],
    "Cloudy": values[1, :],
    "Sunny": values[2, :],
    "Meteor": values[3, :],
})

Unnamed: 0,Rain,Cloudy,Sunny,Meteor
0,0.0,0.0,0.0,0.0
1,-1.8,0.5,0.8,0.0
2,-3.37,0.63,1.43,0.0


## 1.5 Markov Reward Process
The discount factor basically works like reverse interest. The further away your reward is the less it's going to be worth. So a discount factor of 1 would mean that you would optimize for the highest possible future reward. This has the following two problems.
1. The future usually holds more uncertainty, so only focusing on an infinitely far future which might be subject to change could actually yield lower rewards overall.
2. Infinite loops suck, if you have a circulair process and Y=1 you might as well use `while True`
3. In cases where you would get the same reward given path x or path y the gamma function would be the determining factor of choosing the best path. (let's say path x takes 4 turns and gives you 1k points, and path y takes 10 turns and gives you 1k points) Both would give you 1k points with $\gamma = 1$. However path x would yield higher points if $\gamma \lt 1$ due to it taking less turns.

# 2. Control met Value Iteration
Control gaat om het verbeteren van de policy. Value Iteration is een algoritme dat nou juist precies dat doet.  Een policy π is een functie van states naar actions. Dat wil zeggen: een policy ontvangt een state en geeft een action terug. We hadden tot nu toe nog geen acties in onze Markov Processen. Laten we deze stap nemen. We gaan nu over op de MDP (Markov Decision Process) , een MDP is een MRP maar dan met acties. We zien hier een zeer simpel MDP met 3 states. Vanuit 1 state kan de agent kiezen naar links of naar rechts te gaan. In de rest van de states is er geen keuze. De rechter state is de terminal state, oftewel eindstaat. Het doel van een reinforcement learning agent is om de optimale policy gegeven een bepaalde MDP te vinden. Value iteration is een vorm van dynamic programming - programming, staat hier niet voor code schrijven maar voor het vinden van het optimale programma - waar stapsgewijs de valuefunction van de MDP wordt berekend. De value van een state wordt ook wel de utility genoemd. Als we eenmaal alle utilities hebben gevonden aan de hand van de optimale value function, kunnen we een greedy policy toepassen die de verwachtte return Gt maximaliseert. Greedy wil zeggen dat je altijd de actie kiest met de hoogste verwachtte return. Voer nu stapsgewijs value iteration handmatig uit op de onderstaande MDP. Voor meer info lees hoofdstuk 4.4 van http://incompleteideas.net/book/RLbook2020.pdf
<br>
En:
https://youtu.be/Nd1-UUMVfz4?t=3707
<br>
Bepaal de utility van elke state (value function) van onderstaande MDP, je mag zelf kiezen wanneer je stopt met itereren maar beargumenteer waarom. Te vroeg stoppen is niet goed. Neem een discount factor van $\gamma = 1$

De value voor alle states worden geinitialiseerd op 0.
![image.png](attachment:image.png)

In [6]:
rewards = np.array([-.1, -.1, -1])
state_prob_matrix = np.array([
    [0, 1, 0], # S1
    [1, 0, 1], # S2
    [0, 0, 0,] # S3
])

# State transition matrix
# In state -> next_state_left, next_state_right form
actions = np.array([
    [0, 1],
    [0, 2],
    [2, 2]
])

# A modified version of
# https://annisap.medium.com/searching-for-optimal-policies-in-python-an-intro-to-optimization-7182d6fe4dba
def value_iteration(matrix, rewards,  n_actions, theta=0.0001, discount_factor=1.0):
    n_states = len(matrix)
    def one_step_lookahead(state, V):
        A = np.zeros(n_actions)
        for a in range(n_actions):
            next_state = actions[state][a]
            prob = matrix[state]
            A[a] += sum(prob * (rewards + discount_factor * V[next_state]))
        return A
    
    V = np.zeros(n_states)
    while True:
        delta = 0
        for s in range(n_states):
            # Do a one-step lookahead to find the best action
            A = one_step_lookahead(s, V)
            best_action_value = np.max(A)
            # Calculate delta across all states seen so far
            delta = max(delta, np.abs(best_action_value - V[s]))
            # Update the value function. Ref: Sutton book eq. 4.10.
            V[s] = best_action_value
        print(V)
        # Check if we can stop 
        if delta < theta:
            break
    
    return V

In [7]:
value_iteration(
    state_prob_matrix,
    rewards,
    n_actions=2 # left, right
)

[-0.1 -1.1  0. ]
[-0.2 -1.1  0. ]
[-0.3 -1.1  0. ]
[-0.4 -1.1  0. ]
[-0.5 -1.1  0. ]
[-0.6 -1.1  0. ]
[-0.7 -1.1  0. ]
[-0.8 -1.1  0. ]
[-0.9 -1.1  0. ]
[-1.  -1.1  0. ]
[-1.1 -1.1  0. ]
[-1.2 -1.1  0. ]
[-1.2 -1.1  0. ]


array([-1.2, -1.1,  0. ])

# 3. Implementatie
Voor deze opdracht mag je elke programmeertaal kiezen, je mag ook zelf besluiten of je het OO implementeert of niet. Er zijn allerlei voorbeelden te vinden online, maak daar gerust gebruik van. Maar er wordt wel verwacht dat je in je eigen woorden kan uitleggen wat er gebeurt in je code en hoe die gestructureerd is.

## 3.1 De betoverde doolhof
Implementeer de doolhof en een agent die op elk legaal veld in de doolhof kan staan en de acties onder, boven, links en rechts kan uitvoeren. De kans dat je de actie uitvoert die je van plan was is altijd 0.7. Alle andere richtingen op zijn uniform verdeeld over de resterende 3 richtingen. Als je tegen een muur aanloopt blijf je op dezelfde positie staan.
![image-2.png](attachment:image-2.png)

In [8]:
# Actions as indexors in y,x form.
actions = {
    "UP": (-1, 0),
    "DOWN": (1, 0),
    "LEFT": (0, -1),
    "RIGHT": (0, 1),
}

rewards = np.array([
    [-1, -1, -1, 40],
    [-1, -1, -10, -10],
    [-1, -1, -1, -1],
    [10, -2, -1, -1],
])

exits = [(3, 0), (0, 3), (3, 1)]
start = (3, 2)

In [9]:
def get_action(desired_action: str, actions: Dict[str, tuple], chosen_prob = .7):
    # Probability for all actions - the probability of the one you chose
    remaining_actions = list(set(actions.keys())- {desired_action})
    remaining_prob = round((1 - chosen_prob) / len(remaining_actions), 1)
    remaining_probabilities = [remaining_prob for _ in range(0, len(remaining_actions))]
    
    return choice(
        [desired_action, *remaining_actions],
        replace=False,
        p=[chosen_prob, *remaining_probabilities]
    )

def get_next_position(location, action, matrix):
    """Calculate next position based on the action and the current position."""   
    game_height, game_width = matrix.shape
    y, x = location
    next_y, next_x = actions[action]
    next_x += x
    next_y += y
    
    # Check if the new position is within our game grid
    if (next_x >= 0 and next_x < game_width):
        if (next_y >= 0 and next_y < game_height):
            return (next_y, next_x)
    
    # If not in the game grid return the original location
    return location

def create_grid(location, matrix):
    """Calculate next position based on the action and the current position."""
    grid = np.zeros(matrix.shape)
    y, x = location
    grid[location] = 1
    return grid

## 3.2 Value Iteration
Implementeer nu value iteration. Deze functie geeft de optimale policy terug, als je er een MDP ingooit. Maak een vorm van visualisatie waardoor gecheckt kan worden of je functie goed werkt. Zowel de utility van alle states na convergentie van value iteration als de actie gegeven alle states (policy) moeten gevisualiseerd worden (hoe je dat doet maakt niet uit, als het maar goed (af)leesbaar is). Als je acties deterministisch zijn (dus wat je van plan bent, gebeurt ook altijd), en $\gamma = 1$     dan zou je wel moeten kunnen raden wat de value van alle states is (de state direct naar de beste eindstate heeft dan een waarde van 40). Zo kan je alvast testen of je algoritme goed convergeert. Verder zou ik gaan voor value van alle states op 0 te zetten in eerste instantie en een vrij kleine delta (0.1 of 0.01).

![image.png](attachment:image.png)

In [10]:
# A modified version of
# https://annisap.medium.com/searching-for-optimal-policies-in-python-an-intro-to-optimization-7182d6fe4dba
def value_iteration(rewards, actions, prob = .7, theta=0.01, discount_factor=1.0):
    n_actions = len(actions.keys())

    def one_step_lookahead(location, V):
        A = np.zeros(n_actions)
        a_idx = 0
        for a in actions.keys():
            next_state = get_next_position(location, a, rewards)
            if next_state == location:
                continue
            A[a_idx] = prob * (rewards[next_state] + discount_factor * V[next_state])
            a_idx += 1
        return A
    
    V = np.zeros(rewards.shape)
    while True:
        delta = 0
#         print(V)
        for y in range(rewards.shape[0]):
            for x in range(rewards.shape[1]):  
                location = (y, x)
                if location in exits:
                    continue
                # Do a one-step lookahead to find the best action
                A = one_step_lookahead(location, V)
                best_action_value = np.max(A)
                # Calculate delta across all states seen so far
                delta = max(delta, np.abs(best_action_value - V[location]))
                # Update the value function. Ref: Sutton book eq. 4.10.
                V[location] = best_action_value
        # Check if we can stop 
        if delta < theta:
            break
    
    V = np.add(V, rewards)

    # Create a deterministic policy using the optimal value function
    policy = np.zeros([rewards.shape[0], rewards.shape[1], n_actions])
    for y in range(rewards.shape[0]):
        for x in range(rewards.shape[1]):
            location = (y, x)
            # One step lookahead to find the best action for this state
            A = one_step_lookahead(location, V)
            best_action = np.argmax(A)
            # Always take the best action
            policy[y, x, best_action] = 1.0
    
    return policy, V

In [11]:
policy, v = value_iteration(rewards, actions)

In [15]:
def evaluate_policy(location, list_actions):
    grids = []
    points = 0
    while True:
        grids.append(create_grid(location, rewards))
        points += rewards[location]
        if location in exits:
            return grids, points
        desired_action = np.argmax(policy[location])
        actual_action = get_action(list_actions[desired_action], actions)
        location = get_next_position(location, actual_action, rewards)

In [19]:
# ACTUAL POLICY EVALUATION
list_actions = list(actions.keys())
grids, points = evaluate_policy(start, list_actions)
print(f"Congratulation you received: {points}")

# ANIMATION OF GAME
fig = plt.figure()
sns.heatmap(grids[0], vmax=1, square=True)

def init():
    plt.clf()
    sns.heatmap(grids[0], vmax=.8, square=True, cbar=False)

def animate(i):
    plt.clf()
    sns.heatmap(grids[i], vmax=.8, square=True, cbar=False)

anim = animation.FuncAnimation(fig, animate, init_func=init, frames=len(grids), repeat = False)
plt.show()

Congratulation you received: -3
