In [119]:
import numpy as np
import pandas as pd
from tqdm import tqdm

# Intro

### Markov Poperty
$P(S_{t+1}\vert S_t, A_t) = P(S_{T+1}\vert S_t, A_t, S_{t-1}, A_{t-1},\ldots)$

### Transition Function
$p(s'\vert s, a) = P(S_t=s'\vert S_{t-1}=s, A_{t-1}=a)$

### Reward Function
$r(s,a)=\mathbb{E} [R_t\vert S_{t-1}=s, A_{t-1}=a]$<br>
$r(s,a,s')=\mathbb{E} [R_t\vert S_{t-1}=s, A_{t-1}=a, S_t=s']$

### Return
$G_t=R_{t+1}+\gamma R_{t+2}+\gamma^2 R_{t+3}+\ldots+\gamma^{T-1}R_T$

### MDP
$\mathcal{MDP(S,A,T,R,S_\theta,\gamma,H)}$<br>
$\mathcal{POMDP(S,A,T,R,S_\theta,\gamma,H,O,E)}$

# Bellman Equations

### State-Value Function
$v_\pi(s) = \sum_a [\pi(a|s) \sum_{s',r} [p(s',r|s, a) \left[r + \gamma v_\pi(s') \right],\forall s \in S]]$

### Action-Value Function
$q_\pi(s,a) = \sum_{s',r} p(s',r|s, a) \left[r + \gamma v_\pi(s') \right],\forall s \in S,\forall a \in A$

### Action Advantage
$a_\pi(s,a)=q_\pi(s,a)-v_\pi(s)$

### Bellman optimality equations

$v_\star(s)=\displaystyle\max_\pi v_\pi(s),\forall s \in S$<br>
$q_\star(s,a)=\sum_{s',r}p(s',r\vert s,a)[r+\gamma \displaystyle\max_{a'}q_\star(s',a')]$

# Policy Iteration

### Policy Evaluation
$v_{k+1}(s)=\sum_a \pi(a\vert s) \sum_{s',r}p(s',r\vert s,a)[r+\gamma v_k(s')]$

### Policy Improvement
$\pi'(s)=\text{argmax}_a \sum_{s',r}p(s',r\vert s,a)[r+\gamma v_k(s')]$

# Value Iteration
$v_{k+1}(s)=\displaystyle\max_a\sum_{s',r}p(s',r\vert s,a)[r+\gamma v_k(s')]$

# Total Regret

$\mathcal{T}=\sum_{e=1}^E \mathbb{E}[v_\star-q_\star(A_e)]$

# Softmax Exploration

$\pi(a)=\frac{exp(\frac{Q(a)}{\mathcal{T}})}{\sum_{b=0}^B exp(\frac{Q(b)}{\mathcal{T}})}$

In [126]:
LEFT, RIGHT = 0, 1

P = {
    #   state: {action: [(prob. of transition, next state, reward, if statement is terminal)]}
    0: {
        0: [
            (0.5000, 0, 0.0, True),
            (0.3333, 0, 0.0, True),
            (0.1666, 0, 0.0, True),
        ],
        1: [
            (0.5000, 0, 0.0, True),
            (0.3333, 0, 0.0, True),
            (0.1666, 0, 0.0, True),
        ],
    },
    1: {
        0: [
            (0.5000, 0, 0.0, True),
            (0.3333, 1, 0.0, False),
            (0.1666, 2, 0.0, False),
        ],
        1: [
            (0.5000, 2, 0.0, False),
            (0.3333, 1, 0.0, False),
            (0.1666, 0, 0.0, True),
        ],
    },
    2: {
        0: [
            (0.5000, 1, 0.0, False),
            (0.3333, 2, 0.0, False),
            (0.1666, 3, 0.0, False),
        ],
        1: [
            (0.5000, 3, 0.0, False),
            (0.3333, 2, 0.0, False),
            (0.1666, 1, 0.0, False),
        ],
    },
    3: {
        0: [
            (0.5000, 2, 0.0, False),
            (0.3333, 3, 0.0, False),
            (0.1666, 4, 0.0, False),
        ],
        1: [
            (0.5000, 4, 0.0, False),
            (0.3333, 3, 0.0, False),
            (0.1666, 2, 0.0, False),
        ],
    },
    4: {
        0: [
            (0.5000, 3, 0.0, False),
            (0.3333, 4, 0.0, False),
            (0.1666, 5, 0.0, False),
        ],
        1: [
            (0.5000, 5, 0.0, False),
            (0.3333, 4, 0.0, False),
            (0.1666, 3, 0.0, False),
        ],
    },
    5: {
        0: [
            (0.5000, 4, 0.0, False),
            (0.3333, 5, 0.0, False),
            (0.1666, 6, 1.0, True),
        ],
        1: [
            (0.5000, 6, 1.0, True),
            (0.3333, 5, 0.0, False),
            (0.1666, 4, 0.0, False),
        ],
    },
    6: {
        0: [
            (0.5000, 6, 0.0, True),
            (0.3333, 6, 0.0, True),
            (0.1666, 6, 0.0, True),
        ],
        1: [
            (0.5000, 6, 0.0, True),
            (0.3333, 6, 0.0, True),
            (0.1666, 6, 0.0, True),
        ],
    },
}

pi = lambda s: {0: LEFT, 1: LEFT, 2: LEFT, 3: LEFT, 4: LEFT, 5: LEFT, 6: LEFT}[s]

In [128]:
def policy_evaluation(pi, P, gamma=1.0, theta=1e-10):
    prev_V = np.zeros(len(P))
    while True:
        V = np.zeros(len(P))
        for s in range(len(P)):
            for prob, next_state, reward, done in P[s][pi(s)]:
                V[s] += prob * (reward + gamma * prev_V[next_state] * (not done))
        if np.max(np.abs(prev_V - V)) < theta:
            break
        prev_V = V.copy()
    return V

In [130]:
def policy_improvement(V, P, gamma=1.0):
    Q = np.zeros((len(P), len(P[0])), dtype=np.float64)
    for s in range(len(P)):
        for a in range(len(P[s])):
            for prob, next_state, reward, done in P[s][a]:
                Q[s][a] += prob * (reward + gamma * V[next_state] * (not done))
    new_pi = lambda s: {s: a for s, a in enumerate(np.argmax(Q, axis=1))}[s]
    return new_pi

In [132]:
for i in tqdm(range(10)):
    print(V, "\n", [pi(_) for _ in range(6)])
    V = policy_evaluation(pi, P)
    pi = policy_improvement(V, P)

100%|██████████████████████████████████████████| 10/10 [00:00<00:00, 249.47it/s]

[0.         0.66690987 0.88925762 0.96352174 0.98845925 0.99696612
 0.        ] 
 [0, 0, 0, 0, 0, 0]
[0.         0.002739   0.01096093 0.03564318 0.10974096 0.33218911
 0.        ] 
 [0, 1, 1, 1, 1, 1]
[0.         0.66690987 0.88925762 0.96352174 0.98845925 0.99696612
 0.        ] 
 [0, 1, 1, 1, 1, 1]
[0.         0.66690987 0.88925762 0.96352174 0.98845925 0.99696612
 0.        ] 
 [0, 1, 1, 1, 1, 1]
[0.         0.66690987 0.88925762 0.96352174 0.98845925 0.99696612
 0.        ] 
 [0, 1, 1, 1, 1, 1]
[0.         0.66690987 0.88925762 0.96352174 0.98845925 0.99696612
 0.        ] 
 [0, 1, 1, 1, 1, 1]
[0.         0.66690987 0.88925762 0.96352174 0.98845925 0.99696612
 0.        ] 
 [0, 1, 1, 1, 1, 1]
[0.         0.66690987 0.88925762 0.96352174 0.98845925 0.99696612
 0.        ] 
 [0, 1, 1, 1, 1, 1]
[0.         0.66690987 0.88925762 0.96352174 0.98845925 0.99696612
 0.        ] 
 [0, 1, 1, 1, 1, 1]
[0.         0.66690987 0.88925762 0.96352174 0.98845925 0.99696612
 0.        ] 
 [0, 1, 1,




In [61]:
def policy_iteration(P, gamma=1.0, theta=1e-10):
    random_actions = np.random.choice(tuple(P[0].keys()), len(P))
    pi = lambda s: {s: a for s, a in enumerate(random_actions)}[s]
    while True:
        old_pi = {s: pi(s) for s in range(len(P))}
        V = policy_evaluation(pi, P, gamma, theta)
        pi = policy_improvement(V, P, gamma)
        if old_pi == {s: pi(s) for s in range(len(P))}:
            break
    return V, pi