# SAKI Exercise 4: Smart Factory

Import relevant libraries and set warehouse properties and encodings for colors and actions.

In [1]:
import mdptoolbox
import numpy as np
import itertools

color_to_index = {
    "red": 0,
    "blue": 1,
    "white": 2,
    "empty": 3
}

index_to_color = ["red", "blue", "white", "empty"]

actions_to_index = {
    "store": 0,
    "restore": 1
}

index_to_action = ["store", "restore"]

probability_distribution = {
    "red": 1/3,
    "blue": 1/3,
    "white": 1/3
}

# Assert probabilities sum up to 1
assert round(sum(probability_distribution.values())) == 1, "Probabilities of colors must sum up to 1!"

Define a class for a virtual representation of a warehouse.

In [2]:
class Warehouse(object):
    """
    Represents warehouse and handles optimizing.
    """
    def __init__(self, layout, possible_fillings, actions, possible_next_colors):
        """
        Constructor method for warehouse class.
        """
        self.layout = layout
        self.possible_fillings = possible_fillings
        self.actions = actions
        self.possible_next_colors = possible_next_colors
        
        self.state_matrix = self.generate_state_matrix()
        self.transition_matrix = None
        self.reward_matrix = None
        self.policy = None
        
        # Hyperparameter
        self.reward_parameter = 10
        
    def generate_state_matrix(self):
        """
        Generates a state matrix for the warehouse properties given.
        """

        # Multiply warehouse rows and columns to get number of fields
        fields = self.layout[0] * self.layout[1]

        # Get ranges for warehouse properties
        possible_fillings = range(self.possible_fillings)
        possible_actions = range(self.actions)
        possible_next_colors = range(self.possible_next_colors)

        # Get cartesian product of fields and action and next_color
        states_product = itertools.product(itertools.product(possible_fillings, repeat=fields),
                                           itertools.product(possible_actions, possible_next_colors))

        # Put everything together and return state matrix as numpy array
        return np.array([field_state + action_color_state for field_state, action_color_state in states_product])
    
    def generate_transition_matrix(self):
        """
        Generate transition probability matrix for the states and the warehouse layout given.
        """

        def normalize(field):
            """
            Helper function to normalize rows to make sum of probabilities = 1
            """

            row_sum = np.sum(transition_matrix[field, state_1, :])

            if row_sum > 0:
                transition_matrix[field, state_1, :] /= row_sum
            # If there is no 1, we need to make the diagonal value to 1 to create a stochastic matrix in the end
            else:
                transition_matrix[field, state_1, state_1] = 1

        # Multiply warehouse rows and columns to get number of fields
        fields = self.layout[0] * self.layout[1]

        # Get number of states
        number_states = self.state_matrix.shape[0]

        # Initialize transition matrix with 0s
        # Get fields + 1 as shape for 3rd dimension to have start/stop position outside of warehouse
        transition_matrix = np.zeros((fields + 1, 
                                      number_states,  # Matrix should have shape states x states
                                      number_states),
                                    dtype=np.float16)

        # Write to transition matrix
        for state_1 in range(number_states):
            for field in range(fields):
                for state_2 in range(number_states):
                    state_1_array = self.state_matrix[state_1]
                    state_2_array = self.state_matrix[state_2]

                    if index_to_action[state_1_array[fields]] == "store":
                        # If field in state_1 is empty and next color of state_1 and color in field of state_2 match
                        if state_1_array[field] == color_to_index["empty"] and state_2_array[field] == state_1_array[fields + 1]:
                            # Make transition possible
                            color_prob = probability_distribution[index_to_color[state_1_array[fields + 1]]]
                            transition_matrix[field, state_1, state_2] = color_prob
                        else:
                            # If it's not possible, stay in start/stop position
                            transition_matrix[fields, state_1, state_2] = 1
                    else: # Restore instruction
                        # If color in field in state_1 is the next to restore and this field is empty in state_2
                        if state_1_array[field] == state_1_array[fields + 1] and state_2_array[field] == color_to_index["empty"]:
                            # Make transition possible
                            color_prob = probability_distribution[index_to_color[state_1_array[fields + 1]]]
                            transition_matrix[field, state_1, state_2] = color_prob
                        else:
                            # If it's not possible, stay in start/stop position
                            transition_matrix[fields, state_1, state_2] = 1

                # Normalize row for field
                normalize(field)

            # Normalize start/stop position
            # Depends on state_1
            normalize(fields)
        
        self.transition_matrix = transition_matrix
        
        return transition_matrix
    
    def generate_reward_matrix(self, transition_matrix=None):
        """
        Generate reward matrix for the transition matrix and the warehouse layout given.
        """
        
        if self.transition_matrix is None and transition_matrix is None:
            raise ValueError("No transition matrix found. Please run generate_transition_matrix() or pass one.")
        elif self.transition_matrix is not None:
            transition_matrix = self.transition_matrix

        # Get Manhattan distance of warehouse
        distances = np.ones(self.layout)
        distances += np.arange(self.layout[1])
        distances = (distances.T + np.arange(self.layout[0])).T

        distances = distances.flatten()

        # Add distance for start/stop position
        # This parameter can be tuned
        distances = np.append(distances, 7)
        
        # Iterate through fields-dimension of transition_matrix
        # Use list because appending to a list is more efficient
        rewards = []
        for field_index, states_matrix in enumerate(transition_matrix):
            # Init with 0s to account for actions without reward
            rewards_for_field = np.zeros(states_matrix.shape, dtype=np.float16)
            reward = self.reward_parameter - distances[field_index]
            # Write reward to each index where probability is higher than 0 but less than 1
            rewards_for_field[np.where(np.logical_and(states_matrix > 0, states_matrix < 1))] = reward
            
            # Give more reward for moving less if transition is more likely than expected and vice versa
            # This parameters can be tuned
            if field_index == 0:
                rewards_for_field[np.where(np.logical_and(states_matrix > 0, states_matrix >= 1/3, states_matrix < 1))] += 2
                rewards_for_field[np.where(np.logical_and(states_matrix > 0, states_matrix < 1/3, states_matrix < 1))] -= 1
            elif field_index == 1 or field_index == 2:
                rewards_for_field[np.where(np.logical_and(states_matrix > 0, states_matrix >= 1/3, states_matrix < 1))] -= 1
                rewards_for_field[np.where(np.logical_and(states_matrix > 0, states_matrix < 1/3, states_matrix < 1))] += 1             
            elif field_index == 3:
                rewards_for_field[np.where(np.logical_and(states_matrix > 0, states_matrix >= 1/3, states_matrix < 1))] -= 2
                rewards_for_field[np.where(np.logical_and(states_matrix > 0, states_matrix < 1/3, states_matrix < 1))] += 2
                
            rewards.append(rewards_for_field)

        rewards = np.array(rewards)
        self.reward_matrix = rewards
        
        return rewards
    
    def get_optimal_policy(self, discount, verbose=False):
        """
        Run Value Iteration to find optimal policy. 
        """
        
        if self.transition_matrix is None:
            self.generate_transition_matrix()
        if self.reward_matrix is None:
            self.generate_reward_matrix()
        
        vi = mdptoolbox.mdp.ValueIteration(transitions=self.transition_matrix, reward=self.reward_matrix, discount=discount)
        
        if verbose:
            vi.setVerbose()
        
        vi.run()
        
        self.policy = vi.policy
        
        return vi.policy
    
    def evaluate(self, test_file):
        """
        Run evaluation of optimal policy against a greedy strategy. Only works for a 2x2 matrix for now.
        """
        
        def get_next_shelf(state):
            """
            Helper function for greedy strategy.
            """
            filling = 3 if state[4] == 0 else state[5]
            for i in range(len(state) - 2):
                if state[i] == filling:
                    return i
            return 4
                
        # Set distances for evaluating
        distances = np.array([1,2,2,3,0])
        
        # Read test file with instructions
        test_instructions = []
        with open(test_file, "r") as file:
            for line in file:
                action, color = line.rstrip("\n").split("\t")

                test_instructions.append((actions_to_index[action], color_to_index[color]))
        
        # Init counter and states with empty shelves
        distance_moved_mdp = 0
        distance_moved_greedy = 0
        state_mdp = np.array((3,) * self.layout[0] * self.layout[1] + (0, 0))
        state_greedy = np.array((3,) * self.layout[0] * self.layout[1] + (0, 0))
        
        # Follow test instructions
        for instr in test_instructions:
            state_mdp[4] = instr[0] # Set instruction
            state_mdp[5] = instr[1] # Set color of item
            state_greedy[4] = instr[0] # Set instruction
            state_greedy[5] = instr[1] # Set color of item
            
            # Get move from policy
            next_move_mdp = self.policy[np.where((self.state_matrix == state_mdp).all(axis=1))[0][0]]
            # Get move for greedy
            next_move_greedy = get_next_shelf(state_greedy)
            if instr[0] == 0: # store
                state_mdp[next_move_mdp] = instr[1] if next_move_mdp != 4 else state_mdp[next_move_mdp]
                state_greedy[next_move_greedy] = instr[1] if next_move_greedy != 4 else state_greedy[next_move_greedy]
            else: # restore
                state_mdp[next_move_mdp] = color_to_index["empty"] if next_move_mdp != 4 else state_mdp[next_move_mdp]
                state_greedy[next_move_greedy] = color_to_index["empty"] if next_move_greedy != 4 else state_greedy[next_move_greedy]
            
            # Add up distance moved
            distance_moved_mdp += distances[next_move_mdp]
            distance_moved_greedy += distances[next_move_greedy]

        print("Total Manhattan distance moved using optimal policy: " + str(distance_moved_mdp))
        print("Total Manhattan distance moved using greedy strategy: " + str(distance_moved_greedy))

Initialize warehouse with properties.

In [3]:
my_warehouse = Warehouse(layout=(2,2), possible_fillings=4, actions=2, possible_next_colors=3)

Get transition matrices.

In [4]:
my_warehouse.generate_transition_matrix()

array([[[1.00e+00, 0.00e+00, 0.00e+00, ..., 0.00e+00, 0.00e+00,
         0.00e+00],
        [0.00e+00, 1.00e+00, 0.00e+00, ..., 0.00e+00, 0.00e+00,
         0.00e+00],
        [0.00e+00, 0.00e+00, 1.00e+00, ..., 0.00e+00, 0.00e+00,
         0.00e+00],
        ...,
        [0.00e+00, 0.00e+00, 0.00e+00, ..., 1.00e+00, 0.00e+00,
         0.00e+00],
        [0.00e+00, 0.00e+00, 0.00e+00, ..., 0.00e+00, 1.00e+00,
         0.00e+00],
        [0.00e+00, 0.00e+00, 0.00e+00, ..., 0.00e+00, 0.00e+00,
         1.00e+00]],

       [[1.00e+00, 0.00e+00, 0.00e+00, ..., 0.00e+00, 0.00e+00,
         0.00e+00],
        [0.00e+00, 1.00e+00, 0.00e+00, ..., 0.00e+00, 0.00e+00,
         0.00e+00],
        [0.00e+00, 0.00e+00, 1.00e+00, ..., 0.00e+00, 0.00e+00,
         0.00e+00],
        ...,
        [0.00e+00, 0.00e+00, 0.00e+00, ..., 1.00e+00, 0.00e+00,
         0.00e+00],
        [0.00e+00, 0.00e+00, 0.00e+00, ..., 0.00e+00, 1.00e+00,
         0.00e+00],
        [0.00e+00, 0.00e+00, 0.00e+00, ..., 0.00

Get reward matrices.

In [5]:
my_warehouse.generate_reward_matrix()

array([[[ 2.,  0.,  0., ...,  0.,  0.,  0.],
        [ 0.,  2.,  0., ...,  0.,  0.,  0.],
        [ 0.,  0.,  2., ...,  0.,  0.,  0.],
        ...,
        [ 0.,  0.,  0., ...,  2.,  0.,  0.],
        [ 0.,  0.,  0., ...,  0.,  2.,  0.],
        [ 0.,  0.,  0., ...,  0.,  0.,  2.]],

       [[-1.,  0.,  0., ...,  0.,  0.,  0.],
        [ 0., -1.,  0., ...,  0.,  0.,  0.],
        [ 0.,  0., -1., ...,  0.,  0.,  0.],
        ...,
        [ 0.,  0.,  0., ..., -1.,  0.,  0.],
        [ 0.,  0.,  0., ...,  0., -1.,  0.],
        [ 0.,  0.,  0., ...,  0.,  0., -1.]],

       [[-1.,  0.,  0., ...,  0.,  0.,  0.],
        [ 0., -1.,  0., ...,  0.,  0.,  0.],
        [ 0.,  0., -1., ...,  0.,  0.,  0.],
        ...,
        [ 0.,  0.,  0., ..., -1.,  0.,  0.],
        [ 0.,  0.,  0., ...,  0., -1.,  0.],
        [ 0.,  0.,  0., ...,  0.,  0., -1.]],

       [[-2.,  0.,  0., ...,  0.,  0.,  0.],
        [ 0., -2.,  0., ...,  0.,  0.,  0.],
        [ 0.,  0., -2., ...,  0.,  0.,  0.],
        ..

Find optimal policy.

In [7]:
np.array(my_warehouse.get_optimal_policy(discount=0.9, verbose=True))

  Iteration		V-variation
    1		  6.0
    2		  0.8013668060302734
    3		  0.08138541736752813
    4		  0.008398637719601254
    5		  0.0008824624667838066
Iterating stopped, epsilon-optimal policy found.


array([4, 4, 4, ..., 4, 4, 4])

Evaluate it against a greedy strategy.

In [8]:
my_warehouse.evaluate("test.txt")

Total Manhattan distance moved using optimal policy: 135
Total Manhattan distance moved using greedy strategy: 114
