In [None]:
import numpy as np
import random
import statistics

# Dynamic Programming

In [None]:
class FoodTruck:
    def __init__(self):
        self.v_demand = [100, 200, 300, 400]
        self.p_demand = [0.3, 0.4, 0.2, 0.1]
        self.capacity = self.v_demand[-1]
        self.days = ['Mon', 'Tue', 'Wed', 
                     'Thu', 'Fri', "Weekend"]
        self.unit_cost = 4
        self.net_revenue = 7
        self.action_space = [0, 100, 200, 300, 400]
        self.state_space = [("Mon", 0)] \
                            + [(d, i) for d in self.days[1:] 
                                for i in [0, 100, 200, 300]]
    
    def get_next_state_reward(self, state, action, demand):
        day, inventory = state
        result = {}
        result['next_day'] = self.days[self.days.index(day) \
                                       + 1]
        result['starting_inventory'] = min(self.capacity, 
                                           inventory 
                                           + action)
        result['cost'] = self.unit_cost * action 
        result['sales'] = min(result['starting_inventory'], 
                              demand)
        result['revenue'] = self.net_revenue * result['sales']
        result['next_inventory'] \
            = result['starting_inventory'] - result['sales']
        result['reward'] = result['revenue'] - result['cost']
        return result
    
    def get_transition_prob(self, state, action):
        next_s_r_prob = {}
        for ix, demand in enumerate(self.v_demand):
            result = self.get_next_state_reward(state, 
                                                action, 
                                                demand)
            next_s = (result['next_day'],
                      result['next_inventory'])
            reward = result['reward']
            prob = self.p_demand[ix]
            if (next_s, reward) not in next_s_r_prob:
                next_s_r_prob[next_s, reward] = prob
            else:
                next_s_r_prob[next_s, reward] += prob
        return next_s_r_prob
    
    def reset(self):
        self.day = "Mon"
        self.inventory = 0
        state = (self.day, self.inventory)
        return state
    
    def is_terminal(self, state):
        day, inventory = state
        return day == "Weekend"
    
    def step(self, action):
        demand = random.choices(self.v_demand, self.p_demand)[0]
        result = self.get_next_state_reward((self.day, 
                                             self.inventory), 
                                       action, 
                                       demand)
        self.day = result['next_day']
        self.inventory = result['next_inventory']
        state = (self.day, self.inventory)
        reward = result['reward']
        done = self.is_terminal(state)
        info = {'demand': demand, 'sales': result['sales']}
        return state, reward, done, info

In [None]:
# Simulating an arbitrary policy
foodtruck = FoodTruck()
rewards = []
for i_episode in range(10000):
    state = foodtruck.reset()
    done = False
    ep_reward = 0
    while not done:
        day, inventory = state
        action = max(0, 300 - inventory)
        state, reward, done, info = foodtruck.step(action) 
        ep_reward += reward
    rewards.append(ep_reward)
statistics.mean(rewards)

In [None]:
# Single day expected reward
ucost = 4
uprice = 7
v_demand = [100, 200, 300, 400]
p_demand = [0.3, 0.4, 0.2, 0.1]
inv = 400
profit = uprice * sum(p *min(v, inv) for p, v in zip(v_demand, p_demand)) - inv * ucost
print(profit)

## Policy Evaluation

In [None]:
def base_policy(states):
    policy = {}
    for s in states:
        day, inventory = s
        prob_a = {} 
        if inventory >= 300:
            prob_a[0] = 1
        else:
            prob_a[200 - inventory] = 0.5
            prob_a[300 - inventory] = 0.5
        policy[s] = prob_a
    return policy

In [None]:
def expected_update(env, v, s, prob_a, gamma):
    expected_value = 0
    for a in prob_a:
        prob_next_s_r = env.get_transition_prob(s, a)
        for next_s, r in prob_next_s_r:
            expected_value += prob_a[a] \
                            * prob_next_s_r[next_s, r] \
                            * (r + gamma * v[next_s])
    return expected_value

In [None]:
def policy_evaluation(env, policy, max_iter=100, 
                      v = None, eps=0.1, gamma=1):
    if not v:
        v = {s: 0 for s in env.state_space}
    k = 0
    while True:
        max_delta = 0
        for s in v:
            if not env.is_terminal(s):
                v_old = v[s]
                prob_a = policy[s]
                v[s] = expected_update(env, v, 
                                       s, prob_a, 
                                       gamma)
                max_delta = max(max_delta, 
                                abs(v[s] - v_old))
        k += 1
        if max_delta < eps:
            print("Converged in", k, "iterations.")
            break
        elif k == max_iter:
            print("Terminating after", k, "iterations.")
            break
    return v

In [None]:
foodtruck = FoodTruck()
policy = base_policy(foodtruck.state_space)

In [None]:
v = policy_evaluation(foodtruck, policy)
print("Expected weekly profit:", v["Mon", 0])

In [None]:
print("The state values:")
v

In [None]:
def choose_action(state, policy):
    prob_a = policy[state]
    action = np.random.choice(a=list(prob_a.keys()), 
                              p=list(prob_a.values()))
    return action

def simulate_policy(policy, n_episodes):
    np.random.seed(0)
    foodtruck = FoodTruck()
    rewards = []
    for i_episode in range(n_episodes):
        state = foodtruck.reset()
        done = False
        ep_reward = 0
        while not done:
            action = choose_action(state, policy)
            state, reward, done, info = foodtruck.step(action) 
            ep_reward += reward
        rewards.append(ep_reward)
    print("Expected weekly profit:", np.mean(rewards))

In [None]:
simulate_policy(policy, 1000)

## Policy Iteration

In [None]:
def policy_improvement(env, v, s, actions, gamma):
    prob_a = {}
    if not env.is_terminal(s):
        max_q = np.NINF
        best_a = None
        for a in actions:
            q_sa = expected_update(env, v, s, {a: 1}, gamma)
            if q_sa >= max_q:
                max_q = q_sa
                best_a = a
        prob_a[best_a] = 1
    else:
        max_q = 0
    return prob_a, max_q

In [None]:
def policy_iteration(env,  eps=0.1, gamma=1):
    np.random.seed(1)
    states = env.state_space
    actions = env.action_space
    policy = {s: {np.random.choice(actions): 1}
             for s in states}
    v = {s: 0 for s in states}
    while True:
        v = policy_evaluation(env, policy, v=v, 
                          eps=eps, gamma=gamma)
        old_policy = policy
        policy = {}
        for s in states:
            policy[s], _ = policy_improvement(env, v, s, 
                                    actions, gamma)
        if old_policy == policy:
            break
    print("Optimal policy found!")
    return policy, v

In [None]:
policy, v = policy_iteration(foodtruck)
print("Expected weekly profit:", v["Mon", 0])

In [None]:
print(policy)

## Value Iteration

In [None]:
def value_iteration(env, max_iter=100, eps=0.1, gamma=1):
    states = env.state_space
    actions = env.action_space
    v = {s: 0 for s in states}
    policy = {}
    k = 0
    while True:
        max_delta = 0
        for s in states:
            old_v = v[s]
            policy[s], v[s] = policy_improvement(env, 
                                                 v, 
                                                 s, 
                                                 actions, 
                                                 gamma)
            max_delta = max(max_delta, abs(v[s] - old_v))
        k += 1
        if max_delta < eps:
            print("Converged in", k, "iterations.")
            break
        elif k == max_iter:
            print("Terminating after", k, "iterations.")
            break
    return policy, v

In [None]:
policy, v = value_iteration(foodtruck)
print("Expected weekly profit:", v["Mon", 0])

In [None]:
print(policy)

In [None]:
def generalized_policy_iteration(env, max_iter=2, eps=0.1, gamma=1):
    np.random.seed(1)
    states =  env.state_space
    actions = env.action_space
    policy = {s: {np.random.choice(actions): 1}
             for s in states}
    v = {s: 0 for s in states}
    k = 0
    while True:
        v_old = v.copy()
        policy = {}
        for s in states:
            policy[s], v[s] = policy_improvement(env, v, s, 
                                    actions, gamma)
        v = policy_evaluation(env, policy, 
                              max_iter=max_iter, v=v, 
                              eps=eps, gamma=gamma)
        max_delta = np.amax([abs(v[s] - v_old[s]) for s in v])
        k += 1
        if max_delta < eps:
            print("GPI converged in", k, "iterations.")
            print([abs(v[s] - v_old[s]) for s in v])
            break
            
    print("Optimal policy found!")
    return policy, v

In [None]:
policy, v = generalized_policy_iteration(foodtruck, max_iter=2, eps=0.1, gamma=1)

In [None]:
print("Expected weekly profit:", v["Mon", 0])
print(policy)

In [None]:
v

# Monte Carlo Methods

## MC Prediction

In [None]:
def first_visit_return(returns, trajectory, gamma):
    G = 0
    T = len(trajectory) - 1
    for t, sar in enumerate(reversed(trajectory)):
        s, a, r = sar
        G = r + gamma * G
        first_visit = True
        for j in range(T - t):
            if s == trajectory[j][0]:
                first_visit = False
        if first_visit:
            if s in returns:
                returns[s].append(G)
            else:
                returns[s] = [G]
    return returns

In [None]:
def get_trajectory(env, policy):
    trajectory = []
    state = env.reset()
    done = False
    sar = [state]
    while not done:
        action = choose_action(state, policy)
        state, reward, done, info = env.step(action)
        sar.append(action)
        sar.append(reward)
        trajectory.append(sar)
        sar = [state]
    return trajectory

In [None]:
def first_visit_mc(env, policy, gamma, n_trajectories):
    np.random.seed(0)
    returns = {}
    v = {}
    for i in range(n_trajectories):
        trajectory = get_trajectory(env, policy)
        returns = first_visit_return(returns, 
                                     trajectory, 
                                     gamma)
    for s in env.state_space:
        if s in returns:
            v[s] = np.round(np.mean(returns[s]), 1)
    return v

In [None]:
foodtruck = FoodTruck()
policy = base_policy(foodtruck.state_space)

In [None]:
v_est = first_visit_mc(foodtruck, policy, 1, 10000)
v_est

In [None]:
v_true = policy_evaluation(foodtruck, policy)

In [None]:
v_true

In [None]:
# v_est = first_visit_mc(foodtruck, policy, 1, 5)
# {s: v_est[s] for s in sorted(v_est)}

In [None]:
# v_est = first_visit_mc(foodtruck, policy, 1, 10)
# {s: v_est[s] for s in sorted(v_est)}

In [None]:
# v_est = first_visit_mc(foodtruck, policy, 1, 100)
# {s: v_est[s] for s in sorted(v_est)}

In [None]:
# v_est = first_visit_mc(foodtruck, policy, 1, 1000)
# {s: v_est[s] for s in sorted(v_est)}

In [None]:
# v_est = first_visit_mc(foodtruck, policy, 1, 10000)
# {s: v_est[s] for s in sorted(v_est)}

## On-policy Monte Carlo Control

In [None]:
import operator

In [None]:
def get_eps_greedy(actions, eps, a_best):
    prob_a = {}
    n_a = len(actions)
    for a in actions:
        if a == a_best:
            prob_a[a] = 1 - eps + eps/n_a
        else:
            prob_a[a] = eps/n_a
    return prob_a

In [None]:
def get_random_policy(states, actions):
    policy = {}
    n_a = len(actions)
    for s in states:
        policy[s] = {a: 1/n_a for a in actions}
    return policy

In [None]:
def on_policy_first_visit_mc(env, n_iter, eps, gamma):
    np.random.seed(0)
    states =  env.state_space
    actions = env.action_space
    policy =  get_random_policy(states, actions)
    Q = {s: {a: 0 for a in actions} for s in states}
    Q_n = {s: {a: 0 for a in actions} for s in states}
    for i in range(n_iter):
        if i % 10000 == 0:
            print("Iteration:", i)
        trajectory = get_trajectory(env, policy)
        G = 0
        T = len(trajectory) - 1
        for t, sar in enumerate(reversed(trajectory)):
            s, a, r = sar
            G = r + gamma * G
            first_visit = True
            for j in range(T - t):
                s_j = trajectory[j][0]
                a_j = trajectory[j][1]
                if (s, a) == (s_j, a_j):
                    first_visit = False
            if first_visit:
                Q[s][a] = Q_n[s][a] * Q[s][a] + G
                Q_n[s][a] += 1
                Q[s][a] /= Q_n[s][a]
                a_best = max(Q[s].items(), 
                             key=operator.itemgetter(1))[0]
                policy[s] = get_eps_greedy(actions, 
                                           eps, 
                                           a_best)
    return policy, Q, Q_n

In [None]:
policy, Q, Q_n = on_policy_first_visit_mc(foodtruck, 
                                          300000, 
                                          0.05, 
                                          1)

In [None]:
policy

In [None]:
Q

## Off-policy Monte Carlo Control

In [None]:
def off_policy_mc(env, n_iter, eps, gamma):
    np.random.seed(0)
    states =  env.state_space
    actions = env.action_space
    Q = {s: {a: 0 for a in actions} for s in states}
    C = {s: {a: 0 for a in actions} for s in states}
    target_policy = {}
    behavior_policy = get_random_policy(states, 
                                        actions)
    for i in range(n_iter):
        if i % 10000 == 0:
            print("Iteration:", i)
        trajectory = get_trajectory(env, 
                                    behavior_policy)
        G = 0
        W = 1
        T = len(trajectory) - 1
        for t, sar in enumerate(reversed(trajectory)):
            s, a, r = sar
            G = r + gamma * G
            C[s][a] += W
            Q[s][a] += (W/C[s][a]) * (G - Q[s][a])
            a_best = max(Q[s].items(), 
                         key=operator.itemgetter(1))[0]
            target_policy[s] = a_best
            behavior_policy[s] = get_eps_greedy(actions, 
                                                eps, 
                                                a_best)
            if a != target_policy[s]:
                break
            W = W / behavior_policy[s][a]
    target_policy = {s: target_policy[s] for s in states
                                   if s in target_policy}
    return target_policy, Q

In [None]:
policy, Q = off_policy_mc(foodtruck, 300000, 0.05, 1)

In [None]:
policy

In [None]:
Q

# TD Learning

## TD Prediction

In [None]:
def one_step_td_prediction(env, policy, gamma, alpha, n_iter):
    np.random.seed(0)
    states = env.state_space
    v = {s: 0 for s in states}
    s = env.reset()
    for i in range(n_iter):
        a = choose_action(s, policy)
        s_next, reward, done, info = env.step(a)
        v[s] += alpha * (reward + gamma * v[s_next] - v[s])
        if done:
            s = env.reset()
        else:
            s = s_next
    return v

In [None]:
policy = base_policy(foodtruck.state_space)
v = one_step_td_prediction(foodtruck, policy, 1, 0.01, 100000)
v

In [None]:
print({s: np.round(v[s]) for s in v})

True values
{('Mon', 0): 2515.0,
 ('Tue', 0): 1960.0,
 ('Tue', 100): 2360.0,
 ('Tue', 200): 2760.0,
 ('Tue', 300): 3205.0,
 ('Wed', 0): 1405.0,
 ('Wed', 100): 1805.0,
 ('Wed', 200): 2205.0,
 ('Wed', 300): 2650.0,
 ('Thu', 0): 850.0000000000001,
 ('Thu', 100): 1250.0,
 ('Thu', 200): 1650.0,
 ('Thu', 300): 2095.0,
 ('Fri', 0): 295.00000000000006,
 ('Fri', 100): 695.0000000000001,
 ('Fri', 200): 1095.0,
 ('Fri', 300): 1400.0,
 ('Weekend', 0): 0,
 ('Weekend', 100): 0,
 ('Weekend', 200): 0,
 ('Weekend', 300): 0}

In [None]:
def sarsa(env, gamma, eps, alpha, n_iter):
    np.random.seed(0)
    states = env.state_space
    actions = env.action_space
    Q = {s: {a: 0 for a in actions} for s in states}
    policy = get_random_policy(states, actions)
    s = env.reset()
    a = choose_action(s, policy)
    for i in range(n_iter):
        if i % 100000 == 0:
            print("Iteration:", i)
        s_next, reward, done, info = env.step(a)
        a_best = max(Q[s_next].items(), 
                     key=operator.itemgetter(1))[0]
        policy[s_next] = get_eps_greedy(actions, eps, a_best)
        a_next = choose_action(s_next, policy)
        Q[s][a] += alpha * (reward 
                            + gamma * Q[s_next][a_next] 
                            - Q[s][a])
        if done:
            s = env.reset()
            a_best = max(Q[s].items(), 
                         key=operator.itemgetter(1))[0]
            policy[s] = get_eps_greedy(actions, eps, a_best)
            a = choose_action(s, policy)
        else:
            s = s_next
            a = a_next
    return policy, Q

In [None]:
policy, Q = sarsa(foodtruck, 1, 0.1, 0.01, 1000000)

In [None]:
policy

In [None]:
Q[('Mon', 0)]

In [None]:
def q_learning(env, gamma, eps, alpha, n_iter):
    np.random.seed(0)
    states =  env.state_space
    actions = env.action_space
    Q = {s: {a: 0 for a in actions} for s in states}
    policy = get_random_policy(states, actions)
    s = env.reset()
    for i in range(n_iter):
        if i % 100000 == 0:
            print("Iteration:", i)
        a_best = max(Q[s].items(), 
                     key=operator.itemgetter(1))[0]
        policy[s] = get_eps_greedy(actions, eps, a_best)
        a = choose_action(s, policy)
        s_next, reward, done, info = env.step(a)
        Q[s][a] += alpha * (reward 
                            + gamma * max(Q[s_next].values()) 
                            - Q[s][a])
        if done:
            s = env.reset()
        else:
            s = s_next
    policy = {s: {max(policy[s].items(), 
                 key=operator.itemgetter(1))[0]: 1}
                 for s in states}
    return policy, Q

In [None]:
policy, Q = q_learning(foodtruck, 1, 0.1, 0.01, 1000000)
policy

In [None]:
q_learning(foodtruck, 1, 0.1, 0.01, 20000000)


In [None]:
Q

{('Mon', 0): 2880.0,
 ('Tue', 0): 2250.0,
 ('Tue', 100): 2650.0,
 ('Tue', 200): 3050.0,
 ('Tue', 300): 3450.0,
 ('Wed', 0): 1620.0,
 ('Wed', 100): 2020.0,
 ('Wed', 200): 2420.0,
 ('Wed', 300): 2820.0,
 ('Thu', 0): 990.0,
 ('Thu', 100): 1390.0,
 ('Thu', 200): 1790.0,
 ('Thu', 300): 2190.0,
 ('Fri', 0): 390.00000000000006,
 ('Fri', 100): 790.0000000000001,
 ('Fri', 200): 1190.0,
 ('Fri', 300): 1400.0,
 ('Weekend', 0): 0,
 ('Weekend', 100): 0,
 ('Weekend', 200): 0,
 ('Weekend', 300): 0}