<a href="https://colab.research.google.com/github/sachinbluechip/Make-pong-game-with-RL/blob/main/Pong_game_with_Reinforcement_Learning.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!apt-get install -y xvfb python-opengl x11-utils
!apt-get install -y --no-install-recommends ffmpeg
!pip install ffmpeg
!pip install gym pyvirtualdisplay scikit-video #> /dev/null 2>&1
#!pip install 'gym[box2d]'
!pip install atari_py

#%tensorflow_version 2.x
import tensorflow as tf

import numpy as np
import base64, io, time, gym
import IPython, functools
import matplotlib.pyplot as plt
from tqdm import tqdm

!pip install mitdeeplearning
import mitdeeplearning as mdl

# Pong




Define and inspect the Pong environment



In [None]:
env = gym.make("Pong-v0", frameskip=5)
env.seed(1); # for reproducibility

In [None]:
print("Environment has observation space =", env.observation_space)

In [None]:
n_actions = env.action_space.n
print("Number of possible actions that the agent can choose from =", n_actions)

3.7 Define the Pong agent



In [None]:
### Define the Pong agent ###

# Functionally define layers for convenience
# All convolutional layers will have ReLu activation
Conv2D = functools.partial(tf.keras.layers.Conv2D, padding='same', activation='relu')
Flatten = tf.keras.layers.Flatten
Dense = tf.keras.layers.Dense

# Defines a CNN for the Pong agent
def create_pong_model():
  model = tf.keras.models.Sequential([
    # Convolutional layers
    # First, 16 7x7 filters with 4x4 stride
    Conv2D(filters=16, kernel_size=7, strides=4),

    # TODO: define convolutional layers with 32 5x5 filters and 2x2 stride
    Conv2D(filters=32, kernel_size=5, strides=2),

    # TODO: define convolutional layers with 48 3x3 filters and 2x2 stride
    Conv2D(filters=48, kernel_size=3, strides=2),

    Flatten(),

    # Fully connected layer and output
    Dense(units=64, activation='relu'),
    # TODO: define the output dimension of the last Dense layer.
    # Pay attention to the space the agent needs to act in
    Dense(units=n_actions, activation=None)

  ])
  return model

pong_model = create_pong_model()

Pong-specific functions




In [None]:
### Pong reward function ###

# Compute normalized, discounted rewards for Pong (i.e., return)
# Arguments:
#   rewards: reward at timesteps in episode
#   gamma: discounting factor. Note increase to 0.99 -- rate of depreciation will be slower.
# Returns:
#   normalized discounted reward
def discount_rewards(rewards, gamma=0.99):
  discounted_rewards = np.zeros_like(rewards)
  R = 0
  for t in reversed(range(0, len(rewards))):
      # NEW: Reset the sum if the reward is not 0 (the game has ended!)
      if rewards[t] != 0:
        R = 0
      # update the total discounted reward as before
      R = R * gamma + rewards[t]
      discounted_rewards[t] = R

  return normalize(discounted_rewards)

In [None]:
observation = env.reset()
for i in range(30):
  observation, _,_,_ = env.step(0)
observation_pp = mdl.lab3.preprocess_pong(observation)

f = plt.figure(figsize=(10,3))
ax = f.add_subplot(121)
ax2 = f.add_subplot(122)
ax.imshow(observation); ax.grid(False);
ax2.imshow(np.squeeze(observation_pp)); ax2.grid(False); plt.title('Preprocessed Observation');

Training Pong



In [None]:
### Training Pong ###

# Hyperparameters
learning_rate=1e-4
MAX_ITERS = 500 # increase the maximum number of episodes, since Pong is more complex!

# Model and optimizer
pong_model = create_pong_model()
optimizer = tf.keras.optimizers.Adam(learning_rate)

# plotting
smoothed_reward = mdl.util.LossHistory(smoothing_factor=0.9)
plotter = mdl.util.PeriodicPlotter(sec=5, xlabel='Iterations', ylabel='Rewards')
memory = Memory()

for i_episode in range(MAX_ITERS):

  plotter.plot(smoothed_reward.get())

  # Restart the environment
  observation = env.reset()
  previous_frame = mdl.lab3.preprocess_pong(observation)

  while True:
      # Pre-process image
      current_frame = mdl.lab3.preprocess_pong(observation)

      '''TODO: determine the observation change
      Hint: this is the difference between the past two frames'''
      obs_change = current_frame - previous_frame

      '''TODO: choose an action for the pong model, using the frame difference, and evaluate'''
      action = choose_action(pong_model, obs_change)
      # Take the chosen action
      next_observation, reward, done, info = env.step(action)

      '''TODO: save the observed frame difference, the action that was taken, and the resulting reward!'''
      memory.add_to_memory(obs_change, action, reward)

      # is the episode over? did you crash or do so well that you're done?
      if done:
          # determine total reward and keep a record of this
          total_reward = sum(memory.rewards)
          smoothed_reward.append( total_reward )

          # begin training
          train_step(pong_model,
                     optimizer,
                     observations = np.stack(memory.observations, 0),
                     actions = np.array(memory.actions),
                     discounted_rewards = discount_rewards(memory.rewards))

          memory.clear()
          break

      observation = next_observation
      previous_frame = current_frame

In [None]:
saved_pong = mdl.lab3.save_video_of_model(
    pong_model, "Pong-v0", obs_diff=True,
    pp_fn=mdl.lab3.preprocess_pong)
mdl.lab3.play_video(saved_pong)