In [1]:
#Demo2
#Pong & Tetris Game 

In [5]:
##Bare bone skeleton of pong game using pygame.

import pygame
from pygame.locals import *

score = 0

def run(screen_width=400., screen_height=400.):
    global score
    pygame.init()

    bar_width, bar_height = screen_width / 32., screen_height / 6.
    bar_dist_from_edge = screen_width / 64.
    circle_diameter = screen_height / 16.
    circle_radius = circle_diameter / 2.
    bar_1_start_x = bar_dist_from_edge
    bar_start_y = (screen_height - bar_height) / 2.
    bar_max_y = screen_height - bar_height - bar_dist_from_edge
    circle_start_x, circle_start_y = (screen_width - circle_diameter), (screen_width - circle_diameter) / 2.

    screen = pygame.display.set_mode((int(screen_width), int(screen_height)), 0, 32)

    # Creating 2 bars, a ball and background.
    back = pygame.Surface((int(screen_width), int(screen_height)))
    background = back.convert()
    background.fill((0, 0, 0))
    bar = pygame.Surface((int(bar_width), int(bar_height)))
    bar1 = bar.convert()
    bar1.fill((255, 255, 255))
    circle_surface = pygame.Surface((int(circle_diameter), int(circle_diameter)))
    pygame.draw.circle(circle_surface, (255, 255, 255), (int(circle_radius), int(circle_radius)), int(circle_radius))
    circle = circle_surface.convert()
    circle.set_colorkey((0, 0, 0))

    # some definitions
    bar1_x = bar_1_start_x
    bar1_y = bar_start_y
    circle_x, circle_y = circle_start_x, circle_start_y
    bar1_move, bar2_move = 0., 0.
    speed_x, speed_y, speed_bar = -screen_width / 1.28, screen_height / 1.92, screen_height * 1.2

    clock = pygame.time.Clock()

    done = False
    while not done:
        for event in pygame.event.get():  # User did something
            if event.type == pygame.QUIT:  # If user clicked close
                done = True  # Flag that we are done so we exit this loop
            if event.type == KEYDOWN:
                if event.key == K_UP:
                    bar1_move = -ai_speed
                elif event.key == K_DOWN:
                    bar1_move = ai_speed
            elif event.type == KEYUP:
                if event.key == K_UP:
                    bar1_move = 0.
                elif event.key == K_DOWN:
                    bar1_move = 0.

        screen.blit(background, (0, 0))
        screen.blit(bar1, (bar1_x, bar1_y))
        screen.blit(circle, (circle_x, circle_y))

        bar1_y += bar1_move

        # movement of circle
        time_passed = clock.tick(30)
        time_sec = time_passed / 1000.0

        circle_x += speed_x * time_sec
        circle_y += speed_y * time_sec
        ai_speed = speed_bar * time_sec

        if bar1_y >= bar_max_y:
            bar1_y = bar_max_y
        elif bar1_y <= bar_dist_from_edge:
            bar1_y = bar_dist_from_edge

        if circle_x < bar_dist_from_edge + bar_width:
            if circle_y >= bar1_y - circle_radius and circle_y <= bar1_y + bar_height + circle_radius:
                circle_x = bar_dist_from_edge + bar_width
                speed_x = -speed_x
        if circle_x < -circle_radius:
            score -= 1
            circle_x, circle_y = circle_start_x, circle_start_y
            bar1_y, bar_2_y = bar_start_y, bar_start_y
        elif circle_x > screen_width - circle_diameter:
            score += 1
            speed_x = -speed_x
        if circle_y <= bar_dist_from_edge:
            speed_y = -speed_y
            circle_y = bar_dist_from_edge
        elif circle_y >= screen_height - circle_diameter - circle_radius:
            speed_y = -speed_y
            circle_y = screen_height - circle_diameter - circle_radius

        pygame.display.update()

    pygame.quit()


In [7]:
# Run pong game
run()

In [9]:
#Function intercepter to our game code...

import pygame
import numpy  # import is unused but required or we fail later
from pygame.constants import K_DOWN, KEYDOWN, KEYUP, QUIT
import pygame.surfarray
import pygame.key

def function_intercept(intercepted_func, intercepting_func):
    """
    Intercepts a method call and calls the supplied intercepting_func with the result of it's call and it's arguments
    Example:
        def get_event(result_of_real_event_get, *args, **kwargs):
            # do work
            return result_of_real_event_get
        pygame.event.get = function_intercept(pygame.event.get, get_event)
    :param intercepted_func: The function we are going to intercept
    :param intercepting_func:   The function that will get called after the intercepted func. It is supplied the return
    value of the intercepted_func as the first argument and it's args and kwargs.
    :return: a function that combines the intercepting and intercepted function, should normally be set to the
             intercepted_functions location
    """
    def wrap(*args, **kwargs):
        real_results = intercepted_func(*args, **kwargs) # call the function we are intercepting and get it's result
        intercepted_results = intercepting_func(real_results, *args, **kwargs)  # call our own function a
        return intercepted_results
    
    return wrap

class PyGamePlayer(object):
    def __init__(self, force_game_fps=10, run_real_time=False, pass_quit_event=True):
        """
        Abstract class for learning agents, such as running reinforcement learning neural nets against PyGame games.
        The get_keys_pressed and get_feedback methods must be overriden by a subclass to use
        Call start method to start playing intercepting PyGame and training our machine
        :param force_game_fps: Fixes the pygame timer functions so the ai will get input as if it were running at this
                               fps
        :type force_game_fps: int
        :param run_real_time: If True the game will actually run at the force_game_fps speed
        :type run_real_time: bool
        :param pass_quit_event: If True the ai will be asked for the quit event
        :type pass_quit_event: bool
        """
        self.force_game_fps = force_game_fps
        """Fixes the pygame timer functions so the ai will get input as if it were running at this fps"""
        self.run_real_time = run_real_time
        """If True the game will actually run at the force_game_fps speed"""
        self.pass_quit_event = pass_quit_event
        """Decides whether the quit event should be passed on to the game"""
        self._keys_pressed = []
        self._last_keys_pressed = []
        self._playing = False
        self._default_flip = pygame.display.flip
        self._default_update = pygame.display.update
        self._default_event_get = pygame.event.get
        self._default_time_clock = pygame.time.Clock
        self._default_get_ticks = pygame.time.get_ticks
        self._game_time = 0.0
        
    def get_keys_pressed(self, screen_array, feedback, terminal):
        """
        Called whenever the screen buffer is refreshed. returns the keys we want pressed in the next until the next
        screen refresh
        :param screen_array: 3d numpy.array of float. screen_width * screen_height * rgb
        :param feedback: result of call to get_feedback
        :param terminal: boolean, True if we have reached a terminal state, meaning the next frame will be a restart
        :return: a list of the integer values of the keys we want pressed. See pygame.constants for values
        """
        raise NotImplementedError("Please override this method")

    def get_feedback(self):
        """
        Overriden method should hook into game events to give feeback to the learning agent
        :return: First = value we want to give as reward/punishment to our learning agent
                 Second = Boolean true if we have reached a terminal state
        :rtype: tuple (float, boolean)
        """
        raise NotImplementedError("Please override this method")
    
    def start(self):
        """
        Start playing the game. We will now start listening for screen updates calling our play and reward functions
        and returning our intercepted key presses
        """
        if self._playing:
            raise Exception("Already playing")

        pygame.display.flip = function_intercept(pygame.display.flip, self._on_screen_update)
        pygame.display.update = function_intercept(pygame.display.update, self._on_screen_update)
        pygame.event.get = function_intercept(pygame.event.get, self._on_event_get)
        pygame.time.Clock = function_intercept(pygame.time.Clock, self._on_time_clock)
        pygame.time.get_ticks = function_intercept(pygame.time.get_ticks, self.get_game_time_ms)
        # TODO: handle pygame.time.set_timer...

        self._playing = True

    def stop(self):
        """
        Stop playing the game. Will try and return PyGame to the state it was in before we started
        """
        if not self._playing:
            raise Exception("Already stopped")

        pygame.display.flip = self._default_flip
        pygame.display.update = self._default_update
        pygame.event.get = self._default_event_get
        pygame.time.Clock = self._default_time_clock
        pygame.time.get_ticks = self._default_get_ticks

        self._playing = False
        
    @property
    def playing(self):
        """
        Returns if we are in a state where we are playing/intercepting PyGame calls
        :return: boolean
        """
        return self._playing

    @playing.setter
    def playing(self, value):
        if self._playing == value:
            return
        if self._playing:
            self.stop()
        else:
            self.start()

    def get_ms_per_frame(self):
        return 1000.0 / self.force_game_fps

    def get_game_time_ms(self):
        return self._game_time

    def _on_time_clock(self, real_clock, *args, **kwargs):
        return self._FixedFPSClock(self, real_clock)
    
    def _on_screen_update(self, _, *args, **kwargs):
        surface_array = pygame.surfarray.array3d(pygame.display.get_surface())
        reward, terminal = self.get_feedback()
        keys = self.get_keys_pressed(surface_array, reward, terminal)
        self._last_keys_pressed = self._keys_pressed
        self._keys_pressed = keys

        # now we have processed a frame increment the game timer
        self._game_time += self.get_ms_per_frame()
        
    def _on_event_get(self, _, *args, **kwargs):
        key_down_events = [pygame.event.Event(KEYDOWN, {"key": x})
                           for x in self._keys_pressed if x not in self._last_keys_pressed]
        key_up_events = [pygame.event.Event(KEYUP, {"key": x})
                         for x in self._last_keys_pressed if x not in self._keys_pressed]

        result = []

        # have to deal with arg type filters
        if args:
            if hasattr(args[0], "__iter__"):
                args = args[0]

            for type_filter in args:
                if type_filter == QUIT:
                    if type_filter == QUIT:
                        if self.pass_quit_event:
                            for e in _:
                                if e.type == QUIT:
                                    result.append(e)
                    else:
                        pass  # never quit
                elif type_filter == KEYUP:
                    result = result + key_up_events
                elif type_filter == KEYDOWN:
                    result = result + key_down_events
        else:
            result = key_down_events + key_up_events
            if self.pass_quit_event:
                for e in _:
                    if e.type == QUIT:
                        result.append(e)

        return result

    def __enter__(self):
        self.start()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.stop()
        
    
    class _FixedFPSClock(object):
        def __init__(self, pygame_player, real_clock):
            self._pygame_player = pygame_player
            self._real_clock = real_clock

        def tick(self, _=None):
            if self._pygame_player.run_real_time:
                return self._real_clock.tick(self._pygame_player.force_game_fps)
            else:
                return self._pygame_player.get_ms_per_frame()

        def tick_busy_loop(self, _=None):
            if self._pygame_player.run_real_time:
                return self._real_clock.tick_busy_loop(self._pygame_player.force_game_fps)
            else:
                return self._pygame_player.get_ms_per_frame()

        def get_time(self):
            return self._pygame_player.get_game_time_ms()

        def get_raw_time(self):
            return self._pygame_player.get_game_time_ms()

        def get_fps(self):
            return int(1.0 / self._pygame_player.get_ms_per_frame())
        

In [14]:
'''
Created on Jun 2, 2016

@author: rbhat
'''
# This is heavily based off https://github.com/asrivat1/DeepLearningVideoGames
# deep q learning agent that runs against Half-Pong. Runs on a much smaller screen and with fewer layers.
# Performs significantly above random, but still has someway to go to match google deep mind performance...
# To see a trained version of this network start it with the kwargs checkpoint_path="deep_q_half_pong_networks_40x40_8"
# and playback_mode="True"

import os
import random
from collections import deque

import tensorflow as tf
import numpy as np
import cv2
from pygame.constants import K_DOWN, K_UP


class DeepQHalfPongPlayer(PyGamePlayer):
    ACTIONS_COUNT = 3  # number of valid actions. In this case up, still and down
    FUTURE_REWARD_DISCOUNT = 0.99  # decay rate of past observations
    OBSERVATION_STEPS = 50000.  # time steps to observe before training
    EXPLORE_STEPS = 500000.  # frames over which to anneal epsilon
    INITIAL_RANDOM_ACTION_PROB = 1.0  # starting chance of an action being random
    FINAL_RANDOM_ACTION_PROB = 0.05  # final chance of an action being random
    MEMORY_SIZE = 500000  # number of observations to remember
    MINI_BATCH_SIZE = 200  # size of mini batches
    STATE_FRAMES = 4  # number of frames to store in the state
    OBS_LAST_STATE_INDEX, OBS_ACTION_INDEX, OBS_REWARD_INDEX, OBS_CURRENT_STATE_INDEX, OBS_TERMINAL_INDEX = range(5)
    SAVE_EVERY_X_STEPS = 10000
    LEARN_RATE = 1e-6
    STORE_SCORES_LEN = 200.
    SCREEN_WIDTH = 40
    SCREEN_HEIGHT = 40

    def __init__(self,
                 # to see a trained network change checkpoint_path="deep_q_half_pong_networks_40x40_8" and
                 # playback_mode="True"
                 checkpoint_path="deep_q_half_pong_networks",
                 playback_mode=True,
                 verbose_logging=True):
        """
        Example of deep q network for pong

        :param checkpoint_path: directory to store checkpoints in
        :type checkpoint_path: str
        :param playback_mode: if true games runs in real time mode and demos itself running
        :type playback_mode: bool
        :param verbose_logging: If true then extra log information is printed to std out
        :type verbose_logging: bool
        """
        self._playback_mode = playback_mode
        self.last_score = 0
        super(DeepQHalfPongPlayer, self).__init__(force_game_fps=8, run_real_time=playback_mode)
        self.verbose_logging = verbose_logging
        self._checkpoint_path = checkpoint_path
        self._session = tf.Session()
        self._input_layer, self._output_layer = self._create_network()

        self._action = tf.placeholder("float", [None, self.ACTIONS_COUNT])
        self._target = tf.placeholder("float", [None])

        readout_action = tf.reduce_sum(tf.mul(self._output_layer, self._action), reduction_indices=1)

        cost = tf.reduce_mean(tf.square(self._target - readout_action))
        self._train_operation = tf.train.AdamOptimizer(self.LEARN_RATE).minimize(cost)

        self._observations = deque()
        self._last_scores = deque()

        # set the first action to do nothing
        self._last_action = np.zeros(self.ACTIONS_COUNT)
        self._last_action[1] = 1

        self._last_state = None
        self._probability_of_random_action = self.INITIAL_RANDOM_ACTION_PROB
        self._time = 0

        self._session.run(tf.initialize_all_variables())

        if not os.path.exists(self._checkpoint_path):
            os.mkdir(self._checkpoint_path)
        self._saver = tf.train.Saver()
        checkpoint = tf.train.get_checkpoint_state(self._checkpoint_path)

        if checkpoint and checkpoint.model_checkpoint_path:
            self._saver.restore(self._session, checkpoint.model_checkpoint_path)
            print("Loaded checkpoints %s" % checkpoint.model_checkpoint_path)
        elif playback_mode:
            raise Exception("Could not load checkpoints for playback")

    def get_keys_pressed(self, screen_array, reward, terminal):
        # images will be black or white
        _, screen_binary = cv2.threshold(cv2.cvtColor(screen_array, cv2.COLOR_BGR2GRAY), 1, 255,
                                         cv2.THRESH_BINARY)

        if reward != 0.0:
            self._last_scores.append(reward)
            if len(self._last_scores) > self.STORE_SCORES_LEN:
                self._last_scores.popleft()

        # first frame must be handled differently
        if self._last_state is None:
            # the _last_state will contain the image data from the last self.STATE_FRAMES frames
            self._last_state = np.stack(tuple(screen_binary for _ in range(self.STATE_FRAMES)), axis=2)
            return DeepQHalfPongPlayer._key_presses_from_action(self._last_action)

        screen_binary = np.reshape(screen_binary,
                                   (self.SCREEN_WIDTH, self.SCREEN_HEIGHT, 1))
        current_state = np.append(self._last_state[:, :, 1:], screen_binary, axis=2)

        if not self._playback_mode:
            # store the transition in previous_observations
            self._observations.append((self._last_state, self._last_action, reward, current_state, terminal))

            if len(self._observations) > self.MEMORY_SIZE:
                self._observations.popleft()

            # only train if done observing
            if len(self._observations) > self.OBSERVATION_STEPS:
                self._train()
                self._time += 1

        # update the old values
        self._last_state = current_state

        self._last_action = self._choose_next_action()

        if not self._playback_mode:
            # gradually reduce the probability of a random actionself.
            if self._probability_of_random_action > self.FINAL_RANDOM_ACTION_PROB \
                    and len(self._observations) > self.OBSERVATION_STEPS:
                self._probability_of_random_action -= \
                    (self.INITIAL_RANDOM_ACTION_PROB - self.FINAL_RANDOM_ACTION_PROB) / self.EXPLORE_STEPS

            print("Time: %s random_action_prob: %s reward %s scores differential %s" %
                  (self._time, self._probability_of_random_action, reward,
                   sum(self._last_scores) / self.STORE_SCORES_LEN))

        return DeepQHalfPongPlayer._key_presses_from_action(self._last_action)

    def _choose_next_action(self):
        new_action = np.zeros([self.ACTIONS_COUNT])

        if (not self._playback_mode) and (random.random() <= self._probability_of_random_action):
            # choose an action randomly
            action_index = random.randrange(self.ACTIONS_COUNT)
        else:
            # choose an action given our last state
            readout_t = self._session.run(self._output_layer, feed_dict={self._input_layer: [self._last_state]})[0]
            if self.verbose_logging:
                print("Action Q-Values are %s" % readout_t)
            action_index = np.argmax(readout_t)

        new_action[action_index] = 1
        return new_action

    def _train(self):
        # sample a mini_batch to train on
        mini_batch = random.sample(self._observations, self.MINI_BATCH_SIZE)
        # get the batch variables
        previous_states = [d[self.OBS_LAST_STATE_INDEX] for d in mini_batch]
        actions = [d[self.OBS_ACTION_INDEX] for d in mini_batch]
        rewards = [d[self.OBS_REWARD_INDEX] for d in mini_batch]
        current_states = [d[self.OBS_CURRENT_STATE_INDEX] for d in mini_batch]
        agents_expected_reward = []
        # this gives us the agents expected reward for each action we might take
        agents_reward_per_action = self._session.run(self._output_layer, feed_dict={self._input_layer: current_states})
        for i in range(len(mini_batch)):
            if mini_batch[i][self.OBS_TERMINAL_INDEX]:
                # this was a terminal frame so there is no future reward...
                agents_expected_reward.append(rewards[i])
            else:
                agents_expected_reward.append(
                    rewards[i] + self.FUTURE_REWARD_DISCOUNT * np.max(agents_reward_per_action[i]))

        # learn that these actions in these states lead to this reward
        self._session.run(self._train_operation, feed_dict={
            self._input_layer: previous_states,
            self._action: actions,
            self._target: agents_expected_reward})

        # save checkpoints for later
        if self._time % self.SAVE_EVERY_X_STEPS == 0:
            self._saver.save(self._session, self._checkpoint_path + '/network', global_step=self._time)

    def _create_network(self):
        # network weights
        convolution_weights_1 = tf.Variable(tf.truncated_normal([8, 8, self.STATE_FRAMES, 32], stddev=0.01))
        convolution_bias_1 = tf.Variable(tf.constant(0.01, shape=[32]))

        convolution_weights_2 = tf.Variable(tf.truncated_normal([4, 4, 32, 64], stddev=0.01))
        convolution_bias_2 = tf.Variable(tf.constant(0.01, shape=[64]))

        feed_forward_weights_1 = tf.Variable(tf.truncated_normal([256, 256], stddev=0.01))
        feed_forward_bias_1 = tf.Variable(tf.constant(0.01, shape=[256]))

        feed_forward_weights_2 = tf.Variable(tf.truncated_normal([256, self.ACTIONS_COUNT], stddev=0.01))
        feed_forward_bias_2 = tf.Variable(tf.constant(0.01, shape=[self.ACTIONS_COUNT]))

        input_layer = tf.placeholder("float", [None, self.SCREEN_WIDTH, self.SCREEN_HEIGHT,
                                               self.STATE_FRAMES])

        hidden_convolutional_layer_1 = tf.nn.relu(
            tf.nn.conv2d(input_layer, convolution_weights_1, strides=[1, 4, 4, 1], padding="SAME") + convolution_bias_1)

        hidden_max_pooling_layer_1 = tf.nn.max_pool(hidden_convolutional_layer_1, ksize=[1, 2, 2, 1],
                                                    strides=[1, 2, 2, 1], padding="SAME")

        hidden_convolutional_layer_2 = tf.nn.relu(
            tf.nn.conv2d(hidden_max_pooling_layer_1, convolution_weights_2, strides=[1, 2, 2, 1],
                         padding="SAME") + convolution_bias_2)

        hidden_max_pooling_layer_2 = tf.nn.max_pool(hidden_convolutional_layer_2, ksize=[1, 2, 2, 1],
                                                    strides=[1, 2, 2, 1], padding="SAME")

        hidden_convolutional_layer_3_flat = tf.reshape(hidden_max_pooling_layer_2, [-1, 256])

        final_hidden_activations = tf.nn.relu(
            tf.matmul(hidden_convolutional_layer_3_flat, feed_forward_weights_1) + feed_forward_bias_1)

        output_layer = tf.matmul(final_hidden_activations, feed_forward_weights_2) + feed_forward_bias_2

        return input_layer, output_layer

    @staticmethod
    def _key_presses_from_action(action_set):
        if action_set[0] == 1:
            return [K_DOWN]
        elif action_set[1] == 1:
            return []
        elif action_set[2] == 1:
            return [K_UP]
        raise Exception("Unexpected action")

    def get_feedback(self):
        from games.half_pong import score

        # get the difference in score between this and the last run
        score_change = (score - self.last_score)
        self.last_score = score

        return float(score_change), score_change == -1

    def start(self):
        super(DeepQHalfPongPlayer, self).start()
        half_pong.run(screen_width=self.SCREEN_WIDTH, screen_height=self.SCREEN_HEIGHT)

In [None]:
if __name__ == '__main__':
    player = DeepQHalfPongPlayer()
    player.start()