# Exercise: Smart Factory Warehouse with a Markov Model

<b>Goal:</b> Optimization of the robots route for pick-up and storage of
items in a warehouse  
* Size of warehouse: 2x2 containers
* Three types of items: white, blue, red
* Two tasks: store, restore
* Probabilities of these items being stored or restored: 0.25, 0.25, 0.5
* Use the markov decision process toolbox for your solution => Choose the best performing MDP

In [1]:
from mdptoolbox import mdp
import numpy as np

In [2]:
# Definitions:
size_warehouse = (2, 2)  # size of warehouse
coord_warehouse = [(0, 0), (0, 1), (1, 0), (1, 1)]  # coordinates of all warehouse containers
single_reward = [30, 20, 20, 10]  # plain reward for each warehouse container, ordered by coordinates, high reward = short way
items = ['white', 'blue', 'red']  # all items
containers = ['empty'] + items  # possible states of one container
probabilities = [0.25, 0.25, 0.5]  # probabilities of each item that it is being stored or restored
tasks = ['store', 'restore']  # possible tasks the roboter can perform
start_pos = (0, 0)  # starting position of the roboter

In [3]:
# Create a list of all possible warehouse states
# A state is defined by the items that occupy the containers in the warehouse
# For example: ['white', 'empty', 'blue', 'empty'] which means that the container at (0, 0) is filled with a white item, the container at (0, 1) is empty, ...
states = []

# go through the containers
for c in coord_warehouse:
    pre = []
    
    # for the first container: all possible item occupations of this container
    if states == []:
        for i in containers:
            pre += [[i]]
    
    # for the other containers: expand the list by adding each possible occupation of this container
    else:
        for s in states:
            for i in containers:
                pre += [s + [i]]
            
    states = pre
            
print("Number of states: " + str(len(states)))
print("Example state: " + str(states[0]))

Number of states: 256
Example state: ['empty', 'empty', 'empty', 'empty']


In [4]:
# Create a list of all possible agent actions that can be performed on the warehouse
# An action is defined by the task the agent performes and which item it does the task with
# For example: ('store', 'blue') which means that the agent should store a blue item in the warehouse
actions = []

# go through the tasks and items
for t in tasks:
    for i in items:
        actions += [(t, i)]

print("Number of actions: " + str(len(actions)))
print("All actions: " + str(actions))

Number of actions: 6
All actions: [('store', 'white'), ('store', 'blue'), ('store', 'red'), ('restore', 'white'), ('restore', 'blue'), ('restore', 'red')]


In [5]:
# Define the empty transition and reward matrices
transition = np.zeros((len(actions), len(states), len(states)))
reward = np.zeros((len(actions), len(states), len(states)))

In [6]:
# Custom function to that searches a list for a value and returns all positions of the list where the value was found
f = lambda list, value: [i for i in range(len(list)) if list[i] == value]

In [7]:
# Fill the transition and reward matrices with values

# go through the actions
for a in range(len(actions)):
    # get the task and the item of this action
    task, item = actions[a]
    
    # go through the states
    for r in range(len(states)):
        # get the current state
        row_state = states[r]
        
        # catch some special cases:
        if (task == 'store') and ('empty' not in row_state):
            # if an item should be stored but the warehouse is full: do no change the state (reject the request)
            transition[a, r, r] = 1
            continue
            
        if (task == 'restore') and ((['empty', 'empty', 'empty', 'empty'] == row_state) or (item not in row_state)):
            # if an item should be restored but the warehouse is empty or the requested item is not in the warehouse:
            # do not change the state (reject the request)
            transition[a, r, r] = 1
            continue
        
        # normal case: the state of the warehouse can be changed
        idx = None
        
        if task == 'store':
            # if an item should be stored: search for all possible empty containers in the warehouse
            idx = f(row_state, 'empty')
            # calculate the probability that one of the containers will be chosen (equally distributed)
            prob = 1 / len(idx)
            
            # go through all empty containers
            for el in idx:
                # this container is chosen, so store the item in this container which creates a new state
                new_state = row_state.copy()
                new_state[el] = item
                # search this new state in the list of all possible states
                new_s_idx = states.index(new_state)
                # write the probability of transiting from the old state into this new state into the transition matrix 
                transition[a, r, new_s_idx] = prob
                
                # write the plain reward multiplied by the probability of this item appearing into the reward matrix
                reward[a, r, new_s_idx] = single_reward[el] * probabilities[items.index(item)]
            
        else:
            # if an item should be restored: search for all possible containers that contain this item
            idx = f(row_state, item)
            # calculate the probability that one of the containers will be chosen (equally distributed)
            prob = 1 / len(idx)
            
            # go through the containers with this item
            for el in idx:
                # this container is chosen, so restore the item from this container which leaves it empty which creates a new state
                new_state = row_state.copy()
                new_state[el] = 'empty'
                # search this new state in the list of all possible states
                new_s_idx = states.index(new_state)
                # write the probability of transiting from the old state into this new state into the transition matrix 
                transition[a, r, new_s_idx] = prob
                
                # write the plain reward multiplied by the probability of this item appearing into the reward matrix
                reward[a, r, new_s_idx] = single_reward[el] * probabilities[items.index(item)]


In [8]:
# Show matrices for the empty state ['empty', 'empty', 'empty', 'empty'] and the action (store, white):
print(str(states[0]) + " + " + str(actions[0]) + " ->\n")
print("Transition matrix")
print(transition[0, 0])
white_idx = f(transition[0, 0], 0.25)
print("\nIndicies of all state transitions: " + str(white_idx))
print("Resulting next states:")
for wi in white_idx:
    print(states[wi])

['empty', 'empty', 'empty', 'empty'] + ('store', 'white') ->

Transition matrix
[0.   0.25 0.   0.   0.25 0.   0.   0.   0.   0.   0.   0.   0.   0.
 0.   0.   0.25 0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.
 0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.
 0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.
 0.   0.   0.   0.   0.   0.   0.   0.   0.25 0.   0.   0.   0.   0.
 0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.
 0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.
 0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.
 0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.
 0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.
 0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.
 0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.
 0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.
 0.   0.   0.   0.   0.

In [9]:
print("Reward matix: ")
print(reward[0, 0])
print("\nResulting rewards orderd by next state:")
for wi in white_idx:
    print(reward[0, 0, wi])

Reward matix: 
[0.  2.5 0.  0.  5.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  5.  0.
 0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.
 0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.
 0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  7.5 0.  0.  0.  0.  0.  0.  0.
 0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.
 0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.
 0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.
 0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.
 0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.
 0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.
 0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.
 0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.
 0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.
 0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  

# Markov Decision Process Toolbox
I chose the three most popular algorithms for reinforcement learning:
* Policy iteration
* Q-learning
* Value iteration  

The algorithms will be evaluated with a example

In [10]:
# PolicyIteration
policyIt = mdp.PolicyIteration(transition, reward, 0.9)
policyIt.run()

In [11]:
# Q-Learning
qlearning = mdp.QLearning(transition, reward, 0.9)
qlearning.run()

In [12]:
# Value Iteration
valIt = mdp.ValueIteration(transition, reward, 0.9)
valIt.run()

In [13]:
# read in test file
fileactions = []
with open('warehousetest.txt', 'r') as file:
    for line in file:
        split = line.split("\t")
        faction = (split[0], split[1][:-1])
        fileactions += [actions.index(faction)]
        
print("Number of test actions: " + str(len(fileactions)))

Number of test actions: 65


In [14]:
# Helper function to evaluate the different markov models
def eval(markov):
    # Beginning from empty state
    current_state = 0
    test_reward = 0

    # go through the actions in the file
    for a in fileactions:
        print("Current state: " + str(states[current_state]) + " - perform action: " + str(actions[a]))
        
        # get the possible next states reachable through action a from the current state
        next_idx = [i for i in range(len(transition[a, current_state])) if transition[a, current_state, i] != 0]
        # Get the values for this next states from the value function of the model
        values = [markov.V[i] for i in next_idx]
        
        # Choose the state with the max value as next state
        next_state = next_idx[values.index(max(values))]
        # add the reward of this choice on the reward counter
        test_reward += reward[a, current_state, next_state]
        # for next round: current state = next state
        current_state = next_state
        
    print("Last state: " + str(states[current_state]))
    print("Total Reward: " + str(test_reward))

In [15]:
# Evaluate Policy Iteration
print("Policy Iteration")
eval(policyIt)

Policy Iteration
Current state: ['empty', 'empty', 'empty', 'empty'] - perform action: ('store', 'red')
Current state: ['red', 'empty', 'empty', 'empty'] - perform action: ('store', 'white')
Current state: ['red', 'empty', 'empty', 'white'] - perform action: ('restore', 'white')
Current state: ['red', 'empty', 'empty', 'empty'] - perform action: ('store', 'red')
Current state: ['red', 'empty', 'red', 'empty'] - perform action: ('store', 'blue')
Current state: ['red', 'empty', 'red', 'blue'] - perform action: ('store', 'red')
Current state: ['red', 'red', 'red', 'blue'] - perform action: ('restore', 'red')
Current state: ['empty', 'red', 'red', 'blue'] - perform action: ('restore', 'red')
Current state: ['empty', 'empty', 'red', 'blue'] - perform action: ('restore', 'red')
Current state: ['empty', 'empty', 'empty', 'blue'] - perform action: ('store', 'red')
Current state: ['red', 'empty', 'empty', 'blue'] - perform action: ('restore', 'blue')
Current state: ['red', 'empty', 'empty', 'em

In [16]:
# Evaluate Q-Learning
print("Q-Learning")
eval(qlearning)

Q-Learning
Current state: ['empty', 'empty', 'empty', 'empty'] - perform action: ('store', 'red')
Current state: ['empty', 'empty', 'red', 'empty'] - perform action: ('store', 'white')
Current state: ['white', 'empty', 'red', 'empty'] - perform action: ('restore', 'white')
Current state: ['empty', 'empty', 'red', 'empty'] - perform action: ('store', 'red')
Current state: ['empty', 'empty', 'red', 'red'] - perform action: ('store', 'blue')
Current state: ['empty', 'blue', 'red', 'red'] - perform action: ('store', 'red')
Current state: ['red', 'blue', 'red', 'red'] - perform action: ('restore', 'red')
Current state: ['empty', 'blue', 'red', 'red'] - perform action: ('restore', 'red')
Current state: ['empty', 'blue', 'empty', 'red'] - perform action: ('restore', 'red')
Current state: ['empty', 'blue', 'empty', 'empty'] - perform action: ('store', 'red')
Current state: ['empty', 'blue', 'empty', 'red'] - perform action: ('restore', 'blue')
Current state: ['empty', 'empty', 'empty', 'red'] 

In [17]:
# Evaluate Value Iteration
print("Value Iteration")
eval(valIt)

Value Iteration
Current state: ['empty', 'empty', 'empty', 'empty'] - perform action: ('store', 'red')
Current state: ['red', 'empty', 'empty', 'empty'] - perform action: ('store', 'white')
Current state: ['red', 'empty', 'empty', 'white'] - perform action: ('restore', 'white')
Current state: ['red', 'empty', 'empty', 'empty'] - perform action: ('store', 'red')
Current state: ['red', 'empty', 'red', 'empty'] - perform action: ('store', 'blue')
Current state: ['red', 'empty', 'red', 'blue'] - perform action: ('store', 'red')
Current state: ['red', 'red', 'red', 'blue'] - perform action: ('restore', 'red')
Current state: ['empty', 'red', 'red', 'blue'] - perform action: ('restore', 'red')
Current state: ['empty', 'empty', 'red', 'blue'] - perform action: ('restore', 'red')
Current state: ['empty', 'empty', 'empty', 'blue'] - perform action: ('store', 'red')
Current state: ['red', 'empty', 'empty', 'blue'] - perform action: ('restore', 'blue')
Current state: ['red', 'empty', 'empty', 'emp