In [3]:
import numpy as np
import time

# TODO
## transition prob matrix (s,a,s')
## reward matrix (s,a)
## random 환경 구현, random policy 구현, 동기 비동기 for문 구현, matrix 구현, linear equation 구현


# Env, Random policy

In [11]:
def env(s, a):
    '''
    Args: 
        state dim, action dim

    Returns: 
        transition prob, reward matrix
    '''
    
    p = np.random.random(s*a*s).reshape(s,a,s)
    p = p / p.sum(axis=2, keepdims=True) 
    r = np.random.random(s*a).reshape(s,a)
    
    return p, r

# class Env:
#     def __init__(self, s, a):
#         self.state_dim = s
#         self.action_dim = a
#         self.current_state = 0  # 초기 상태
#         self._generate_tp_(s, a)
#         self._generate_r_(s, a)
    
#     def _generate_tp_(self, s, a):
#         self.p = np.random.random(s*a*s).reshape(s,a,s)
#         self.p = self.p / self.p.sum(axis=2, keepdims=True) 
    
#     def _generate_r_(self, s, a):
#         self.r = np.random.random(s*a).reshape(s,a)
    
#     def step(self, action):
#         next_state_probs = self.p[self.current_state, action]
#         next_state = np.random.choice(self.state_dim, p=next_state_probs)
#         reward = self.r[self.current_state, action]
#         self.current_state = next_state
#         return next_state, reward
    
#     def reset(self):
#         self.current_state = 0
#         return self.current_state
    
#     def get_dynamics(self):
#         return self.p, self.r
        

def random_policy(s,a):
    '''
    Args: 
        state dim, action dim

    Returns: 
        random policy
    '''
    pi = np.random.random(s*a).reshape(s,a)
    pi = pi / pi.sum(axis=1, keepdims=True)
    return pi

In [12]:
s,a = 10, 5
P, R = env(s,a)
# env = Env(s,a)
# P,R = env.get_dynamics()
policy = random_policy(s,a)
print(policy.shape)
print(P.shape)
print(R.shape)

(10, 5)
(10, 5, 10)
(10, 5)


# Policy Evaluation

In [13]:
# linear equation(evaluation)
def linear_equation(P,R, policy, gamma):
    r_pi = np.sum(policy*R, axis=1)
    p_pi = np.sum(policy[:, :,np.newaxis] * P, axis=1)
    V = np.linalg.inv(np.eye(s) - gamma*p_pi) @ r_pi
    return V

# policy evaluation matrix
def policy_evaluation_matrix(p, r, policy, env=None, gamma=0.9, theta=1e-6):
    """
    Args:
        p: transition probability matrix (S x A x S)
        r: reward matrix (S x A)
        policy: policy matrix (S x A)
        gamma: discount factor
        theta: threshold
        
    Returns:
        V: state-value function (S)
    """
    n_states = p.shape[0]
    
    # Initialize value function or random
    V = np.zeros(n_states)
    
    while True:
        # Compute expected rewards for each state (S x 1)
        expected_rewards = np.sum(policy * r, axis=1)
        
        # Compute expected next state values (S x S)
        # p_policy: (S x S) matrix where p_policy[s,s'] = sum_a policy(s,a) * p(s,a,s')
        p_policy = np.sum(policy[:, :, np.newaxis] * p, axis=1)
        
        # Compute new value function
        V_new = expected_rewards + gamma * np.dot(p_policy, V)
        
        # Check for convergence
        if np.max(np.abs(V_new - V)) < theta:
            break
            
        V = V_new
        
    return V

# policy evaluation for iteration sync
def policy_evaluation_for_sync(p, r, policy, gamma=0.9, theta=1e-6):
    """
    Policy evaluation using for loops
    
    Args:
        p: transition probability matrix (S x A x S)
        r: reward matrix (S x A)
        policy: policy matrix (S x A)
        gamma: discount factor
        theta: threshold
        
    Returns:
        V: state-value function (S)
    """
    n_states = p.shape[0]
    n_actions = p.shape[1]
    
    # Initialize value function
    V = np.zeros(n_states)
    
    while True:
        delta = 0
        V_new = np.zeros(n_states)
        
        # For each state
        for s in range(n_states):
            v = V[s]
            total = 0
            
            # For each action
            for a in range(n_actions):
                # Calculate expected reward
                reward = r[s, a]
                
                # Calculate expected next state value
                next_state_value = 0
                for s_prime in range(n_states):
                    next_state_value+= p[s, a, s_prime] * V[s_prime]
                
                # Add to total with policy probability
                total += policy[s, a] * (reward + gamma * next_state_value)
            
            V_new[s] = total
            delta = max(delta, abs(v - V_new[s]))
        
        V = V_new
        
        # convergence
        if delta < theta:
            break
            
    return V

# policy evaluation for iteration async
def policy_evaluation_for_async(p, r, policy, gamma=0.9, theta=1e-6):
    """
    Policy evaluation using for loops async
    
    Args:
        p: transition probability matrix (S x A x S)
        r: reward matrix (S x A)
        policy: policy matrix (S x A)
        gamma: discount factor
        theta: threshold
        
    Returns:
        V: state-value function (S)
    """
    n_states = p.shape[0]
    n_actions = p.shape[1]

    V = np.zeros(n_states)
    while True:
        delta=0

        for s in range(n_states):
            v = V[s]
            total = 0

            for a in range(n_actions):
                reward = r[s,a]

                next_state_value = 0
                for s_prime in range(n_states):
                    next_state_value += p[s,a,s_prime] * V[s_prime]
                
                total += policy[s,a] * (reward + gamma * next_state_value)

            V[s] = total
            delta = max(delta,abs(v-V[s]))
        
        if delta < theta:
            break 
    return V

# Policy Improvement

In [7]:
# policy improvement matrix
def policy_improvement_matrix(p, r, V, gamma=0.9):
    """
    policy improvement 
    
    Args:
        p: transition probability matrix (S x A x S)
        r: reward matrix (S x A)
        V: current value function (S)
        gamma: discount factor
        
    Returns:
        new_policy: improved policy matrix (S x A)
    """
    # Q(s,a) = R(s,a) + gamma * sum_s' P(s'|s,a) * V(s')
    Q = r + gamma * np.sum(p * V[np.newaxis, np.newaxis, :], axis=2)

    new_policy = np.zeros_like(Q)
    best_actions = np.argmax(Q, axis=1)
    new_policy[np.arange(len(Q)), best_actions] = 1
    
    return new_policy

## policy iteration, value iteration

In [None]:
# policy iteration
def policy_iteration_matrix(p, r, policy, gamma=0.9, theta=1e-6):
    """
    policy iteration 
    
    Args:
        p: transition probability matrix (S x A x S)
        r: reward matrix (S x A)
        gamma: discount factor
        theta: threshold
        
    Returns:
        new_policy: improved policy matrix (S x A)
    """
    while True:
        V = policy_evaluation_matrix(p, r, policy, gamma, theta)
        new_policy = policy_improvement_matrix(p, r, V, gamma)
        
        if new_policy.all() == policy.all():
            break
        
        policy = new_policy
        
    return policy, V

# value iteration
def value_iteration_matrix(p, r, gamma=0.9, theta=1e-6):
    """
    Args:
        p: transition probability matrix (S x A x S)
        r: reward matrix (S x A)
        gamma: discount factor
        theta: convergence threshold
        
    Returns:
        V: optimal state-value function (S)
        policy: optimal policy (S x A)
    """
    n_states = p.shape[0]
    n_actions = p.shape[1]
    
    # Initialize value function
    V = np.zeros(n_states)
    
    while True:
        # Q-values for all state-action pairs
        ## Q(s,a) = R(s,a) + gamma * sum_s' P(s'|s,a) * V(s')
        Q = r + gamma * np.sum(p * V[np.newaxis, np.newaxis, :], axis=2)
        
        # new value function 
        V_new = np.max(Q, axis=1)
        
        # convergence
        if np.max(np.abs(V_new - V)) < theta:
            break
            
        V = V_new
    
    policy = np.zeros((n_states, n_actions))
    best_actions = np.argmax(Q, axis=1)
    policy[np.arange(n_states), best_actions] = 1
    
    return V, policy

In [4]:
# state, action, gamma
s= 10
a = 5
gamma = 0.9

# transition prob, reward matrix
P,R = env(s,a)

# random policy
policy = random_policy(s,a)