<a href="https://colab.research.google.com/github/riaverma96/boost-python-examples/blob/master/project1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Project 2: Probabilistic Inference

## TAs TODO/CHECKLIST:
* make powerpoint template
* complete instructions and ground-truth solution here
* make sure submission instructions are great
* Ria: do final read-through to make sure it is unambiguous/doable.
* see if we can use this notebook instead of project1.md, without solution of course (do not commit solution to github!)

Remember: the more work we put in to make the assignment clear, doable, unambiguous, the less blwoback we'll get on Piazza. We do not want to update the instructions if at all possible!

In [0]:
import numpy as np
import unittest

# import gtsam



This part is a team-effort. Sign up to one of the sessions.

TODO: how do they proof it's done? Something has to end up in the powerpoint template.

## Probabilistic Inference

### Simulation
* Define a prior on the state
* Define a sensor model
* Define a transition model
* Sample from it

In [0]:
# Actions and states
ACTIONS = ["Left", "Right", "Up", "Down"]
STATES = [(x, y) for x in range(10) for y in range(10)]

# Implement a function that maps from a string representing the action to the
# probability of that action.
# Hint: action is the key, and probability is the value.
def action_prior(action):
  """Return prior probability distribution on an action.
    arguments:
      action (string): "Left", "Right", "Up", "Down"
    returns:
      probability of given action (float).
  """
  p = {"Left": 0.2, "Right": 0.6, "Up": 0.1, "Down": 0.1}
  return p[action]

# Implement a function that maps from a coordinate pair (x,y) to the
# probability of the robot being in that location.
def state_prior(state):
  """Return prior probability distribution on a state.
    arguments:
      state (pair): (x: int, y: int)
    returns:
      probability of given state (float).
  """
  # Use np.array to create a 2D array that represents the prior on the state
  # (i,j) where i is the row index and j is the column index of the agent's
  # position.
  state_prior_matrix = np.array([
      [0, 0, 0, 0, 1, 0, 0, 0, 0, 0],
      [0, 1, 0, 0, 0, 0, 0, 0, 0, 0],
      [0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
      [0, 0, 0, 1, 0, 0, 0, 0, 0, 0],
      [1, 0, 0, 0, 0, 1, 0, 0, 1, 0],
      [0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
      [0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
      [0, 0, 1, 0, 0, 0, 1, 0, 0, 0],
      [0, 0, 0, 0, 0, 0, 0, 1, 0, 0],
      [0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
  ], dtype=np.float) / 10.0
  return state_prior_matrix[state]


def calculate_cdf(pmf, ordered_outcomes):
  """Calculate cumulative distribution function.
    arguments:
      pmf (function): probability mass function.
      ordered_outcomes (list): all outcomes in an ordered list.
    returns:
      a function that represents the CDF
    note:
      The ordered_outcomes defines how the CDF behaves.
  """
  cd_values = np.cumsum([pmf(outcome) for outcome in ordered_outcomes])

  def cdf(outcome):
    return cd_values[ordered_outcomes.index(outcome)]

  return cdf


# Fill in the body of the following function to sample from an arbitrary PMF.
def sample_from_pmf(pmf, ordered_outcomes):
  """Sample from a PMF
    arguments:
      pmf (np.array): a probability mass function
      ordered_outcomes (list): ordered list of all outcomes
    returns:
      if 1D: an index i \in [0.0,N-1[ where N is number of outcomes.
      if 2D: a pair (i,j) in the array
  """
  # create CDF
  cdf = calculate_cdf(pmf, ordered_outcomes)

  # sample u uniformly
  rand_val = np.random.rand()
  ordered_cdf = np.array([cdf(outcome) for outcome in ordered_outcomes])
  sample = (-np.argmax(ordered_cdf[::-1] <= rand_val)) % ordered_cdf.size
  # return index according to inverse transform method
  # convert to 2D indices if 2D
  return ordered_outcomes[sample]


# Define the sensor model conditional distribution, P(O|S), in functional form,
# i.e., fill in the body of the function below.
def sensor_model(k, S):
  """Give value of CPT for sensor model P(O|S).
    arguments:
      k: the value of the sensor, a vertical coordinate.
      S (int,int): a pair (i,j), representing the position in the grid.
    returns:
      P(k|i,j).
    notes:
      Sensor reports noisy observation of vertical coordinate i.
  """
  i, j = S
  return (0.91 if k == i else 0.01)


# Define the state transition model P(T|S,A)
def transition_model(t, S, A):
  """Give value of CPT for transition model P(T|S,A).
    arguments:
      t (int,int): a pair (i,j), representing the next position in the grid
      S (int,int): a pair (i,j), representing the position in the grid.
      A (str): a string representing the action to be taken
    returns:
      P(T|S,A).
  """
  next_pos_tbl = {
      "Left": (i-1, j),
      "Right": (i+1, j),
      "Up": (i, j-1),
      "Down": (i, j+1)
  }
  next_pos = next_pos_tbl[A]
  next_pos[0] = max(l, min(10, next_pos[0]))
  next_pos[1] = max(l, min(10, next_pos[1]))
  if t == next_pos:
    return 1.0
  else:
    return 0.0
# Define a function to sample from the DBN, three steps.


### Implement Maximum Probable Explanation (MPE)

Above you sampled from the Dynamic Bayes net, yielding actions, states, and observations. In ths section, you will implement MPE, which infers the most probable values of the states, given the actions and observations. I.e., you are asked to maximize the posterior probability
$$P(S_1, S_2, S_3|A_1=a_1, O_1=o_1, A_2=a_2, O_2=o_2, O_3=o_3)$$
To do this, you will implement a highly specialized version of the max-product algorithm.

In [0]:
# Max-Product Algorithm

def maximum_probable_explanation(actions, observations):
  """Calculate the most probable states given the actions and observations
    arguments:
      actions ([str]): Array of all actions taken
      observations ([int]): Sensor measurements
    returns:
      list of moast probable states.
  """

  return []

### Do MPE with GTSAM

Here, you will implement the same calculation by creating a Bayes net in GTSAM, and using it to do the MPE.

In [0]:
# DO MPE code

In [0]:
# Unit tests
from collections import Counter

class TestProject1(unittest.TestCase):
  """All (public) unit tests for project 1."""

  def test_action_prior(self):
    """Check the prior on actions."""
    sum = 0.0
    for action in ACTIONS:
      p = action_prior(action)
      self.assertGreaterEqual(p, 0)
      self.assertLessEqual(p, 1.0)
      sum += p
    self.assertEqual(sum, 1.0)

  @unittest.skip
  def test_state_prior(self):
    """Check the state prior."""
    self.assertEqual(state_prior.shape, (10, 10))
    self.assertTrue(np.min(action_prior) >= 0)
    self.assertEqual(np.sum(state_prior), 1.0)

  def test_action_cdf(self):
    """Check calculating of cumulative distribution functions."""
    action_cdf = calculate_cdf(action_prior, ACTIONS)
    self.assertEqual(action_cdf("Left"), 0.2)
    self.assertEqual(action_cdf("Right"), 0.8)
    self.assertEqual(action_cdf("Up"), 0.9)
    self.assertEqual(action_cdf("Down"), 1.0)

  def test_sample_action_sanity(self):
    """Check sampling from discrete distributions."""
    sampled_action = sample_from_pmf(action_prior, ACTIONS)
    self.assertTrue(sampled_action in ACTIONS)

  def test_sample_action_accuracy(self):
    """Check actual sampling from discrete distributions."""
    samples = [sample_from_pmf(action_prior, ACTIONS) for x in range(1000)]
    hist = np.array([Counter(samples)[act] for act in ACTIONS])
    hist = hist / np.sum(hist)  # normalize
    pmf_flattened = np.array([action_prior(act) for act in ACTIONS])
    np.testing.assert_allclose(hist, pmf_flattened, atol=1e-1, rtol=1e-1)

  def test_sample_state(self):
    """Check sampling from discrete distribution where PMF is 2D."""
    sampled_state = sample_from_pmf(state_prior, STATES)
    self.assertGreaterEqual(sampled_state[0], 0)
    self.assertLess(sampled_state[0], 100)

  def test_sample_state_accuracy(self):
    """Check actual sampling from discrete distributions."""
    samples = [sample_from_pmf(state_prior, STATES) for x in range(1000)]
    hist = np.array([Counter(samples)[act] for act in STATES])
    hist = hist / np.sum(hist)  # normalize
    pmf_flattened = np.array([state_prior(act) for act in STATES])
    np.testing.assert_allclose(hist, pmf_flattened, atol=1e-1, rtol=1e-1)

  def test_sensor_model(self):
    """Check that sensor model is correct."""
    S = 2, 5
    self.assertEqual(sensor_model(0, S), 0.01)
    self.assertEqual(sensor_model(1, S), 0.01)
    self.assertEqual(sensor_model(2, S), 0.91)
    self.assertEqual(sensor_model(3, S), 0.01)
    self.assertEqual(sensor_model(4, S), 0.01)


if __name__ == '__main__':
  unittest.main(argv=['first-arg-is-ignored'], exit=False)


........
----------------------------------------------------------------------
Ran 8 tests in 1.650s

OK
