In [None]:
import pygame # import needed to run next few cells
# !pip install pygame

In [None]:
# import other required packages 
import numpy as np

import tensorflow as tf
from tensorflow import keras

from collections import deque

import random
from random import randint
from random import choice

import time

import matplotlib.pyplot as plt
from IPython import display

import warnings
warnings.filterwarnings('ignore')

In [None]:
# class to create paddle and update position

class Player(pygame.sprite.Sprite):
    # paddle class extends Sprite class in Pygame
    
    def __init__(self, color, width, height):
        # parent class Sprite constructor
        super().__init__()
        
        # set color, width and height of paddle
        self.image = pygame.Surface([width, height])
        self.image.fill((0,0,0))
        self.image.set_colorkey((0,0,0))
 
        # Draw the paddle 
        pygame.draw.rect(self.image, color, [0, 0, width, height])
        
        # Fetch the rectangle object
        self.rect = self.image.get_rect()
        
    def moveUp(self, pixels):
        self.rect.y -= pixels 
        # check that paddle does not go off the screen
        if self.rect.y < 0:
            self.rect.y = 0
          
    def moveDown(self, pixels):
        self.rect.y += pixels 
        # check that paddle does not go off the screen
        if self.rect.y > 420:
            self.rect.y = 420

In [None]:
# extend Player class and create update function to follow ball

class CPU(Player):

    def update(self, ball):
        self.ball = ball
        
        if self.rect.centery < self.ball.rect.y:
            self.moveDown(7)
            
        if self.rect.centery > self.ball.rect.y:
            self.moveUp(7)

In [None]:
# extend Player class and create new function to execute action 0, 1 or 2

class AI(Player):

    def action_update(self, action):
        self.action = action
        
        if self.action == 1:
            self.moveDown(7)
            
        if self.action == 2:
            self.moveUp(7)
            
        if self.action == 0:
            self.rect.y = self.rect.y

In [None]:
# class to create and update ball object

class Ball(pygame.sprite.Sprite):
    # ball class extends Sprite class in Pygame
    
    def __init__(self, color, width, height):
        # parent class Sprite constructor
        super().__init__()
        
        # set color, width and height of the ball
        self.image = pygame.Surface([width, height])
        self.image.fill((0,0,0))
        self.image.set_colorkey((0,0,0))
 
        # Draw the ball 
        pygame.draw.rect(self.image, color, [0, 0, width, height])
        
        # set velocity
        self.velocity = [0,0]
        self.velocity[0] = randint(7,8) * random.choice([-1,1])
        self.velocity[1] = random.choice([-3, 3])
        
        # Fetch the rectangle object that has the dimensions of the image.
        self.rect = self.image.get_rect()
        
        self.active = False
        self.score_time = 0
    
    def counter(self):
        current_time = pygame.time.get_ticks()
            
        if current_time - self.score_time >= 1000:
            self.active = True
    
    def update(self):
        if self.active:
            self.rect.x += self.velocity[0]
            self.rect.y += self.velocity[1]
            return True
        else:
            self.counter()
        
    def reset(self):
        self.active = False
        self.rect.x = 345
        self.rect.y = 195
        self.velocity[0] = randint(7,8) * random.choice([-1,1])
        self.velocity[1] = random.choice([-3, 3])
        self.score_time = pygame.time.get_ticks()
        
    def bounce(self, side):
        if(side == 'cpu'):
            self.velocity[0] = abs(self.velocity[0])
        else:
            self.velocity[0] = -abs(self.velocity[0])
        self.velocity[1] = randint(4,8) * random.choice([-1,1])

In [None]:
# main Pong Environment class
# partly built with the help of: https://www.101computing.net/pong-tutorial-using-pygame-getting-started/

class PongEnv():
    def __init__(self):
        
        self.black = (0,0,0)
        self.white = (255,255,255)
        self.red = (255, 0, 0)
        
        self.height = 500
        self.width = 700
        self.fps = 60 # set FPS to 80+ for faster training
    
        self.scoreCPU = 0
        self.scoreAI = 0
        self.reward = 0
        
        self.hit = False
        self.wait = False
        
        self.action_space = [0,1,2]
        self.observation_space = np.zeros(5) # for now only for obs count
        
        self.screen = None
        self.clock = pygame.time.Clock()
        self.state = None
        self.is_open = True
        
        self.paddleCPU = CPU(self.white, 10, 80)
        self.paddleAI = AI(self.white, 10, 80)
        self.ball = Ball(self.red,10,10)
        
        # list of all the sprites in the game
        self.all_sprites_list = pygame.sprite.Group()
        
        # add 2 paddles and the ball to the list of spirtes
        self.all_sprites_list.add(self.paddleCPU)
        self.all_sprites_list.add(self.paddleAI)
        self.all_sprites_list.add(self.ball)
    
    def render(self):
        
        import pygame
        
        if self.screen is None:
            # set new window
            pygame.init()
            pygame.display.set_caption("Pong")
            self.screen = pygame.display.set_mode((self.width, self.height))
        
        # set screen to black
        self.screen.fill(self.black)
        
        # draw line for net 
        pygame.draw.line(self.screen, self.white, [349, 0], [349, 500], 5)

        # draw all the sprites
        self.all_sprites_list.draw(self.screen)
        
        # display all scores
        font = pygame.font.Font(None, 74)
        self.screen.blit(font.render(str(self.scoreCPU), 1, self.white), (250,10))
        self.screen.blit(font.render(str(self.scoreAI), 1, self.white), (420,10))
        
        # update events and screen
        pygame.event.pump()
        pygame.display.flip()
        
        return self.is_open

    def step(self, action):
            
        # update ball and paddle position and velocity
        self.wait = self.ball.update()
        self.paddleCPU.update(self.ball)

        #Check if the ball is bouncing against any of the 4 walls:
        if self.ball.rect.x>=690:
            self.scoreCPU += 1
            self.ball.reset()
        if self.ball.rect.x<=0:
            self.scoreAI += 1
            self.ball.reset()
        if self.ball.rect.y>490:
            self.ball.velocity[1] = -self.ball.velocity[1]
        if self.ball.rect.y<0:
            self.ball.velocity[1] = -self.ball.velocity[1]     

        #Detect collisions between the ball and the paddles
        if(pygame.sprite.collide_mask(self.ball, self.paddleCPU)):
            self.ball.bounce('cpu')

        if(pygame.sprite.collide_mask(self.ball, self.paddleAI)):
            self.ball.bounce(None)
        
        # ----- implement action and work out reward -----
        
        # update ai paddle using random or predicted action
        self.paddleAI.action_update(action)
        
        # initialise reward
        self.reward = 0
        
        if self.wait:
            # absolute difference reward system
            y_difference = np.abs(self.ball.rect.centery - self.paddleAI.rect.centery)
            if y_difference == 0:
                self.reward = 0.5
            else:
                self.reward = (0.5 - ((y_difference/self.height)/2))
            
            # normal rewards
            if self.ball.rect.x >= 690: # if ai paddle does not hit ball
                self.reward = -0.5
                self.hit = False
            if self.ball.rect.x <= 0 and self.hit == True: # if ai paddle scores point after hit
                self.reward += 2
            if(pygame.sprite.collide_mask(self.ball, self.paddleAI)): # if ai paddle hits ball
                self.reward += 1
                self.hit = True
            if self.scoreAI == 3: # if ai wins game
                self.reward += 5
                
            self.reward = np.around(self.reward, 2)
        
        # score of 3 wins game returns done=True
        done = bool(self.scoreCPU > 2 or self.scoreAI > 2)
        
        self.state = (self.ball.velocity[0], self.ball.velocity[1], self.ball.rect.x, self.ball.rect.y, self.paddleAI.rect.y)
        self.state = self.norm_state(self.state)
        
        # tick / frames per second
        self.clock.tick(self.fps)
        
        return [self.reward, self.state, done]
    
    # normalise state values for ann training
    def norm_state(self, arr):
        state = np.zeros(len(arr))
        state[0] = arr[0]/1 # velocity 0
        state[1] = arr[1]/1 # velocity 1
        state[2] = arr[2]/500 # ball x
        state[3] = arr[3]/500 # ball y
        state[4] = arr[4]/500 # paddle y
        return state
    
    # reset ball and paddle posiitons and reset scores
    def reset(self):
        self.paddleCPU.rect.x = 20
        self.paddleCPU.rect.y = 200

        self.paddleAI.rect.x = 670
        self.paddleAI.rect.y = 200
        
        self.ball.reset()
        
        self.state = (self.ball.velocity[0], self.ball.velocity[1], self.ball.rect.centerx, self.ball.rect.centery, self.paddleAI.rect.y)
        self.state = self.norm_state(self.state)
        
        self.scoreAI = 0
        self.scoreCPU = 0
        
        self.hit = False
        
        return self.state
    
    # close pong environment
    def close(self):
        pygame.display.quit()
        pygame.quit()

In [None]:
def experience_replay(model, batch_size, gamma, memory, observation_count, action_count, epoch_count):
    batch = random.sample(memory, batch_size) # randomly sample a batch 
    batch_vector = np.array(batch, dtype=object) # convert batch to vector
    
    observation_t = np.zeros((batch_size, observation_count)) #observation at time t
    observation_t_next = np.zeros((batch_size, observation_count)) #observation at time t+1
    for i in range(len(batch_vector)): #loop through the batch and store observations at time t and t+1 in arrays
        observation_t[i] = batch_vector[i,0]       
        observation_t_next[i] = batch_vector[i,3]          
    
    with tf.device("gpu:0"):
        # predict an action using model
        prediction_at_t = model(observation_t).numpy()                  
        prediction_at_t_next = model(observation_t_next).numpy()       
    
    # feature vector and label  
    X = []
    y = []               
    
    i = 0
    for observation_t, action, reward, _, done in batch_vector: # fetch row from the batch
        X.append(observation_t) # append most recent observation to feature vector
        
        # if episode is done: target value is the reward, if not done: get target using Bellman optimality equation
        if done:                                 
            target = reward                     
        else:                                    
            target = reward + gamma * np.max(prediction_at_t_next[i])
        
        # update action of original state
        prediction_at_t[i, action] = target
        # append updates action values to label
        y.append(prediction_at_t[i])    
        
        i += 1 
    
    X_train = np.array(X).reshape(batch_size, observation_count) 
    y_train = np.array(y)                           
    with tf.device("gpu:0"): # using GPU
        hist = model.fit(X_train, y_train, epochs = epoch_count, verbose=0) # fit the ann model

In [None]:
# plot function from
# https://github.com/python-engineer/snake-ai-pytorch/blob/main/helper.py

plt.ion()

def plot(rewards):
    display.clear_output(wait=True)
    display.display(plt.gcf())
    plt.clf()
    plt.title('Training...')
    plt.xlabel('Number of Games')
    plt.ylabel('Rewards')
    plt.plot(rewards)
    plt.ylim(ymin=0)
    plt.text(len(rewards)-1, rewards[-1], str(rewards[-1]))
    plt.show(block=False)
    plt.pause(.1)

In [None]:
env = PongEnv()
observation_count = env.observation_space.shape[0]
action_count = len(env.action_space)

In [None]:
alpha = 0.001 # learning rate
model = keras.Sequential()
model.add(keras.layers.Dense(32, input_dim=observation_count, activation='relu'))
model.add(keras.layers.Dense(64, activation='relu'))
model.add(keras.layers.Dense(action_count, activation = 'linear'))
model.compile(loss = 'mean_squared_error', optimizer=keras.optimizers.Adam(learning_rate=alpha))

In [None]:
rewards = []                                                
record = 0 # keep track of best score
episodes = 1000                                            
gamma = 0.9                                               
beta = 0.05                                               
batch_size = 128                                          
memory = deque([], maxlen=25000) # memory replay buffer set to 25000

to_render = True # set to false to not render environment
render_freq = 5 # render every 5 episodes 

for episode in range(episodes):
    observation_t = env.reset()
    observation_t = np.reshape(observation_t, [1, observation_count])
    
    total_reward = 0
    epsilon = 0.5 / (1 + beta * (episode / action_count))
    
    done = False
    while not done:
        rand_num = np.random.random()
        if rand_num <= epsilon:
            action = random.choice(env.action_space)
        else:
            # note: window will be open but not respond when not rendering , do not exit
            if episode%render_freq == 0 and to_render == True: 
                env.render() # render env
            with tf.device("gpu:0"): # execute with gpu
                action_values = model(observation_t)
            action = np.argmax(action_values[0])
    
        reward, observation_t_next, done = env.step(action)
        observation_t_next = np.reshape(observation_t_next, [1, observation_count])
        total_reward += reward
        memory.append((observation_t, action, reward, observation_t_next, done))
        observation_t = observation_t_next
    
        if done: # if cpu or ai wins
            rewards.append(total_reward)
            if total_reward > record:
                record = total_reward
                model.save(f'pong_model_eps{episode}')
                
            plot(rewards)
            
            print(f'\nepisode: {episode}/{episodes}, score: {total_reward}, epsilon: {epsilon}\n')
        
        #  if enough experiences start function approximation
        if len(memory) > batch_size:
            experience_replay(model, batch_size, gamma, memory, observation_count, action_count, 2)

env.close() # close environment aftet training complete

In [None]:
# plot cumulative_rewards

cumulative_rewards = []
for episode in range(episodes):
    x = sum(rewards[:episode])
    y = x/(episode+1)
    cumulative_rewards.append(y)

plt.figure(figsize=(15, 5))
plt.title('Cummulative Reward over episodes')
plt.xlabel('Episodes')
plt.ylabel('Cummulative Reward')
plt.plot(range(episodes), cumulative_rewards)
plt.show()

In [None]:
# play the game with trained model
env = PongEnv()
observation_count = env.observation_space.shape[0]
action_count = len(env.action_space)

model_trained = keras.models.load_model('pong_model_eps739')

observation = env.reset() # reset the environment to the initial state
observation = np.reshape(observation, [1, observation_count])

done = False
while not done:
    env.render()
    action_values = model_trained(observation) #run observation through the ANN Q(s,a)
    action = np.argmax(action_values[0]) # get the best action
    reward, observation, done = env.step(action)  # execute action
    observation = np.reshape(observation, [1, observation_count])
env.close() # close the env

In [None]:
# generate plot for rewards over episodes

plt.figure(figsize=(15, 5))
plt.title('Rewards Graph')
plt.xlabel('Episodes')
plt.ylabel('Rewards')
plt.plot(range(len(rewards)),rewards)
plt.show()