In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import collections
import datetime
import gc
import json
import os
import pickle
import threading
import time

import numpy as np
import pandas as pd
import tensorflow as tf
import tensorflow.keras as keras

from tqdm.notebook import trange

In [3]:
for gpu in tf.config.list_physical_devices('GPU'):
    tf.config.experimental.set_memory_growth(gpu, True)

In [4]:
import solution.constants as const
import solution.model as model
import solution.utils as utils
from solution.replay_buffer import ReplayBuffer
from solution.rl import n_step_return

In [5]:
def get_replay_data(delete_processed_files=False):
    with tf.device('CPU:0'):
        current_chunk = []
        for file_name in os.listdir(const.REPLAY_PATH):
            file_path = os.path.join(const.REPLAY_PATH, file_name)
            with open(file_path, 'rb') as f:
                data = pickle.load(f)
                current_chunk += data
            
            if delete_processed_files:
                os.unlink(file_path)
    return current_chunk


MAX_REPLAY_BUFFER_SIZE = 5000
replay_buffer = ReplayBuffer(MAX_REPLAY_BUFFER_SIZE)


def replay_buffer_job():
    clear_dir(const.REPLAY_PATH)
    step = 0
    total_sequences = 0
    
    while replay_data_needed:
        chunk = get_replay_data(True)
        if not chunk:
            time.sleep(1.0)
            continue
            
        if step == 0:
            start_time = time.time()
            
        total_sequences += len(chunk)
        
        with tf.device('CPU:0'):
            add_start_time = time.time()
            replay_buffer.add(chunk)
            with tf_writer_lock, tf_writer.as_default(), tf.name_scope('replay_buffer'):
                tf.summary.scalar('buffer size', len(replay_buffer), step)
                tf.summary.scalar('add chunk time', time.time() - add_start_time, step)
                tf.summary.scalar('sequences per second', total_sequences / (time.time() - start_time), step)

        step += 1

In [6]:
def games_data_job():
    clear_dir(const.GAMES_DATA_PATH)
    step = collections.defaultdict(int)
    
    while games_data_needed:
        for file_name in os.listdir(const.GAMES_DATA_PATH):
            file_path = os.path.join(const.GAMES_DATA_PATH, file_name)
            with open(file_path) as f:
                game_data = json.load(f)
                agent_id = game_data['agent_id']
                
                for k, v in game_data.items():
                    if k == 'agent_id':
                        continue
                    
                    if v is None:
                        continue
                        
                    with tf_writer_lock, tf_writer.as_default():
                        with tf.name_scope('game_data/total'):
                            tf.summary.scalar(k, v, step['total'])
                            
                        with tf.name_scope('game_data/%s' % agent_id):
                            tf.summary.scalar(k, v, step[agent_id])
                    
                    
                step['total'] += 1
                step[agent_id] += 1

            os.unlink(file_path)

        time.sleep(1)

In [7]:
def soft_update(source_variables, target_variables, tau):
    for v_s, v_t in zip(source_variables, target_variables):
        v_t.assign((1 - tau) * v_t + tau * v_s)



def save_model(name, epoch, model, link_path=None):
    if not os.path.exists(const.MODELS_PATH):
        os.makedirs(const.MODELS_PATH)
        
    file_path = os.path.join(const.MODELS_PATH, '%s_%s_%s' % (name, current_time, epoch))
    model.save_weights(file_path)
    
    if link_path is not None:
        tmp_link_path = '%s_tmp' % link_path
        os.symlink(file_path, tmp_link_path)
        os.rename(tmp_link_path, link_path)


def clear_dir(path):
    if not os.path.exists(path):
        os.makedirs(path)
    
    for file_name in os.listdir(path):
        file_path = os.path.join(path, file_name)
        os.unlink(file_path)

In [8]:
tf.random.set_seed(42)

In [9]:
BATCH_SIZE = 64
input_shape = [(BATCH_SIZE,) + tuple(i.shape) for i in utils.STATE_SPEC]

network = model.Model()
network.build(input_shape)
network.summary()
#latest_path = tf.train.latest_checkpoint(const.MODELS_PATH)
#print(latest_path)
#network.load_weights(latest_path)

target_network = model.Model()
target_network.build(input_shape)
target_network.trainable = False
target_network.set_weights(network.get_weights())


optimizer = keras.optimizers.Adam(learning_rate=5e-4, clipnorm=40)
loss_fn = keras.losses.MeanSquaredError()

Model: "model"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
sequential (Sequential)      (64, 256)                 215176    
_________________________________________________________________
sequential_1 (Sequential)    (64, 64)                  11584     
_________________________________________________________________
sequential_2 (Sequential)    (2048, 256)               777456    
_________________________________________________________________
sequential_3 (Sequential)    (64, 32, 32)              1312      
_________________________________________________________________
sequential_4 (Sequential)    (64, 32, 256)             205568    
_________________________________________________________________
sequential_5 (Sequential)    (64, 32, 512)             558080    
_________________________________________________________________
sequential_6 (Sequential)    (64, 32, 7)               133127

In [10]:
@tf.function
def learn_from_batch(states, actions, rewards, is_not_done, *, gamma, weights, online_network, target_network):
    #print('Tracing lfb')
    # states is tuple of 5 state_element, state_element has (B, N, *) shape
    target = n_step_return(
        states, actions, rewards, is_not_done,
        gamma=gamma, n=const.N_STEPS, online_network=online_network, target_network=target_network
    )
        
    first_state = tuple([state[:, 0] for state in states])
    first_action = actions[:, 0]
    first_units_mask = tf.cast(states[4][:, 0], dtype=tf.float32)
    
    with tf.GradientTape() as tape:
        q_values = online_network(first_state, training=True)
        prediction = tf.gather(q_values, first_action, axis=-1, batch_dims=2)
        prediction = tf.reduce_sum(prediction * first_units_mask, axis=-1)
        
        td_error = target - prediction
        #loss = tf.reduce_mean(tf.math.pow(td_error, 2))
        loss = tf.reduce_mean(tf.math.pow(td_error, 2) * weights)
    
    gradients = tape.gradient(loss, online_network.trainable_weights)
    grad_min = tf.reduce_mean([tf.reduce_min(g) for g in gradients])
    grad_max = tf.reduce_max([tf.reduce_max(g) for g in gradients])
    grad_l2 = tf.norm([tf.norm(g) for g in gradients])
    optimizer.apply_gradients(zip(gradients, online_network.trainable_weights))
    
    return loss, tf.math.abs(td_error), grad_min, grad_max, grad_l2

In [11]:
# Setup tensorboard
TB_LOGS_DIR = '/tmp/tb_logs/'

if not os.path.exists(TB_LOGS_DIR):
    os.mkdir(TB_LOGS_DIR)

current_time = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
current_logs_dir = os.path.join(TB_LOGS_DIR, current_time)
tf_writer = tf.summary.create_file_writer(current_logs_dir)
tf_writer_lock = threading.Lock()

steps = 0

save_model('online', 0, network, const.LATEST_MODEL_SYMLINK_PATH)
save_model('target', 0, target_network, const.TARGET_MODEL_SYMLINK_PATH)

PermissionDeniedError: /tmp/tb_logs/20211001-120436; Permission denied [Op:CreateSummaryFileWriter]

In [None]:
def stop_threads():
    if 'replay_buffer_thread' in globals():
        print('Stopping replay thread')
        global replay_data_needed
        replay_data_needed = False
        if replay_buffer_thread.is_alive():
            replay_buffer_thread.join()
    
    if 'games_data_thread' in globals():
        print('Stopping games data thread')
        global games_data_needed
        games_data_needed = False
        if games_data_thread.is_alive():
            games_data_thread.join()


stop_threads()


replay_data_needed = True
replay_buffer_thread = threading.Thread(target=replay_buffer_job)
replay_buffer_thread.start()

games_data_needed = True
games_data_thread = threading.Thread(target=games_data_job)
games_data_thread.start()


EPOCHS = 10000000
TARGET_NETWORK_UPDATE_EPISODES = 1000
MODEL_SAVE_PERIOD = 100
INITIAL_DATA_SIZE = BATCH_SIZE * 10


while len(replay_buffer) < INITIAL_DATA_SIZE:
    print('Waiting for data: %s / %s' % (len(replay_buffer), INITIAL_DATA_SIZE))
    time.sleep(1)

        
for epoch in trange(EPOCHS):
    with tf.device('CPU:0'):
        start_time = time.time()
        ts, ind, (batch_states, batch_actions, batch_rewards, batch_is_not_done), td, weights = replay_buffer.get_prioritized(BATCH_SIZE)
        #(batch_states, batch_actions, batch_rewards, batch_is_not_done), td = replay_buffer.get_uniform(BATCH_SIZE)
        get_time = time.time() - start_time
    
    if steps % TARGET_NETWORK_UPDATE_EPISODES == 0:
        print(epoch, 'Updating network')
        target_network.set_weights(network.get_weights())
        save_model('target', epoch, target_network, const.TARGET_MODEL_SYMLINK_PATH)
        gc.collect()
        tf.keras.backend.clear_session()
        # soft_update(network.variables, target_network.variables, 0.01)
    
    if steps % MODEL_SAVE_PERIOD == 0:
        print(epoch, 'Saving model')
        save_model('online', epoch, network, const.LATEST_MODEL_SYMLINK_PATH)
    
    start_time = time.time()
    #loss, new_td_error, grad_min, grad_max, grad_l2 = learn_from_batch(
    #    batch_states, batch_actions, batch_rewards, batch_is_not_done,
    #    gamma=const.GAMMA, weights=None, online_network=network, target_network=target_network)
    loss, new_td_error, grad_min, grad_max, grad_l2 = learn_from_batch(
        batch_states, batch_actions, batch_rewards, batch_is_not_done,
        gamma=const.GAMMA, weights=weights, online_network=network, target_network=target_network)
    learn_time = time.time() - start_time
    
    start_time = time.time()
    replay_buffer.update(ts, ind, new_td_error)
    update_time = time.time() - start_time

    with tf_writer_lock, tf_writer.as_default():
        tf.summary.scalar('loss', loss, step=steps)
        #tf.summary.scalar('avg_version', avg_model_version, step=steps)
        with tf.name_scope('td_error'):
            tf.summary.scalar('before', tf.reduce_mean(td), step=steps)
            tf.summary.scalar('after', tf.reduce_mean(new_td_error), step=steps)
        with tf.name_scope('timing'):
            tf.summary.scalar('get', get_time, step=steps)
            tf.summary.scalar('update', update_time, step=steps)
            tf.summary.scalar('learn', learn_time, step=steps)
        with tf.name_scope('gradients'):
            tf.summary.scalar('min', grad_min, step=steps)
            tf.summary.scalar('max', grad_max, step=steps)
            tf.summary.scalar('l2 norm', grad_l2, step=steps)
            
    steps += 1

stop_threads()

In [None]:
stop_threads()