In [0]:
import matplotlib.pyplot as plt
import networkx as nx
import numpy as np
import random

rank = 10

# Define Scenario


In [0]:
def generateOffice(rank):
  G = nx.Graph()

  #food truck
  target = rank ** 2
  G.add_node(target)

  # open space desks
  for i in range(rank):
    for j in range(rank):
      n = i * rank + j
      G.add_node(n)
      if i > 0: G.add_edge(n - rank, n)
      if j > 0: G.add_edge(n - 1, n)
      if i == rank - 1: G.add_edge(n, target)
  
  return G, target

In [0]:
def getSource(rank):
  return random.choice(range(rank))

def getOccupiedDesks(G, source, target, p):
  available_desks = set(G.nodes).difference(set([source, target]))
  return random.sample(available_desks, int(round(p * len(available_desks))))

def removeOccupiedDesks(G, source, target, p):
  occupied_desks = getOccupiedDesks(G, source, target, p)
  G.remove_nodes_from(occupied_desks)

def generateEnvironment(G, target, p):
  # todo initialise random seed
  source = getSource(rank)
  removeOccupiedDesks(G, source, target, p)
  return source

# Deep Reinforcement Learning Setup

In [0]:
# actions - cardinal directions clockwise from 12 as per problem statement
actions = range(4)

# env = (graph, target)

# next state
# todo rank is global, should be in env
def move_options(env, state):
  nbrs = set(env[0].neighbors(state))
  options = [0] * 4
  options[0] = int(env[1] in nbrs or (state + rank) in nbrs)
  options[1] = int((state + 1) in nbrs)
  options[2] = int((state - rank) in nbrs)
  options[3] = int((state - 1) in nbrs)
  return options

def move_action(env, state, action):
  next_state = state
  if action == 0:
    if state // rank == rank - 1:
      next_state = env[1]
    else:
      next_state = state + rank
  if action == 1:
    next_state = state + 1
  if action == 2:
    next_state = state - rank
  if action == 3:
    next_state = state - 1

  return next_state

# decided to not move anywhere if you try to move through block
# there are alternative ways to do this (eg, more semantic actions, fail)
def next_state(env, state, action):
  next_state = state
  if move_options(env, state)[action]:
    next_state = move_action(env, state, action)

  return next_state

def reward(env, state, action=None, next_state=None):
  r = 0
  if next_state == env[1]:
    r = 1
  if next_state == state:
    r = -0.1

  return r

# DRL Training

In [5]:
# scaffolding for training a simple NN to move forward
import tensorflow as tf
from tensorflow import keras
from keras.utils import plot_model
import random

replay_memory = []

q_model = keras.models.Sequential()
q_model.add(keras.layers.InputLayer(batch_input_shape=(1,1)))
q_model.add(keras.layers.Dense(4,
                               activation='relu',
                               kernel_initializer='random_uniform',
                               bias_initializer='zeros'))
q_model.compile(loss='mse', optimizer='adam', metrics=['mae'])

qn_model = keras.models.clone_model(q_model)
minibatch_size = 20

eps = 0.2
gamma = 0.3
copy_steps = 5

episodes = 20
steps = 20
for n in range(episodes):
  env = generateOffice(rank)
  source = generateEnvironment(env[0], env[1], 0)
  state = source
  for t in range(steps):
    action = random.choice(actions)
    if random.random() > eps:
      options = q_model.predict([state])
      action = np.argmax(options[0])
    
    sn = next_state(env, state, action) 
    r = reward(env, state, next_state=sn)
    is_dead = (t == steps - 1)
    replay_memory.append((state, action, r, sn, is_dead))

    # sample minibatches to estimate y
    batch_size = min(minibatch_size, len(replay_memory))
    batch = random.sample(replay_memory, batch_size)
    
    # get next states and predict
    batch_nx = np.array([m[3] for m in batch])
    batch_nx.reshape(batch_size,) #[[s1],[s2],...,[sn]]?
    qn_options = qn_model.predict(batch_nx)
    y = np.zeros((batch_size,)) 
    for i in range(batch_size):
      if batch[i][4]:
        y[i] = -1
      else:
        y[i] = r + gamma * np.amax(qn_options[i])

    # need to convert to one-hot
    batch_actions = np.array([m[1] for m in batch])
    oh_actions = np.eye(4)[batch_actions]
    oh_targets = oh_actions * y[:,None]

    # perform fit/optimisation for q_model with x & y
    q_model.fit(batch_nx[:,None], oh_targets)

    # occasionally reset qn = q
    if (t + 1) % copy_steps == 0:
      qn_model = keras.models.clone_model(q_model)

    # finally
    state = sn
    if state == env[1]:
      break


Using TensorFlow backend.




# Now Try the Model

In [0]:
def action_model(env, state):
  options = q_model.predict([state])
  return np.argmax(options[0])

In [7]:
# evaluation loop

env = generateOffice(rank)
source = generateEnvironment(env[0], env[1], 0)
s = source

path = [s]
for t in range(steps):
  a = action_model(env, s)
  s = next_state(env, s, a) 
  path.append(s)
  if s == env[1]:
    break

print("Success? {} - path {}".format(s == env[1], path))


Success? True - path [9, 19, 29, 39, 49, 59, 69, 79, 89, 99, 100]
