In [1]:
import numpy as np

In [8]:
class MDP:
  def __init__(self, states, actions, rewards, transition_probs, discount_factor=0.9):
    self.states = states # List of states
    self.actions = actions # list of actions
    self.rewards = rewards # dictionary where rewards [s][a] is the reward for taking action 'a' in state 's'
    self.transition_probs = transition_probs # dictionary where transition_probs [s][a][s'] is the prob. of moving from state s to state s' with action a
    self.discount_factor = discount_factor # discount factor(gamma) for future rewards
    self.policy = {s: np.random.choice(actions) for s in states} # random initial policy

  def policy_evaluation(self, theta=1e-6):
    # Evaluate the current policy to calculate state-value function v(s)
    V = {s: 0 for s in self.states} #initialuze state values with 0

    while True:
      delta = 0
      for s in self.states:
        v = V[s]
        a = self.policy[s]

        # Bellman expectation equation
        V[s] = sum(self.transition_probs[s][a][s_prime] * (self.rewards[s][a] + self.discount_factor * V[s_prime])
        for s_prime in self.states)

        delta = max(delta, abs(v - V[s])) # Check for convergence
      if delta < theta:
        break
    return V

  def policy_improvement(self, V):
    #Improve the current policy using the state values function V(S)

    policy_stable = True
    for s in self.states:
      old_action = self.policy[s]

      # Find the action that maximizes expected value
      self.policy[s] = max(self.actions, key=lambda a:sum(
          self.transition_probs[s][a][s_prime] * (self.rewards[s][a] + self.discount_factor * V[s_prime])
          for s_prime in self.states))

      if old_action != self.policy[s]:
        policy_stable = False
    return policy_stable

  def policy_iteration(self):
    # Perform policy iteration : alternate between policy evaluation and improvement until the policy is stable(optimal)
    while True:
      V = self.policy_evaluation()
      policy_stable = self.policy_improvement(V)
      if policy_stable:
        return self.policy, V


In [9]:

states = ['s1', 's2', 's3']
actions = ['a1', 'a2']
rewards = {
    's1': {'a1': 5, 'a2': 10},
    's2': {'a1': -1, 'a2': 2},
    's3': {'a1': 0, 'a2': 0}
}
transition_probs = {
    's1': {'a1': {'s1': 0.7, 's2': 0.3, 's3': 0.0}, 'a2': {'s1': 0.4, 's2': 0.6, 's3': 0.0}},
    's2': {'a1': {'s1': 0.1, 's2': 0.6, 's3': 0.3}, 'a2': {'s1': 0.0, 's2': 0.9, 's3': 0.1}},
    's3': {'a1': {'s1': 0.0, 's2': 0.0, 's3': 1.0}, 'a2': {'s1': 0.0, 's2': 0.0, 's3': 1.0}}
}

#create MDP instances and perform policy iteration
mdp = MDP(states, actions, rewards, transition_probs, discount_factor=0.9)
optimal_policy, optimal_value = mdp.policy_iteration()

print("Optimal Policy:", optimal_policy)
print("Optimal State Values:", optimal_value)

Optimal Policy: {'s1': 'a2', 's2': 'a2', 's3': 'a1'}
Optimal State Values: {'s1': 24.506574930441033, 's2': 10.526312442034197, 's3': 0.0}


MDP Initialization:

states, actions, rewards, and transition_probs are inputs defining the environment. The initial policy is randomly selected. Policy Evaluation:

Iteratively calculates the value function 𝑉 ( 𝑠 ) V(s) for each state based on the current policy. Convergence is reached when changes to 𝑉 ( 𝑠 ) V(s) are smaller than a threshold theta. Policy Improvement:

Updates the policy by choosing the action that maximizes expected rewards for each state using 𝑉 ( 𝑠 ) V(s). If no changes are made to the policy, it is considered optimal. Policy Iteration:

Alternates between policy evaluation and improvement until the policy converges to the optimal solution. This approach is known as policy iteration for solving MDPs and ensures convergence to the optimal policy and state values.