In [None]:
!pip install pygame
!pip install stable_baselines3

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
import pygame
import random
import numpy as np
#np.set_printoptions(threshold=np.inf)
from pygame.surfarray import array3d
import torch
import cv2
import gym 
from gym import spaces 
from stable_baselines3 import DQN
import os


In [None]:
os.environ["SDL_VIDEODRIVER"] = "dummy"

In [None]:
# Drone class
class Drone:
    def __init__(self, gameDisplay, display_width=800, display_height=600, *args, **kwargs):
        self.drone_speed = 20 # Default rate of change for drone movement
        self.x_change = 0
        self.y_change = 0
        self.x = 0
        self.y = 0
        self.drone_width = 70 # 35
        self.drone_height = 70 # 35
        self.display_width = display_width
        self.display_height = display_height
        self.gameDisplay = gameDisplay
        self.img = pygame.image.load('images/drone1.png').convert() # To fix up png files use: pngcrush -ow -rem allb -reduce file.png
        self.img = pygame.transform.scale(self.img, (int(self.display_width*0.1),int(self.display_height*0.12)))
        #self.img = pygame.transform.scale(self.img, (int(self.display_width*0.05),int(self.display_height*0.06)))

    def move_left(self):
        self.x_change = -self.drone_speed

    def move_right(self):
        self.x_change = self.drone_speed

    def move_up(self):
        self.y_change = -self.drone_speed

    def move_down(self):
        self.y_change = +self.drone_speed

    def update(self):
        self.x += self.x_change
        self.y += self.y_change
        self.x_change = 0
        self.y_change = 0

    def draw(self):
        self.gameDisplay.blit(self.img, (self.x,self.y))


In [None]:
# Obstacle class
class Obstacle:
    def __init__(self, gameDisplay, display_width=800, display_height=600, *args, **kwargs):
        #self.x = 0
        #self.y = 0
        self.x = random.randrange(0, display_width)
        self.y = -100 #random.randrange(-1300, -550) # display_height * (-1) # to give more space for obstacle to fully render
        self.speed = 40
        self.height = 100 # self.display_width / 8
        self.width = 100 # self.display_width / 6
        self.display_width = display_width
        self.display_height = display_height
        self.gameDisplay = gameDisplay
        self.img = pygame.image.load('images/asteroid.png').convert() 
        self.img = pygame.transform.scale(self.img, (int(self.display_width*0.16),int(self.display_height*0.2)))
        #self.img = pygame.transform.scale(self.img, (int(self.display_width*0.08),int(self.display_height*0.1)))

    def reset(self):
        self.x = random.randrange(0, self.display_width)
        self.y = 0 - self.height

    def update(self):
        self.y += self.speed

    def draw(self):
        self.gameDisplay.blit(self.img, (self.x,self.y))

In [None]:
# Environment class

def pre_processing(image, w=84, h=84):
    image = image[:800, 20:, :] # crop out the top so score is not visible
    #cv2.imwrite("original.jpg", image)
    image = cv2.cvtColor(cv2.resize(image, (w, h)), cv2.COLOR_BGR2GRAY)
    #cv2.imwrite("color.jpg", image)
    _, image = cv2.threshold(image, 127, 255, cv2.THRESH_BINARY)
    #cv2.imwrite("bw.jpg", image)

    #a = np.array(image[None, :, :]).astype(np.float32) 
    a = image[None, :, :].astype(np.uint8) # use for open ai baselines
    #a = a / 255 # normalise the outputs # do not use for open ai gym

    return a #image[None, :, :].astype(np.float32)


class DroneWars(gym.Env):
    def __init__(self, gameDisplay, display_width=800, display_height=600, clock=None, fps = 30, *args, **kwargs):
        super(DroneWars, self).__init__()
        self.my_drone1 = Drone(gameDisplay)
        self.my_drone1.x = display_width * 0.8
        self.my_drone1.y = display_height * 0.85 #
        self.my_drone2 = Drone(gameDisplay)
        self.my_drone2.x = display_width * 0.2
        self.my_drone2.y = display_height * 0.85 # 500
        self.gameDisplay = gameDisplay
        self.display_width = display_width
        self.display_height = display_height
        self.score = 0
        self.gameExit = False
        self.clock = clock
        self.fps = fps
        self.black = (0,0,0)
        self.white = (255,255,255)
        self.dark_red = (150,0,0)
        self.green = (0,255,0)
        self.dark_green = (0,150,0)
        self.red = (255,0,0)
        self.obstacle_list = []
        self.n_actions = 9 # 3 actions per drone so it's 3^3 action space
        self.action_space = spaces.Discrete(self.n_actions)
        #self.observation_space = spaces.Box(low=0, high=255, shape=(1, 84, 84), dtype=np.float32)
        self.observation_space = spaces.Box(low=0, high=255, shape=(1,84,84), dtype=np.uint8) #needed for cnn policy for open baselines
        self.num_of_obstacles = 1 # nuber of obstacles
        
        for n in range(0,self.num_of_obstacles):
            self.obstacle_list.append(Obstacle(gameDisplay))

        pygame.display.set_caption('Drone Wars')
        

    def close(self):
        pass


    def reset(self):
        #r = np.zeros((1,84,84)).astype(np.float32) # use for custom model
        r = np.zeros((1,84,84)).astype(np.uint8) # use for openbaselines
        return r


    def render(self):
        self.gameDisplay.fill(self.white) # Comment this out if using scrolBackground
        for obs in self.obstacle_list:
            obs.draw()
            
        self.my_drone1.draw()
        self.my_drone2.draw()

        self.scoreboard(self.score)
        pygame.display.update()


    def scoreboard(self, count):
        font = pygame.font.SysFont(None, 25)
        text = font.render("Score: "+str(count), True, self.black)
        self.gameDisplay.blit(text,(0,0))


    def out_of_bounds(self, drone, display_width, display_height):
        if (drone.x > display_width - drone.drone_width or drone.x < 0) or \
            (drone.y > display_height - drone.drone_height or drone.y < 0):
            
            return True 


    def collision_multi(self, drone, obstacle_list):
        for obs in obstacle_list:
            if (drone.y < obs.y + obs.height):

                if (drone.x > obs.x
                    and drone.x < obs.x + obs.width or drone.x + drone.drone_width > obs.x 
                    and drone.x + drone.drone_width < obs.x + obs.width):
                    
                    return True   


    def collision(self, drone, obstacle):
            if (drone.y < obstacle.y + obstacle.height):

                if (drone.x > obstacle.x
                    and drone.x < obstacle.x + obstacle.width or drone.x + drone.drone_width > obstacle.x 
                    and drone.x + drone.drone_width < obstacle.x + obstacle.width):
                    
                    return True   


    def step(self, action, record=False): # 0: do nothing, 1: go left, 2: go right
        reward = 0.1
        
        if action == 0:
            #pass
            #print("Action: 0, do nothing")
            reward += 0.01
            
        if action == 1:
            # drone1 do nothing, drone2 move left
            #print("Action: 1, drone2 left")
            self.my_drone2.move_left()
            
        if action == 2:
            #drone 1 do nothing, drone 2 move right
            #print("Action: 2, drone2 right")
            self.my_drone2.move_right()
        
        if action == 3:
            #drone 1 & 2 move left
            #print("Action: 3, drone1 left, drone2 move left")
            self.my_drone1.move_left()
            self.my_drone2.move_left()

        if action == 4:
            #drone 1 move left, drone 2 do nothing
            #print("Action: 4, drone1 left")
            self.my_drone1.move_left()

        if action == 5:
            #drone 1 move left, drone 2 move right
            #print("Action: 3, drone1 left, drone2 move right")
            self.my_drone1.move_left()
            self.my_drone2.move_right()

        if action == 6:
            #drone 1&2 move right
            #print("Action: 6, drone1 right, drone2 move right")
            self.my_drone1.move_right()
            self.my_drone2.move_right()

        if action == 7:
            #drone 1 move right, drone 2 do nothing
            #print("Action: 7, drone1 right")
            self.my_drone1.move_right()

        if action == 8:
            #print("Action: 8, drone1 right, drone2 move left")
            self.my_drone1.move_right()
            self.my_drone2.move_left()
            # drone 1 move right, drone 2 move left
        
        
        # Uncomment bellow for single drone actions
        """
        if action == 0:
            pass
        #    reward += 0.01

        elif action == 1:
            self.my_drone1.move_left()

        elif action == 2:
            self.my_drone1.move_right()
        """
        
        # Update drone 1 & 2 position 
        self.my_drone1.update()
        self.my_drone2.update()

        # Update obstacle position. Move obstacle down the screen.
        for obs in self.obstacle_list:
            obs.update()

        # Detect if obstacle went to the bottom of the screen, then reset y & x coordinates to start from the top again at a random x coordinate. 
        for obs in self.obstacle_list:
            if obs.y > self.display_height:
                obs.reset()
                reward = 1
                self.score += 1

        # Detect if drone1 left the display bounds, then game over
        if self.out_of_bounds(self.my_drone1, self.display_width, self.display_height):
            reward = -1
            self.gameExit = True

        if self.out_of_bounds(self.my_drone2, self.display_width, self.display_height):
            #crash()
            reward = -1
            self.gameExit = True

        # Detect when obstacle collides with the drone1 and reduce the score 
        if self.collision_multi(self.my_drone1, self.obstacle_list):
            self.score -= 1 
            reward = -1
            self.gameExit = True

        # Detect when obstacle collides with the drone2 and reduce the score 
        if self.collision_multi(self.my_drone2, self.obstacle_list):
            self.score -= 1 
            reward = -1
            self.gameExit = True

        self.render()
        self.clock.tick(self.fps) 
        #print("clock:", self.clock.get_fps()) # Uncomment to printout actual fps 
        #print("fps", self.fps) 

        if self.gameExit:
            self.__init__(self.gameDisplay, self.display_width, self.display_height, self.clock, self.fps)
        
        state = pygame.display.get_surface() 
        state = array3d(state)
       
        done = (not (reward > 0))
        info = {}

        # Return
        if record:
            return pre_processing(state), np.transpose(cv2.cvtColor(state, cv2.COLOR_RGB2BGR), (1, 0, 2)), reward, done, info # Use for openbaselines
            #return torch.from_numpy(pre_processing(state)), np.transpose(cv2.cvtColor(state, cv2.COLOR_RGB2BGR), (1, 0, 2)), reward, done, info 
        else:
            #return torch.from_numpy(pre_processing(state)), reward, done, info 
            return pre_processing(state), reward, done, info # use for gym baselines

In [None]:
# Train with Openbaselines3

pygame.init()
clock = pygame.time.Clock()
#flags = pygame.SHOWN
flags = pygame.HIDDEN
width = 800
height = 600
gameDisplay = pygame.display.set_mode((width,height), flags) 

env = DroneWars(gameDisplay, display_width=width, display_height=height, clock=clock, fps=200) # 200

#model = DQN("MlpPolicy", env, buffer_size=10000, verbose=1) # Use either mlp or cnn policy
model = DQN("CnnPolicy", env, buffer_size=30000, verbose=2)

# Uncomment below for training:
model.learn(total_timesteps=300000, log_interval=10)
model.save("dqn_dronewars")
model.save_replay_buffer("dqn_replay_buffer")

# Load model
model = DQN.load("dqn_dronewars")
model.load_replay_buffer("dqn_replay_buffer")
print(f"The loaded_model has {model.replay_buffer.size()} transitions in its buffer")

obs = env.reset()

episodes = 1 # Episodes to play after training

path = "output/"

if not os.path.exists(path):
  os.makedirs(path)
  print("Output dir created")

out = cv2.VideoWriter("output/drone_wars.mp4", cv2.VideoWriter_fourcc(*"MJPG"), 60, (width, height))

while episodes > 0:
    print("Playing")
    action, _states = model.predict(obs, deterministic=True)
    obs, reward, done, info = env.step(action)
    
    # To record gameplay uncomment below 
    #obs, raw_next_state, reward, done, info = env.step(action, record=True)
    #out.write(raw_next_state)
    
    env.render()
    if done:
      obs = env.reset()
      episodes -= 1
    


Using cuda device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.




The loaded_model has 30000 transitions in its buffer
Playing
Playing
Playing
Playing
Playing
Playing
Playing
Playing
Playing
Playing
Playing
Playing
Playing
