https://stackoverflow.com/questions/42605769/openai-gym-atari-on-windows

TODO: Create a new notebook for this model.

In [1]:
import pygame
import time
import random

import argparse
from pygame.locals import *
from PIL import Image
# from Logger import Logger

import sys
import os

from keras.models import Sequential
from keras.layers import *
from keras.optimizers import *
import numpy as np

from IPython.core.debugger import set_trace

Using TensorFlow backend.


In [2]:
class DQNetwork:
    def __init__(self, actions, input_shape, alpha=0.1, gamma=0.99,
                 dropout_prob=0.1, load_path='', logger=None):
        self.model = Sequential()
        self.actions = actions  # Size of the network output
        self.gamma = gamma
        self.alpha = alpha
        self.dropout_prob = dropout_prob

        # Define neural network
        self.model.add(BatchNormalization(axis=1, input_shape=input_shape))
        self.model.add(Convolution2D(32, 2, 2, border_mode='valid',
                                     subsample=(2, 2), dim_ordering='th'))
        self.model.add(Activation('relu'))

        self.model.add(BatchNormalization(axis=1))
        self.model.add(Convolution2D(64, 2, 2, border_mode='valid',
                                     subsample=(2, 2), dim_ordering='th'))
        self.model.add(Activation('relu'))

        self.model.add(BatchNormalization(axis=1))
        self.model.add(Convolution2D(64, 3, 3, border_mode='valid',
                                     subsample=(2, 2), dim_ordering='th'))
        self.model.add(Activation('relu'))

        self.model.add(Flatten())

        self.model.add(Dropout(self.dropout_prob))
        self.model.add(Dense(512))
        self.model.add(Activation('relu'))

        self.model.add(Dense(self.actions))

        self.optimizer = Adam()
        self.logger = logger

        # Load the network from saved model
        if load_path != '':
            self.load(load_path)

        self.model.compile(loss='mean_squared_error', optimizer=self.optimizer,
                           metrics=['accuracy'])

    def train(self, batch):
        """
        Generates inputs and targets from the given batch, trains the model on
        them.
        :param batch: iterable of dictionaries with keys 'source', 'action',
        'dest', 'reward'
        """
        x_train = []
        t_train = []

        # Generate training set and targets
        for datapoint in batch:
            x_train.append(datapoint['source'].astype(np.float64))

            # Get the current Q-values for the next state and select the best
            next_state_pred = self.predict(datapoint['dest'].astype(np.float64)).ravel()
            next_q_value = np.max(next_state_pred)

            # The error must be 0 on all actions except the one taken
            t = list(self.predict(datapoint['source'])[0])
            if datapoint['final']:
                t[datapoint['action']] = datapoint['reward']
            else:
                t[datapoint['action']] = datapoint['reward'] + \
                                         self.gamma * next_q_value

            t_train.append(t)

        # Prepare inputs and targets
        x_train = np.asarray(x_train).squeeze()
        t_train = np.asarray(t_train).squeeze()

        # Train the model for one epoch
        h = self.model.fit(x_train,
                           t_train,
                           batch_size=32,
                           nb_epoch=1)

        # Log loss and accuracy
        if self.logger is not None:
            self.logger.to_csv('loss_history.csv',
                               [h.history['loss'][0], h.history['acc'][0]])

    def predict(self, state):
        """
        Feeds state into the model, returns predicted Q-values.
        :param state: a numpy.array with same shape as the network's input
        :return: numpy.array with predicted Q-values
        """
        state = state.astype(np.float64)
        return self.model.predict(state, batch_size=1)

    def save(self, filename=None):
        """
        Saves the model weights to disk.
        :param filename: file to which save the weights (must end with ".h5")
        """
        f = ('model.h5' if filename is None else filename)
        if self.logger is not None:
            self.logger.log('Saving model as %s' % f)
        self.model.save_weights(self.logger.path + f)

    def load(self, path):
        """
        Loads the model's weights from path.
        :param path: h5 file from which to load teh weights
        """
        if self.logger is not None:
            self.logger.log('Loading weights from file...')
        self.model.load_weights(path)

In [3]:
class DQAgent:
    def __init__(self,
                 actions,
                 batch_size=1024,
                 alpha=0.01,
                 gamma=0.9,
                 dropout_prob=0.1,
                 epsilon=1,
                 epsilon_rate=0.99,
                 network_input_shape=(2, 84, 84),
                 load_path='',
                 logger=None):

        # Parameters
        self.actions = actions  # Size of the discreet action space
        self.batch_size = batch_size  # Size of the batch to train the network
        self.gamma = gamma  # Discount factor
        self.epsilon = epsilon  # Coefficient for epsilon-greedy exploration
        self.epsilon_rate = epsilon_rate  # Rate at which to make epsilon smaller, as training improves the agent's performance; epsilon = epsilon * rate
        self.min_epsilon = 0.3  # Minimum epsilon value
        # Experience variables
        self.experiences = []
        self.training_count = 0

        # Instantiate the deep Q-network
        self.DQN = DQNetwork(
            self.actions,
            network_input_shape,
            alpha=alpha,
            gamma=self.gamma,
            dropout_prob=dropout_prob,
            load_path=load_path,
            logger=logger
        )

        if logger is not None:
            logger.log({
                'Learning rate': alpha,
                'Discount factor': self.gamma,
                'Starting epsilon': self.epsilon,
                'Epsilon decrease rate': self.epsilon_rate,
                'Batch size': self.batch_size
            })

    def get_action(self, state, testing=False):
        """
        Poll DCN for Q-values, return greedy action with probability 1-epsilon
        :param state: a state of the MDP with the same size as the DQN input
        :param testing: whether to force a greedy action
        :return: the selected action
        """
        q_values = self.DQN.predict(state)
        if (random.random() < self.epsilon) and not testing:
            return random.randint(0, self.actions - 1)
        else:
            return np.argmax(q_values)

    def add_experience(self, source, action, reward, dest, final):
        """
        Add a tuple (source, action, reward, dest, final) to experiences.
        :param source: a state of the MDP
        :param action: the action associated to the transition
        :param reward: the reward associated to the transition
        :param dest: a state of the MDP
        :param final: whether the destination state is an absorbing state
        """
        self.experiences.append({'source': source,
                                 'action': action,
                                 'reward': reward,
                                 'dest': dest,
                                 'final': final})

    def sample_batch(self):
        """
        Pops self.batch_size random samples from experiences and return them as
        a batch.
        """
        out = [self.experiences.pop(random.randrange(0, len(self.experiences)))
               for _ in range(self.batch_size)]
        return np.asarray(out)

    def must_train(self):
        """"
        Returns true if the number of samples in experiences is greater than the
        batch size.
        """
        return len(self.experiences) >= self.batch_size

    def train(self, update_epsilon=True):
        """
        Samples a batch from experiences, trains the DQN on it, and updates the
        epsilon-greedy coefficient.
        """
        self.training_count += 1
        print ('Training session #', self.training_count, ' - epsilon:', self.epsilon)
        batch = self.sample_batch()
        self.DQN.train(batch)  # Train the DQN
        if update_epsilon:
            self.epsilon = self.epsilon * self.epsilon_rate if self.epsilon > self.min_epsilon else self.min_epsilon  # Decrease the probability of picking a random action to improve exploitation

    def quit(self):
        """
        Saves the DQN to disk.
        """
        self.DQN.save()

In [4]:
pygame.init()

display_width = 600
display_height = 400

fps = 30
white = (255,255,255)
black = (0,0,0)
red = (255,0,0)
green = (0,155,0)

direction = "right"
actions = 4

icon = pygame.image.load('Assets/appleImg.png')
img = pygame.image.load('Assets/snakehead.png')
appleimg = pygame.image.load('Assets/apple.png')

gameDisplay = pygame.display.set_mode((display_width,display_height))
pygame.display.set_caption('Snake')
pygame.display.set_icon(icon)

block_size = 10
AppleThickness = 20
snake_speed = 5
apple_reward = 1
life_reward = 0
death_reward = -1

clock = pygame.time.Clock()

smallfont = pygame.font.SysFont('Trebuchet MS', 14)
medfont = pygame.font.SysFont('Trebuchet MS', 24)
largefont = pygame.font.SysFont('Trebuchet MS', 52)

# CONSTANTS
MAX_EPISODE_LENGTH_FACTOR = 100
MAX_EPISODES_BETWEEN_TRAININGS = 1500
SCREENSHOT_DIMS = (84, 84)

global_episode_counter = 0
exp_backup_counter = 0
experience_buffer = []

In [5]:
def init_game():
    global direction, \
            episode_length, \
            episode_reward, \
            action, state, next_state, must_die, \
            lead_x, lead_y, lead_x_change, lead_y_change, snakeList, snakeLength, \
            randAppleX, randAppleY
    
    direction = random.choice(["right", "left", "up", "down"])
    action = random.randint(0, actions - 1) # 0-left 1-right 2-up 3-down
    
    must_die = False
    
    # Stats
    episode_length = 0
    episode_reward = 0
#     episode_nb = 0
#     exp_backup_counter = 0
#     global_episode_counter = 0  # Keeps track of how many episodes there were between traning iterations
    
    gameExit = False
    gameOver = False
    
    # Start position and movement
    lead_x = display_width / 2
    lead_y = display_height / 2
    lead_x_change, lead_y_change = init_movement(direction)

    snakeList = []
    snakeLength = 1

    randAppleX, randAppleY = randAppleGen()
    
    # Initialize the states
    state = [screenshot(), screenshot()]
    next_state = [screenshot(), screenshot()]
    
    gameDisplay.fill(white)
    pygame.display.update()

In [6]:
def game_intro():
    intro = True

    while intro:
        for event in pygame.event.get():
            if event.type == pygame.QUIT:
                pygame.quit()
                quit()
            if event.type == pygame.KEYDOWN:
                if event.key == pygame.K_SPACE:
                    intro = False
                if event.key == pygame.K_q:
                    pygame.quit()
                    quit()


        gameDisplay.fill(white)
        message_to_screen("Snake", green, -100, "large")
        message_to_screen("Eat the Red apples to grow as much as you can!", black, -30)
        message_to_screen("Make sure you do not to run into yourself, or the edges.", black, 10)
        message_to_screen("Press 'Space' to play/pause or 'Q' to quit", red, 50)

        pygame.display.update()
        clock.tick(30)

In [7]:
def snake(block_size, snakeList):
    if direction == "right":
        head = pygame.transform.rotate(img, 270)
    if direction == "left":
        head = pygame.transform.rotate(img, 90)
    if direction == "up":
        head = pygame.transform.rotate(img, 0)
    if direction == "down":
        head = pygame.transform.rotate(img, 180)

    gameDisplay.blit(head, (snakeList[-1][0], snakeList[-1][1]))
    for XnY in snakeList[:-1]:
        pygame.draw.rect(gameDisplay, green, [XnY[0],XnY[1],block_size,block_size])

def randAppleGen():
    randAppleX = round(random.randrange(0, display_width - AppleThickness) / 10.0) * 10.0
    randAppleY = round(random.randrange(0, display_height - AppleThickness) / 10.0) * 10.0

    return randAppleX, randAppleY

def init_movement(direction):
    if direction == "right":
        lead_x_change = 5
        lead_y_change = 0
    elif direction == "left":
        lead_x_change = -5
        lead_y_change = 0
    elif direction == "up":
        lead_x_change = 0
        lead_y_change = -5
    elif direction == "down":
        lead_x_change = 0
        lead_y_change = 5
        
    return lead_x_change, lead_y_change

def Score(score):
    text = smallfont.render("Score: "+str(score), True, black)
    gameDisplay.blit(text, [0,0])

def text_objects(text, color, size):
    if size == "small":
        textSurface = smallfont.render(text, True, color)
    if size == "medium":
        textSurface = medfont.render(text, True, color)
    if size == "large":
        textSurface = largefont.render(text, True, color)
    return textSurface, textSurface.get_rect()
    
def message_to_screen(msg, color, y_displace=0, size = "small"):
    textSurf, textRect = text_objects(msg, color, size)
    textRect.center = (display_width / 2), (display_height / 2)+y_displace
    gameDisplay.blit(textSurf, textRect)

In [8]:
def die(DQA):
    global logger, remaining_iters, episode_length, episode_reward, \
        must_test, experience_buffer, exp_backup_counter, global_episode_counter

#     set_trace()
    global_episode_counter += 1

    # If agent is stuck, kill the process
    if global_episode_counter > MAX_EPISODES_BETWEEN_TRAININGS:
        print('Shutting process down because something seems to have gone '
                   'wrong during training. Please manually check that '
                   'all is OK and restart the training with the -l flag.')
        DQA.quit()
        sys.exit(0)

    # Before resetting must_test, save info about the test episode
#     if must_test:
#         logger.to_csv('test_data.csv', [score, episode_length, episode_reward])
#         logger.log('Test episode - Score: %s; Steps: %s'
#                    % (score, episode_length))

    # Reset this every time (only one testing episode per training session)
#     must_test = False

    # Add the episode to the experience buffer
    if snakeLength >= 1 and episode_length >= 10:
        exp_backup_counter += len(experience_buffer)
        print ('Adding episode to experiences - Score: %s; Episode length: %s' \
              % (snakeLength, episode_length))
#         logger.to_csv('train_data.csv', [snakeLength, episode_length, episode_reward])
        print ('Got %s samples of %s' % (exp_backup_counter, DQA.batch_size))
        for exp in experience_buffer:
            DQA.add_experience(*exp)

    # Train the network
#     if DQA.must_train() and args.train:
    if DQA.must_train():
        exp_backup_counter = 0
        print('Episodes elapsed: %d' % global_episode_counter)
        global_episode_counter = 0
        # Quit at the last iteration
        if remaining_iters == 0:
            DQA.quit()
            sys.exit(0)

        # Train the DQN
        DQA.train()

        remaining_iters -= 1 if remaining_iters != -1 else 0
        # After training, the next episode will be a test one
#         must_test = True
#         logger.log('Test episode')

    experience_buffer = []

    # Update graphics and restart episode
    pygame.display.update()
    init_game()

In [9]:
def screenshot():
    """
    Takes a screenshot of the game, converts it to greyscale, resizes it to
    60x60 and returns it as np.array
    :return:
    """
    global gameDisplay
    data = pygame.image.tostring(gameDisplay, 'RGB')  # Take screenshot
    image = Image.frombytes('RGB', (display_width, display_height), data)
    image = image.convert('L')  # Convert to greyscale
    image = image.resize(SCREENSHOT_DIMS)  # Resize
    image = image.convert('1')
    matrix = np.asarray(image.getdata(), dtype=np.float64)
    return matrix.reshape(image.size[0], image.size[1])

In [10]:
def gameLoop():
    global direction, snakeLength, \
            episode_length, \
            episode_reward, \
            action, state, next_state, must_die
    
    direction = random.choice(["right", "left", "up", "down"])
    action = random.randint(0, actions - 1) # 0-left 1-right 2-up 3-down
    
    must_die = False
    
    # Stats
    episode_length = 0
    episode_reward = 0
    episode_nb = 0
    exp_backup_counter = 0
    global_episode_counter = 0  # Keeps track of how many episodes there were between traning iterations
    
    gameExit = False
    gameOver = False
    
    # Start position and movement
    lead_x = display_width / 2
    lead_y = display_height / 2
    lead_x_change, lead_y_change = init_movement(direction)

    snakeList = []
    snakeLength = 1

    randAppleX, randAppleY = randAppleGen()
    
    # Agent
    DQA = DQAgent(
        actions,
        gamma=0.95,
        dropout_prob=0.1,
        load_path=''
    )
    experience_buffer = []  # This will store the SARS tuples at each episode
    
    # Initialize the states
    state = [screenshot(), screenshot()]
    next_state = [screenshot(), screenshot()]
    
#     gameDisplay.fill(white)
#     pygame.display.update()
    
    while not gameExit:
        episode_length += 1
        reward = life_reward # Reward for not dying and not eating
        next_state[0] = state[1]
        
        clock.tick(fps)
        
        while gameOver == True:
            gameDisplay.fill(white)
            message_to_screen('Game Over!', red, -50, "medium")
            message_to_screen('Press C to play again or Q to quit', black, 50, "small")
            pygame.display.update()

            for event in pygame.event.get():
                if event.type == QUIT:
                    gameExit = True
                    gameOver = False
                    DQA.quit()
                    sys.exit(0)
        
        if action == 0 and direction != "right":
            direction = "left"
            lead_x_change = -snake_speed
            lead_y_change = 0
        elif action == 1 and direction != "left":
            direction = "right"
            lead_x_change = snake_speed
            lead_y_change = 0
        elif action == 2 and direction != "down":
            direction = "up"
            lead_y_change = -snake_speed
            lead_x_change = 0
        elif action == 3 and direction != "up":
            direction = "down"
            lead_y_change = snake_speed
            lead_x_change = 0         

        # Hits walls
        if lead_x >= display_width - block_size or lead_x < 0 or lead_y >= display_height - block_size or lead_y < 0:
            must_die = True
            reward = death_reward
#             gameOver = True

        lead_x += lead_x_change
        lead_y += lead_y_change

        gameDisplay.fill(white)

        
        gameDisplay.blit(appleimg, (randAppleX, randAppleY))
        
        snakeHead = []
        snakeHead.append(lead_x)
        snakeHead.append(lead_y)
        snakeList.append(snakeHead)

        if len(snakeList) > snakeLength:
            del snakeList[0]

        # Hits itself
        for eachSegment in snakeList[:-1]:
            if eachSegment == snakeHead:
#                 gameOver = True
                must_die = True
                reward = death_reward

        snake(block_size, snakeList)

        Score(snakeLength-1)
        
        pygame.display.update()

        # Eats apple
        if lead_x > randAppleX and lead_x < randAppleX + AppleThickness or lead_x + block_size > randAppleX and lead_x + block_size < randAppleX + AppleThickness:
            if lead_y > randAppleY and lead_y < randAppleY + AppleThickness:
                randAppleX, randAppleY = randAppleGen()
                snakeLength += 1
                reward = apple_reward
            elif lead_y + block_size > randAppleY and lead_y + block_size < randAppleY + AppleThickness:
                randAppleX, randAppleY = randAppleGen()
                snakeLength += 1
                reward = apple_reward

        # Update next state
        next_state[1] = screenshot()

        # Add SARS tuple to experience_buffer
        experience_buffer.append((np.asarray([state]), action, reward,
                                  np.asarray([next_state]),
                                  True if must_die else False))
        episode_reward += reward

        # Change current state
        state = list(next_state)

        # Poll the DQAgent to get the next action
        action = DQA.get_action(np.asarray([state]), testing=False)

        # Stopping condition
        if must_die or episode_length > 5 * MAX_EPISODE_LENGTH_FACTOR: # need to change 5 to some generic number
            die(DQA)
            
#         clock.tick(fps)  #setting the fps

    pygame.quit()
    quit()

In [11]:
# Run the game    
# game_intro()
gameLoop()

  del sys.path[0]


Adding episode to experiences - Score: 2; Episode length: 501
Got 0 samples of 1024
Adding episode to experiences - Score: 1; Episode length: 184
Got 0 samples of 1024


KeyboardInterrupt: 