In [1]:
import numpy as np
import gym

In [16]:
class FoodTruck(gym.Env):
    def __init__(self):
        self.v_demand = [100, 200, 300, 400]
        self.p_demand = [0.3, 0.4, 0.2, 0.1]
        self.capacity = self.v_demand[-1]
        self.days = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Weekend']
        self.unit_cost = 4
        self.net_revenue = 7
        self.action_space = [0, 100, 200, 300, 400]
        self.state_space = [("Mon", 0)] + [(d, i) for d in self.days[1:] for i in [0, 100, 200, 300]]
        
    def get_next_state_reward(self, state, action, demand):
        day, inventory = state
        result = {}
        result['next_day'] = self.days[self.days.index(day) + 1]
        result['starting_inventory'] = min(self.capacity, inventory + action)
        result['cost'] = self.unit_cost * action
        result['sales'] = min(result['starting_inventory'], demand)
        result['revenue'] = self.net_revenue * result['sales']
        result['next_inventory'] = result['starting_inventory'] - result['sales']
        result['reward'] = result['revenue'] - result['cost']
        return result
    
    def get_transition_prob(self, state, action):
        next_s_r_prob = {}
        for ix, demand in enumerate(self.v_demand):
            result = self.get_next_state_reward(state, action, demand)
            next_s = (result['next_day'], result['next_inventory'])
            reward = result['reward']
            prob = self.p_demand[ix]
            if (next_s, reward) not in next_s_r_prob:
                next_s_r_prob[next_s, reward] = prob
            else:
                next_s_r_prob[next_s, reward] += prob
        return next_s_r_prob
    
    def reset(self):
        self.day = "Mon"
        self.inventory = 0
        state = (self.day, self.inventory)
        return state
    
    def is_terminal(self, state):
        day, inventory = state
        if day == "Weekend":
            return True
        else:
            return False
        
    def step(self, action):
        demand = np.random.choice(self.v_demand, p=self.p_demand)
        result = self.get_next_state_reward((self.day, self.inventory), action, demand)
        self.day = result['next_day']
        self.inventory = result['next_inventory']
        state = (self.day, self.inventory)
        reward = result['reward']
        done = self.is_terminal(state)
        info = {'demand': demand, 'sales': result['sales']}
        return state, reward, done, info

In [17]:
def base_policy(states):
    policy = {}
    for s in states:
        day, inventory = s
        prob_a = {}
        if inventory >= 300:
            prob_a[0] = 1
        else:
            prob_a[200 - inventory] = 0.5
            prob_a[300 - inventory] = 0.5
        policy[s] = prob_a
    return policy

In [18]:
def expected_update(env, v, s, prob_a, gamma):
    expected_value = 0
    for a in prob_a:
        prob_next_s_r = env.get_transition_prob(s, a)
        for next_s, r in prob_next_s_r:
            expected_value += prob_a[a] * prob_next_s_r[next_s, r] * (r + gamma * v[next_s])
    return expected_value

In [19]:
def policy_evaluation(env, policy, max_iter=100, v=None, eps=0.1, gamma=1):
    if not v:
        v = {s: 0 for s in env.state_space}
    k = 0
    while True:
        max_delta = 0
        for s in v:
            if not env.is_terminal(s):
                v_old = v[s]
                prob_a = policy[s]
                v[s] = expected_update(env, v, s, prob_a, gamma)
                max_delta = max(max_delta, abs(v[s] - v_old))
        k += 1
        if max_delta < eps:
            print("Converged in ", k, " iterations.")
            break
        elif k == max_iter:
            print("Terminating after ", k, " iterations.")
            break
    return v

In [20]:
foodtruck = FoodTruck()
policy = base_policy(foodtruck.state_space)

In [21]:
#TODO Implement is_terminal for FoodTruck
v = policy_evaluation(foodtruck, policy)
print("Expected weekly profit:", v["Mon", 0])

Converged in  6  iterations.
Expected weekly profit: 2515.0


In [22]:
def choose_action(state, policy):
    prob_a = policy[state]
    action = np.random.choice(a=list(prob_a.keys()), p=list(prob_a.values()))
    return action

In [23]:
def simulate_policy(policy, n_episodes):
    np.random.seed(0)
    foodtruck = FoodTruck()
    rewards = []
    for i_episode in range(n_episodes):
        state = foodtruck.reset()
        done = False
        ep_reward = 0
        while not done:
            action = choose_action(state, policy)
            state, reward, done, info = foodtruck.step(action)
            ep_reward += reward
        rewards.append(ep_reward)
    print("Expected weekly profit:", np.mean(rewards))

In [24]:
simulate_policy(policy, 1000)

Expected weekly profit: 2518.1


In [25]:
def policy_improvement(env, v, s, actions, gamma):
    prob_a = {}
    if not env.is_terminal(s):
        max_q = np.NINF
        best_a = None
        for a in actions:
            q_sa = expected_update(env, v, s, {a: 1}, gamma)
            if q_sa >= max_q:
                max_q = q_sa
                best_a = a
        prob_a[best_a] = 1
    else:
        max_q = 0
    return prob_a, max_q

In [26]:
def policy_iteration(env, eps=0.1, gamma=1):
    np.random.seed(1)
    states = env.state_space
    actions = env.action_space
    policy = {s: {np.random.choice(actions): 1} for s in states}
    v = {s: 0 for s in states}
    while True:
        v = policy_evaluation(env, policy, v=v, eps=eps, gamma=gamma)
        old_policy = policy
        policy = {}
        for s in states:
            policy[s], _ = policy_improvement(env, v, s, actions, gamma)
        if old_policy == policy:
            break
    print("Optimal policy found")
    return policy, v