In [6]:
import numpy as np
import gym
from gym import wrappers
import time
import cv2
#from skimage.color import rgb2gray

# Imports specifically so we can render outputs in Jupyter.
import matplotlib.pyplot as plt
#%matplotlib inline
#from JSAnimation.IPython_display import display_animation
from matplotlib import animation
from IPython.display import display
from collections import deque

In [7]:
save_path = 'pong_policy.npy'
### Hyperparams

# In how many slices discretize the continuous space, the bigger, the smoother. but it increases a lot the time to converge !
# Try and check to see how they work !
numStates_x = 60
numStates_y = 80
numStates = np.array([numStates_x, numStates_y, numStates_x, numStates_y, 3, 30])

# The environment low, high and interval mapped per state
env_low = None
env_high = None
env_dx = None

# Number of episodes
numEpisodes = 100

# Tweaking params
initial_lr = 1.0 # Initial Learning Rate
min_lr = 0.001 # Minimum Learning Rate
lr_decay = 0.999996
gamma = 1.0 # Discount factor
epsilon_start = 1.0 # Allow the model to do a lot of trial and error on the beggining
epsilon_decay = 0.999 # Decay per episode.
epsilon_end = 0.01 # The end point / min of the epsilon

In [8]:
def obs_to_state(obs):
    """ Maps an observation to state """

    p_x = int(obs[0] / env_dx[0])
    p_y = int(obs[1] / env_dx[1])
    ball_x = int(obs[2] / env_dx[0])
    ball_y = int(obs[3] / env_dx[1])
    ball_dir_x = int(obs[4] + 1)
    ball_dir_y = int((obs[5] + 1.5) * 10)
    
    return p_x, p_y, ball_x, ball_y, ball_dir_x, ball_dir_y

# get epsilon by Episode
def get_epsilon(n_episode):
    epsilon = max(epsilon_start * (epsilon_decay ** n_episode), epsilon_end)
    return (epsilon)


def run_episode(env, policy=None, render=False):
    obs = env.reset()
    total_reward = 0
    step_idx = 0
    images = []
    while True:
        if policy is None:
            action = env.action_space.sample()
        else:
            p_x, p_y, ball_x, ball_y, ball_dir_x, ball_dir_y = obs_to_state(obs)
            action = policy[p_x][p_y][ball_x][ball_y][ball_dir_x][ball_dir_y]
        obs, reward, done, _ = env.step(action)
        total_reward += gamma ** step_idx * reward
        step_idx += 1
        if render:
            images.append(env.render())
        if done:
            break

    return total_reward, np.array(images, dtype=np.uint8)

In [9]:
# select action based on epsilon greedy
def select_action(env, q_table, state, epsilon):
    p_x, p_y, ball_x, ball_y, ball_dir_x, ball_dir_y = state
    # implicit policy; if we have action values for that state, choose the largest one, else random
    if np.random.rand() > epsilon:
#         logits = q_table[position][speed]
#         logits_exp = np.exp(logits)
#         probs = logits_exp / np.sum(logits_exp)
#         action = np.random.choice(env.action_space.n, p=probs)  # asa era la prof
        action = np.argmax(q_table[p_x][p_y][ball_x][ball_y][ball_dir_x][ball_dir_y])
    else:
        action = env.action_space.sample()
    return action

# Given (state, action, reward, next_state) pair after a transition made in the e nvironment and the episode index
def updateExperience(env, q_table, state, action, reward, next_state, lr):
#     next_action = select_action(env, q_table, next_state, -1)  # -1 so the algorithm NEVER chooses random on second action
    # Q(s t+1, a t+1)
    p_x, p_y, ball_x, ball_y, ball_dir_x, ball_dir_y = state
    p_x_n, p_y_n, ball_x_n, ball_y_n, ball_dir_x_n, ball_dir_y_n = next_state
    next_q = np.max(q_table[p_x_n][p_y_n][ball_x_n][ball_y_n][ball_dir_x_n][ball_dir_y_n])
    q_table[p_x][p_y][ball_x][ball_y][ball_dir_x][ball_dir_y][action] += lr * (reward + gamma * next_q - q_table[p_x][p_y][ball_x][ball_y][ball_dir_x][ball_dir_y][action])

def train_q_learning(env):
    print('Start Q-Learning training:')
    display_freq = min(numEpisodes // 10, 1000)

    # Initialize Q-Table
    q_table = np.random.uniform(-1, 1, (numStates[0], numStates[1], numStates[2], numStates[3], numStates[4], numStates[5], 3))  # [number_of_positions x number_of_speeds x number_of_actions]
    last100_moving_total = 0
    last100_rewards = deque()
    SOLVED = False
    last_total_rewards = [] # For stat purposes, accumultates some episode rewards
    
    for episode in range(numEpisodes):
        eps = get_epsilon(episode)
        lr = max(min_lr, initial_lr * (lr_decay ** episode))
        
        obs = env.reset()
        total_reward = 0
        
        while True:
            state = obs_to_state(obs)
            action = select_action(env, q_table, state, eps)
            
            # step environment
            obs, reward, done, info = env.step(action)
            next_state = obs_to_state(obs)
            total_reward += reward
            
            updateExperience(env, q_table, state, action, reward, next_state, lr)
            
            if done:
                break
                
        last100_rewards.append(total_reward)
        last100_moving_total += total_reward
        
        while len(last100_rewards) > 100:
            removedItem = last100_rewards.popleft()
            last100_moving_total -= removedItem
                        
        last100_moving_avg = last100_moving_total / len(last100_rewards)

        if episode % display_freq == 0:  # Write out partial results
            print(f'At episode: {episode+1} - Reward mean from last 100 episodes: {last100_moving_avg}. - LR:{lr:0.4f} - eps:{eps:0.4f}')
            last_total_rewards.clear()
        
    print('Training finished!')
    solution_policy = np.argmax(q_table, axis=4)
    solution_policy_scores = [run_episode(env, solution_policy, False)[0] for _ in range(1000)]
    print("Average score of solution on a dry run= ", np.mean(solution_policy_scores))

    return solution_policy

In [11]:
import pong_env_pos as pong_env
env = pong_env.env()

env_dx = np.array([pong_env.DISPLAY_WIDTH / numStates[0], pong_env.DISPLAY_HEIGHT / numStates[1]])

# Train a policy. TODO: save it
sol_policy = train_q_learning(env)

with open(save_path, 'wb') as f:
    np.save(f, sol_policy)

Start Q-Learning training:


MemoryError: Unable to allocate 46.3 GiB for an array with shape (60, 80, 60, 80, 3, 30, 3) and data type float64

In [None]:
# Play  simulation with the learned policy
reward, images = run_episode(env, sol_policy, True)
print(reward)

frame = images[0]
height, width, layers = frame.shape

video = cv2.VideoWriter('demo.avi', 0, 30, (width,height))

for image in images:
    video.write(image)

cv2.destroyAllWindows()
video.release()