In [1]:
import numpy, pandas, numbers

# Markov Process

In [2]:
class MarkovProcess:
    def __init__(self, state_space: list, transition_probability_matrix: numpy.ndarray) -> None:
        '''Markov Proces is a tuple (S, P), where S is a state space and P is a transition probability matrix.'''
        self.state_space = state_space
        self.transition_probability_matrix = transition_probability_matrix

        self.validate_inputs()

    def validate_inputs(self):

        ### Dimensions
        assert len(self.state_space) == len(set(self.state_space)), 'Some state names are not unique!'
        self.num_states = len(self.state_space)

        # Transition prob shape
        assert self.transition_probability_matrix.shape[0] == self.num_states
        assert self.transition_probability_matrix.shape[0] == self.transition_probability_matrix.shape[1]

        # Transition prob entries
        assert numpy.all(self.transition_probability_matrix <= 1)
        assert numpy.all(self.transition_probability_matrix >= 0)
        probs_sum = numpy.sum(self.transition_probability_matrix, axis=1)
        assert numpy.all(numpy.isclose(probs_sum, 1) + numpy.isclose(probs_sum, 0)), 'Sum of probabilities must be 0 (for terminal states) or 1 in each row!'
        

In [3]:
state_space = [f's{i}' for i in range(4)]
transition_probability_matrix = numpy.array(
    [
        [5/6, 1/6, 0,     0],
        [0 ,  1/6, 4/6, 1/6],
        [0 ,  0,   3/4, 1/4],
        [1,   0,   0,     0],
    ]
)

markov_process = MarkovProcess(state_space=state_space, transition_probability_matrix=transition_probability_matrix)

## Ergodicity TODO

$$
P(S_{j}) = \sum_{i} P(S_{j} | S_{i}) * P(S_{i}) \quad \forall j=1,...,N \\
P^{T} \bold{p} = \bold{p}\\
$$

This implies that $\bold{p}$ is the eignevector of $P$ corresponding to the eigenvalue of 1.

In [7]:
eigevalues, eigenvectors = numpy.linalg.eigh(transition_probability_matrix)

In [9]:
eigevalues

array([-0.66666667,  0.16666667,  0.75      ,  1.5       ])

In [10]:
numpy.where(numpy.isclose(eigevalues, 1)) # TODO: why is there no unit eigenvalue?

(array([], dtype=int64),)

In [12]:
# unit_eigenvalue_idx = numpy.where(numpy.isclose(eigevalues, 1))[0][0]
# eigenvectors[unit_eigenvalue_idx]

# Markov Reward Process

In [13]:
class MarkovRewardProcess:
    def __init__(
        self, reward_vector: numpy.ndarray, discount_factor: numbers.Real,
        markov_process: MarkovProcess = None, state_space: list = None, transition_probability_matrix: numpy.ndarray = None, 
    ) -> None:
        '''
        A Markov Reward Process (MRP) is a tuple (S, P, R, gamma). Note: there is no action space!
        '''
        
        assert (markov_process is not None) or (state_space is not None and transition_probability_matrix is not None)
        if markov_process is None:
            self._markov_process = MarkovProcess(
                state_space=state_space, transition_probability_matrix=transition_probability_matrix
            ) # NOTE: this validates state space and TPM
        self.state_space = self._markov_process.state_space
        self.num_states = self._markov_process.num_states
        self.transition_probability_matrix = self._markov_process.transition_probability_matrix
        self.reward_vector = reward_vector
        self.discount_factor = discount_factor

        self.validate_inputs()

        # Results
        self.value_function = None

    def validate_inputs(self):

        # Reward vector shape
        if len(self.reward_vector.shape) == 2:
            assert self.reward_vector.shape[1] == 1
            self.reward_vector = self.reward_vector.reshape(-1,)
        assert len(self.reward_vector) == self.num_states

        ### Discount factor
        assert 0 <= self.discount_factor <= 1

    def calculate_value_function(self, convert_to_series: bool = False):
        identity_matrix = numpy.diag(numpy.ones(self.num_states))
        inv_matrix = numpy.linalg.inv(identity_matrix - self.discount_factor * self.transition_probability_matrix)
        self.value_function = inv_matrix @ self.reward_vector
        if convert_to_series:
            self.value_function = pandas.Series(data=self.value_function, index=self.state_space)
        return self.value_function                   

In [14]:
reward_vector = numpy.array(
    [70, 50, -20, -100]
)
discount_factor = 0.6


mrp = MarkovRewardProcess(
    state_space=state_space, transition_probability_matrix=transition_probability_matrix, reward_vector=reward_vector, discount_factor=discount_factor
)
mrp.calculate_value_function(convert_to_series=True)

s0    147.339983
s1     36.699917
s2    -39.526185
s3    -11.596010
dtype: float64

# Environment

In [15]:
class Environment:
    def __init__(
        self, state_space: list, action_space: list,
        transition_probability_matrix: numpy.ndarray, reward_matrix: numpy.ndarray, discount_factor: numbers.Real
    ) -> None:
        '''Environment is a Markov Decision Process without policy.'''
        
        self.state_space = state_space
        self.action_space = action_space
        self.transition_probability_matrix = transition_probability_matrix
        self.reward_matrix = reward_matrix
        self.discount_factor = discount_factor

        self.validate_inputs()

    def validate_inputs(self):

        ### State space shape
        assert len(self.state_space) == len(set(self.state_space)), 'Some state names are not unique!'
        self.num_states = len(self.state_space)

        ### Action space shape
        assert len(self.action_space) == len(set(self.action_space)), 'Some state names are not unique!'
        self.num_actions = len(self.action_space)

        # Transition prob shape: num_states x num_states x num_actions
        assert len(self.transition_probability_matrix.shape) == 3
        assert self.transition_probability_matrix.shape[0] == self.num_states
        assert self.transition_probability_matrix.shape[1] == self.num_states
        assert self.transition_probability_matrix.shape[2] == self.num_actions

        # Reward matrix shape: num_states x num_states x num_actions OR num_states x num_actions
        assert len(self.reward_matrix.shape) in [2, 3]
        assert self.reward_matrix.shape[0] == self.num_states
        if len(self.reward_matrix.shape) == 3:
            assert self.reward_matrix.shape[1] == self.num_states
            assert self.reward_matrix.shape[2] == self.num_actions
        else:
            assert self.reward_matrix.shape[1] == self.num_actions

        # Transition prob entries
        assert numpy.all(self.transition_probability_matrix <= 1)
        assert numpy.all(self.transition_probability_matrix >= 0)
        probs_sum = numpy.sum(self.transition_probability_matrix, axis=1)
        assert numpy.all(
            numpy.isclose(probs_sum, 1) + numpy.isclose(probs_sum, 0)
        ), 'Sum of probabilities must be 0 (for terminal states) or 1 in each row!'

        ### Discount factor
        assert 0 <= self.discount_factor <= 1

In [16]:
env_transition_probability_matrix = numpy.array([
    [ # action 0
        [0, 5/6, 1/6, 0  ],
        [0, 1/6, 4/6, 1/6],
        [0, 0,   2/3, 1/3],
        [0, 0,   0,   1  ],
    ],
    [ # action 1
        [5/6, 1/6, 0,    0  ],
        [0,   4/5, 1/5,  0  ],
        [0,   0,   3/4,  1/4],
        [0,   0,   0,    1  ],
    ],
    [ # action 2
        [1, 0, 0, 0],
        [1, 0, 0, 0],
        [1, 0, 0, 0],
        [1, 0, 0, 0],
    ]
])
env_transition_probability_matrix = numpy.swapaxes(env_transition_probability_matrix, axis1=0, axis2=1)
env_transition_probability_matrix = numpy.swapaxes(env_transition_probability_matrix, axis1=1, axis2=2)
env_transition_probability_matrix.shape # number of initial states x number of arriving states x number of actions

(4, 4, 3)

In [17]:
env_reward_matrix_3d = numpy.array([
    [ # action 0
        [numpy.nan, 100,       100,       numpy.nan],
        [numpy.nan, 50,        50,        50       ],
        [numpy.nan, numpy.nan, 10,        10       ],
        [numpy.nan, numpy.nan, numpy.nan, 0        ],
    ],
    [ # action 1
        [70,        70,        numpy.nan, numpy.nan],
        [numpy.nan, 20,        20,        numpy.nan],
        [numpy.nan, numpy.nan, -20,       -20      ],
        [numpy.nan, numpy.nan, numpy.nan, -30      ],
    ],
    [ # action 2 (renovate)
        [0,    numpy.nan, numpy.nan, numpy.nan],
        [-50,  numpy.nan, numpy.nan, numpy.nan],
        [-90,  numpy.nan, numpy.nan, numpy.nan],
        [-100, numpy.nan, numpy.nan, numpy.nan],
    ]
])
env_reward_matrix_3d = numpy.swapaxes(env_reward_matrix_3d, axis1=0, axis2=1)
env_reward_matrix_3d = numpy.swapaxes(env_reward_matrix_3d, axis1=1, axis2=2)
env_reward_matrix_3d.shape # number of actions x number of initial states x number of arriving states

(4, 4, 3)

In [18]:
# NOTE: we can represent rewards in 2D becase the depend only on initial states and actions, they do not depend on arriving states
env_reward_matrix_2d = numpy.array([
    [100, 50, 10, 0],  # action 0
    [70, 20, -20, -30], # action 1
    [0, -50, -90, -100],  # action 2 (renovate)
])
env_reward_matrix_2d = numpy.swapaxes(env_reward_matrix_2d, axis1=0, axis2=1)

env_reward_matrix_2d.shape # number of initial states x number of actions

(4, 3)

In [19]:
env_state_space = [f's{i}' for i in range(4)]
env_action_space = [f's{i}' for i in range(3)]

In [20]:
environment = Environment(
    state_space=env_state_space, 
    action_space=env_action_space,
    transition_probability_matrix=env_transition_probability_matrix,
    reward_matrix=env_reward_matrix_2d,
    discount_factor=0.6
)

In [21]:
environment_all3d = Environment(
    state_space=env_state_space, 
    action_space=env_action_space,
    transition_probability_matrix=env_transition_probability_matrix,
    reward_matrix=env_reward_matrix_3d,
    discount_factor=0.6
)

# Markov Decision Process

In [22]:
class MarkovDecisionProcess:
    def __init__(self, environment: Environment, policy: numpy.ndarray) -> None:
        self.environment = environment
        self.policy = policy
        self.deterministic_policy: bool = None 
        
        self.validate_inputs()
    
    def validate_inputs(self):

        assert len(self.policy.shape) <= 2
        assert self.policy.shape[0] == self.environment.num_states

        if len(self.policy.shape) == 1:
            self.deterministic_policy = True
        elif self.policy.shape[1] == 1:
            self.policy = self.policy.reshape(-1,)
            self.deterministic_policy = True
        else:
            self.deterministic_policy = False
        
        if not self.deterministic_policy:
            assert self.policy.shape[1] == self.environment.num_actions
            probs_sum = numpy.sum(self.policy, axis=1)
            assert numpy.all(
                numpy.isclose(probs_sum, 1) + numpy.isclose(probs_sum, 0)
            ), 'In your policy, the sum of action probabilities must be 0 (for terminal states) or 1 (in each row)!'


    ### TODO
    def _calculate_policy_tpm(self):
        pass

    def _calculate_policy_rewards(self):
        pass

    def calculate_policy_value_function(self, convert_to_series: bool = False):
        pass

In [23]:
policy = numpy.array([1, 0, 1, 2])
mdp = MarkovDecisionProcess(environment=environment, policy=policy)
mdp.deterministic_policy

True