In [None]:
import numpy as np

# cleaning robot example
class MopBot:
  def __init__(self,
               layout = np.array([[0, 0, 0, 0, 0]]),
               prob_spill = 0.1):
    # elements of the map are:
    # 0: unobstructed cell
    # 1: obstructed cell
    # 5: unobstructed cell with spill (5 looks like S)
    # 6: unobstructed cell with agent (think Baymax of Big Hero 6)
    self.layout = layout
    self.map = layout.copy()
    self.map[0,0] = 6
    self.prob_spill = prob_spill
  
  def step(self, action):
    # determine current and target location
    location = np.where(self.map == 6)
    location = [int(location[0]),int(location[1])]
    target = location.copy()
    if action == 'left':
      target[1] -= 1
    elif action == 'right':
      target[1] += 1
    elif action == 'up':
      target[0] -= 1
    elif action == 'down':
      target[0] += 1

    # move agent if unobstructed
    height,width = np.shape(self.map)
    if (0 <= target[0] < height) and (0 <= target[1] < width) and (self.map[target[0],target[1]] != 1):
      self.map[location[0],location[1]] = 0
      self.map[target[0],target[1]] = 6

    # new spill
    spills = np.random.binomial(1, self.prob_spill, (height, width))
    self.map += (self.map == 0) * spills * 5

    observation = self.map
    return observation

  def map_to_state(self, map):
    # state = concatenate([row, col], spill_configuration_array)
    row,col = np.where(map == 6)
    spills = (map == 5).flatten()
    state = np.concatenate((row, col, spills))
    return tuple(state)

  def state_to_map(self, state):
    location = state[:2]
    spills = np.array(state[2:])
    map = self.layout.copy()
    map[location[0],location[1]] = 6
    spill_map = spills.reshape(map.shape)
    map += (map == 0) * spill_map * 5
    return map

  def state_value_table(self):
    # allocate table with one value per state, initialized to zero
    height,width = np.shape(self.map)
    table = np.zeros([height,width] + height*width*[2])
    return table

  def state_list(self):
    height,width = np.shape(self.map)
    spill_list = [[]]
    for idx in range(height*width):
      spill_list = [s + [0] for s in spill_list] + [s + [1] for s in spill_list]
    return [tuple(np.concatenate(([row], [col], spills)))
            for row in range(height) 
            for col in range(width)
            for spills in spill_list]

  def transition_probs(self, state, action):
    # compute state transition probabilities
    # transition probs is an array with one probability per state
    map = self.state_to_map(state)
    location = np.where(map == 6)
    location = [int(location[0]),int(location[1])]

    # move agent
    target = location.copy()
    if action == 'left':
      target[1] -= 1
    elif action == 'right':
      target[1] += 1
    elif action == 'up':
      target[0] -= 1
    elif action == 'down':
      target[0] += 1
    # move agent if unobstructed
    height,width = np.shape(map)
    if (0 <= target[0] < height) and (0 <= target[1] < width) and (map[target[0],target[1]] != 1):
      map[location[0],location[1]] = 0
      map[target[0],target[1]] = 6

    transition_probs = self.state_value_table()

    # iterate through possible spill configurations
    spill_list = [np.array([], dtype='int64')]
    for idx in range(height*width):
      spill_list = [np.concatenate((s,[0])) for s in spill_list] + [np.concatenate((s,[1])) for s in spill_list]
    for spills in spill_list:
      spill_map = spills.reshape(map.shape)
      new_map = map.copy()
      new_map += (new_map == 0) * spill_map * 5
      new_state = self.map_to_state(new_map)
      print(spills)
      transition_probs[new_state] += np.power(self.prob_spill, np.sum(spills)) * np.power(1-self.prob_spill, np.sum(spills==0))      
    return transition_probs


def reward(map):
  return -np.sum(map==5)


In [None]:
## code for planning via value iteration in the simple default case of a single hallway
mb = MopBot()

# compute optimal action value function
action_list = ['left','right','stay'] # note that these actions are specialized to the default case of a single hallway
Q = {a:mb.state_value_table() for a in action_list}
for iter in range(100):
  Q_update = {a:mb.state_value_table() for a in action_list}
  for a in action_list:
    for state in mb.state_list():
      probs = mb.transition_probs(state, a)
      Q_max = np.maximum.reduce([Q[b] for b in action_list])
      Q_update[a][state] = reward(mb.state_to_map(state)) + np.sum(np.multiply(probs, Q_max))
  Q = Q_update

# print map of optimal actions at time 0, with no spills
policy = [None, None, None, None, None] # note that the dimensions are specialized to the default case of a single hallway
for location in range(len(policy)):
  map = mb.layout.copy()
  map[0,location] = 6
  state = mb.map_to_state(map)
  policy[location] = action_list[np.argmax([Q[a][state] for a in action_list])]
print(policy)

# print map of optimal actions at time 0, with one spill at the left wall
policy = [None, None, None, None, None] # note that the dimensions are specialized to the default case of a single hallway
for location in range(len(policy)):
  map = mb.layout.copy()
  map[0,0] = 5
  map[0,location] = 6
  state = mb.map_to_state(map)
  policy[location] = action_list[np.argmax([Q[a][state] for a in action_list])]
print(policy)



['right', 'right', 'stay', 'left', 'left']
['right', 'left', 'left', 'left', 'left']


In [None]:
## code for planning via Q-learning in the simple default case of a single hallway
stepsize = 0.01
discount = 0.99

mb = MopBot()
action_list = ['left','right','stay'] # note that these actions are specialized to the default case of a single hallway
Q = {a:mb.state_value_table() for a in action_list}
state = mb.map_to_state(mb.map)
for iter in range(10000000):
  action = action_list[np.argmax([Q[a][state] for a in action_list])]
  observation = mb.step(action)
  next_state = mb.map_to_state(observation)
  Q_max = np.maximum.reduce([Q[b] for b in action_list])
  Q[action][state] += stepsize * (reward(observation) + discount*Q_max[next_state] - Q[action][state])
  state = next_state

# print map of optimal actions at time 0, with no spills
policy = [None, None, None, None, None] # note that the dimensions are specialized to the default case of a single hallway
for location in range(len(policy)):
  map = mb.layout.copy()
  map[0,location] = 6
  state = mb.map_to_state(map)
  policy[location] = action_list[np.argmax([Q[a][state] for a in action_list])]
print(policy)

# print map of optimal actions at time 0, with one spill at the left wall
policy = [None, None, None, None, None] # note that the dimensions are specialized to the default case of a single hallway
for location in range(len(policy)):
  map = mb.layout.copy()
  map[0,0] = 5
  map[0,location] = 6
  state = mb.map_to_state(map)
  policy[location] = action_list[np.argmax([Q[a][state] for a in action_list])]
print(policy)


['right', 'right', 'stay', 'left', 'left']
['right', 'left', 'left', 'left', 'left']
