In [1]:
import numpy as np

In [29]:
arr = np.ones((6, 6, 6))

print([a[0][b] for a in arr for b in range(2)])

[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0]


In [2]:
class CustomCombinedExtractor(BaseFeaturesExtractor):
    def __init__(self, observation_space: spaces.Dict):
        # We do not know features-dim here before going over all the items,
        # so put something dummy for now. PyTorch requires calling
        # nn.Module.__init__ before adding modules
        super().__init__(observation_space, features_dim=68)

        extractors = {}

        total_concat_size = 0
        # We need to know size of the output of this extractor,
        # so go over all the spaces and compute output feature sizes
        for key, subspace in observation_space.spaces.items():
            if key == "grid":
                # We will just downsample one channel of the image by 4x4 and flatten.
                # Assume the image is single-channel (subspace.shape[0] == 0)
                extractors[key] = nn.Sequential(
            nn.Conv2d(subspace.shape[0] * subspace.shape[1], 32, kernel_size=6, stride=2, padding=0),
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=4, stride=2, padding=0),
            nn.ReLU(),
            nn.Conv2d(64, 64, kernel_size=2, stride=1, padding=0),
            nn.ReLU(),
            nn.Flatten(),
        )
                
                total_concat_size += (subspace.shape[0] // 4 - 3) * (subspace.shape[1] // 4 - 3)
            else:
                # Run through a simple MLP
                extractors[key] = nn.Linear(subspace.shape[0], 16)
                total_concat_size += 16
        
        # Now concatenate the obtained features
        self.extractors = nn.ModuleDict(extractors)
        # Update the features dim manually
        self._features_dim = total_concat_size

    def forward(self, observations) -> th.Tensor:
        encoded_tensor_list = []

        # self.extractors contain nn.Modules that do all the processing.
        for key, extractor in self.extractors.items():
            print(observations)
            print(key)
            print(observations[key].shape)
            print(extractor)
            encoded_tensor_list.append(extractor(observations[key]))
        # Return a (B, self._features_dim) PyTorch tensor, where B is batch dimension.
        return th.cat(encoded_tensor_list, dim=1)

In [3]:

    def __init__(self, len, render_mode = True):
        self.render_mode = render_mode
        self.len = len
        if render_mode == True:
            pygame.init()
            self.screen = pygame.display.set_mode((720, 720))      
        # Actions we can up down left right
        self.action_space = Discrete(4)
        # 0=visited, 1=notvisited
        #gym.spaces.space.Space.seed(1)

        
        self.observation_space = Dict({'grid': Box(low=0, high=1, shape=(self.len,self.len), dtype=np.uint8), 'loc': MultiDiscrete([self.len, self.len]), 'bounds': MultiDiscrete([self.len, self.len, self.len, self.len])})
        
        # Set length
        self.direct = [[-1, 0], [0, 1], [1, 0], [0, -1]]
        
    def step(self, action):
        reward = 0
        arr = self.state['grid']
        pos = self.state['loc']
        # Apply action (no walls for now)
        x = pos[0] + self.direct[action][0]
        y = pos[1] + self.direct[action][1]
        if x < 0 or x >= self.len or y < 0 or y >= self.len:
            reward = -100.0
            self.running_length -= 1 
        else:
            if arr[x][y] == 1:
                reward = 5.0
            elif arr[x][y] == 0:
                reward = 0.1
                self.running_length -= 1 
            arr[pos[0]][pos[1]] = 0
            pos[0] = x
            pos[1] = y

        # Reduce shower length by 1 second
        
        # Check if shower is done
        if self.running_length <= 0: 
            done = True
        else:
            done = False
        if self.finished():
            reward = self.len * self.len * 6
            done = True
        # Set placeholder for info
        info = {}
        
        # Return step information
        return (self.state, reward, done, False, info)

    def leastDist(self, x, y):
        ret = 999999999
        arr = self.state['grid']
        for i in range(self.len):
            for j in range(self.len):
                if arr[i][j] == 1:
                    ret = min(ret, abs(i - x) + abs(j - y))
        return ret

    def finished(self):
        arr = self.state['grid']
        for i in range(self.len):
            for j in range(self.len):
                if arr[i][j] == 1:
                    return False
        return True
    
    def render(self):
        # Implement viz
        if self.render_mode == True:
            pygame.event.get()
            gap = 720//self.len
            arr = self.state['grid']
            pos = self.state['loc']
            for i in range(0, 720, gap):
                for j in range(0, 720, gap):
                    if arr[i//gap][j//gap] == 0:
                        color = (255, 255, 255)
                    elif arr[i//gap][j//gap] == 1:
                        color = (255, 255,0)
                    if i//gap == pos[0] and j//gap == pos[1]:
                        color = (255, 0, 0)
                    pygame.draw.rect(self.screen, color, (j, i, gap, gap))
            pygame.display.update()
    
    
    def reset(self, seed=None, options=None):
        # Set starting state
        self.state = self.observation_space.sample()
        image = self.state['grid']
        position = self.state['loc']
        bounds = self.state['bounds']
        for i in range(self.len):
            for j in range(self.len):
                image[i][j] = 1
        image[0][0] = 0
        position[0] = position[1] = 0
        # Set length
        self.running_length = (self.len)*(self.len)//2
        bounds[0] = bounds[1] = 0
        bounds[2] = bounds[3] = self.len-1
        return self.state, {}
    
    def close(self):
        pass

In [4]:
env = MowerEnv(36, True)
env.reset()

(OrderedDict([('bounds', array([ 0,  0, 35, 35], dtype=int64)),
              ('grid',
               array([[0, 1, 1, ..., 1, 1, 1],
                      [1, 1, 1, ..., 1, 1, 1],
                      [1, 1, 1, ..., 1, 1, 1],
                      ...,
                      [1, 1, 1, ..., 1, 1, 1],
                      [1, 1, 1, ..., 1, 1, 1],
                      [1, 1, 1, ..., 1, 1, 1]], dtype=uint8)),
              ('loc', array([0, 0], dtype=int64))]),
 {})

In [12]:
policy_kwargs = dict(
        features_extractor_class=CustomCombinedExtractor,
        features_extractor_kwargs=dict(features_dim=128))

In [13]:
model = PPO("MultiInputPolicy", env, gamma=0.999, ent_coef=0.0001,verbose = 1, seed = 112, policy_kwargs=policy_kwargs)

Using cuda device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.


TypeError: CustomCombinedExtractor.__init__() got an unexpected keyword argument 'grid'

In [8]:
model.learn(total_timesteps=1500000,  reset_num_timesteps=True)

{'bounds': tensor([[1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
         0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
         1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
         0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
         0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
         0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 1.,
         0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
         0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 1.]],
       device='cuda:0'), 'grid': tensor([[[0., 1., 1.,  ..., 1., 1., 1.],
         [1., 1., 1.,  ..., 1., 1., 1.],
         [1., 1., 1.,  ..., 1., 1., 1.],
         ...,
         [1., 1., 1.,  ..., 1., 1., 1.],
         [1., 1., 1.,  ..., 1., 1., 1.],
         [1., 1., 1.,  ..., 1., 1., 1.]]], device='cuda:0'), 'loc': tensor([[1., 0., 0.

RuntimeError: mat1 and mat2 shapes cannot be multiplied (1x144 and 4x16)