Check Colab settings.

In [46]:
from psutil import virtual_memory

gpu_info = !nvidia-smi
gpu_info = '\n'.join(gpu_info)
if gpu_info.find('failed') >= 0:
  print('Not connected to a GPU')
else:
  print(gpu_info)

ram_gb = virtual_memory().total / 1e9
print('Your runtime has {:.1f} gigabytes of available RAM\n'.format(ram_gb))

if ram_gb < 20:
  print('Not using a high-RAM runtime')
else:
  print('You are using a high-RAM runtime!')

/bin/bash: nvidia-smi: command not found
Your runtime has 13.6 gigabytes of available RAM

Not using a high-RAM runtime


Mount Google Drive.

In [47]:
from google.colab import drive

drive.mount("/content/gdrive")

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


Install code repository

In [48]:
!git clone --depth 1 https://github.com/pezon/msca-race-simulation 
!cp -R msca-race-simulation/* .

fatal: destination path 'msca-race-simulation' already exists and is not an empty directory.


Install dependencies.

In [49]:
!pip install -r requirements.txt

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


Import libraries

In [51]:
import os
import pickle
import shutil
from io import BytesIO
from pathlib import Path
from zipfile import ZipFile

import numpy as np
import tensorflow as tf
from tf_agents.environments import tf_py_environment
from tf_agents.agents.dqn import dqn_agent
from tf_agents.networks import q_network
from tf_agents.policies import random_tf_policy
from tf_agents.replay_buffers import tf_uniform_replay_buffer
from tf_agents.trajectories import trajectory
from tf_agents.utils import common
try:
  from google.colab import files
except ImportError:
  files = None

from racesim.src.import_pars import import_pars


# Set working director
workspace_dir = Path("/content/gdrive/MyDrive/RLF1/vse-1")

# Set up output paths 
checkpoint_dir = os.path.join(workspace_dir, "checkpoint")
policy_dir = os.path.join(workspace_dir, "policy")

checkpoint_export_dir = workspace_dir / "exports/checkpoints"
checkpoint_export_dir.mkdir(exist_ok=True, parents=True)

policy_export_dir = workspace_dir / "exports/policy-tflite"
policy_export_dir.mkdir(exist_ok=True, parents=True)

policy_tflite_export_dir = workspace_dir / "exports/policy-tflite"
policy_tflite_export_dir.mkdir(exist_ok=True, parents=True)


def export_archive(dirname, base_filename):
  return shutil.make_archive(base_filename, "zip", dirname)


def import_archive(dirname):
  if files is None:
    return
  uploaded = files.upload()
  for fn in uploaded.keys():
    print('User uploaded file "{name}" with length {length} bytes'.format(
        name=fn, length=len(uploaded[fn])))
    shutil.rmtree(dirname)
    zip_files = ZipFile(BytesIO(uploaded[fn]), 'r')
    zip_files.extractall(dirname)
    zip_files.close()

Configure training

In [53]:
# environment parameters
race = "Shanghai_2019"  # set race (see racesim/input/parameters for possible races)
# VSE type for other drivers: 'basestrategy', 'realstrategy', 'supervised', 'reinforcement' (if already available),
# 'multi_agent' (if VSE should learn for all drivers at once)
vse_others = "basestrategy"
mcs_pars_file = "pars_mcs.ini"  # parameter file for Monte Carlo parameters

# hyperparameters
num_iterations = 1
replay_buffer_max_length = 200_000
initial_collect_steps = 200
collect_steps_per_iteration = 1

fc_layer_params = (64, 64,)
batch_size = 64
learning_rate = 1e-3
gamma = 1.0  # discount rate
n_step_update = 1
target_update_period = 1
dueling_q_net = False

# training options
log_interval = 100_000
eval_interval = 50_000
num_eval_episodes = 100

# postprocessing (currently not implemented for multi-agent environment)
calculate_final_positions = False  # activate or deactivate evaluation after training
num_races_postproc = 10_000
# VSE type for other drivers: 'basestrategy', 'realstrategy', 'supervised', 'reinforcement' (if already available)
vse_others_postproc = "basestrategy"


# ----------------------------------------------------------------------------------------------------------------------
# CHECK USER INPUT -----------------------------------------------------------------------------------------------------
# ----------------------------------------------------------------------------------------------------------------------

if vse_others == "multi_agent" and calculate_final_positions:
    print("WARNING: Evaluation of trained strategy is currently not implemented for the multi-agent environment!"
          " Setting calculate_final_positions = False!")
    calculate_final_positions = False

# ----------------------------------------------------------------------------------------------------------------------
# CHECK FOR WET RACE ---------------------------------------------------------------------------------------------------
# ----------------------------------------------------------------------------------------------------------------------

# create race parameter file name
race_pars_file = 'pars_%s.ini' % race

# load parameter file
pars_in = import_pars(
    use_print=False,
    use_vse=False,
    race_pars_file=race_pars_file,
    mcs_pars_file=mcs_pars_file)[0]

# loop through drivers and check for intermediate or wet tire compounds in real race
for driver in pars_in["driver_pars"]:
    if any([True if strat[1] in ["I", "W"] else False for strat in pars_in["driver_pars"][driver]["strategy_info"]]):
        raise RuntimeError("Cannot train for current race %s because it was a (partly) wet race!" % race)

Setup environment

In [54]:
from tf_agents.environments.tf_py_environment import TFPyEnvironment

from machine_learning_rl_training.src.rl_environment_multi_agent import RaceSimulation as MultiAgentRaceSimulation
from machine_learning_rl_training.src.rl_environment_single_agent import RaceSimulation as SingleAgentRaceSimulation
        
if vse_others == 'multi_agent':
    train_py_env = MultiAgentRaceSimulation(
        race_pars_file=race_pars_file,
        mcs_pars_file=mcs_pars_file,
        use_prob_infl=True,
        create_rand_events=True)
    eval_py_env = MultiAgentRaceSimulation(
        race_pars_file=race_pars_file,
        mcs_pars_file=mcs_pars_file,
        use_prob_infl=True,
        create_rand_events=True)
else:
    train_py_env = SingleAgentRaceSimulation(
        race_pars_file=race_pars_file,
        mcs_pars_file=mcs_pars_file,
        vse_type=vse_others,
        use_prob_infl=True,
        create_rand_events=True)
    eval_py_env = SingleAgentRaceSimulation(
        race_pars_file=race_pars_file,
        mcs_pars_file=mcs_pars_file,
        vse_type=vse_others,
        use_prob_infl=True,
        create_rand_events=True)

train_tf_env = TFPyEnvironment(environment=train_py_env)
eval_tf_env = TFPyEnvironment(environment=eval_py_env)

print('INFO: Race: %s, strategy of other drivers: %s' % (race, vse_others))

if train_py_env.batched:
    print('INFO: Batched environment:', train_py_env.batched(), 'batch size:', train_py_env.batch_size)

print('INFO: Observation spec:', train_py_env.time_step_spec().observation)
print('INFO: Action spec:', train_py_env.action_spec())


INFO: Race: Shanghai_2019, strategy of other drivers: basestrategy
INFO: Observation spec: BoundedArraySpec(shape=(40,), dtype=dtype('float32'), name='observation', minimum=0.0, maximum=1.0)
INFO: Action spec: BoundedArraySpec(shape=(), dtype=dtype('int32'), name='action', minimum=0, maximum=3)


Setup DQN Agent

In [55]:
from tensorflow.compat.v1.train import AdamOptimizer
from tensorflow.compat.v1.train import get_or_create_global_step
from tensorflow.keras.optimizers.schedules import PolynomialDecay
from tf_agents.agents.dqn import dqn_agent
from tf_agents.networks.q_network import QNetwork

q_net = QNetwork(
    input_tensor_spec=train_tf_env.observation_spec(),
    action_spec=train_tf_env.action_spec(),
    fc_layer_params=fc_layer_params)

optimizer = AdamOptimizer(learning_rate=learning_rate)
global_step = get_or_create_global_step()

boltzmann_fn = PolynomialDecay(
    initial_learning_rate=1.0,
    decay_steps=num_iterations,
    end_learning_rate=0.01)

agent = dqn_agent.DqnAgent(
    time_step_spec=train_tf_env.time_step_spec(),
    action_spec=train_tf_env.action_spec(),
    q_network=q_net,
    optimizer=optimizer,
    n_step_update=n_step_update,
    target_update_period=target_update_period,
    td_errors_loss_fn=common.element_wise_squared_loss,
    gamma=gamma,
    train_step_counter=global_step)

agent.initialize()


Setup policy.

In [59]:
from tf_agents.policies.random_tf_policy import RandomTFPolicy

eval_policy = agent.policy
collect_policy = agent.collect_policy

policy = RandomTFPolicy(
    time_step_spec=train_tf_env.time_step_spec(),
    action_spec=train_tf_env.action_spec())

#@TODO: do we need to inject this into data collection?

Data Collection

Source: [DynamicStepDriver | TensorFlow Documentation](https://www.tensorflow.org/agents/api_docs/python/tf_agents/drivers/dynamic_step_driver/DynamicStepDriver)

In [57]:
from tf_agents.drivers.dynamic_step_driver import DynamicStepDriver
from tf_agents.metrics.tf_metrics import AverageReturnMetric, EnvironmentSteps, NumberOfEpisodes
from tf_agents.policies.random_tf_policy import RandomTFPolicy
from tf_agents.replay_buffers.tf_uniform_replay_buffer import TFUniformReplayBuffer

replay_buffer = TFUniformReplayBuffer(
    data_spec=agent.collect_data_spec,
    batch_size=train_tf_env.batch_size,
    max_length=replay_buffer_max_length)

metric = AverageReturnMetric()

driver = DynamicStepDriver(
    train_tf_env,
    agent.collect_policy,
    observers=[
        replay_buffer.add_batch,
        metric,
    ],
    num_steps=collect_steps_per_iteration)

# Initial data collection:
# initial driver.run will reset the environment and initialize the policy
final_time_step, policy_state = driver.run(maximum_iterations=initial_collect_steps)

# Dataset generates trajectories with shape [BxTx...] where
# T = n_step_update + 1.
dataset = replay_buffer.as_dataset(
    num_parallel_calls=3,
    sample_batch_size=batch_size,
    num_steps=2
).prefetch(3)

iterator = iter(dataset)

Evaluate agent policy

In [58]:
def compute_average_return(
    env: TFPyEnvironment,
    policy,
    num_episodes: int = 1
) -> float:
    total_return = 0.0
    for _ in range(num_episodes):
        time_step_ = env.reset()
        episode_return = 0.0
        while not any(time_step_.is_last()):
            action_step = policy.action(time_step_)
            time_step_ = env.step(action=action_step.action)
            episode_return += np.mean(time_step_.reward)
        total_return += episode_return
    average_return = total_return / num_episodes
    return average_return

# evaluate the agent's policy once before training
avg_return = compute_average_return(
    env=eval_tf_env,
    policy=agent.policy,
    num_episodes=num_eval_episodes)
print(f"INFO: Evaluated the agent's policy once before the training, average return: {avg_return:.3f}")

KeyboardInterrupt: ignored

Train the agent

In [18]:
# (Optional) Optimize by wrapping some of the code in a graph using TF function.
agent.train = common.function(agent.train)

# reset training step
agent.train_step_counter.assign(0)

def train(num_iterations: int = 1):
  # Collect a few steps using collect_policy and save to the replay buffer.
  collect_driver.run()

  # Sample a batch of data from the buffer and update the agent's network.
  experience, _ = next(iterator)
  train_loss = agent.train(experience)

  iteration = agent.train_step_counter.numpy()
  step = int(agent.train_step_counter.numpy())

  if step % log_interval == 0:
      print(f"INFO: Step: {step}, loss: {train_loss:.3f}")

  if step % eval_interval == 0:
      avg_return = compute_average_return(env=eval_tf_env, policy=eval_policy, num_episodes=num_eval_episodes)
      print(f"INFO: Step: {step}, average return: {avg_return:.3f}")

  if (10 * step) % num_iterations == 0:
      # print every 10%
      print(f"INFO: Training progress: {step / num_iterations * 100.0:0f}%...")


# train one iteration
train(num_iterations=num_iterations)

INFO: Training progress: 100%...


Setup checkpointing and saving.

In [19]:
from tf_agents.policies.policy_saver import PolicySaver
from tf_agents.utils.common import Checkpointer

checkpoint_dir = os.path.join(output_path, "checkpoint")
policy_dir = os.path.join(output_path, "policy")

train_checkpointer = Checkpointer(
    ckpt_dir=checkpoint_dir,
    max_to_keep=1,
    agent=agent,
    policy=agent.policy,
    replay_buffer=replay_buffer,
    global_step=global_step
)
policy_saver = PolicySaver(agent.policy)

# train_checkpointer.save(global_step)
# policy_saver.save(policy_dir)



Train one iteration

In [20]:
train(num_iterations=1)

INFO: Training progress: 200%...


Save to checkpoint and save policy

In [22]:
train_checkpointer.save(global_step)
policy_saver.save(policy_dir)



Restore checkpoint and load the saved policy.

In [24]:
train_checkpointer.initialize_or_restore()
global_step = tf.compat.v1.train.get_global_step()
saved_policy = tf.saved_model.load(policy_dir)

Create a zipped file from the checkpoint directory.

In [38]:
train_checkpointer.save(global_step)
exported_checkpoint = export_archive(checkpoint_dir, checkpoint_export_dir)
print(f"Exported checkpoint: {exported_checkpoint}")

Exported checkpoint: /content/gdrive/MyDrive/RLF1/exports/checkpoints.zip


Download the zip file.

In [27]:
if files is not None:
    files.download(exported_checkpoint) # try again if this fails: https://github.com/googlecolab/colabtools/issues/469

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

...Come back later and resume training.

In [31]:
import_archive(checkpoint_dir)
train_checkpointer.initialize_or_restore()
global_step = tf.compat.v1.train.get_global_step()

At this point, you can either (1) continue training iterations, or (2) generate an artifact to check the performance of the loaded policy, or (3) save the policy.

When you save the policy and restore it, you cannot continue with the training, but you can deploy the model.

Save the policy.

In [40]:
policy_saver.save(policy_dir)
print(f"Saved policy: f{policy_dir}")

exported_policy = export_archive(policy_dir, export_policy_path)
print(f"Exported policy: {exported_policy}")



Saved policy: f/content/gdrive/MyDrive/RLF1/policy
Exported policy: /content/gdrive/MyDrive/RLF1/exports/policies.zip


Download policy.

In [41]:
if files is not None:
    files.download(exported_policy) # try again if this fails: https://github.com/googlecolab/colabtools/issues/469

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

Upload the policy and check how saved policy performs.

In [60]:
import_archive(policy_dir)
saved_policy = tf.saved_model.load(policy_dir)
# run_episodes(saved_policy, eval_env, eval_py_env)

Convert policy to TFLite

See [TensorFlow Lite converter](https://www.tensorflow.org/lite/convert) for more details.

In [44]:
converter = tf.lite.TFLiteConverter.from_saved_model(policy_dir, signature_keys=["action"])
tflite_policy = converter.convert()

export_policy_tflite_file = policy_tflite_export_dir / "policy.tflite"
with open(export_policy_tflite_file, "wb") as f:
    f.write(tflite_policy)

Load TFLite model and run inference.

In [45]:
import numpy as np

interpreter = tf.lite.Interpreter(export_policy_tflite_file)

policy_runner = interpreter.get_signature_runner()
print(policy_runner._inputs)

policy_runner(**{
    "0/discount": tf.constant(0.0),
    "0/observation": tf.zeros([1,4]),
    "0/reward": tf.constant(0.0),
    "0/step_type": tf.constant(0),
})

TypeError: ignored