## Import Libraries

In [1]:
import numpy as np
import cv2
import time
import random
from tqdm.notebook import tqdm_notebook
import os
from PIL import Image

import gym
from gym import spaces
from stable_baselines3 import DQN

In [2]:
LOAD_MODEL = None # Path of the model or none

DISCOUNT = 0.99
REPLAY_MEMORY_SIZE = 50_000  # How many last steps to keep for model training
MIN_REPLAY_MEMORY_SIZE = 1_000  # Minimum number of steps in a memory to start training
MINIBATCH_SIZE = 64  # How many steps (samples) to use for training
UPDATE_TARGET_EVERY = 10 # 5  # Terminal states (end of episodes)
MODEL_NAME = '2x256'
MAX_TRY_IN_EPISODE = 100 # 200
MIN_REWARD = -MAX_TRY_IN_EPISODE # 0 For model save
# MEMORY_FRACTION = 0.20 # For GPU

# Environment settings
EPISODES = 5000 # 20_000

# Exploration settings
epsilon = 1  # not a constant, going to be decayed
EPSILON_DECAY = 0.9 # 0.99975
MIN_EPSILON = 0.001

#  Stats settings
AGGREGATE_STATS_EVERY = 50  # episodes

SHOW_PREVIEW = True

# For more repetitive results
random.seed(1)
np.random.seed(1)

In [3]:
class Blob:
    def __init__(self, size):
        self.size = size
        self.x = np.random.randint(0, size)
        self.y = np.random.randint(0, size)

    def __str__(self):
        return f"Blob ({self.x}, {self.y})"

    def __sub__(self, other):
        return (self.x-other.x, self.y-other.y)

    def __eq__(self, other):
        return self.x == other.x and self.y == other.y

    def action(self, choice):
        '''
        Gives us 4 total movement options. (0,1,2,3)
        '''
        if choice == 0:
            self.move(x=1, y=0)
        elif choice == 1:
            self.move(x=-1, y=0)
        elif choice == 2:
            self.move(x=0, y=1)
        elif choice == 3:
            self.move(x=0, y=-1)


    def move(self, x=False, y=False):
        # If no value for x, move randomly
        if not x:
            self.x += np.random.randint(-1, 2)
        else:
            self.x += x

        # If no value for y, move randomly
        if not y:
            self.y += np.random.randint(-1, 2)
        else:
            self.y += y

        # If we are out of bounds, fix!
        if self.x < 0:
            self.x = 0
        elif self.x > self.size-1:
            self.x = self.size-1
        if self.y < 0:
            self.y = 0
        elif self.y > self.size-1:
            self.y = self.size-1


# Custom Environment

In [4]:
class GameEnv(gym.Env):
    """Custom Environment that follows gym interface"""
    metadata = {'render.modes': ['human']}
    SIZE = 10
    RESIZE_IMG_SIZE = 50
    RETURN_IMAGES = True
    MOVE_PENALTY = 1
    ENEMY_PENALTY = 300
    FOOD_REWARD = 25
    OBSERVATION_SPACE_VALUES = (SIZE, SIZE, 3)  # 4
    ACTION_SPACE_SIZE = 4
    PLAYER_N = 1  # player key in dict
    FOOD_N = 2  # food key in dict
    ENEMY_N = 3  # enemy key in dict
    # the dict! (colors)
    d = {1: (255, 175, 0),
         2: (0, 255, 0),
         3: (0, 0, 255)}
    
    def __init__(self):
        super(GameEnv, self).__init__()
        
        # Define action and observation space
        # They must be gym.spaces objects
        self.action_space = spaces.Discrete(self.ACTION_SPACE_SIZE)
        # Example for using image as input:
        self.observation_space = spaces.Box(low=0, high=255,
                                            shape=(self.RESIZE_IMG_SIZE, self.RESIZE_IMG_SIZE, 3), dtype=np.uint8)

    def reset(self):
        self.player = Blob(self.SIZE)
        self.food = Blob(self.SIZE)
        while self.food == self.player:
            self.food = Blob(self.SIZE)
        self.enemy = Blob(self.SIZE)
        while self.enemy == self.player or self.enemy == self.food:
            self.enemy = Blob(self.SIZE)

        self.episode_step = 0

        if self.RETURN_IMAGES:
            observation = np.array(self.get_image())
        else:
            observation = (self.player-self.food) + (self.player-self.enemy)
        return observation

    def step(self, action):
        self.episode_step += 1
        self.player.action(action)
        print("Steppp")

        #### MAYBE ###
        #enemy.move()
        #food.move()
        ##############

        if self.RETURN_IMAGES:
            new_observation = np.array(self.get_image())
        else:
            new_observation = (self.player-self.food) + (self.player-self.enemy)

        if self.player == self.enemy:
            reward = -self.ENEMY_PENALTY
        elif self.player == self.food:
            reward = self.FOOD_REWARD
        else:
            reward = -self.MOVE_PENALTY

        done = False
        if reward == self.FOOD_REWARD or reward == -self.ENEMY_PENALTY or self.episode_step >= MAX_TRY_IN_EPISODE:
            done = True
            
        self.render()

        return new_observation, reward, done, {}

    def render(self, mode='human'):
        img = self.get_image()
        img = img.resize((300, 300))  # resizing so we can see our agent in all its glory.
        cv2.imshow("image", np.array(img))  # show it!
        cv2.waitKey(1)

    # FOR CNN #
    def get_image(self):
        env = np.zeros((self.SIZE, self.SIZE, 3), dtype=np.uint8)  # starts an rbg of our size
        env[self.food.x][self.food.y] = self.d[self.FOOD_N]  # sets the food location tile to green color
        env[self.enemy.x][self.enemy.y] = self.d[self.ENEMY_N]  # sets the enemy location to red
        env[self.player.x][self.player.y] = self.d[self.PLAYER_N]  # sets the player tile to blue
        img = Image.fromarray(env, 'RGB')  # reading to rgb. Apparently. Even tho color definitions are bgr. ???
        img = img.resize((self.RESIZE_IMG_SIZE, self.RESIZE_IMG_SIZE))
        return img
    
    
    def close(self):
        self.__del__()
    
    def __del__(self):
        cv2.destroyAllWindows()

## Check the env

In [5]:
from stable_baselines3.common.env_checker import check_env

# env = GameEnv()
# It will check your custom environment and output additional warnings if needed
# check_env(env)

# Main Loop

In [None]:

models_dir = f"models/dqn/{int(time.time())}/"
logdir = f"logs/dqn/{int(time.time())}/"

if not os.path.exists(models_dir):
	os.makedirs(models_dir)

if not os.path.exists(logdir):
	os.makedirs(logdir)

env = GameEnv()
env.reset()
model = DQN('MlpPolicy', env=env, buffer_size = REPLAY_MEMORY_SIZE, learning_starts = MIN_REPLAY_MEMORY_SIZE, verbose=1, target_update_interval = 50, tensorboard_log=logdir)

TIMESTEPS = 10000
MAX_ITERS = 20 # 2000

iters = 0
while iters < MAX_ITERS:
	try:
		iters += 1
		print("Here")
		model.learn(total_timesteps=TIMESTEPS, reset_num_timesteps=False, tb_log_name="dqn")
		model.save(f"{models_dir}/{TIMESTEPS*iters}")
	
	except KeyboardInterrupt:
		env.close()
		print("Exit!")
		break

Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
Wrapping the env in a VecTransposeImage.
Here


2022-09-06 13:38:02.127612: I tensorflow/core/util/util.cc:169] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
