<a href="https://colab.research.google.com/github/prevelat/Machine_Learning/blob/master/BQ_Training_documented.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Setup

### Installation

In [0]:
# Note: If you haven't installed the following dependencies, run:
!apt-get install -y xvfb
!pip install 'gym==0.10.11'
!pip install 'imageio==2.4.0'
!pip install PILLOW
!pip install 'pyglet==1.3.2'
!pip install pyvirtualdisplay
!pip install tf-agents
!pip install tensorflow==2.0.0
!pip install tensorflow-probability==0.8
try:
  %%tensorflow_version 2.x
except:
  pass

In [0]:
from __future__ import absolute_import, division, print_function

import base64
import imageio
import IPython
import matplotlib
import matplotlib.pyplot as plt
import PIL.Image
import pyvirtualdisplay
import numpy as np
import datetime
import pandas as pd
import time
import tensorflow as tf

from tf_agents.agents.dqn import dqn_agent
from tf_agents.drivers import dynamic_step_driver
from tf_agents.environments import suite_gym
from tf_agents.environments import tf_py_environment
from tf_agents.eval import metric_utils
from tf_agents.metrics import tf_metrics
from tf_agents.networks import q_network
from tf_agents.policies import random_tf_policy
from tf_agents.replay_buffers import tf_uniform_replay_buffer
from tf_agents.trajectories import trajectory
from tf_agents.utils import common
from tf_agents.policies import policy_saver

%load_ext tensorboard

tf.compat.v1.enable_v2_behavior()

In [0]:
tf.version.VERSION

### Cloud Management

In [0]:
# Google Cloud Setup

import os
from google.colab import drive
drive.mount('/content/drive')
try:
  os.chdir("drive/My Drive/Colab Notebooks")
except:
  pass
from google.colab import auth
auth.authenticate_user()
print('Authenticated')

from google.cloud import bigquery

In [0]:
#Google Cloud Variables

project_id = 'ml-piscine-262622'
dataset_id = 'ml_piscine_bq'

environment_table_id = dataset_id + '.environment'
agent_policy_table_id = dataset_id + '.agent_policy_v2'
observation_table_id = dataset_id + '.observation'
episodes_table_id = dataset_id + '.episodes'
steps_table_id = dataset_id + '.steps'
traj_raw_table_id = dataset_id + '.traj_raw'

client = bigquery.Client(project=project_id)
try:
  dataset = bigquery.Dataset(dataset_id)
  dataset.location = "US"
  dataset = client.create_dataset(dataset)
except:
  pass

!gcloud config set project {project_id}
policy_bucket_folder = 'gs://ml-piscine-bucket/policy/'

### Hyperparameters

In [0]:
num_iterations = 200 # @param {type:"integer"}

replay_buffer_max_length = 100000  # @param {type:"integer"}

batch_size = 64  # @param {type:"integer"}
learning_rate = 1e-3  # @param {type:"number"}

num_eval_episodes = 100  # @param {type:"integer"}
eval_interval =   50# @param {type:"integer"}

collect_steps = 1000  # @param {type:"integer"}

dqn_layer_params = (100, )  # @param

### Environment

Load the CartPole environment from the OpenAI Gym suite. <br/>
Usually two environments are instantiated: one for training and one for evaluation.

In [0]:
env_name = 'CartPole-v0'
env = suite_gym.load(env_name)
train_py_env = suite_gym.load(env_name)
eval_py_env = suite_gym.load(env_name)
train_env = tf_py_environment.TFPyEnvironment(train_py_env)
eval_env = tf_py_environment.TFPyEnvironment(eval_py_env)

### Agent

The DQN agent can be used in any environment which has a discrete action space.

At the heart of a DQN Agent is a QNetwork, a neural network model that can learn to predict QValues (expected returns) for all actions, given an observation from the environment.

Use tf_agents.networks.q_network to create a QNetwork, passing in the observation_spec, action_spec, and a tuple describing the number and size of the model's hidden layers.

In [0]:
fc_layer_params = dqn_layer_params

q_net = q_network.QNetwork(
    train_env.observation_spec(),
    train_env.action_spec(),
    fc_layer_params=fc_layer_params)

Now use `tf_agents.agents.dqn.dqn_agent` to instantiate a `DqnAgent`. In addition to the `time_step_spec`, `action_spec` and the QNetwork, the agent constructor also requires an optimizer (in this case, `AdamOptimizer`), a loss function, and an integer step counter.

In [0]:
optimizer = tf.compat.v1.train.AdamOptimizer(learning_rate=learning_rate)

train_step_counter = tf.Variable(0)

loss_fn = common.element_wise_squared_loss

agent = dqn_agent.DqnAgent(
    train_env.time_step_spec(),
    train_env.action_spec(),
    q_network=q_net,
    optimizer=optimizer,
    td_errors_loss_fn=loss_fn,
    train_step_counter=train_step_counter)

agent.initialize()

Big Query agent table schema:

In [0]:
agent_policy_schema = [
  {'name': 'ID', 'type': 'INTEGER', 'mode': 'REQUIRED'},
  {'name': 'source', 'type': 'STRING', 'mode': 'REQUIRED'},
  {'name': 'avg_return', 'type': 'FLOAT', 'mode': 'REQUIRED'},
  {'name': 'eval_episodes', 'type': 'INTEGER', 'mode': 'REQUIRED'},
  {'name': 'training_steps', 'type': 'INTEGER', 'mode': 'REQUIRED'},
  {'name': 'optimizer', 'type': 'STRING', 'mode': 'REQUIRED'},
  {'name': 'loss_fn', 'type': 'STRING', 'mode': 'REQUIRED'},
  {'name': 'learning_rate', 'type': 'FLOAT', 'mode': 'REQUIRED'},
  {'name': 'dqn_layer_params', 'type': 'STRING', 'mode': 'REQUIRED'}
]
ag_pol_d = dict()

### Replay Buffer

The replay buffer keeps track of data collected from the environment. This tutorial uses `tf_agents.replay_buffers.tf_uniform_replay_buffer.TFUniformReplayBuffer`, as it is the most common. 

The constructor requires the specs for the data it will be collecting. This is available from the agent using the `collect_data_spec` method. The batch size and maximum buffer length are also required.

In [0]:
replay_buffer = tf_uniform_replay_buffer.TFUniformReplayBuffer(
    data_spec=agent.collect_data_spec,
    batch_size=train_env.batch_size,
    max_length=replay_buffer_max_length)

For most agents, collect_data_spec is a named tuple called Trajectory, containing the specs for observations, actions, rewards, and other items.

In [0]:
agent.collect_data_spec

In [0]:
agent.collect_data_spec._fields

Convert data back to Trajectory and add to buffer:

In [0]:
def add_data_to_replay_buffer(buffer, data):

  """
  Input new data to the replay buffer
  Arguments:
      buffer : replay buffer that the new data should be added
      data : data that should be added to the replay buffer
  """

  for i in data:

    obs = [data[i]['obs0'], data[i]['obs1'], data[i]['obs2'], data[i]['obs3']]

    step_type = tf.convert_to_tensor(data[i]['step_type'], tf.int32)
    observation = tf.convert_to_tensor(obs, tf.float32)
    action = tf.convert_to_tensor(data[i]['action'], tf.int64)
    # policy_info = tuple(data[i]['policy_info'])
    policy_info = ()
    next_step_type = tf.convert_to_tensor(data[i]['next_step_type'], tf.int32)
    reward = tf.convert_to_tensor(data[i]['reward'], tf.float32)
    discount = tf.convert_to_tensor(data[i]['discount'], tf.float32)

    traj = trajectory.Trajectory(step_type, observation, action, policy_info, next_step_type, reward, discount)
    buffer.add_batch(traj)
  print('Replay buffer ready')

The replay buffer, after the function call, is a collection of Trajectories.

The agent needs access to the replay buffer. This is provided by creating an iterable `tf.data.Dataset` pipeline which will feed data to the agent.

Each row of the replay buffer only stores a single observation step. But since the DQN Agent needs both the current and next observation to compute the loss, the dataset pipeline will sample two adjacent rows for each item in the batch (`num_steps=2`).

This dataset is also optimized by running parallel calls and prefetching data.

### Metrics for Evaluation

The most common metric used to evaluate a policy is the average return. The return is the sum of rewards obtained while running a policy in an environment for an episode. Several episodes are run, creating an average return.

The following function computes the average return of a policy, given the policy, environment, and a number of episodes.

In [0]:
#@test {"skip": true}
def compute_avg_return(environment, policy, num_episodes=10):

  """
  Calculates the average return 
  Arguments:
      environment : environment used for the agent to be evaluated
      policy : the policy, that the agent is using, to be evaluated
      num_episodes : how many episodes should the policy be evaluated for
  Returns:
      The evaluation average return
  """

  total_return = 0.0
  for _ in range(num_episodes):

    time_step = environment.reset()
    episode_return = 0.0

    while not time_step.is_last():
      action_step = policy.action(time_step)
      time_step = environment.step(action_step.action)
      episode_return += time_step.reward
    total_return += episode_return

  avg_return = total_return / num_episodes
  return avg_return.numpy()[0]


# See also the metrics module for standard implementations of different metrics.
# https://github.com/tensorflow/agents/tree/master/tf_agents/metrics

For each evaluation we save a new policy

In [0]:
def generate_policy_folder_name(avg, ID):

  """
  Generates new policy folder name to be created in the bucket
  Arguments:
      avg : policy average return
      ID : policy ID 
  Returns:
      The new folder name to be created
  """

  i = datetime.datetime.now() - datetime.datetime(1970,1,1)
  folder_name = str(ID) + '.' + str(avg) + '.policy.' + str(i.total_seconds()) 
  return folder_name

### Big Query

In [0]:
def get_last_ID(table_id):

  """
  Query for the last ID in table
  Arguments:
      table_id : qbq table id used as reference
  Returns:
      The last integer ID found in the table
  """

  path = project_id + "." + table_id
  query = "SELECT * FROM `" + path + "` ORDER BY ID DESC LIMIT 1"
  row = pd.read_gbq(query=query, project_id=project_id)
  return int(row['ID'][0])

def get_last_env_ID():

  """
  Query for the last environment ID
  Returns:
      The last integer environment ID
  """

  path = project_id + "." + episodes_table_id
  query = "SELECT * FROM `" + path + "` ORDER BY env_ID DESC LIMIT 1"
  row = pd.read_gbq(query=query, project_id=project_id)
  return int(row['env_ID'][0])

new_env_ID = 1 + get_last_env_ID()

# SQL queries

## query for randomly collected trajs
query_first_run = "SELECT \
                    `ml_piscine_bq.steps`.id, obs0, obs1, obs2, obs3, action, reward, \
                    discount, step_type, next_step_type, policy_info \
                    FROM `ml_piscine_bq.observation` \
                    INNER JOIN `ml_piscine_bq.steps` \
                    ON `ml_piscine_bq.observation`.ID = `ml_piscine_bq.steps`.obs_ID \
                    INNER JOIN `ml_piscine_bq.episodes` \
                    ON `ml_piscine_bq.steps`.epi_ID = `ml_piscine_bq.episodes`.ID \
                    WHERE `ml_piscine_bq.episodes`.env_ID = " + str(new_env_ID) + " \
                    ORDER BY `ml_piscine_bq.steps`.id \
                    LIMIT 1000"

## query for collect steps generated by last policy ID
def query_from_last_pol(pol_ID):

  """
  Query for trajectories generated by last policy ID
  Arguments:
      pol_ID : policy ID that generated the data wanted
  Returns:
      query : SQL query string
  """

  query = "SELECT \
            `ml_piscine_bq.steps`.id, obs0, obs1, obs2, obs3, action, reward, \
            discount, step_type, next_step_type, policy_info \
            FROM `ml_piscine_bq.observation` \
            INNER JOIN `ml_piscine_bq.steps` \
            ON `ml_piscine_bq.observation`.ID = `ml_piscine_bq.steps`.obs_ID \
            INNER JOIN `ml_piscine_bq.episodes` \
            ON `ml_piscine_bq.steps`.epi_ID = `ml_piscine_bq.episodes`.ID \
            INNER JOIN `ml_piscine_bq.agent_policy_v2` \
            ON `ml_piscine_bq.episodes`.ag_pol_ID = `ml_piscine_bq.agent_policy_v2`.ID \
            WHERE `ml_piscine_bq.episodes`.ag_pol_ID = " + str(pol_ID) + " \
            ORDER BY `ml_piscine_bq.steps`.ID"
  return query

## Visualization

In [0]:
#-------------------- Run TensorBoard --------------------

# clear tensorboard cache
tf.compat.v1.summary.FileWriterCache.clear()

current_time = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
train_log_dir = 'logs/cartpole/' + current_time + '/train'
train_summary_writer = tf.summary.create_file_writer(train_log_dir)

In [0]:
# run tensorboard ui
%tensorboard --logdir logs/cartpole/

## Training Loop

In [0]:
tensorboard_ID = 0
ag_pol_ID = 0
replay_buffer.clear()
first_run = True
step = 0
dataset = replay_buffer.as_dataset(
    num_parallel_calls=3, 
    sample_batch_size=batch_size, 
    num_steps=2).prefetch(3)
iterator = iter(dataset)

In [0]:
while True:

  # Fetch last policy ID
  try:
    ag_pol_ID = get_last_ID(agent_policy_table_id)
  except:
    pass
  print("Policy ID = ", ag_pol_ID)

  # Fetch new data
  if first_run and ag_pol_ID == 0:
    print("First Training")
    query = query_first_run
    while True:
      try:
        data_read = pd.read_gbq(query=query, project_id=project_id).transpose()
        first_run = False
        if not data_read.empty:
          break
      except:
        print("Still no Data, waiting...")
        time.sleep(5)
        pass
  else:
    query = query_from_last_pol(ag_pol_ID)
    while True:
      try:
        data_read = pd.read_gbq(query=query, project_id=project_id).transpose()
        if not data_read.empty:
          break
      except:
        print("Awaiting for new Data")
        time.sleep(5)
        pass

  # Feed replay buffer and ready dataset for training
  add_data_to_replay_buffer(replay_buffer, data_read)

  # Train and Eval

  ## (Optional) Optimize by wrapping some of the code in a graph using TF function.
  agent.train = common.function(agent.train)

  ## Reset the train step
  agent.train_step_counter.assign(0)

  ## Evaluate the agent's policy once before training.
  avg_return = compute_avg_return(eval_env, agent.policy, num_eval_episodes)
  print ('Evaluate the agent\'s policy once before training - ' ,avg_return)
  returns = [avg_return]
  try:
    print('Last best average return = ', best_avg)
  except:
    pass

  print('--------------------------------------')
  for _ in range(num_iterations + 1):

    step = agent.train_step_counter.numpy()

    # Sample a batch of data from the buffer and update the agent's network.
    experience, unused_info = next(iterator)
    train_loss = agent.train(experience).loss

    if step % 5 == 0:
      print('step = {0}: loss = {1}'.format(step, train_loss))

    # Evaluate
    if step % eval_interval == 0 and step != 0:
      avg_return = compute_avg_return(eval_env, agent.policy, num_eval_episodes)
      print('\nstep = {0}: Average Return = {1}'.format(step, avg_return))

      # Output to Tensorboard
      with train_summary_writer.as_default():
        tf.summary.scalar('avg_return', avg_return, step=tensorboard_ID)
        tensorboard_ID += 1

      # Save policy to drive
      ag_pol_ID += 1
      saver = policy_saver.PolicySaver(agent.collect_policy, batch_size=None)
      policy_folder_name = generate_policy_folder_name(avg_return, ag_pol_ID)
      saver.save(policy_folder_name)
      
      # Upload Policy
      path = policy_bucket_folder + policy_folder_name
      ag_pol_d[0] = {
          'ID': ag_pol_ID,
          'source': path,
          'avg_return': avg_return,
          'eval_episodes': num_eval_episodes,
          'training_steps': num_iterations,
          'optimizer': str(optimizer).split(' ')[0][1:],
          'loss_fn': str(loss_fn).split(' ')[1],
          'learning_rate': learning_rate,
          'dqn_layer_params': str(dqn_layer_params)
      }
      ## To GQB
      df_ag_pol = pd.DataFrame(ag_pol_d).transpose()
      df_ag_pol.to_gbq(agent_policy_table_id, if_exists='append', project_id=project_id, table_schema=agent_policy_schema)
      print()
      ## To Bucket
      !gsutil -q cp -r $policy_folder_name $path
      !rm -r $policy_folder_name

  print('--------------------------------------')