In [47]:

import gymnasium as gym
import numpy as np
import pygame as pg
from collections import deque
from gym.spaces import Box,Discrete
from collections import defaultdict
from gridworld.modules import Agent, Wall, Goal, State, Hole, Block
import numpy as np
import matplotlib.pyplot as plt
import torch
import os.path as osp
import random
from typing import Dict,List
import gym.spaces as spaces
import hydra
import numpy as np
from typing import Tuple
import yaml
from collections import defaultdict
from typing import Dict, Optional

from hydra.utils import instantiate as hydra_instantiate
from omegaconf import DictConfig, OmegaConf
from rl_utils.common import (Evaluator, compress_dict, get_size_for_space,
                             set_seed)
from rl_utils.envs import create_vectorized_envs
from rl_utils.logging import Logger


import numpy as np
from gym.utils import seeding
from gym.envs.registration import register

ModuleNotFoundError: No module named 'gridworld.modules'

In [None]:
class GridWorld(gym.Env):
    def __init__(self, grid_size=(4, ), start_position=(0, 0), goal_position=(4, 4), obstacles=None):
        super(GridWorld, self).__init__()
        
        # Initialize environment parameters
        self.grid_size = grid_size
        self.start_position = start_position
        self.goal_position = goal_position
        self.obstacles = obstacles if obstacles else []
        
        # Define action and observation spaces
        # Actions: 0 = Up, 1 = Right, 2 = Down, 3 = Left
        self.action_space = spaces.Discrete(4)
        # Observation: Agent's current position in the grid (x, y)
        self.observation_space = spaces.Box(low=0, high=max(grid_size)-1, shape=(2,), dtype=np.int32)
        
        # Initialize agent's position
        self.state = np.array(self.start_position)
        
        # Initialize the random seed
        self.seed()

    def seed(self, seed=None):
        # Seed the environment's random number generator
        self.np_random, seed = seeding.np_random(seed)
        return [seed]

    def reset(self):
        # Reset the agent's position to the start
        self.state = np.array(self.start_position)
        return self.state

    def step(self, action):
        # Define movement directions
        movement = {
            0: (-1, 0),  # Up
            1: (0, 1),   # Right
            2: (1, 0),   # Down
            3: (0, -1)   # Left
        }
        
        # Calculate the new position
        new_position = self.state + np.array(movement[action])
        
        # Check if the new position is within the grid bounds
        if (0 <= new_position[0] < self.grid_size[0]) and (0 <= new_position[1] < self.grid_size[1]):
            # Check if the new position is not an obstacle
            if tuple(new_position) not in self.obstacles:
                self.state = new_position
        
        # Check if the agent has reached the goal
        done = np.array_equal(self.state, self.goal_position)
        reward = 1 if done else -0.1  # Reward for reaching goal, penalty otherwise

        return self.state, reward, done, {}

    def render(self, mode='human'):
        # Render the grid
        grid = np.full(self.grid_size, ' ')
        grid[self.goal_position] = 'G'  # Goal
        for obs in self.obstacles:
            grid[obs] = 'X'  # Obstacles
        grid[tuple(self.state)] = 'A'  # Agent
        print("\n".join(["".join(row) for row in grid]))
        print()

    def close(self):
        pass

def set_seed(seed: int) -> None:
    """
    Sets the seed for numpy, python random, and pytorch.
    """
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)
    np.random.seed(seed)
    random.seed(seed)


In [48]:
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

import os
import os.path as osp
from collections import defaultdict
from typing import Dict, Optional

import gym.spaces as spaces
import hydra
import numpy as np
import torch
import torch.nn as nn
from hydra.utils import instantiate as hydra_instantiate
from omegaconf import DictConfig, OmegaConf
from rl_utils.common import (Evaluator, compress_dict, get_size_for_space,
                             set_seed)
from rl_utils.envs import create_vectorized_envs
from rl_utils.logging import Logger

from imitation_learning.policy_opt.policy import Policy
from imitation_learning.policy_opt.ppo import PPO
from imitation_learning.policy_opt.storage import RolloutStorage

ModuleNotFoundError: No module named 'torchrl'

In [55]:
cfg = yaml.load(open("/Users/williamhuang/Documents/Projects/Tobin 2024/Code/code/bc-irl-main/imitation_learning/config/default.yaml", 'r'), Loader=yaml.SafeLoader)
cfg = DictConfig(cfg)

In [56]:
import gymnasium as gym
import numpy as np
import torch
import random
import yaml
from typing import List, Tuple, Dict
from gymnasium.core import Env
from omegaconf import DictConfig
import gym
import gym_minigrid as minigrid

# Define the vectorized environment class
class VectorizedEnv:
    def __init__(self, envs: List[Env]):
        self.envs = envs
        self.num_envs = len(self.envs)
        self.observation_space = self.envs[0].observation_space
        self.action_space = self.envs[0].action_space

    def reset(self):
        observations = [env.reset()[0] for env in self.envs]
        return torch.tensor(observations, dtype=torch.float32)

    def step(self, actions) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor, List[Dict]]:
        steps = [env.step(actions[i]) for i, env in enumerate(self.envs)]
        observations = torch.tensor([step[0] for step in steps], dtype=torch.float32)
        rewards = torch.tensor([step[1] for step in steps], dtype=torch.float32)
        dones = torch.tensor([step[2] for step in steps], dtype=torch.bool)
        infos = [step[3] for step in steps]
        return observations, rewards, dones, infos

# Function to set random seeds for reproducibility
def set_seed(seed: int) -> None:
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)

# Load configuration from YAML file
with open("bc-irl-mouse.yaml", 'r') as file:
    cfg_dict = yaml.safe_load(file)
cfg = DictConfig(cfg_dict)

# Set seed and device
set_seed(cfg.seed)
device = torch.device(cfg.device)

# Function to create a single environment instance
def make_env(seed=None):
    def _init():
        env = gym.make("MiniGrid-Empty-5x5-v0")
        return env
    return _init

# Create a list of environment instances with unique seeds
num_envs = cfg.num_envs
envs = [make_env(cfg.seed + i)() for i in range(num_envs)]

# Instantiate the vectorized environment
vec_env = VectorizedEnv(envs)


In [57]:
from rl_utils.common import (Evaluator, compress_dict, get_size_for_space,
                             set_seed)
import gym.spaces as spaces

steps_per_update = cfg.num_steps * cfg.num_envs
num_updates = int(cfg.num_env_steps) // steps_per_update

cfg.obs_shape = vec_env.observation_space['image'].shape
cfg.action_dim = get_size_for_space(vec_env.action_space)
cfg.action_is_discrete = isinstance(cfg.action_dim, spaces.Discrete)
cfg.total_num_updates = num_updates

In [58]:
print(vec_env.observation_space)

Dict('direction': Discrete(4), 'image': Box(0, 255, (7, 7, 3), uint8), 'mission': MissionSpace(<function EmptyEnv.__init__.<locals>.<lambda> at 0x12afdd790>, None))


In [59]:
print(vec_env.observation_space['image'].shape)
print(vec_env.action_space)

(7, 7, 3)
Discrete(7)


In [60]:
from imitation_learning.policy_opt.policy import Policy
from imitation_learning.policy_opt.ppo import PPO
from imitation_learning.policy_opt.storage import RolloutStorage

logger: Logger = hydra_instantiate(cfg.logger, full_cfg=cfg)

storage: RolloutStorage = hydra_instantiate(cfg.storage, device=device)
policy: Policy = hydra_instantiate(cfg.policy)
policy = policy.to(device)

Assigning full prefix 118-3-d6uqDM


InstantiationException: Error in call to target 'imitation_learning.policy_opt.policy.Policy':
TypeError("__init__() got an unexpected keyword argument 'num_envs'")
full_key: policy