In [1]:
import numpy as np
import matplotlib.pyplot as plt

In [2]:
# PARAMs
p = 0.05
# p = 0.2
# p = 0.45

discount = 0.95
theta = 0.01

In [3]:
# States Space
S_space = []

#num of states
N = 16  
for i in range(16):
    num = "{0:04b}".format(i)
    S_space.append([int(i) for i in num])
S_space = np.array(S_space)

In [4]:
# Value
V = np.zeros(16)

In [5]:
# Action Space
A = np.array([[0,0,0,0],
               [0,0,0,1],
               [0,0,1,0],
               [0,1,0,0],
               [1,0,0,0]])

print(A.shape[0])

5


In [6]:
# Policy
policy = []
for i in range(N):
    policy.append(A[0])     # Initialize: all a1=[0,0,0,0]

# no control policy
no_control_policy = policy

In [7]:
# Connectivity Matrix
C = np.array([[0,0,-1,0],
               [1,0,-1,-1],
               [0,1,0,0],
               [-1,1,1,0]])

In [8]:
# v_bar mapping
def bar(vec):
    for i in range(len(vec)):
        if vec[i] > 0:          # map greater than 0 to 1
            vec[i] = 1
        else:                   # map smaller or equal to 0 to 0
            vec[i] = 0
    return vec

In [9]:
# ⊕ component wise XOR
def mod2add(a, b):
    result = []
    for i, j in zip(a, b):
        if i==j:
            result.append(0)
        else:
            result.append(1)
    return np.array(result)

In [10]:
# r(s, a, s')
def reward(state, action, next_state):
    return np.sum(next_state)*5 - np.sum(np.absolute(action))

In [11]:
# Transition Matrix P(a)
matrix_list = []
def P(p, S_space, C, action):
    Matrix = np.zeros((N, N))

    for i in range(N):              # P(a)
        for j in range(N):
            exp = np.sum(np.absolute(S_space[j] - mod2add(bar(np.matmul(C, S_space[i])), action)))
            Matrix[i][j] = np.power(p, exp) * np.power(1-p, 4-exp)
    return Matrix

In [12]:
# P(a) list
matrix_list = []
for action in A:
    mat = P(p, S_space, C, action)
    matrix_list.append(mat)


# P(π)
def get_policy_matrix(policy):
    mat_policy = np.zeros((N,N))

    for row in range(len(policy)):
        for i in range(A.shape[0]):
            if (policy[row] == A[i]).all():
                mat_policy[row] = matrix_list[i][row]

    return mat_policy

# pp = get_policy_matrix(policy)
# print(pp.shape)

In [13]:
# Reward Matrix R(a)
def R(S_space, action):
    Matrix = np.zeros((N, N))

    for i in range(N):              # M(a)
        for j in range(N):
            Matrix[i][j] = reward(S_space[i], action, S_space[j])
    return Matrix

In [14]:
# R(a) list
reward_list = []
for action in A:
    rew = R(S_space, action)
    reward_list.append(rew)

# R(π)
def get_policy_reward(policy):
    rew_policy = np.zeros((N,N))

    for row in range(len(policy)):                  # for π(s)
        for i in range(A.shape[0]):                 # for a
            if (policy[row] == A[i]).all():         # if  π(s)==a
                rew_policy[row] = reward_list[i][row]
    return rew_policy

# rr = get_policy_reward(policy)
# print(rr)

In [15]:

iteration_count = 0
policy_stable = False

while not policy_stable:
    iteration_count = iteration_count+1

    #------- Policy Evaluation -------#
    mat = get_policy_matrix(policy)
    rew = get_policy_reward(policy)
    R_expected = np.matmul(np.multiply(mat, rew), np.ones(N))
    I = np.identity(N)

    # Value
    V_new = np.matmul(np.linalg.inv(I - discount * mat), R_expected)
    V = V_new

    # print(V_new)

    #------- Policy Improvement -------#
    V_list = []
    for action in A:
        mat = P(p, S_space, C, action)
        rew = R(S_space, action)
        R_expected = np.matmul(np.multiply(mat, rew), np.ones(N))

        # Value
        V_new = R_expected + discount* np.matmul(mat, V)
        V_list.append(V_new)

    V_list = np.array(V_list)
    new_action = np.argmax(V_list, axis=0)

    new_policy = []
    for i in range(N):
        new_policy.append(A[new_action[i]])

    # print(new_action)

    # check if π = π' 
    equal = []
    for i in range(N):
        if (policy[i] == new_policy[i]).all():
            equal.append(True)
        else:
            equal.append(False)

    policy_stable = all(equal)

    # update policy
    policy = new_policy


# Optimal
optimal_policy = policy

optimal_action = np.zeros(N)
for row in range(len(optimal_policy)):
    for i in range(A.shape[0]):
        if (optimal_policy[row] == A[i]).all():
            optimal_action[row] = i

# print(optimal_policy)
print(optimal_action)
print("Num of Iterations = ", iteration_count)


[3. 3. 3. 3. 3. 3. 3. 3. 2. 3. 3. 3. 1. 3. 3. 3.]
Num of Iterations =  3


In [16]:
#  Performance Evaluation

def AvgA(policy, matrix_list):

    A_sum = 0
    episode_num = 100

    # 100 episodes
    for j in range(episode_num):

        # 1 episode
        i_start = np.random.randint(0, 15)
        i_curr = i_start

        sum=0
        for i in range(200):
            # action
            action = policy[i_curr]
            sum = sum + np.sum(S_space[i_curr])

            # next state
            matrix = matrix_list[A.tolist().index(action.tolist())]
            p_next_state = matrix[i_curr]                       # probability vector of next state
            i_next = np.random.choice(16, 1, p=p_next_state)[0]    # choose next state

            # update
            i_curr = i_next

        Ai = sum/200
        A_sum = A_sum + Ai

    Avg = A_sum/episode_num
    return Avg

# print(len(matrix_list))
# print(matrix_list[4])

no_control = AvgA(no_control_policy, matrix_list)
optimal = AvgA(optimal_policy, matrix_list)

print(no_control)
print(optimal)

0.4907500000000002
2.8419500000000006
