<a href="https://colab.research.google.com/github/mathrhino/reinforcement/blob/main/reinforcement_MDP.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [14]:
import sys; sys.path.append('..')
import io
import numpy as np
import sys
from gym.envs.toy_text import discrete
from copy import deepcopy as dc

In [17]:
UP = 0
RIGHT = 1
DOWN = 2
LEFT = 3


class GridworldEnv(discrete.DiscreteEnv):
    """
    Grid World environment from Sutton's Reinforcement Learning book chapter 4.
    You are an agent on an MxN grid and your goal is to reach the terminal
    state at the top left or the bottom right corner.
    For example, a 4x4 grid looks as follows:
    T  o  o  o
    o  x  o  o
    o  o  o  o
    o  o  o  T
    x is your position and T are the two terminal states.
    You can take actions in each direction (UP=0, RIGHT=1, DOWN=2, LEFT=3).
    Actions going off the edge leave you in your current state.
    You receive a reward of -1 at each step until you reach a terminal state.
    """

    metadata = {'render.modes': ['human', 'ansi']}

    def __init__(self, shape=[4, 4]):
        if not isinstance(shape, (list, tuple)) or not len(shape) == 2:
            raise ValueError('shape argument must be a list/tuple of length 2')

        self.shape = shape

        nS = np.prod(shape)
        nA = 4

        MAX_Y = shape[0]
        MAX_X = shape[1]

        P = {}
        grid = np.arange(nS).reshape(shape)
        it = np.nditer(grid, flags=['multi_index'])

        while not it.finished:
            s = it.iterindex
            y, x = it.multi_index

            # P[s][a] = (prob, next_state, reward, is_done)
            P[s] = {a: [] for a in range(nA)}

            def is_done(s): return s == 0 or s == (nS - 1)
            reward = 0.0 if is_done(s) else -1.0

            # We're stuck in a terminal state
            if is_done(s):
                P[s][UP] = [(1.0, s, reward, True)]
                P[s][RIGHT] = [(1.0, s, reward, True)]
                P[s][DOWN] = [(1.0, s, reward, True)]
                P[s][LEFT] = [(1.0, s, reward, True)]
            # Not a terminal state
            else:
                ns_up = s if y == 0 else s - MAX_X
                ns_right = s if x == (MAX_X - 1) else s + 1
                ns_down = s if y == (MAX_Y - 1) else s + MAX_X
                ns_left = s if x == 0 else s - 1
                P[s][UP] = [(1.0, ns_up, reward, is_done(ns_up))]
                P[s][RIGHT] = [(1.0, ns_right, reward, is_done(ns_right))]
                P[s][DOWN] = [(1.0, ns_down, reward, is_done(ns_down))]
                P[s][LEFT] = [(1.0, ns_left, reward, is_done(ns_left))]

            it.iternext()

        # Initial state distribution is uniform
        isd = np.ones(nS) / nS

        # We expose the model of the environment for educational purposes
        # This should not be used in any model-free learning algorithm
        self.P = P

        # Prepare state transition tensor and reward tensor
        self.P_tensor = np.zeros(shape=(nA, nS, nS))
        self.R_tensor = np.zeros(shape=(nS, nA))

        for s in self.P.keys():
            for a in self.P[s].keys():
                p_sa, s_prime, r, done = self.P[s][a][0]
                self.P_tensor[a, s, s_prime] = p_sa
                self.R_tensor[s, a] = r

        super(GridworldEnv, self).__init__(nS, nA, P, isd)
    
    def observe(self):
        return dc(self.s)
    
    def _render(self, mode='human', close=False):
        """ Renders the current gridworld layout
         For example, a 4x4 grid with the mode="human" looks like:
            T  o  o  o
            o  x  o  o
            o  o  o  o
            o  o  o  T
        where x is your position and T are the two terminal states.
        """
        if close:
            return

        outfile = io.StringIO() if mode == 'ansi' else sys.stdout
        grid = np.arange(self.nS).reshape(self.shape)
        it = np.nditer(grid, flags=['multi_index'])

        outfile.write('==' * self.shape[1] + '==\n')

        while not it.finished:
            s = it.iterindex
            y, x = it.multi_index

            if self.s == s:
                output = " x "
            elif s == 0 or s == self.nS - 1:
                output = " T "
            else:
                output = " o "

            if x == 0:
                output = output.lstrip()
            if x == self.shape[1] - 1:
                output = output.rstrip()

            outfile.write(output)

            if x == self.shape[1] - 1:
                outfile.write("\n")

            it.iternext()

        outfile.write('==' * self.shape[1] + '==\n')

In [19]:
num_y, num_x = 4, 4

#4*4 그리드워드(T: 종착점,x: 현재 위치, o는 다른 환경의 점) 
env = GridworldEnv(shape=[num_y, num_x])

In [25]:
observation_space = env.observation_space
action_space = env.action_space
#check number of states and actions 
print("number of states: {}", format(observation_space))
print("number of actions: {}", format(action_space))

number of states: {} Discrete(16)
number of actions: {} Discrete(4)


In [27]:
num_row=2
num_col=2

rank2_tensor = np.random.random(size=(num_row,num_col))

In [28]:
print(rank2_tensor)

[[0.03372781 0.85223185]
 [0.18153947 0.37649096]]


In [30]:
tensor_shape=rank2_tensor.shape
tensor_rank=len(tensor_shape)
print("Tensor shape: {}". format(tensor_shape))
print("Tensor rank: {}". format(tensor_rank))

Tensor shape: (2, 2)
Tensor rank: 2


In [31]:
P = env.P_tensor
print("P shape : {}" . format(P.shape))

P shape : (4, 16, 16)


In [32]:
action_up_prob = P[0, : , :]

In [33]:
print(action_up_prob)

[[1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1.]]


In [34]:
#모든 값이 0보다 같거나 큰 지 확인.
action_up_prob>=0

array([[ True,  True,  True,  True,  True,  True,  True,  True,  True,
         True,  True,  True,  True,  True,  True,  True],
       [ True,  True,  True,  True,  True,  True,  True,  True,  True,
         True,  True,  True,  True,  True,  True,  True],
       [ True,  True,  True,  True,  True,  True,  True,  True,  True,
         True,  True,  True,  True,  True,  True,  True],
       [ True,  True,  True,  True,  True,  True,  True,  True,  True,
         True,  True,  True,  True,  True,  True,  True],
       [ True,  True,  True,  True,  True,  True,  True,  True,  True,
         True,  True,  True,  True,  True,  True,  True],
       [ True,  True,  True,  True,  True,  True,  True,  True,  True,
         True,  True,  True,  True,  True,  True,  True],
       [ True,  True,  True,  True,  True,  True,  True,  True,  True,
         True,  True,  True,  True,  True,  True,  True],
       [ True,  True,  True,  True,  True,  True,  True,  True,  True,
         True,  True,  Tru

In [36]:
is_greater_than_0 = action_up_prob >=0
is_all_greater_than_0 = is_greater_than_0.sum() == is_greater_than_0.size

In [37]:
#true가 나오면 모든 size안의 값이 다 0보다 같거나 크다.
is_all_greater_than_0

True

In [38]:
# 가로 합이 1인지 확인
action_up_prob.sum(axis=1)

array([1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.])

In [39]:
action_up_prob.sum(axis=1) 

array([1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.])

In [40]:
#보상함수 행렬, 가로축은 행동 세로축은 상태, 이동 할 때 -1의 보상 
R = env.R_tensor
print(R)

[[ 0.  0.  0.  0.]
 [-1. -1. -1. -1.]
 [-1. -1. -1. -1.]
 [-1. -1. -1. -1.]
 [-1. -1. -1. -1.]
 [-1. -1. -1. -1.]
 [-1. -1. -1. -1.]
 [-1. -1. -1. -1.]
 [-1. -1. -1. -1.]
 [-1. -1. -1. -1.]
 [-1. -1. -1. -1.]
 [-1. -1. -1. -1.]
 [-1. -1. -1. -1.]
 [-1. -1. -1. -1.]
 [-1. -1. -1. -1.]
 [ 0.  0.  0.  0.]]


In [41]:
_ = env.reset()
print("Current position index : {}". format(env.s))

Current position index : 4


In [42]:
action_mapper = {
    0 : 'UP',
    1 : 'RIGHT',
    2: 'DOWN',
    3: 'LEFT'
}

In [43]:
step_counter =0
while True:
  print('At t = {}'. format(step_counter))
  env._render()

  cur_state = env.s
  action = np.random.randint(low=0, high=4)
  next_state, reward, done, info = env.step(action)

  print('state : {}'.format(cur_state))
  print('action: {}'.format(action_mapper[action]))
  print('reward: {}'. format(reward))
  print("next state: {} \n".format(next_state))
  step_counter +=1
  if done:
    break

At t = 0
T  o  o  o
x  o  o  o
o  o  o  o
o  o  o  T
state : 4
action: RIGHT
reward: -1.0
next state: 5 

At t = 1
T  o  o  o
o  x  o  o
o  o  o  o
o  o  o  T
state : 5
action: DOWN
reward: -1.0
next state: 9 

At t = 2
T  o  o  o
o  o  o  o
o  x  o  o
o  o  o  T
state : 9
action: DOWN
reward: -1.0
next state: 13 

At t = 3
T  o  o  o
o  o  o  o
o  o  o  o
o  x  o  T
state : 13
action: DOWN
reward: -1.0
next state: 13 

At t = 4
T  o  o  o
o  o  o  o
o  o  o  o
o  x  o  T
state : 13
action: LEFT
reward: -1.0
next state: 12 

At t = 5
T  o  o  o
o  o  o  o
o  o  o  o
x  o  o  T
state : 12
action: DOWN
reward: -1.0
next state: 12 

At t = 6
T  o  o  o
o  o  o  o
o  o  o  o
x  o  o  T
state : 12
action: DOWN
reward: -1.0
next state: 12 

At t = 7
T  o  o  o
o  o  o  o
o  o  o  o
x  o  o  T
state : 12
action: LEFT
reward: -1.0
next state: 12 

At t = 8
T  o  o  o
o  o  o  o
o  o  o  o
x  o  o  T
state : 12
action: LEFT
reward: -1.0
next state: 12 

At t = 9
T  o  o  o
o  o  o  o
o  o  o  o

In [45]:
#에피소드 여러 번 시뮬레이션
def run_episode(env, s0):
  _ =env.reset()
  env.s = s0

  step_counter =0
  while True:
    action = np.random.randint(low=0, high=4)
    next_state, reward, done, info =env.step(action)
    
    step_counter +=1
    if done:
      break
  return step_counter

In [46]:
n_episodes =10
s0 = 6

for i in range(n_episodes):
  len_ep=run_episode(env, s0)
  print("Episode {} | Length of episode : {}". format(i, len_ep))

Episode 0 | Length of episode : 3
Episode 1 | Length of episode : 19
Episode 2 | Length of episode : 15
Episode 3 | Length of episode : 10
Episode 4 | Length of episode : 6
Episode 5 | Length of episode : 17
Episode 6 | Length of episode : 5
Episode 7 | Length of episode : 38
Episode 8 | Length of episode : 21
Episode 9 | Length of episode : 28
