# Final Programming Assignment

## Problem Definition

In [1]:
x = 6

In [2]:
import numpy as np
import itertools

def get_states(n, police, medics):
  '''return all possible states of size n*n'''
  states = []
  symbols = ['S', 'I', 'H', 'Q0', 'Q1']
  if police == 0:
    symbols.remove('Q1')
    symbols.remove('Q0')
  if medics == 0:
    symbols.remove('I')

  for permutation in itertools.product(symbols, repeat=n ** 2):
    permutation = np.array(permutation, dtype='U25')
    s = np.reshape(permutation, [n, n])
    states.append(to_tuple(s))
  return states

In [3]:
def collect_locations(state, symbol):
  '''given a state and a symbol, return a set of all coordinates in state with symbol'''
  res = set()
  for i in range(len(state)):
    for j in range(len(state[0])):
      if state[i][j] == symbol:
        res.add((i, j))
  return res

def get_possible_actions(state, police, medics):
  '''given a state and number of police and medics teams, return all possible actions from state'''
  sick_set = collect_locations(state, 'S')
  healthy_set = collect_locations(state, 'H')
  coordinate_dict = {
      'S': sick_set,
      'H': healthy_set
  }

  medics_comb = []
  police_comb = []
  '''
  In optimal policy, vaccinate is always at least as good as doing nothing.
  Therefore one can assume that maximum number of medics teams are activated in each given state.
  (This is NOT true for police teams, due to the quarantined locations penalty...)
  '''
  # for i in range(min(medics, len(coordinate_dict['H'])) + 1):
  medics_comb = list(itertools.combinations(coordinate_dict['H'], min(medics, len(coordinate_dict['H']))))
  for j in range(min(police, len(coordinate_dict['S'])) + 1):
    police_comb += list(itertools.combinations(coordinate_dict['S'], j))

  actions = []
  for m in medics_comb:
      for p in police_comb:
          medic_coords = list(m)
          police_coords = list(p)
          action = []
          for medic_coord in medic_coords:
              action.append(('vaccinate', medic_coord))
          for police_coord in police_coords:
              action.append(('quarantine', police_coord))
          actions.append(tuple(action))

  return actions

def get_all_actions(states, police, medics):
  '''return a dictionary which maps each state to all possible actions (given the number of police and medics teams)'''
  res = {}
  for s in states:
    res[to_tuple(s)] = get_possible_actions(s, police, medics)
  return res

In [4]:
from copy import deepcopy

def to_tuple(state_as_list):
  '''convert a 2d list into a 2d tuple'''
  state_as_tuple = []
  for row in state_as_list:
    state_as_tuple.append(tuple(row))
  return tuple(state_as_tuple)

def to_list(state_as_tuple):
  '''convert a 2d tuple into a 2d list'''
  state_as_list = []
  for row in state_as_tuple:
    state_as_list.append(list(row))
  return state_as_list

def end_of_quarantine(state, just_quarantined):
  '''update Q0 -> Q1, Q1 -> H, excluding location which turned Q0 in current round (just_quarantined)'''
  new_state = to_list(deepcopy(state))
  for i in range(len(state)):
    for j in range(len(state[0])):
      if (i, j) in just_quarantined:
        continue
      if state[i][j] == 'Q0':
        new_state[i][j] = 'Q1'
      elif state[i][j] == 'Q1':
        new_state[i][j] = 'H'
  return to_tuple(new_state)

def powerset(iterable):
    '''powerset([1,2,3]) --> () (1,) (2,) (3,) (1,2) (1,3) (2,3) (1,2,3). taken from HW utils.py file (changed to include the empty set)'''
    s = list(iterable)
    return list(itertools.chain.from_iterable(itertools.combinations(s, r) for r in range(len(s)+1)))

def take_atomic_action(state, a):
  '''given a state and an atomic action, return a new state reached from applying the atomic action on the given state'''
  operation, i, j = a[0], a[1][0], a[1][1]
  new_state = to_list(deepcopy(state))
  if operation == 'vaccinate' and state[i][j] == 'H':
    new_state[i][j] = 'I'
  if operation == 'quarantine' and state[i][j] == 'S':
    new_state[i][j] = 'Q0'
  return new_state

def count_S_neighbors(state, loc):
  '''count the number of loc's sick neighbors in state''' 
  count, i, j = 0, loc[0], loc[1]
  for a, b in [(i+1, j), (i, j+1), (i-1, j), (i, j-1)]:
    if 0 <= a < len(state) and 0 <= b < len(state[0]):
      if state[a][b] == 'S':
        count += 1
  return count

def infection_probability(state, loc, x):
  '''given a state, a (healthy) location and parameter x, return the probability of loc to be infected'''
  n_S_neighbors = count_S_neighbors(state, loc)
  if n_S_neighbors == 0:
    return 0
  elif n_S_neighbors == 1:
    return 0.1 + 0.01*x
  elif n_S_neighbors == 2:
    return 0.3 + 0.01*x
  elif n_S_neighbors == 3:
    return 0.7 + 0.01*x
  elif n_S_neighbors == 4:
    return 0.9 + 0.01*x

def transition_function(state, action, resulted_state, x):
  '''return the probability of reaching from state to resulted_state after applying action (given parameter x)'''
  probs_dict = transition_function_aux(state, action, x)
  if to_tuple(resulted_state) in probs_dict.keys():
    return probs_dict[resulted_state]
  return 0.0

def transition_function_aux(original_state, action, x):
  '''given a state and an action, return all possible outcome states with their probabilities'''
  possible_states = {} # state: (probability, just_got_infected - locations set)
  just_quarantined = set()

  # 1. take action
  state = to_list(deepcopy(original_state))
  for atomic_action in action:
    operation, i, j = atomic_action[0], atomic_action[1][0], atomic_action[1][1]
    if operation == 'quarantine':
      just_quarantined.add((i, j))
    state = take_atomic_action(state, atomic_action)
    
  # 2. stochastic spread
  healthy_set = collect_locations(state, 'H')
  healthy_set_with_probs = [(loc, infection_probability(state, loc, x)) for loc in healthy_set] # set of (H_loc, infection_prob) pairs (infection_probability should return this pair!)
  for to_infect in powerset(healthy_set_with_probs):
    new_state = deepcopy(state)
    new_prob = 1
    new_just_got_infected = set()
    for ((i, j), p) in healthy_set_with_probs:
      if ((i, j), p) in to_infect:
        new_state[i][j] = 'S'
        new_prob *= p
        new_just_got_infected.add((i, j))
      else:
        new_prob *= (1-p)
    possible_states[to_tuple(new_state)] = (new_prob, new_just_got_infected)

  # 3. stochastic heal
  possible_states_2 = {s: p for (s, (p, _)) in possible_states.items()}
  for s, (p, just_got_infected) in possible_states.items():
    q = 0.3 + 0.01*x # healing prob.
    candidates_for_healing = collect_locations(s, 'S') - just_got_infected
    for to_heal in powerset(candidates_for_healing):
      new_state = to_list(deepcopy(s))
      new_prob = p
      for (i, j) in candidates_for_healing:
        if (i, j) in to_heal:
          new_state[i][j] = 'H'
          new_prob *= q
        else:
          new_prob *= (1-q)
      possible_states_2[to_tuple(new_state)] = new_prob
  possible_states = possible_states_2 # state: probability

  # 4. deterministic end-of-quarantine
  res = {}
  for s in possible_states.keys():
    if possible_states[s] > 0.0:
      res[end_of_quarantine(s, just_quarantined)] = possible_states[s]

  return res

In [5]:
def reward(state):
  '''return the reward earned from visiting state'''
  res = 0
  for i in range(len(state)):
    for j in range(len(state[0])):
      if state[i][j] == 'S':
        res -= 1
      elif state[i][j] == 'Q0' or state[i][j] == 'Q1':
        res -= 5
      elif state[i][j] == 'H' or state[i][j] == 'I':
        res += 1
  return res

## MDP Implementation

In [6]:
import numpy as np
from copy import deepcopy
from random import choice

class MDP:
    '''
    Implementation is highly inspired by the q-learning notebook provided on Moodle.
    '''

    def __init__(self, states, actions, P, R, x, gamma):
        self.gamma = gamma  # discount constant
        self.states = states
        self.actions = actions
        self.x = x
        self.P = P # transition function
        self.R = R # reward function
        self.V = {} # current est. V*
        self.optimal_policy = {}
        print('MDP initialized.')

    def value_iteration_finize_horizon(self, T=5):
      policy = {}
      old_V = {s: self.R(s) for s in self.states}
      i = 0
      while i < T:
        print(f'i = {i}')
        new_V = deepcopy(old_V)
        for s in self.states:
          max_a_exp = float("-inf") # sum ( P(s' | s , a) * V(s') )
          max_a = None # a which maximizing max_a_exp
          for a in self.actions[s]:
            exp = sum([p * old_V[s2] for s2, p in transition_function_aux(s, a, self.x).items()])
            if exp > max_a_exp:
              max_a_exp = exp
              max_a = a
          policy[(s, i)] = max_a
          new_V[s] = self.R(s) + max_a_exp
        old_V = deepcopy(new_V)
        self.V = deepcopy(new_V)
        i += 1
      self.optimal_policy = deepcopy(policy)

    def value_iteration(self, eps=1e-6): # seen in class: | v_k - v_(k-1) | <= eps ensures | v* - v_k | <= eps*gamma / (1-gamma)
      policy = {}
      old_V = {s: self.R(s) for s in self.states}
      i = 0
      while True:
        print(f'i = {i}')
        new_V = deepcopy(old_V)
        for s in self.states:
          max_a_exp = float("-inf") # sum ( P(s' | s , a) * V(s') )
          max_a = None # a which maximizing max_a_exp
          for a in self.actions[s]:
            exp = sum([p * old_V[s2] for s2, p in transition_function_aux(s, a, self.x).items()])
            if exp > max_a_exp:
              max_a_exp = exp
              max_a = a
          policy[s] = max_a
          new_V[s] = self.R(s) + self.gamma * max_a_exp
        diff = self.inf_norm(old_V, new_V)
        if (diff < eps):
          self.V = deepcopy(new_V)
          self.optimal_policy = deepcopy(policy)
          break
        old_V = deepcopy(new_V)
        self.V = deepcopy(new_V)
        i += 1
      self.optimal_policy = deepcopy(policy)

    def inf_norm(self, v1, v2):
      diff = float("-inf")
      for s in v1.keys():
        diff = max(diff, abs(v1[s] - v2[s]))
      return diff

## Solutions

In [7]:
# Q 1.1

state = [['S', 'H'], ['H', 'H']]
police = 0
medics = 0

states = get_states(n=2, police=police, medics=medics)
actions = get_all_actions(states=states, police=police, medics=medics)
P = transition_function
R = reward
x = x
gamma = 0.9 + 0.01*x

print('problem initialized.')

mdp = MDP(
    states=states, 
    actions=actions, 
    P=P, 
    R=R,
    x=x, 
    gamma=gamma
  )

mdp.value_iteration()
val = mdp.V[to_tuple(state)]
print(val)
print(mdp.optimal_policy[to_tuple(state)])

problem initialized.
MDP initialized.
i = 0
i = 1
i = 2
i = 3
i = 4
i = 5
i = 6
i = 7
i = 8
i = 9
i = 10
i = 11
i = 12
i = 13
i = 14
i = 15
i = 16
i = 17
i = 18
i = 19
i = 20
i = 21
i = 22
i = 23
i = 24
i = 25
i = 26
i = 27
i = 28
i = 29
i = 30
i = 31
i = 32
i = 33
i = 34
i = 35
i = 36
i = 37
i = 38
i = 39
i = 40
i = 41
i = 42
i = 43
i = 44
i = 45
i = 46
i = 47
i = 48
i = 49
i = 50
i = 51
i = 52
i = 53
i = 54
i = 55
i = 56
i = 57
i = 58
i = 59
i = 60
i = 61
i = 62
i = 63
i = 64
i = 65
i = 66
i = 67
i = 68
i = 69
i = 70
i = 71
i = 72
i = 73
i = 74
i = 75
i = 76
i = 77
i = 78
i = 79
i = 80
i = 81
i = 82
i = 83
i = 84
i = 85
i = 86
i = 87
i = 88
i = 89
i = 90
i = 91
i = 92
i = 93
i = 94
i = 95
i = 96
i = 97
i = 98
i = 99
i = 100
i = 101
i = 102
i = 103
i = 104
i = 105
i = 106
i = 107
i = 108
i = 109
i = 110
i = 111
i = 112
i = 113
i = 114
i = 115
i = 116
i = 117
i = 118
i = 119
i = 120
i = 121
i = 122
i = 123
i = 124
i = 125
i = 126
i = 127
i = 128
i = 129
i = 130
i = 131
i = 132
i = 133


In [8]:
# Q 1.2

state = [['S', 'H'], ['H', 'H']]
police = 1
medics = 0

states = get_states(n=2, police=police, medics=medics)
actions = get_all_actions(states=states, police=police, medics=medics)
P = transition_function
R = reward
x = x
gamma = 0.9 + 0.01*x

print('problem initialized.')

mdp = MDP(
    states=states, 
    actions=actions, 
    P=P, 
    R=R,
    x=x, 
    gamma=gamma
  )

mdp.value_iteration()
val = mdp.V[to_tuple(state)]
print(val)
print(mdp.optimal_policy[to_tuple(state)])

problem initialized.
MDP initialized.
i = 0
i = 1
i = 2
i = 3
i = 4
i = 5
i = 6
i = 7
i = 8
i = 9
i = 10
i = 11
i = 12
i = 13
i = 14
i = 15
i = 16
i = 17
i = 18
i = 19
i = 20
i = 21
i = 22
i = 23
i = 24
i = 25
i = 26
i = 27
i = 28
i = 29
i = 30
i = 31
i = 32
i = 33
i = 34
i = 35
i = 36
i = 37
i = 38
i = 39
i = 40
i = 41
i = 42
i = 43
i = 44
i = 45
i = 46
i = 47
i = 48
i = 49
i = 50
i = 51
i = 52
i = 53
i = 54
i = 55
i = 56
i = 57
i = 58
i = 59
i = 60
i = 61
i = 62
i = 63
i = 64
i = 65
i = 66
i = 67
i = 68
i = 69
i = 70
i = 71
i = 72
i = 73
i = 74
i = 75
i = 76
i = 77
i = 78
i = 79
i = 80
i = 81
i = 82
i = 83
i = 84
i = 85
i = 86
i = 87
i = 88
i = 89
i = 90
i = 91
i = 92
i = 93
i = 94
i = 95
i = 96
i = 97
i = 98
i = 99
i = 100
i = 101
i = 102
i = 103
i = 104
i = 105
i = 106
i = 107
i = 108
i = 109
i = 110
i = 111
i = 112
i = 113
i = 114
i = 115
i = 116
i = 117
i = 118
i = 119
i = 120
i = 121
i = 122
i = 123
i = 124
i = 125
i = 126
i = 127
i = 128
i = 129
i = 130
i = 131
i = 132
i = 133


In [9]:
# Q 1.3

state = [['S', 'H'], ['H', 'H']]
police = 0
medics = 2

states = get_states(n=2, police=police, medics=medics)
actions = get_all_actions(states=states, police=police, medics=medics)
P = transition_function
R = reward
x = x
gamma = 0.9 + 0.01*x

print('problem initialized.')

mdp = MDP(
    states=states, 
    actions=actions, 
    P=P, 
    R=R,
    x=x, 
    gamma=gamma
  )

mdp.value_iteration()
val = mdp.V[to_tuple(state)]
print(val)
print(mdp.optimal_policy[to_tuple(state)])

problem initialized.
MDP initialized.
i = 0
i = 1
i = 2
i = 3
i = 4
i = 5
i = 6
i = 7
i = 8
i = 9
i = 10
i = 11
i = 12
i = 13
i = 14
i = 15
i = 16
i = 17
i = 18
i = 19
i = 20
i = 21
i = 22
i = 23
i = 24
i = 25
i = 26
i = 27
i = 28
i = 29
i = 30
i = 31
i = 32
i = 33
i = 34
i = 35
i = 36
i = 37
i = 38
i = 39
i = 40
i = 41
i = 42
i = 43
i = 44
i = 45
i = 46
i = 47
i = 48
i = 49
i = 50
i = 51
i = 52
i = 53
i = 54
i = 55
i = 56
i = 57
i = 58
i = 59
i = 60
i = 61
i = 62
i = 63
i = 64
i = 65
i = 66
i = 67
i = 68
i = 69
i = 70
i = 71
i = 72
i = 73
i = 74
i = 75
i = 76
i = 77
i = 78
i = 79
i = 80
i = 81
i = 82
i = 83
i = 84
i = 85
i = 86
i = 87
i = 88
i = 89
i = 90
i = 91
i = 92
i = 93
i = 94
i = 95
i = 96
i = 97
i = 98
i = 99
i = 100
i = 101
i = 102
i = 103
i = 104
i = 105
i = 106
i = 107
i = 108
i = 109
i = 110
i = 111
i = 112
i = 113
i = 114
i = 115
i = 116
i = 117
i = 118
i = 119
i = 120
i = 121
i = 122
i = 123
i = 124
i = 125
i = 126
i = 127
i = 128
i = 129
i = 130
i = 131
i = 132
i = 133


In [10]:
# Q 2.1

state = [['H', 'H', 'H'], ['H', 'S', 'H'], ['H', 'H', 'H']]
police = 0
medics = 0

states = get_states(n=3, police=police, medics=medics)
actions = get_all_actions(states=states, police=police, medics=medics)
P = transition_function
R = reward
x = x
gamma = 1

print('problem initialized.')

mdp = MDP(
    states=states, 
    actions=actions, 
    P=P, 
    R=R,
    x=x, 
    gamma=gamma
  )

mdp.value_iteration_finize_horizon()
val = mdp.V[to_tuple(state)]
print(val)
print(mdp.optimal_policy[(to_tuple(state), 0)])

problem initialized.
MDP initialized.
i = 0
i = 1
i = 2
i = 3
i = 4
38.19291112034395
()


In [11]:
# Q 2.2

state = [['H', 'H', 'H'], ['H', 'S', 'H'], ['H', 'H', 'H']]
police = 1
medics = 0

states = get_states(n=3, police=police, medics=medics)
actions = get_all_actions(states=states, police=police, medics=medics)
P = transition_function
R = reward
x = x
gamma = 1

print('problem initialized.')

mdp = MDP(
    states=states, 
    actions=actions, 
    P=P, 
    R=R,
    x=x, 
    gamma=gamma
  )

mdp.value_iteration_finize_horizon()
val = mdp.V[to_tuple(state)]
print(val)
print(mdp.optimal_policy[(to_tuple(state), 0)])

problem initialized.
MDP initialized.
i = 0
i = 1
i = 2
i = 3
i = 4
40
()


In [12]:
# Q 2.3

state = [['H', 'H', 'H'], ['H', 'S', 'H'], ['H', 'H', 'H']]
police = 0
medics = 4

states = get_states(n=3, police=police, medics=medics)
actions = get_all_actions(states=states, police=police, medics=medics)
P = transition_function
R = reward
x = x
gamma = 1

print('problem initialized.')

mdp = MDP(
    states=states, 
    actions=actions, 
    P=P, 
    R=R,
    x=x, 
    gamma=gamma
  )

mdp.value_iteration_finize_horizon()
val = mdp.V[to_tuple(state)]
print(val)
print(mdp.optimal_policy[(to_tuple(state), 0)])

problem initialized.
MDP initialized.
i = 0
i = 1
i = 2
i = 3
i = 4
48.8262193152
(('vaccinate', (0, 1)), ('vaccinate', (1, 2)), ('vaccinate', (2, 1)), ('vaccinate', (1, 0)))
