In [13]:
# Xvfb (X virtual framebuffer) itself is a display server that performs all graphical operations in virtual memory, emulating a screen.
!apt-get install -y xvfb x11-utils

!pip install pyvirtualdisplay
!pip install moviepy
import gymnasium as gym
import random
import time
from pyvirtualdisplay import Display
from IPython.display import clear_output

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
x11-utils is already the newest version (7.7+5build2).
xvfb is already the newest version (2:21.1.4-2ubuntu1.7~22.04.16).
0 upgraded, 0 newly installed, 0 to remove and 1 not upgraded.


In [14]:
!pip install gymnasium numpy
# Optional: Install dependencies for video recording in Colab
!apt-get install -y xvfb x11-utils
!pip install pyvirtualdisplay==0.2.*

import gymnasium as gym
import numpy as np
import random
from pyvirtualdisplay import Display
import os

# Start virtual display
display = Display(visible=False, size=(1400, 900))
_ = display.start()

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
x11-utils is already the newest version (7.7+5build2).
xvfb is already the newest version (2:21.1.4-2ubuntu1.7~22.04.16).
0 upgraded, 0 newly installed, 0 to remove and 1 not upgraded.


In [15]:
class PartiallyObservableGridEnv(gym.Env):
    def __init__(self, maze_size=(10, 10), observation_window_size=3):
        super().__init__()
        self.maze_size = maze_size
        self.window_size = observation_window_size
        self.agent_pos = None
        self.goal_pos = (maze_size[0] - 1, maze_size[1] - 1)

        # Define Action space: 0: up, 1: down, 2: left, 3: right
        self.action_space = gym.spaces.Discrete(4)

        # Define the observation space as a local window
        # The observation will be a 2D array representing the local area
        self.observation_space = gym.spaces.Box(low=0, high=1,
                                                shape=(self.window_size, self.window_size),
                                                dtype=int)

    def reset(self, seed=None, options=None):
        super().reset(seed=seed)
        # Randomize start position
        self.agent_pos = (random.randint(0, self.maze_size[0] - 1),
                          random.randint(0, self.maze_size[1] - 1))
        # Ensure agent doesn't start at the goal
        while self.agent_pos == self.goal_pos:
            self.agent_pos = (random.randint(0, self.maze_size[0] - 1),
                              random.randint(0, self.maze_size[1] - 1))

        observation = self._get_observation()
        info = {}
        return observation, info

    def step(self, action):
        # Define movements
        x, y = self.agent_pos
        if action == 0: # Up
            x = max(0, x - 1)
        elif action == 1: # Down
            x = min(self.maze_size[0] - 1, x + 1)
        elif action == 2: # Left
            y = max(0, y - 1)
        elif action == 3: # Right
            y = min(self.maze_size[1] - 1, y + 1)

        self.agent_pos = (x, y)

        # Check if goal is reached
        terminated = (self.agent_pos == self.goal_pos)
        reward = 1.0 if terminated else 0.0

        observation = self._get_observation()
        truncated = False # No episode length limit in this example
        info = {}

        return observation, reward, terminated, truncated, info

    def _get_observation(self):
        # Extract the local window around the agent
        obs = np.zeros((self.window_size, self.window_size), dtype=int)
        half_window = self.window_size // 2

        for i in range(self.window_size):
            for j in range(self.window_size):
                env_x = self.agent_pos[0] + i - half_window
                env_y = self.agent_pos[1] + j - half_window

                # Check bounds and mark goal in observation if visible
                if (env_x, env_y) == self.goal_pos:
                    obs[i, j] = 1 # 1 indicates the goal is in this spot
                # Edges can be implicitly handled as 0 (empty space)
        return obs

    def render(self):
        # In a real scenario, this would visualize the full state or observation
        print(f"Agent Pos: {self.agent_pos}, Goal Pos: {self.goal_pos}")
        print(f"Current Observation Window:\n{self._get_observation()}")

In [16]:
# Create an instance of the custom environment
env = PartiallyObservableGridEnv(maze_size=(10, 10), observation_window_size=3)

# Run a sample episode with random actions
observation, info = env.reset()
terminated = False
truncated = False
total_reward = 0

while not terminated and not truncated:
    env.render() # See the agent's limited view
    action = env.action_space.sample() # Replace with your RL agent's policy
    observation, reward, terminated, truncated, info = env.step(action)
    total_reward += reward
    print(f"Action taken: {action}, Reward received: {reward}\n")

print(f"Episode finished. Total Reward: {total_reward}")
env.close()

Agent Pos: (2, 4), Goal Pos: (9, 9)
Current Observation Window:
[[0 0 0]
 [0 0 0]
 [0 0 0]]
Action taken: 3, Reward received: 0.0

Agent Pos: (2, 5), Goal Pos: (9, 9)
Current Observation Window:
[[0 0 0]
 [0 0 0]
 [0 0 0]]
Action taken: 3, Reward received: 0.0

Agent Pos: (2, 6), Goal Pos: (9, 9)
Current Observation Window:
[[0 0 0]
 [0 0 0]
 [0 0 0]]
Action taken: 1, Reward received: 0.0

Agent Pos: (3, 6), Goal Pos: (9, 9)
Current Observation Window:
[[0 0 0]
 [0 0 0]
 [0 0 0]]
Action taken: 2, Reward received: 0.0

Agent Pos: (3, 5), Goal Pos: (9, 9)
Current Observation Window:
[[0 0 0]
 [0 0 0]
 [0 0 0]]
Action taken: 1, Reward received: 0.0

Agent Pos: (4, 5), Goal Pos: (9, 9)
Current Observation Window:
[[0 0 0]
 [0 0 0]
 [0 0 0]]
Action taken: 1, Reward received: 0.0

Agent Pos: (5, 5), Goal Pos: (9, 9)
Current Observation Window:
[[0 0 0]
 [0 0 0]
 [0 0 0]]
Action taken: 3, Reward received: 0.0

Agent Pos: (5, 6), Goal Pos: (9, 9)
Current Observation Window:
[[0 0 0]
 [0 0 0]
 