# 3x2 warehouse layout

## training data and layout

I obtained the distribution of actions from `warehousetraining3x2.txt` (see jupyter notebook `overview.ipynb` for that):       

```
store white      0.1278
store blue       0.1253
store red        0.2469
restore white    0.1278
restore blue     0.1253
restore red      0.2469
```

For the 3x2 warehouse I choose the following storage layout:

```
0 1 2
3 4 5
```

The robot arm starts from field 0 (x=0,y=0) which already counts as a distance of 1. Storing or restoring in 1 or 3 costs a distance of 2, in 2 and 4 it costs 3. Traveling to 5 has a distance of 4:

```
1 2 3
2 3 4
```

## Code

In [1]:
import pandas as pd
import numpy as np
import itertools
from typing import List, Tuple, Union

data_dir_3x2 = 'data_3x2'

In [2]:
distance_array = [1, 2, 3, 2, 3, 4]
reward_array = [4, 3, 2, 3, 2, 1]

# do next action on position...
action_positions = [0, 1, 2, 3, 4, 5]

field_map = {
    0: 'none',
    1: 'white',
    2: 'blue',
    3: 'red'
}

field_map_inverse = {v: k for k, v in field_map.items()}

action_map = {
    0: ('store', 'white'),
    1: ('store', 'blue'),
    2: ('store', 'red'),
    3: ('restore', 'white'),
    4: ('restore', 'blue'),
    5: ('restore', 'red')
}

action_map_inverse = {v: k for k, v in action_map.items()}

training_action_dist = {
    0: 0.1278, # store   white
    1: 0.1253, # store   blue
    2: 0.2469, # store   red
    3: 0.1278, # restore white
    4: 0.1253, # restore blue
    5: 0.2469  # restore red
}

In [3]:
def generate_statelist() -> List[Tuple[int]]:
    # 0...3 := none, white, blue, red
    state_per_field = [0, 1, 2, 3]
    # 0...5 := store {white, blue, red}, restore {white, blue, red}
    actions = [0, 1, 2, 3, 4, 5]
    #storage_states = [p for p in itertools.product(storage_state, repeat=4)]
    statelist = []
    for action in actions:
        for x0 in state_per_field:
            for x1 in state_per_field:
                for x2 in state_per_field:
                    for x3 in state_per_field:
                        for x4 in state_per_field:
                            for x5 in state_per_field:
                                state = (x0, x1, x2, x3, x4, x5, action)
                                statelist.append(state)
    print(f'generated {len(statelist)} states')
    return statelist

In [4]:
statelist = generate_statelist()

generated 24576 states


In [5]:
# helper functions

# For a given state: get action
def get_state_action(state_tup: Tuple[int], humanreadable: bool=False) -> Union[str, int]:
    # last element in state tuple is the action
    action = state_tup[-1]
    if humanreadable:
        return action_map[action]
    else:
        return action

# For a given state: list of indizes of empty slots
def get_state_free_slots(state_tup: Tuple[int]) -> List[int]:
    return [index for index, slot in enumerate(state_tup[:-1]) if slot == 0]

# For a given state: does it have empty slots?
def state_has_any_free_slots(state_tup: Tuple[int]) -> bool:
    return len(get_state_free_slots(state_tup)) > 0

# For a given state: all slots free?
def state_has_only_free_slots(state_tup: Tuple[int]) -> bool:
    return len([index for index, slot in enumerate(state_tup[:-1]) if slot == 0]) == len(state_tup[:-1])

# For a given state: list of indizes of certain color slots
def get_state_slots_of_color(state_tup: Tuple[int], color_to_find: Union[int, str]) -> List[int]:
    if isinstance(color_to_find, str):
        color_int = field_map_inverse[color_to_find]
    else:
        color_int = color_to_find
    return [index for index, slot in enumerate(state_tup[:-1]) if slot == color_int]

# For a given state: does it have certain color slots?
def state_has_any_slots_of_color(state_tup: Tuple[int], color_to_find: Union[int, str]) -> bool:
    return len(get_state_slots_of_color(state_tup, color_to_find)) > 0

# For two given states: list of indizes of varying slots
def get_states_varying_slots(state_tup0: Tuple[int], state_tup1: Tuple[int]) -> List[int]:
    state0_slots = state_tup0[:-1]
    state1_slots = state_tup1[:-1]
    indizes_varying_slots = []
    for index, slot_content in enumerate(state0_slots):
        if slot_content != state1_slots[index]:
            indizes_varying_slots.append(index)
    return indizes_varying_slots

# For two given states: do they have varying slots?
def states_have_varying_slots(state_tup0: Tuple[int], state_tup1: Tuple[int]) -> bool:
    return len(get_states_varying_slots(state_tup0, state_tup1)) > 0


# For two given states: is state1 a valid state after state0?
def states_are_valid_successors(state_tup0: Tuple[int], state_tup1: Tuple[int], action_position: int) -> Tuple[bool, str]:
    if state_tup0[action_position] == state_tup1[action_position]:
        # nothing changed at action_position...
        return False
    # check rest if it is the same
    for ind, field in enumerate(state_tup0[:-1]):
        # skip field that is where action should take place
        if ind == action_position:
            continue
        if field != state_tup1[ind]:
            return False
    # store
    if state_tup0[-1] in [0, 1, 2]: # ['store white', 'store blue', 'store red']
        if (state_tup0[-1] + 1) == state_tup1[action_position] and state_tup0[action_position] == 0:
            return True
        else:
            return False
    # restore
    if state_tup0[-1] in [3, 4, 5]: # ['restore white', 'restore blue', 'restore red']
        if 0 == state_tup1[action_position] and state_tup1[action_position] == (state_tup0[-1] - 2):
            return True
        else:
            return False
    raise Exception(f'state_tup0: {state_tup0}, state_tup1: {state_tup1}')

# call states_are_valid_successors beforehand!
def distance_between_successor_states(state_tup0: Tuple[int], state_tup1: Tuple[int]) -> int:
    #if not states_are_valid_successors(state_tup0, state_tup1):
    #    raise Error('states_are_valid_successors returned False')
    try:
        changed_slot = get_states_varying_slots(state_tup0, state_tup1)[0]
    except IndexError:
        print(f'state_tup0: {state_tup0}, state_tup1: {state_tup1}')
    return distance_array[changed_slot]

In [6]:
statelist[50]

(0, 0, 0, 3, 0, 2, 0)

In [7]:
# test helper functions
# state0: (0, 0, 0, 3, 0, 2, 0)
state0 = statelist[50]
state1 = (1, 0, 0, 3, 0, 2, 0)

print(f'state0:\t\t\t\t{state0}')
print(f'get_state_action:\t\t{get_state_action(state0, humanreadable=True)}')
print(f'get_state_free_slots:\t\t{get_state_free_slots(state0)}')
print(f'state_has_any_free_slots:\t{state_has_any_free_slots(state0)}')
print(f'state_has_only_free_slots:\t{state_has_only_free_slots(state0)}')
print(f'get_state_slots_of_color:\t{get_state_slots_of_color(state0, 2)}')
print(f'get_state_slots_of_color:\t{get_state_slots_of_color(state0, "blue")}')
print(f'state_has_any_slots_of_color:\t{state_has_any_slots_of_color(state0, "blue")}')
print()
print(f'state0:\t\t\t\t{state0}')
print(f'action of state0:\t\t{get_state_action(state0)}')
print(f'action of state0:\t\t{get_state_action(state0, humanreadable=True)}')
print(f'statelist[51]:\t\t\t{state1}')
print(f'get_states_varying_slots:\t{get_states_varying_slots(state0, state1)}')
print(f'states_have_varying_slots:\t{states_have_varying_slots(state0, state1)}')
print(f'states_are_valid_successors:\t{states_are_valid_successors(state0, state1, 0)}')

state0:				(0, 0, 0, 3, 0, 2, 0)
get_state_action:		('store', 'white')
get_state_free_slots:		[0, 1, 2, 4]
state_has_any_free_slots:	True
state_has_only_free_slots:	False
get_state_slots_of_color:	[5]
get_state_slots_of_color:	[5]
state_has_any_slots_of_color:	True

state0:				(0, 0, 0, 3, 0, 2, 0)
action of state0:		0
action of state0:		('store', 'white')
statelist[51]:			(1, 0, 0, 3, 0, 2, 0)
get_states_varying_slots:	[0]
states_have_varying_slots:	True
states_are_valid_successors:	True


In [8]:
def fix_tpm_empty_row(tpm):
    for index, row_vector in enumerate(tpm):
        sum_ = np.sum(row_vector)
        if sum_ == 0.0:
            # it should stay in the state then since the sum of each row has to equal one
            tpm[index, index] = 1.0
            continue
    return tpm

def create_tpms(statelist):
    tpms = []
    inner_counter = 0
    for idx, action_position in enumerate(action_positions):
        print(f'{idx + 1} / {len(action_positions)} tpms... ', end='')
        tpm = np.zeros((len(statelist), len(statelist)), dtype=np.float16)
        for x, state0 in enumerate(statelist):
            for y, state1 in enumerate(statelist):
                valid_succ = states_are_valid_successors(state0, state1, action_position)
                if valid_succ:
                    inner_counter += 1
                    action = get_state_action(state1)
                    tpm[x, y] = training_action_dist[action]
                    if inner_counter == 5:
                        inner_counter = 0
                        continue
        fix_tpm_empty_row(tpm)
        tpms.append(tpm)
        print('ok')
    return tpms

In [9]:
def generate_reward_matrix(statelist):
    rm = np.zeros((len(statelist), len(action_positions)), dtype=np.float16)
    for idx, action_position in enumerate(action_positions):
        print(f'{idx + 1} / {len(action_positions)} rm columns... ', end='')
        for ind0, state0 in enumerate(statelist):
            for ind1, state1 in enumerate(statelist):
                if state1 == state0:
                    continue
                valid_succ = states_are_valid_successors(state0, state1, action_position)
                if valid_succ:
                    rm[ind0, action_position] = reward_array[distance_between_successor_states(state0, state1)-1]
        for ind0, state0 in enumerate(statelist):
            # punish store when non-empty
            if get_state_action(state0) in [0, 1, 2] and state0[action_position] != 0:
                rm[ind0, action_position] = -100
                continue
            # punish restore when empty
            if get_state_action(state0) == 3 and state0[action_position] != 1:
                rm[ind0, action_position] = -100
                continue
            elif get_state_action(state0) == 4 and state0[action_position] != 2:
                rm[ind0, action_position] = -100
                continue
            elif get_state_action(state0) == 5 and state0[action_position] != 3:
                rm[ind0, action_position] = -100
                continue
        print('ok')
    return rm

In [10]:
import mdptoolbox

states = generate_statelist()
tpms = create_tpms(statelist)
rm = generate_reward_matrix(statelist)

generated 24576 states
1 / 6 tpms... ok
2 / 6 tpms... ok
3 / 6 tpms... ok
4 / 6 tpms... ok
5 / 6 tpms... ok
6 / 6 tpms... ok
1 / 6 rm columns... ok
2 / 6 rm columns... ok
3 / 6 rm columns... ok
4 / 6 rm columns... ok
5 / 6 rm columns... ok
6 / 6 rm columns... ok


In [14]:
import numpy as np
np.save(f'{data_dir_3x2}/.~tpms_list.npy', tpms)
#tpms = np.load(f'{data_dir_3x2}/.~tpms_list.npy')
np.save(f'{data_dir_3x2}/.~rm.npy', rm)
#rm = np.load(f'{data_dir_3x2}/.~rm.npy')


In [15]:
import mdptoolbox
mdp_result_policy = mdptoolbox.mdp.PolicyIteration(tpms, rm, 0.999, max_iter=1000)

In [19]:
import mdptoolbox
mdp_result_value = mdptoolbox.mdp.ValueIteration(tpms, rm, 0.95, max_iter=1000)

In [16]:
mdp_result_policy.run()
print('PolicyIteration: done')
#print(mdp_result_policy.policy)
#print(mdp_result_policy.V)
#print(mdp_result_policy.iter)

PolicyIteration: done


In [20]:
mdp_result_value.run()
print('ValueIteration: done')
#print(mdp_result_value.policy)
#print(mdp_result_value.V)
#print(mdp_result_value.iter)

ValueIteration: done


## Evaluation

For the 3x2 storage layout, both MDP solution (PolicyIteration: discount=0.999, ValueIteration: discount=0.95) perform worse for the training data set, but outperform the greedy algo when running on the order data set

### Greedy

The greedy implementation walks 26144 steps on the training set and 130 on the order set.

In [22]:
import pandas as pd
import numpy as np
from GreedyAlgo import GreedyAlgo

data_dir_3x2 = 'data_3x2'
file_train_3x2 = 'warehousetraining3x2.txt'
file_order_3x2 = 'warehouseorder3x2.txt'
csv_data_file_train = f'{data_dir_3x2}\\{file_train_3x2}'
csv_data_file_order = f'{data_dir_3x2}\\{file_order_3x2}'

df_3x2_train = pd.read_csv(csv_data_file_train, sep='\t', header=None, names=["operation_type", "color"])
df_3x2_order = pd.read_csv(csv_data_file_order, sep='\t', header=None, names=["operation_type", "color"])

actions_3x2_train = list(df_3x2_train.itertuples(index=False))
actions_3x2_order = list(df_3x2_order.itertuples(index=False))

greedy_algo = GreedyAlgo(6)
for action in actions_3x2_train:
    greedy_algo.run_next_action(action)
print(f'greedy_algo.distance_walked:\t{greedy_algo.distance_walked}')

greedy_algo = GreedyAlgo(6)
for action in actions_3x2_order:
    greedy_algo.run_next_action(action)
print(f'greedy_algo.distance_walked:\t{greedy_algo.distance_walked}')

greedy_algo.distance_walked:	26144
greedy_algo.distance_walked:	130


## PolicyIteration

The policy from PolicyIteration walks 26456 steps on the training set and 126 on the order set (discount=0.999). 

In [18]:
# mdp_result_policy
import pandas as pd
import numpy as np
from AIAlgo import AIAlgo

ai_algo = AIAlgo(mdp_result_policy.policy, statelist, 6)
for action in actions_3x2_train:
    ai_algo.run_next_action(action)
print(f'ai_algo.distance_walked:\t{ai_algo.distance_walked}')

ai_algo = AIAlgo(mdp_result_policy.policy, statelist, 6)
for action in actions_3x2_order:
    ai_algo.run_next_action(action)
print(f'ai_algo.distance_walked:\t{ai_algo.distance_walked}')

ai_algo.distance_walked:	26456
ai_algo.distance_walked:	126


## ValueIteration

The policy from ValueIteration walks 26422 steps on the training set and 126 on the order set (discount=0.95).

In [21]:
# mdp_result_value
import pandas as pd
import numpy as np
from AIAlgo import AIAlgo

ai_algo = AIAlgo(mdp_result_value.policy, statelist, 6)
for action in actions_3x2_train:
    ai_algo.run_next_action(action)
print(f'ai_algo.distance_walked:\t{ai_algo.distance_walked}')

ai_algo = AIAlgo(mdp_result_value.policy, statelist, 6)
for action in actions_3x2_order:
    ai_algo.run_next_action(action)
print(f'ai_algo.distance_walked:\t{ai_algo.distance_walked}')

ai_algo.distance_walked:	26422
ai_algo.distance_walked:	126
