In [3]:
!pip install torch torchvision




In [4]:
!pip install torch torchvision torchaudio -f https://download.pytorch.org/whl/cu102/torch_stable.html


Looking in links: https://download.pytorch.org/whl/cu102/torch_stable.html


In [5]:
pip install --upgrade pip


Note: you may need to restart the kernel to use updated packages.


In [6]:
pip install pygame

Note: you may need to restart the kernel to use updated packages.


In [7]:
pip install gymnasium

Note: you may need to restart the kernel to use updated packages.


In [8]:
import numpy as np
import pygame

import gymnasium as gym
from gymnasium import spaces

In [9]:

pip install stable_baselines3

Note: you may need to restart the kernel to use updated packages.


In [10]:
pip install gym

Note: you may need to restart the kernel to use updated packages.


In [11]:
class GridWorldEnv(gym.Env):
    metadata = {"render_modes": ["human", "rgb_array"], "render_fps": 4}

    def __init__(self, render_mode="human", size=5):
        self.size = size  # The size of the square grid
        self.window_size = 512  # The size of the PyGame window

        # Observations are dictionaries with the agent's and the target's location.
        # Each location is encoded as an element of {0, ..., `size`}^2, i.e. MultiDiscrete([size, size]).
        self.observation_space = spaces.Dict(
            {
                "agent": spaces.Box(0, size - 1, shape=(2,), dtype=int),
                "target": spaces.Box(0, size - 1, shape=(2,), dtype=int),
            }
        )

        # We have 4 actions, corresponding to "right", "up", "left", "down"
        self.action_space = spaces.Discrete(8,start=1)

        """
        The following dictionary maps abstract actions from `self.action_space` to
        the direction we will walk in if that action is taken.
        I.e. 0 corresponds to "right", 1 to "up" etc.
        """
        self._action_to_direction = {
            0: np.array([-1, 0]),
            1: np.array([0, -1]),
            2: np.array([1, 0]),
            3: np.array([0, 1]),
            4: np.array([1,-1]),
            5: np.array([1,1]),
            6: np.array([-1,1]),
            7: np.array([-1,-1]),
            8: np.array([0,0])
        }

        assert render_mode is None or render_mode in self.metadata["render_modes"]
        self.render_mode = render_mode

        """
        If human-rendering is used, `self.window` will be a reference
        to the window that we draw to. `self.clock` will be a clock that is used
        to ensure that the environment is rendered at the correct framerate in
        human-mode. They will remain `None` until human-mode is used for the
        first time.
        """
        self.window = None
        self.clock = None


    def step(self, action):
        # Map the action (element of {0,1,2,3}) to the direction we walk in
        direction = self._action_to_direction[action]
        self._agent_location = np.clip(
            self._agent_location + direction, 0, self.size - 1
        )
        # An episode is done iff the agent has reached the target
        terminated = np.array_equal(self._agent_location, self._target_location)
        reward = 1 if terminated else -1  # Binary sparse rewards
        observation = self._get_obs()
        info = self._get_info()

        if self.render_mode == "human":
            self._render_frame()

        return observation, reward, terminated, False, info

    def _get_obs(self):
        return {"agent": self._agent_location, "target": self._target_location}

# %%
# We can also implement a similar method for the auxiliary information
# that is returned by ``step`` and ``reset``. In our case, we would like
# to provide the manhattan distance between the agent and the target:

    def _get_info(self):
        return {
            "distance": np.linalg.norm(
                self._agent_location - self._target_location, ord=1
            )
        }


    def reset(self, seed=None, options=None):
        # We need the following line to seed self.np_random
        super().reset(seed=seed)

        # Choose the agent's location uniformly at random
        self._agent_location = self.np_random.integers(0, self.size, size=2, dtype=int)

        # We will sample the target's location randomly until it does not coincide with the agent's location
        self._target_location = self._agent_location
        while np.array_equal(self._target_location, self._agent_location):
            self._target_location = self.np_random.integers(
                0, self.size, size=2, dtype=int
            )

        observation = self._get_obs()
        info = self._get_info()

        if self.render_mode == "human":
            self._render_frame()

        return observation, info


    def render(self):
        if self.render_mode == "rgb_array":
            return self._render_frame()

    def _render_frame(self):
        if self.window is None and self.render_mode == "human":
            pygame.init()
            pygame.display.init()
            self.window = pygame.display.set_mode(
                (self.window_size, self.window_size)
            )
        if self.clock is None and self.render_mode == "human":
            self.clock = pygame.time.Clock()

        canvas = pygame.Surface((self.window_size, self.window_size))
        canvas.fill((255, 255, 255))
        pix_square_size = (
            self.window_size / self.size
        )  # The size of a single grid square in pixels

        # First we draw the target
        pygame.draw.rect(
            canvas,
            (255, 0, 0),
            pygame.Rect(
                pix_square_size * self._target_location,
                (pix_square_size, pix_square_size),
            ),
        )
        # Now we draw the agent
        pygame.draw.circle(
            canvas,
            (0, 0, 255),
            (self._agent_location + 0.5) * pix_square_size,
            pix_square_size / 3,
        )

        # Finally, add some gridlines
        for x in range(self.size + 1):
            pygame.draw.line(
                canvas,
                0,
                (0, pix_square_size * x),
                (self.window_size, pix_square_size * x),
                width=3,
            )
            pygame.draw.line(
                canvas,
                0,
                (pix_square_size * x, 0),
                (pix_square_size * x, self.window_size),
                width=3,
            )

        if self.render_mode == "human":
            # The following line copies our drawings from `canvas` to the visible window
            self.window.blit(canvas, canvas.get_rect())
            pygame.event.pump()
            pygame.display.update()

            # We need to ensure that human-rendering occurs at the predefined framerate.
            # The following line will automatically add a delay to keep the framerate stable.
            self.clock.tick(self.metadata["render_fps"])
        else:  # rgb_array
            return np.transpose(
                np.array(pygame.surfarray.pixels3d(canvas)), axes=(1, 0, 2)
            )
# In many cases, you don’t actually have to bother to
# implement this method. However, in our example ``render_mode`` may be
# ``"human"`` and we might need to close the window that has been opened:

    def close(self):
        if self.window is not None:
            pygame.display.quit()
            pygame.quit()

In [12]:
pip install shimmy

Note: you may need to restart the kernel to use updated packages.


In [13]:
env = GridWorldEnv()
dic = env.reset()
dic

({'agent': array([1, 3]), 'target': array([3, 4])}, {'distance': 3.0})

In [14]:
env.step(2)
env.step(0)

({'agent': array([1, 3]), 'target': array([3, 4])},
 -1,
 False,
 False,
 {'distance': 3.0})

In [15]:
obs, info = env.reset()
obs, info

({'agent': array([3, 4]), 'target': array([1, 1])}, {'distance': 5.0})

In [16]:
env.step(0)
env.step(1)

({'agent': array([2, 3]), 'target': array([1, 1])},
 -1,
 False,
 False,
 {'distance': 3.0})

In [17]:
# USING DQN(Deep Queue Network) RL Model

In [18]:
# import sys
# sys.stdout = sys.stdout.bufferimport gymnasium as gym
import numpy as np
from stable_baselines3 import DQN
from stable_baselines3.common.vec_env import DummyVecEnv
# env = gym.make("CartPole-v1", render_mode="human")

# Create the environment
env = GridWorldEnv()
env = DummyVecEnv([lambda: env])

# Create the DQN model
model = DQN("MultiInputPolicy", env, verbose=1)

# Train the model
model.learn(total_timesteps=15000, log_interval=4)

# Save the model
model.save("dqn_grid_world")

# Load the model
model = DQN.load("dqn_grid_world")

obs, info = env.reset()
x = input("enter random input")

Using cpu device
----------------------------------
| rollout/            |          |
|    exploration_rate | 0.329    |
| time/               |          |
|    episodes         | 4        |
|    fps              | 3        |
|    time_elapsed     | 89       |
|    total_timesteps  | 353      |
----------------------------------
----------------------------------
| rollout/            |          |
|    exploration_rate | 0.05     |
| time/               |          |
|    episodes         | 8        |
|    fps              | 3        |
|    time_elapsed     | 167      |
|    total_timesteps  | 658      |
----------------------------------
----------------------------------
| rollout/            |          |
|    exploration_rate | 0.05     |
| time/               |          |
|    episodes         | 12       |
|    fps              | 3        |
|    time_elapsed     | 277      |
|    total_timesteps  | 1091     |
----------------------------------
----------------------------------
| r

In [21]:
# Continue interacting with the environment using the trained model
env = GridWorldEnv()
obs, info = env.reset()
train_rate = 0
while True:
    action, _states = model.predict(obs, deterministic=True)
    int_action = int(action)
    obs, reward, terminated, truncated, info = env.step(int_action)
    print(obs, reward, terminated, truncated, info)
    train_rate += 1
    if terminated or truncated:
        obs, info = env.reset()
    elif(train_rate==100):
        obs, info = env.reset()
        break
        

{'agent': array([2, 2]), 'target': array([0, 4])} -1 False False {'distance': 4.0}
{'agent': array([3, 1]), 'target': array([0, 4])} -1 False False {'distance': 6.0}
{'agent': array([4, 0]), 'target': array([0, 4])} -1 False False {'distance': 8.0}
{'agent': array([4, 0]), 'target': array([0, 4])} -1 False False {'distance': 8.0}
{'agent': array([4, 0]), 'target': array([0, 4])} -1 False False {'distance': 8.0}
{'agent': array([4, 0]), 'target': array([0, 4])} -1 False False {'distance': 8.0}
{'agent': array([4, 0]), 'target': array([0, 4])} -1 False False {'distance': 8.0}
{'agent': array([4, 0]), 'target': array([0, 4])} -1 False False {'distance': 8.0}
{'agent': array([4, 0]), 'target': array([0, 4])} -1 False False {'distance': 8.0}
{'agent': array([4, 0]), 'target': array([0, 4])} -1 False False {'distance': 8.0}
{'agent': array([4, 0]), 'target': array([0, 4])} -1 False False {'distance': 8.0}
{'agent': array([4, 0]), 'target': array([0, 4])} -1 False False {'distance': 8.0}
{'ag

{'agent': array([4, 0]), 'target': array([0, 4])} -1 False False {'distance': 8.0}


In [18]:
env.close()

In [92]:
# Manual Automation by hard Code

In [None]:
pos_dict = dict(dic.__getitem__(0))
print(pos_dict)
dist_dict = dict(dic.__getitem__(1))
print(dist_dict)
from numpy import array as arr
pos_agent = arr(pos_dict.get('agent'))
pos_target = arr(pos_dict.get('target'))
print(pos_agent, pos_target)
distance_agent_target = dist_dict.get('distance')
print(int(distance_agent_target))
vector = [int(abs(pos_agent[0]-pos_target[0])),int(abs(pos_agent[1]-pos_target[1]))]
vec1 = vector[0]
vec2 = vector[1]
if(pos_target[0] != pos_agent[0]):
    if(pos_target[0]<pos_agent[0]):
        while(vec1!=0):
            env.step(0)
            vec1 -= 1
    else:
        while(vec1!=0):
            env.step(2)
            vec1 -= 1

if(pos_target[1] != pos_agent[1]):
    if(pos_target[1]<pos_agent[1]):
        while(vec2!=0):
            env.step(1)
            vec2 -= 1
    else:
        while(vec2!=0):
            env.step(3)
            vec2 -= 1

print(env._get_obs(),env._get_info())
# env.step(0)
# env.step(1)
# env.step(2)
# env.step(3)
# env.step(4)
# env.step(5)
# env.step(6)
# env.step(1)
# if(pos_target[1] != pos_agent[1]):
#     env.step(pos_target[1])
# env.render()
# env._render_frame()
import time as stark
stark.sleep(10)

In [None]:
# USING A2C RL Model
!pip install torch torchvision torchaudio

In [22]:
from stable_baselines3 import A2C
from stable_baselines3.common.env_util import make_vec_env
# Parallel environments
vec_env = env #make_vec_env(env, n_envs=4)

model = A2C("MultiInputPolicy", vec_env, verbose=1)
model.learn(total_timesteps=25000)
model.save("a2c_grid_world")

# del model # remove to demonstrate saving and loading

model = A2C.load("a2c_grid_world")

Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
------------------------------------
| rollout/              |          |
|    ep_len_mean        | 25.2     |
|    ep_rew_mean        | -23.2    |
| time/                 |          |
|    fps                | 3        |
|    iterations         | 100      |
|    time_elapsed       | 129      |
|    total_timesteps    | 500      |
| train/                |          |
|    entropy_loss       | -2       |
|    explained_variance | -0.0639  |
|    learning_rate      | 0.0007   |
|    n_updates          | 99       |
|    policy_loss        | -4.62    |
|    value_loss         | 7.88     |
------------------------------------
-------------------------------------
| rollout/              |           |
|    ep_len_mean        | 43        |
|    ep_rew_mean        | -41       |
| time/                 |           |
|    fps                | 3         |
|    iterations         | 200       |
|    time_e

In [24]:
model = A2C.load("a2c_grid_world")
obs, info = env.reset()
print(obs,info)
train_rate = 0
while True:
    action, _states = model.predict(obs,deterministic=True)
    int_action = int(action)
    obs, reward, terminated, truncated, info = env.step(int_action)
    print(obs, reward, terminated, truncated, info)
    train_rate += 1
    if terminated or truncated:
        obs, info = env.reset()
        vec_env.render("human")
    elif(train_rate==100):
        obs, info = env.reset()
        break
    

{'agent': array([2, 3]), 'target': array([0, 0])} {'distance': 5.0}
{'agent': array([3, 3]), 'target': array([0, 0])} -1 False False {'distance': 6.0}
{'agent': array([4, 3]), 'target': array([0, 0])} -1 False False {'distance': 7.0}
{'agent': array([4, 2]), 'target': array([0, 0])} -1 False False {'distance': 6.0}
{'agent': array([4, 1]), 'target': array([0, 0])} -1 False False {'distance': 5.0}
{'agent': array([4, 0]), 'target': array([0, 0])} -1 False False {'distance': 4.0}
{'agent': array([4, 0]), 'target': array([0, 0])} -1 False False {'distance': 4.0}
{'agent': array([4, 0]), 'target': array([0, 0])} -1 False False {'distance': 4.0}
{'agent': array([4, 0]), 'target': array([0, 0])} -1 False False {'distance': 4.0}
{'agent': array([4, 0]), 'target': array([0, 0])} -1 False False {'distance': 4.0}
{'agent': array([4, 0]), 'target': array([0, 0])} -1 False False {'distance': 4.0}
{'agent': array([4, 0]), 'target': array([0, 0])} -1 False False {'distance': 4.0}
{'agent': array([4,

{'agent': array([4, 0]), 'target': array([0, 0])} -1 False False {'distance': 4.0}
{'agent': array([4, 0]), 'target': array([0, 0])} -1 False False {'distance': 4.0}
