In [1]:
import csv
import numpy as np
from mdptoolbox import mdp
from itertools import product
from scipy.sparse import csr_matrix

## Define states

In [2]:
cells = ["white", "red", "blue", "empty"]
actions = [("store","white"), ("store","red"), ("store","blue"),
           ("restore","white"), ("restore","red"), ("restore","blue")]

states = tuple(product(cells, cells, cells, cells, actions))
states_dict = {s:i for i,s in enumerate(states)}

## Generate transition probability matrix

In [3]:
def getNextStates(state, action):
    
    next_states = []
    valid = False
    order = state[-1][0]
    
    if order == "store":
        valid = state[action] == "empty"
    else:
        valid = state[action] == state[-1][-1]
    
    if valid:
        if order == "store":
            next_states = [i for i,s in enumerate(states) if (s[action] == state[-1][-1]) and
                           ([x for j,x in enumerate(s) if j!=action][:-1] == [x for j,x in enumerate(state) if j!=action][:-1])]
        else:
            next_states = [i for i,s in enumerate(states) if (s[action] == "empty") and
                           ([x for j,x in enumerate(s) if j!=action][:-1] == [x for j,x in enumerate(state) if j!=action][:-1])]   
    else:
        next_states = [i for i,s in enumerate(states) if (s[:-1] == state[:-1])]
    
    return next_states

In [4]:
prob = 1/6
prob_end = 1 - (1/6)*5

trans_prob_matrix = []

for i in range(2*2):
    
    trans_prob = np.zeros((1536, 1536))
    
    for j in range(len(trans_prob)):
        
        next_states = getNextStates(states[j], i)
          
        for k,n in enumerate(next_states):
            
            if k == (len(next_states)-1):
                trans_prob[j][n] = prob_end
            else:
                trans_prob[j][n] = prob

    trans_prob_matrix.append(csr_matrix(trans_prob))

## Generate reward matrix

In [5]:
rewards = {0: 10, 1: 5, 2: 5, 3: 0, "fail": -10}

reward_matrix = []

for state in states:
    
    reward = np.zeros(4, dtype=np.float16)
    
    order = str()
    if state[-1][0] == "store":
        order = "store"
    else:
        order = "restore"
    
    if order == "store":
        for i,x in enumerate(reward):
            if state[i]=="empty":
                reward[i] = rewards[i]
            else:
                reward[i] = rewards["fail"]
    else:
        for i,x in enumerate(reward):
            if state[i] == state[-1][-1]:
                reward[i] = rewards[i]
            else:
                reward[i] = rewards["fail"]
    reward_matrix.append(reward)
    
reward_matrix = np.array(reward_matrix)

## Define and run MDPs

In [6]:
max_it = 10000
dscnt = 0.99999

### Policy Iteration

In [7]:
mdp_pol_it = mdp.PolicyIteration(transitions=trans_prob_matrix, reward=reward_matrix, discount=dscnt, max_iter=max_it)
mdp_pol_it.run()
print(mdp_pol_it.time)

919.6292378902435


### Value Iteration

In [8]:
mdp_val_it = mdp.ValueIteration(transitions=trans_prob_matrix, reward=reward_matrix, discount=dscnt, max_iter=max_it)
mdp_val_it.run()
print(mdp_val_it.time)

0.020993947982788086


### Policy Iteration Modified

In [9]:
mdp_pol_mod = mdp.PolicyIterationModified(transitions=trans_prob_matrix, 
                                          reward=reward_matrix, discount=dscnt, max_iter=max_it)
mdp_pol_mod.run()
print(mdp_pol_mod.time)

38.259228467941284


### Relative Value Iteration

In [10]:
mdp_val_rel = mdp.RelativeValueIteration(transitions=trans_prob_matrix, reward=reward_matrix, max_iter=max_it)
mdp_val_rel.run()
print(mdp_val_rel.time)

0.006995677947998047


### Q-Learning

In [11]:
mdp_ql = mdp.QLearning(transitions=trans_prob_matrix, reward=reward_matrix, discount=dscnt, n_iter=10000)
mdp_ql.run()
print(mdp_ql.time)

129.89526057243347


## Generate greedy policy for comparison

In [12]:
greedy = []

for state in states:
    
    failed = True
    order = state[-1][0]

    if order == "store":
        for i in range(4):
            if state[i] == "empty":
                failed = False
                greedy.append(i)
                break
    else: # order == "restore"
        for i in range(4):
            if state[i] == state[-1][-1]:
                failed = False
                greedy.append(i)
                break
    if failed:
        greedy.append(3)

## Evaluate policy performance with test data

In [13]:
def getPolPerf(orders, policy):

    state = ["empty","empty",
             "empty","empty",
             ("action","placeholder")]
    
    distance = 0

    for order in orders:
        
        state[-1] = order
        cell = policy[states_dict[tuple(state)]]

        if state[-1][0] == "store":
            state[cell] = state[-1][-1]
        else:
            state[cell] = "empty"

        if cell == 0:
            distance += 2
        elif cell == 1 or cell == 2:
            distance += 4
        else: #cell == 3
            distance += 6

    return distance

In [14]:
orders = []

with open("warehouseorder2x2.txt") as file:
    rdr = csv.reader(file, delimiter='\t')
    for row in rdr:
        orders.append((row[0], row[1]))

### Performance per policy, based on the total number of cells the robot needs to traverse. Less is better.

In [15]:
print("MDP Policy Iteration :", getPolPerf(orders, mdp_pol_it.policy))
print("MDP Value Iteration  :", getPolPerf(orders, mdp_val_it.policy))
print("MDP Pol.It. Modified :", getPolPerf(orders, mdp_pol_mod.policy))
print("MDP Relative Val.It. :", getPolPerf(orders, mdp_val_rel.policy))
print("MDP Q-Learning       :", getPolPerf(orders, mdp_ql.policy))
print("Greedy               :", getPolPerf(orders, greedy))

MDP Policy Iteration : 208
MDP Value Iteration  : 208
MDP Pol.It. Modified : 208
MDP Relative Val.It. : 208
MDP Q-Learning       : 222
Greedy               : 228
