In [25]:
import pandas as pd
import numpy as np
import seaborn as sns                       #visualisation
import matplotlib.pyplot as plt             #visualisation
# %matplotlib inline     
sns.set(color_codes=True)

import random

# import scipy


In [None]:
#logic
# start the game
# play then get reward of 4 and roll dice, if dice value is 1 or 2 then quit else continue the game
# or quit and get reward of 10

In [116]:
class TramMDP:
    def __init__(self,N):
        self.N = N  # Number of states
        self.states = list(range(1, N+1))
        # self.actions = ['walk', 'tram']
        self.current_state = 1
        self.endState = N
        self.gamma = 1.0
        self.total_reward = 0


        # Define the transition probabilities and rewards as a combined dictionary
        self.transitions = self._create_transitions()


    def actions(self, state):
        """Return a list of valid actions for the given state."""
        if state == self.endState:
            return [None]  # No actions are possible in the end state

        possible_actions = []
        if state + 1 <= self.N:  # If the next state is within the range, add 'walk'
            possible_actions.append("walk")
        if state * 2 <= self.N:  # If the double of the state is within the range, add 'tram'
            possible_actions.append("tram")
        return possible_actions



# create transition and reward matrices
        
    # def _create_transitions(self):
    #         transitions = {}
    #         for state in self.states[:-1]:  # Exclude the 'game over' state
    #             transitions[state] = {
    #                 'walk': [((state + 1, 1.0, -1.0),)],
    #                 'tram': [((min(state * 2, self.N), 0.7, -2.0),), ((state, 0.3, -2.0),)]
    #             }
    #         return transitions
    def _create_transitions(self):
        transitions = {
            state: {
                'walk': [(state + 1, 1.0, -1.0)],
                'tram': [(min(state * 2, self.N), 0.5, -2.0), (state, 0.5, -2.0)]
            } for state in self.states[:-1]  # Exclude the 'game over' state
        }
        return transitions

    
    def getTransition(self, state, action):
        return self.transitions[state][action]


    def take_action(self, action):
        if self.current_state == self.endState:
            return self.endState

        transitions = self.getTransition(self.current_state, action)
        if not transitions:
            return self.current_state

        # Extract the next states, probabilities, and rewards
        next_states, probabilities, rewards = zip(*transitions)
        
        # Choose the next state based on the transition probabilities
        next_state_index = random.choices(range(len(next_states)), weights=probabilities, k=1)[0]
        next_state = next_states[next_state_index]
        reward = rewards[next_state_index]

        # Update current state and total reward
        self.current_state = next_state
        self.total_reward += reward

        return self.current_state, reward



    # def take_action(self, action):
    #     if self.current_state == self.endState:
    #         return self.endState

    #     transitions = self.getTransition(self.current_state, action)
    #     if not transitions:
    #         return self.current_state

    #     # Extract the next states and their probabilities
    #     next_states, probabilities, rewards = zip(*[trans[0] for trans in transitions])
        
    #     # Choose the next state based on the transition probabilities
    #     next_state_index = random.choices(range(len(next_states)), weights=probabilities, k=1)[0]
    #     next_state = next_states[next_state_index]
    #     reward = rewards[next_state_index]

    #     # Update current state and total reward
    #     self.current_state = next_state
    #     self.total_reward += reward

    #     return self.current_state, reward
    
    def playGame(self, policy):
        self.reset()
        while not self.isEnd(self.current_state):
            action = policy[self.current_state]
            self.take_action(action)
        return self.total_reward

    def reset(self):
        self.current_state = 1
        self.total_reward = 0

    def getRandomPolicy(self):
                # Assuming tram is an instance of your MDP class
        policy = {}
        for s in self.states:
            available_actions = self.actions(s)
            if available_actions:  # Check if there are available actions
                policy[s] = random.choice(available_actions)
            else:
                policy[s] = None  # Assign None or a placeholder if no actions are available
        return policy


In [117]:
tram = TramMDP(5)

print(tram.getTransition(1, 'tram'))

print(tram.take_action('tram'))

print(tram.getTransition(1, 'tram'))


# print(tram2.take_action_prob('tram'))

[(2, 0.5, -2.0), (1, 0.5, -2.0)]
(1, -2.0)
[(2, 0.5, -2.0), (1, 0.5, -2.0)]


In [75]:
def policy_evaluation(policy, mdp, theta=0.01):
    # Initialize value function for each state
    V_pi = {s: 0 for s in mdp.states if s != mdp.endState}

    def is_end(s):
        return s == mdp.endState

    # Perform policy evaluation
    while True:
        delta = 0  # This will track the maximum change in the value function

        for s in mdp.states:
            if is_end(s):
                V_pi[s] = 0
            else:
                # Calculate the action-value function Q for the current state and action according to policy
                a = policy[s]
                Q_pi = sum(prob * (reward + mdp.gamma * V_pi.get(s_prime, 0))
                           for (s_prime, prob, reward) in mdp.getTransition(s, a))
                
                # Calculate the maximum change in the value function
                delta = max(delta, abs(Q_pi - V_pi.get(s, 0)))
                
                # Update the state-value function V
                V_pi[s] = Q_pi

        # Check for convergence
        if delta < theta:
            break

    # Output the final state-value function
    for s in mdp.states:
        print(f"Value of state {s}: {V_pi.get(s, 0)}")

    return V_pi


In [118]:
policy = tram.getRandomPolicy()

print(policy)

print(policy_evaluation(policy,tram))

{1: 'walk', 2: 'tram', 3: 'walk', 4: 'walk', 5: None}
Value of state 1: -5.994140625
Value of state 2: -4.9970703125
Value of state 3: -2.0
Value of state 4: -1.0
Value of state 5: 0
{1: -5.994140625, 2: -4.9970703125, 3: -2.0, 4: -1.0, 5: 0}


In [123]:
def value_iteration_with_policy(mdp, theta=1e-10):
    # Initialize value function for each state
    V = {s: 0 for s in mdp.states if s != mdp.endState}

    def is_end(s):
        """Check if the current state is the end of the game."""
        return s == mdp.endState

    def compute_Q(s, a, V):
        """Compute the action-value function Q for a given state and action."""
        return sum(prob * (reward + mdp.gamma * V.get(s_prime, 0))
                   for s_prime, prob, reward in mdp.getTransition(s, a))

    # Perform value iteration
    while True:
        delta = 0  # This will track the maximum change in the value function
        for s in mdp.states:
            if is_end(s):
                V[s] = 0
            else:
                # Find the best action by comparing the Q value for each action
                max_value = max(compute_Q(s, a, V) for a in mdp.actions(s))
                delta = max(delta, abs(max_value - V.get(s, 0)))
                V[s] = max_value

        # Check for convergence
        if delta < theta:
            break

    # Read out policy
    pi = {}
    for state in mdp.states:
        if is_end(state):
            pi[state] = 'none'
        else:
            pi[state] = max(
                ((compute_Q(state, action, V), action) for action in mdp.actions(state)), 
                key=lambda x: x[0]
            )[1]  # Select the action from the (value, action) tuple

    # Print out the policy and values
    print('{:15} {:15} {:15}'.format('s', 'V(s)', 'pi(s)'))
    for state in mdp.states:
        print('{:15} {:15} {:15}'.format(state, V.get(state, 0), pi.get(state, 'none')))

    # Return the optimal value function and policy
    return V, pi


In [125]:

V_opt, optimal_policy = value_iteration_with_policy(TramMDP(20))


s               V(s)            pi(s)          
              1 -11.999999999905413 walk           
              2 -10.999999999951797 walk           
              3 -9.999999999975444 walk           
              4 -8.999999999987494 walk           
              5 -7.9999999999936335 tram           
              6 -7.999999999998181 walk           
              7 -6.9999999999990905 walk           
              8 -5.999999999999545 walk           
              9 -4.999999999999773 walk           
             10 -3.9999999999998863 tram           
             11            -9.0 walk           
             12            -8.0 walk           
             13            -7.0 walk           
             14            -6.0 walk           
             15            -5.0 walk           
             16            -4.0 walk           
             17            -3.0 walk           
             18            -2.0 walk           
             19            -1.0 walk           
     