# Notes

Next thing to work out is how to return reward so rllib doesn't return nan as output. Otherwise everything seems to be working as expected!


The game is currently returning 33 steps per 1 second.

It would be interesting to be able to record a video of one learning episode of the agent - look into how to do this (maybe ask around).

### My wrapper for Light Game

In [3]:
# !pip install gym

In [1]:
import gym
import random, math
import numpy as np
import arcade
from skimage import data, color
from skimage.transform import rescale, resize, downscale_local_mean
from PIL import Image

        
# from LightEnvCopy import LightEnv
from LightEnvCopy import LightEnv

import gym.spaces
from gym.spaces import Discrete, Box

from ray.rllib.env.env_context import EnvContext
from ray.rllib.models import ModelCatalog

from collections import namedtuple

# Do the math to figure out our screen dimensions
SCREEN_WIDTH = 800
SCREEN_HEIGHT = 600
SCREEN_TITLE = "Game 1: Let There Be Light!"

SPRITE_SCALING = 0.25

# How fast the camera pans to the player. 1.0 is instant.
CAMERA_SPEED = 0.1

PLAYER_MOVEMENT_SPEED = 7
BOMB_COUNT = 5
TORCH_COUNT = 1
PLAYING_FIELD_WIDTH = 800 #1600
PLAYING_FIELD_HEIGHT = 600 #1600
REWARD_COUNT = 1 #TF - Add in reward
END_GAME = False
torch_collected = False

#TF Start - Adding in actions for action conversion

# COnvenient data structure to hold information about actions
Action = namedtuple('Action', 'name index delta_i delta_j')

up = Action('up', 0, -1, 0)    
down = Action('down', 1, 1, 0)    
left = Action('left', 2, 0, -1)    
right = Action('right', 3, 0, 1)    

index_to_actions = {}
for action in [up, down, left, right]:
    index_to_actions[action.index] = action
# print(index_to_actions[0].name)
str_to_actions = {}
for action in [up, down, left, right]:
    str_to_actions[action.name] = action
#TF End - Adding in actions for action conversion


class LightEnvWrapper(gym.Env, LightEnv):
    """Class that wraps the Lights Environment to make it 
    compatible with RLLib."""

    metadata = {"render.modes": ["rgb_array", "state_pixels"]}
    
    def __init__(self, config: EnvContext):
        super().__init__(SCREEN_WIDTH, SCREEN_HEIGHT, SCREEN_TITLE)
        self.counting = 0
        game_size = config.get("size_env")

#         self.mygame = GUIWrapper(LightEnv)
        
        self.mygame = LightEnv
        #The action space is a choice of 9 actions: U/D/L/R/UR/DR/DL/UL/DO NOTHING. Not continuous
        #because speed of agent is fixed. Or potentially just 4: U/D/L/R.
        self.action_space = Discrete(4)
        #The observation space is a fixed image of the current game screen - fully observable.
        #Can set to a view just around the player using arcade.set_viewport.
        #Need obs space to be either 42,42,x or 84,84,x to be compatible with rllib.
        self.observation_space = Box(low=0, high=255, shape=(84,84, 4), dtype=np.uint8)
        
        self.counting = 0

    def reset(self):
        print("resetting in wrapper")
        self.render(drawing=True)
        #Resets the state of the environment for a new episode and an initial observation.
        obs_mygame = self.mygame.reset(self)
        
        #Open up the resetted image to verify working correctly.
        obs_mygame.show()
        
        #Convert observation to 84x84 resolution and np array for rllib.
        obs = self.convert_observations(obs_mygame)
        
#         print("resetted")
        return obs

    def step(self, action):
        self.counting += 1
#         print("Wrapper stepping number: ", self.counting) #every 33 is a second in game time.
#         print("stepping")
        #Choose a random action: 0, 1, 2, 3.
        assert action in [0, 1, 2, 3] #0-up,1-down,2-left,3-right.
        
        #Convert the numeric action to a keyword: up, down, left, right.
        actions_myenv = index_to_actions[action].name #returns a word, one of: up/down/left/right
#         print(f"action taken: {actions_myenv}")
        
        #Update the window with on_update()
        self.render(self)
        
        #Compute observation extracted from the window (800x600), with reward and done flag.
        obs, reward, done = self.mygame.step(self,actions_myenv)
        
        if self.counting % 33 == 0:
            print("self.counting is now divisible by 33(ie. 1 second has passed), showing obs now")
            print(f"total score is {self.score} at time: {self.mygame.time_taken_reported(self)}")
            obs.show()
            
#         print(f"step reward is {reward}")
        
        #Convert observation to 84x84 resolution and np array for rllib.
        obs_mygame = self.convert_observations(obs)
        
        #If the reward has been obtained, reset the environment and start again
        if done == True:
            print(f"done is {done}, resetting environment in wrapper.")
            self.reset()
        
#         print("Finished stepping")
        return obs_mygame, reward, done, {}

    def seed(self, seed=None):
        random.seed(seed)

    def convert_observations(self, obs_mygame): #Not needed? This is just for rescaling?
        # We normalize and concatenate observations
        obs = obs_mygame
        obs_resized = obs.resize((84,84))
        obsarray = np.array(obs_resized)
#         print("sum: ", np.sum(obsarray))
        return obsarray
    
    def render(self, mode='state_pixels', drawing=False):
        self.mygame.on_update(self, 1/60)
        self.mygame.on_draw(self)
        test = self.mygame.time_taken_reported(self)
#         print("testing time taken: ", test)
        

pygame 2.1.2 (SDL 2.0.18, Python 3.6.13)
Hello from the pygame community. https://www.pygame.org/contribute.html


### Now run the rllib script to train the agent

In [2]:
# !pip install ray==1.11.0
import gym
import ray.rllib.agents.ppo.ppo as ppo


import ray
import ray.rllib.agents.dqn as dqn
from ray.tune.logger import pretty_print

config = dqn.DEFAULT_CONFIG.copy()
config["preprocessor_pref"] = "rllib"
config["framework"] = "torch"
config["dueling"] = False
config["double_q"] = False
config["env"] = LightEnvWrapper
# config["env_config"] = { "size_env": 15}
config["model"] = { "fcnet_hiddens": [64, 64],
                    "fcnet_activation": "relu",
    }

trainer = dqn.DQNTrainer(config=config)

# Can optionally call trainer.restore(path) to load a checkpoint.

avg_rewards = []

for i in range(100):
    # Perform one iteration of training the policy with PPO
    result = trainer.train()
    #print(pretty_print(result))
    print(result['episode_reward_mean'])
    avg_rewards.append(result['episode_reward_mean'])

    if i % 10 == 0:
        checkpoint = trainer.save()
        print("checkpoint saved at", checkpoint)




resetting in wrapper
resetting


  elif np.issubdtype(ret.dtype, int):
Install gputil for GPU system monitoring.


resetting in wrapper
resetting




self.counting is now divisible by 33(ie. 1 second has passed), showing obs now
total score is -1 at time: 1
self.counting is now divisible by 33(ie. 1 second has passed), showing obs now
total score is -3 at time: 1
self.counting is now divisible by 33(ie. 1 second has passed), showing obs now
total score is -3 at time: 2
self.counting is now divisible by 33(ie. 1 second has passed), showing obs now
total score is -3 at time: 2
self.counting is now divisible by 33(ie. 1 second has passed), showing obs now
total score is -3 at time: 3
self.counting is now divisible by 33(ie. 1 second has passed), showing obs now
total score is -3 at time: 3
self.counting is now divisible by 33(ie. 1 second has passed), showing obs now
total score is -3 at time: 4
self.counting is now divisible by 33(ie. 1 second has passed), showing obs now
total score is -3 at time: 4
self.counting is now divisible by 33(ie. 1 second has passed), showing obs now
total score is -3 at time: 5
self.counting is now divisib

KeyboardInterrupt: 

In [None]:
# import os
# os.environ["ARCADE_HEADLESS"] = "true"
# import arcade

# class App(arcade.Window):

#     def __init__(self):
#         super().__init__(200, 200)
#         self.frame = 0
#         self.sprite = arcade.Sprite(
#             ":resources:images/animated_characters/female_adventurer/femaleAdventurer_idle.png",
#             center_x=self.width / 2,
#             center_y=self.height / 2,
#         )

#     def on_draw(self):
#         self.clear()
#         self.sprite.draw()

#         # Dump the window framebuffer to disk
#         image = arcade.get_image(0, 0, *self.get_size())
#         image.save("framebuffer.png")

#     def on_update(self, delta_time: float):
#         # Close the window on the second frame
#         if self.frame == 2:
#             self.close()

#         self.frame += 1

# App().run()

In [2]:
# obs_mygame = LightEnvWrapper.reset
# print(obs_mygame)

In [3]:
# # importing image class from PIL package
# from PIL import Image
  
# # creating image object
# obs_test = LightEnvWrapper.reset
# print(obs_test)
# img = Image.open(obs_test)
  
# # using convert method for img1
# img1 = img.convert("L")
# img1.show()
  
# # using convert method for img2
# img2 = img.convert("1")
# img2.show()

In [20]:
# # path2img = r"C:\Users\Tim\OneDrive\Documents\Artificial Intelligence\INM363 Individual Project\GitHub\INM363-Project\Scripts"
# img = Image.open(r"C:\Users\Tim\OneDrive\Documents\Artificial Intelligence\INM363 Individual Project\GitHub\INM363-Project\Scripts\screenshot.png")
# print(img.size)
# # print(img)
# img2 = img.resize((80,60))
# # print(img2)
# # img2.size
# # img2.show()

(800, 600)


### Example custom wrapper from ray documentation

In [None]:
class SimpleCorridor(gym.Env):
    """Example of a custom env in which you have to walk down a corridor.
    You can configure the length of the corridor via the env config."""

    def __init__(self, config: EnvContext):
        self.end_pos = config["corridor_length"]
        self.cur_pos = 0
        self.action_space = Discrete(2)
        self.observation_space = Box(0.0, self.end_pos, shape=(1,), dtype=np.float32)
        # Set the seed. This is only used for the final (reach goal) reward.
        self.seed(config.worker_index * config.num_workers)

    def reset(self):
        self.cur_pos = 0
        return [self.cur_pos]

    def step(self, action):
        assert action in [0, 1], action
        if action == 0 and self.cur_pos > 0:
            self.cur_pos -= 1
        elif action == 1:
            self.cur_pos += 1
        done = self.cur_pos >= self.end_pos
        # Produce a random reward when we reach the goal.
        return [self.cur_pos], random.random() * 2 if done else -0.1, done, {}

    def seed(self, seed=None):
        random.seed(seed)


I will need to install EGL 
and for arcade 2.6.13.