In [1]:
import numpy as np
from model import NSFrozenLake
from amalearn.agent import AgentBase

In [24]:
#%% allowed actions
LEFT = 0
DOWN = 1
RIGHT = 2
UP = 3

ACTIONS = [LEFT, DOWN, RIGHT, UP]

#%% hyperparameters
REPS = 20
EPISODES = 100
EPSILON = 0.1
LEARNING_RATE = 0.1
DISCOUNT = 0.9
STUDENT_NUM = 810896059
THETA = 0.1

In [3]:
environment = NSFrozenLake(studentNum=STUDENT_NUM)
environment.reset()

(0, 0)

In [4]:
print("you can see the environment in each step by render command :")
environment.render()

you can see the environment in each step by render command :

------------------------------
| [44m0.000[0m | 0.001 | 0.311 | 0.248 | 
------------------------------
| 0.510 | 0.001 | 0.001 | 0.001 | 
------------------------------
| 0.630 | 0.652 | 0.681 | 0.001 | 
------------------------------
| 0.674 | 0.607 | 0.645 | 0.000 | 
------------------------------


In [5]:
print("\n\nand this is the bare map for debugging :")
print(environment.map)



and this is the bare map for debugging :
[[0.         0.001      0.31141408 0.24825232]
 [0.51044688 0.001      0.001      0.001     ]
 [0.63032472 0.6521101  0.68109729 0.001     ]
 [0.67382383 0.60654236 0.64488509 0.        ]]


### get environment informations

In [6]:
temp = []
env_info = []
for s0 in range(4):
    for s1 in range(4):
        dict = {}
        for action in ACTIONS:
            states, probs, fail_probs, dones = environment.possible_consequences(action=action ,state_now=(s0,s1))
            dict["states"] = states
            dict["probs"] = probs
            dict["fail_probs"] = fail_probs
            dict["dones"] = dones
            temp.append(dict)
    env_info.append(temp)

In [7]:
env_info[0][0]

{'states': [(0, 0), (0, 1), (1, 0)],
 'probs': array([0.95 , 0.025, 0.025]),
 'fail_probs': array([0.        , 0.001     , 0.51044688]),
 'dones': array([False, False, False])}

In [29]:
class Q1Agent(AgentBase):
    def __init__(self, id, environment):
        super(Q1Agent, self).__init__(id, environment)
        self.V = np.zeros((4, 4))
        self.policy = np.zeros((4, 4))

    def Q_value(self, s0, s1):
        q_values = []
        for a in ACTIONS:
            next_states, probs, fail_probs, dones = self.environment.possible_consequences(action=a, state_now=(s0, s1))
            q_value = 0
            for i,s in enumerate(next_states):
                r = -1
                if dones[i]:
                    r += 50
                q_value +=  probs[i] * (fail_probs[i]*(r-10) + ((1-fail_probs[i])*r +  DISCOUNT*(self.V[s])))
            q_values.append(q_value)
        return q_values


    def optimal_policy(self):
        for s0 in range(4):
            for s1 in range(4):
                max_q = 0
                max_a = -1
                for a in ACTIONS:
                    next_states, probs, fail_probs, dones = self.environment.possible_consequences(action=a, state_now=(s0, s1))
                    q_value = 0
                    for i, s in enumerate(next_states):
                        r = -1
                        if dones[i]:
                            r += 50
                        q_value +=  probs[i] * (fail_probs[i]*(r-10) + ((1-fail_probs[i])*r +  DISCOUNT*(self.V[s])))
                    if q_value > max_q:
                        max_q = q_value
                        max_a = a
                self.policy[s0][s1] = max_a

                
    def value_iteration(self):
        iter=0
        while True:
            iter+=1
#             print(iter)
            delta = 0
            for s0 in range(4):
                for s1 in range(4):
                    v = self.V[s0][s1]
                    self.V[s0][s1] = max(self.Q_value(s0, s1))
                    delta = max(delta, abs(v - self.V[s0][s1]))
            if delta < THETA:
                break
        self.optimal_policy()
        print(self.policy)

    def select_action(self, s0, s1):
        selected_action = self.policy[s0][s1]
        return selected_action

    def take_action(self, s0, s1) -> (object, float, bool, object):
        action = self.select_action(s0, s1)
        obs, r, d, i = self.environment.step(action)
        self.environment.render()
        return obs, r, d, i

In [30]:
environment = NSFrozenLake(studentNum=STUDENT_NUM)
agent = Q1Agent("1", environment)
current_state = environment.reset()
agent.value_iteration()
done = 0
while done == 0:
    current_state, reward, done, information = agent.take_action(current_state[0], current_state[1])
#     print(agent.environment.render())

[[2. 1. 1. 1.]
 [2. 2. 2. 1.]
 [2. 2. 2. 1.]
 [2. 2. 2. 1.]]

------------------------------
| 0.000 | [44m0.001[0m | 0.311 | 0.248 | 
------------------------------
| 0.510 | 0.001 | 0.001 | 0.001 | 
------------------------------
| 0.630 | 0.652 | 0.681 | 0.001 | 
------------------------------
| 0.674 | 0.607 | 0.645 | 0.000 | 
------------------------------

------------------------------
| 0.000 | 0.001 | 0.311 | 0.248 | 
------------------------------
| 0.510 | [44m0.001[0m | 0.001 | 0.001 | 
------------------------------
| 0.630 | 0.652 | 0.681 | 0.001 | 
------------------------------
| 0.674 | 0.607 | 0.645 | 0.000 | 
------------------------------

------------------------------
| 0.000 | 0.001 | 0.311 | 0.248 | 
------------------------------
| 0.510 | 0.001 | [44m0.001[0m | 0.001 | 
------------------------------
| 0.630 | 0.652 | 0.681 | 0.001 | 
------------------------------
| 0.674 | 0.607 | 0.645 | 0.000 | 
------------------------------

-----------------------

In [None]:
print(observation, reward, done, info)