In [10]:
from gymnasium import Env
import time
from math import gamma

import gymnasium as gym
import numpy as np
from gymnasium.envs.toy_text import frozen_lake

size = 100
seed =1

map = frozen_lake.generate_random_map(size,seed=seed,p=0.9)


class CustomGymEnv(frozen_lake.FrozenLakeEnv):
    def __init__(self,render_mode,desc = map,is_slippery = False,start_at_goal = False):
        self.desc = desc = np.asarray(desc, dtype="c")
        super().__init__(desc = self.desc,is_slippery=is_slippery,render_mode=render_mode)
        #self.desc = desc = np.asarray(desc, dtype="c")
        self.start_at_goal = start_at_goal
        self.reward_range=(-1,1)
        self.nA = 8
        self.action_space = gym.spaces.Discrete(self.nA)
        self.initial_state_distrib = np.array(desc == b"F").astype("float64").ravel()
        self.initial_state_distrib /= self.initial_state_distrib.sum()
        self.size = size
        self.s = 0
        nrow, ncol = self.nrow, self.ncol
        nS = nrow * ncol
        self.P = {s: {a: [] for a in range(self.nA)} for s in range(nS)}

        def to_s(row,col):
            return row*ncol+col


        def inc(row,col,action):
          if action == 0:  # LEFT
            col = max(col-1, 0)
          elif action == 1:  # DOWN
            row = min(row+1, self.size-1)
          elif action == 2:  # RIGHT
            col = min(col+1, self.size-1)
          elif action == 3:  # UP
            row = max(row-1, 0)
          elif action == 4:  # DOWN-LEFT
            row = min(row+1, self.size-1)
            col = max(col-1, 0)
          elif action == 5:  # DOWN-RIGHT
           row = min(row+1, self.size-1)
           col = min(col+1, self.size-1)
          elif action == 6:  # UP-LEFT
           row = max(row-1, 0)
           col = max(col-1, 0)
          elif action == 7:  # UP-RIGHT
           row = max(row-1, 0)
           col = min(col+1, self.size-1)
          return row, col
        def phi(state, goal=(99, 99)):
         return -(abs(state%size - goal[0]) + abs(state/size - goal[1]))

        def shaped_reward(reward, state, next_state, gamma=0.5):
         return reward + gamma * phi(next_state) - phi(state)
        def update_probability_matrix(row, col, action, state):
           old_state = to_s(row, col)
           new_row, new_col = inc(row, col, action)
           if (new_row, new_col) == (row, col):
             edge_penalty = -10
             return old_state, shaped_reward(edge_penalty, state, old_state), False

           new_state = to_s(new_row, new_col)
           new_letter = desc[new_row, new_col]
           terminated = bytes(new_letter) in b"GH"

           if new_letter == b"G":
            reward = float(1)
           elif new_letter == b"H":
            reward = float(-100)
           elif new_letter == b"F":
            reward = shaped_reward(1, state, new_state)
           elif new_letter == b"S":
            reward = 0
           return new_state, reward, terminated
        for row in range(nrow):
            for col in range(ncol):
                s = to_s(row, col)
                for a in range(self.nA):
                    li = self.P[s][a]
                    letter = desc[row, col]
                    if letter in b"GH":
                        li.append((1.0, *update_probability_matrix(row,col,a,s)))
                    else:
                      #this part is wrong but is not used since i  have turend off slippery since it was not converging in most case but we could assume the slippery concept works like this                        if is_slippery:
                            for b in [(a - 1) % 4, a, (a + 1) % 4]:
                                li.append(
                                    (1.0 / 3.0, *update_probability_matrix(row, col, b,s))
                                )
                        else:
                            li.append((1.0, *update_probability_matrix(row, col, a,s)))

    def step(self, a):  # Parameter is named 'a'
    # Use 'a' instead of 'action'
     transitions = self.P[self.s][a]  # Changed 'action' to 'a'

    # Choose a transition (for deterministic or stochastic)
     i = self.np_random.choice(len(transitions), p=[t[0] for t in transitions])
     p, s, r, t = transitions[i]

     self.s = s
     if(a<=3):
      self.lastaction = a
     else:
      self.lastaction = a%4

     if self.render_mode == "human":
        self.render()

     return (int(s), r, t, False, {"prob": p})
    def reset(self,seed=seed,options=None):
        Env.reset(self,seed = seed)
        start_at_goal = self.start_at_goal
        if start_at_goal:
            self.s = 0
            self.lastaction = None
        else:
            self.s = np.random.choice(np.arange(1,self.size**2))
            self.lastaction = None

        if self.render_mode == "human":
            self.render()
        return int(self.s), {"prob": 1}



gym.register("MyENV","__main__:CustomGymEnv",kwargs={'render_mode': 'ansi', 'start_at_goal': False} )
env = CustomGymEnv(render_mode = "human")
env = gym.make("MyENV",render_mode = "ansi")
state = env.reset()

action = [1,2,3,4,5,6,7,0]



In [5]:
# from collections import defaultdict
# value_state = np.zeros(size*size)
# return_state = np.zeros(size*size)
#
# env = gym.make("MyENV",render_mode = "none")
# state = env.reset()
# returns_sum = np.zeros(size*size)
# returns_count = np.zeros(size*size)
# V = np.zeros(size*size)
#
# gamma = 0.99
#
# for epochs in range(150000):
#     #random walks
#     episode_end = False
#     episode_history = []
#     current_state,_=env.reset(seed=100)
#     if(epochs % 10000 == 0):
#         print("epochs: ",epochs)
#         #print("current state: ",current_state)
#     i = 0
#     while not episode_end:
#         actionspace = range(8)
#
#         current_action = env.action_space.sample()
#         observation, reward, terminated, truncated,info = env.step(action=current_action)
#         episode_history.append((current_state,reward-i))
#         current_state = observation
#         if terminated or truncated:
#             episode_end = True
#             # if(current_state == size**2-1):
#             #     print("goal reached")
#         i+=1
#
#     G = 0
#     for state, reward in reversed(episode_history):
#         G = reward + gamma * G
#         returns_sum[state] += G
#         returns_count[state] += 1
#         nonzero_mask = returns_count != 0
#         V[nonzero_mask] = returns_sum[nonzero_mask] / returns_count[nonzero_mask]






In [12]:
Q = np.zeros([env.observation_space.n, env.action_space.n])
print(Q)
n_visits = np.zeros([env.observation_space.n, env.action_space.n])

num_episodes = 100000
epsilon = 0.8
stats = 0.0

for episode in range(num_episodes):
  state,_ = env.reset()
  done = False
  results_list = []
  result_sum = 0.0

  while not done:
    if np.random.rand() > epsilon:
      action = np.argmax(Q[state])
    else:
      action = env.action_space.sample()

    new_state, reward, done, info ,_= env.step(action)
    results_list.append((state, action))
    result_sum += reward
    state = new_state

  for (state, action) in results_list:
    n_visits[state, action] += 1.0
    alpha = 0.1
    Q[state, action] += alpha * (result_sum - Q[state, action])
  if state == size**2-1:
    stats += 1
    print("goal reached")
  if episode % 1000 == 0 and episode != 0:
    print(f"success: {stats/episode}, epsilon: {epsilon}")

print(f"success: {stats/episode}, epsilon: {epsilon}")

env.close()

[[0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 ...
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]]
success: 0.0, epsilon: 0.8
success: 0.0, epsilon: 0.8
success: 0.0, epsilon: 0.8
success: 0.0, epsilon: 0.8
success: 0.0, epsilon: 0.8
success: 0.0, epsilon: 0.8
success: 0.0, epsilon: 0.8
success: 0.0, epsilon: 0.8
success: 0.0, epsilon: 0.8
success: 0.0, epsilon: 0.8
success: 0.0, epsilon: 0.8
success: 0.0, epsilon: 0.8
success: 0.0, epsilon: 0.8
success: 0.0, epsilon: 0.8
success: 0.0, epsilon: 0.8
success: 0.0, epsilon: 0.8
success: 0.0, epsilon: 0.8
success: 0.0, epsilon: 0.8
success: 0.0, epsilon: 0.8
success: 0.0, epsilon: 0.8
success: 0.0, epsilon: 0.8
success: 0.0, epsilon: 0.8
success: 0.0, epsilon: 0.8
success: 0.0, epsilon: 0.8
success: 0.0, epsilon: 0.8
success: 0.0, epsilon: 0.8
success: 0.0, epsilon: 0.8
success: 0.0, epsilon: 0.8
success: 0.0, epsilon: 0.8
success: 0.0, epsilon: 0.8
success: 0.0, epsilon: 0.8
success

In [402]:
import numpy as np

def extract_policy(V, P, gamma=0.99):
    n_states = len(P)
    n_actions = len(P[0])
    policy = np.zeros(n_states, dtype=int)

    for s in range(n_states):
        q_values = np.zeros(n_actions)
        for a in range(n_actions):
            for (prob, next_state, reward, done) in P[s][a]:
                next_value = 0 if done else V[next_state]
                q_values[a] += prob * (reward + gamma * next_value)

        policy[s] = np.argmax(q_values)

    return policy

In [13]:
optimal_policy = np.array([np.argmax(q) for q in Q])
for s in optimal_policy:
    print(s)


4
0
4
5
2
2
2
4
3
7
7
3
1
5
1
2
3
7
2
2
4
0
0
6
5
2
1
4
4
1
5
2
6
0
0
1
1
2
5
7
2
7
1
2
4
3
6
3
6
6
6
2
7
1
7
0
2
7
4
4
0
6
7
1
3
6
4
4
0
2
7
7
2
3
2
3
4
6
4
5
6
0
6
3
7
4
2
2
1
1
2
0
7
1
2
0
1
3
6
6
1
5
5
2
7
0
6
7
0
5
2
6
1
7
7
7
7
5
6
2
0
0
0
5
5
2
1
1
5
4
1
0
6
1
5
5
7
1
7
2
7
5
6
2
0
0
0
1
3
4
7
7
5
5
7
5
4
6
4
7
6
1
4
2
6
0
7
3
3
0
2
7
0
3
0
6
4
6
5
7
7
3
0
4
3
3
3
3
5
2
7
3
3
3
2
4
2
6
7
5
5
0
7
1
1
3
5
2
1
5
7
1
7
3
3
6
6
2
1
4
0
0
5
7
5
5
2
2
4
2
6
0
7
3
4
5
3
7
7
1
3
5
6
3
0
2
6
4
6
6
7
7
3
6
6
1
5
3
3
4
1
2
1
4
0
6
5
1
4
1
3
2
6
0
0
6
3
3
7
0
3
0
1
7
1
4
1
0
3
1
7
6
3
0
7
7
2
7
6
6
0
2
7
1
2
3
0
5
1
0
4
2
7
7
3
6
6
2
2
5
6
6
4
5
2
3
3
3
4
0
0
0
5
4
4
0
5
5
5
2
5
2
3
3
0
3
4
7
6
3
4
7
3
7
2
1
7
0
7
2
0
4
0
6
2
3
0
5
2
5
3
2
7
6
0
6
0
5
5
7
4
3
4
2
2
7
3
2
7
7
2
6
6
6
1
2
3
7
6
3
7
7
7
7
0
6
7
1
3
4
0
1
1
2
3
0
0
7
2
5
3
4
4
1
2
7
7
6
4
6
6
6
4
1
0
7
2
3
5
6
7
7
3
6
6
1
6
1
3
1
6
1
2
7
7
0
4
1
4
4
0
3
6
6
5
7
2
5
6
4
3
2
6
6
4
6
6
7
7
1
0
5
1
5
7
7
0
0
7
7
2
6
1
4
4
1
2
7
1
3


In [19]:
env = gym.make("MyENV",render_mode = "human",start_at_goal = True)
state,_ = env.reset()
max_steps = 100
R=0
epsilon = 0.8
win = 0
for i in range(100):
 while True:
    if np.random.rand() > epsilon:
      action = optimal_policy[state]
    else:
      action = env.action_space.sample()
    #print(f"State {state} Action {action}")
    observation, reward, terminated, truncated,info = env.step(action)
    state = observation
    R += reward
    print(f"state: {state}")
    if terminated or truncated:
        print(state,R,i)
        if state == size**2-1:
            win += 1
        break
env.close()


state: 100
state: 0
state: 1
1 0.5 0
state: 102
state: 2
state: 1
1 98.99000000000001 1
state: 100
state: 200
state: 100
state: 100
state: 100
state: 1
1 473.98 2
state: 1
state: 102
state: 203
state: 104
state: 3
3 759.96 3
state: 104
state: 205
state: 206
state: 107
state: 8
state: 7
state: 8
state: 9
state: 9
state: 10
state: 11
state: 10
state: 11
state: 11
state: 12
state: 112
state: 212
state: 213
state: 313
state: 214
state: 113
state: 12
state: 12
state: 13
state: 13
state: 13
state: 14
state: 13
state: 113
state: 13
state: 114
state: 214
state: 114
state: 113
state: 14
state: 114
state: 14
state: 13
state: 114
state: 214
state: 113
state: 114
state: 215
state: 114
state: 14
state: 114
state: 115
state: 15
state: 14
state: 14
state: 14
state: 115
state: 16
state: 17
state: 118
state: 117
state: 217
state: 318
state: 217
state: 218
state: 119
state: 219
state: 118
state: 17
state: 117
state: 116
state: 16
state: 17
state: 18
state: 19
state: 120
state: 119
state: 120
state: 119


KeyboardInterrupt: 

In [5]:
print(map[401//100][401%100])

H
