In [1]:
import pandas as pd
import pandas_ta
import numpy as np
 
# from gym_anytrading.datasets import FOREX_EURUSD_1H_ASK, STOCKS_GOOGL
import matplotlib.pyplot as plt

In [2]:
import os
import time
import datetime
from six.moves import range

import tensorflow as tf

In [3]:
PRICE_COLUMN = 'close'
USE_PRICE_RANGE_COLUMNS = False

freq = 'h'
start_date='2016-01-02'
end_date='2019-03-28'

ranges_dict_path = 'data\\ranges_dict.pickle'
save_path = f'.\\data\\featured_prices_{freq}_start_{start_date}.csv'

# prices_path = '.\\data\\prices_freq-min_2019-01-01_2019-03-28.csv'
prices_path = '.\\data\\sources\\coinbaseUSD_1-min_data_2014-12-01_to_2019-01-09.csv'

# Scrapped from twitters from 2016-01-01 to 2019-03-29, Collecting Tweets containing Bitcoin or BTC
tweets_path = 'data/sources/tweets_historical.csv'

In [4]:
import price_features

prices_df, ranges_dict = price_features.main(
    prices_path=prices_path,
    ranges_dict_path=ranges_dict_path,
    save_path=save_path,
    onlyRead=True,
    freq=freq,
    timestamp_col='Timestamp',
    cleanNans=True,
    start_date=start_date,
)

[nltk_data] Downloading package stopwords to
[nltk_data]     C:\Users\Lluis\AppData\Roaming\nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package vader_lexicon to
[nltk_data]     C:\Users\Lluis\AppData\Roaming\nltk_data...
[nltk_data]   Package vader_lexicon is already up-to-date!
Loading data...
Filling All Time data
Filling NA data
Aggregating from min to h level
Generating TA features...
83 out of 84 featuresThe following columns have been removed: ['FISHERT_120', 'KAMA_240_48_720']
Dropping 2159 rows because of NaN values


In [5]:
from tweets_preprocess import (
    tweetsPreprocess,
    VADER_COLUMNS,
    TEXTBLOB_COLUMNS,
)

# TODO: Save tweets sentiment independent of prices and one file per date range and frequency

sentiment_cols = VADER_COLUMNS + TEXTBLOB_COLUMNS

save_path='data/preprocess/twitter.csv'

partial_file = os.path.splitext(save_path)
save_final_path = f'{partial_file[0]}_{start_date}_-_{end_date}{partial_file[1]}'

if os.path.exists(save_final_path):
    tweets_df = pd.read_csv(save_final_path, sep='\t', index_col='timestamp')

    tweets_df = tweets_df.set_index(
        pd.to_datetime(tweets_df.index)
    )

else:
    print("Start tweetsPreprocess")
    tweets_df = tweetsPreprocess(
        tweets_path,
        freq=freq,
        sentiment_cols=sentiment_cols,
        # sentiment_cols=['Compound', 'Polarity'],
        aggregate_cols=['replies', 'likes', 'retweets'], # TODO: Also by volume of tweets??
        start_date=start_date,
        end_date=end_date,
        nrows=None,
        chunksize=5e5,
        save_path='data/preprocess/twitter.csv',
        write_files=False
    )

remove_cols = [
    'replies_sum',
    'replies_mean',
    'likes_sum',
    'likes_mean',
    'retweets_sum',
    'retweets_mean',
]
tweets_df = tweets_df.drop(remove_cols, axis=1)

In [6]:
data = prices_df.merge(tweets_df, how='left', left_index=True, right_index=True)
data = data.reset_index(drop=True)

In [7]:
FEATURE_COLUMNS = []
for key in ranges_dict:
    FEATURE_COLUMNS += ranges_dict[key]['cols'] if ranges_dict[key]['normalize'] else []

FEATURE_COLUMNS += list(tweets_df.columns)

In [8]:
if USE_PRICE_RANGE_COLUMNS:

    diff_cols = len(ranges_dict['prices']['cols']) - len(FEATURE_COLUMNS) - int(POSITION_AS_OBSERVATION)
    print(f'Difference of {diff_cols} columns between prices cols and normalized cols')
    print('In order to use Group Normalization Layer with 2 groups, both groups should be equal and sorted to be one first and then the other.')

    if diff_cols > 0:
        remove_cols = ['LR_14']
        print(f'The following columns are going to be removed: {remove_cols}')
        prices_cols = [col for col in ranges_dict['prices']['cols'] if col not in remove_cols]
    else:
        prices_cols = ranges_dict['prices']['cols']

    # Add prices cols into the FEATURE_COLUMNS
    FEATURE_COLUMNS = prices_cols + FEATURE_COLUMNS

# Make sure that PRICE_COL is in data
ALL_COLS = [PRICE_COLUMN] if PRICE_COLUMN not in FEATURE_COLUMNS else []
ALL_COLS += FEATURE_COLUMNS

# Set the columns used in data PRICE_COL + FEATURE_COLS
data = data[ALL_COLS]

In [9]:
assert not np.isinf(data).any(1).any(), data[np.isinf(data).any(1)]
assert not data.isnull().any().any()

In [10]:
# unit_factor = 60*24*30 # months 
unit_factor = 24*30*12 # years 
print(f'Data for {len(data.index) / unit_factor:.3f} units')

Data for 3.050 units


In [11]:
train_time = 2
gap_time = 1/12
valid_time = (len(data.index) / unit_factor - train_time - 2 * gap_time) / 2
test_time = (len(data.index) / unit_factor - train_time - 2 * gap_time) / 2

train_end = int(train_time * unit_factor)
valid_start = train_end + int(gap_time * unit_factor)
valid_end = valid_start + int(valid_time * unit_factor)
test_start = valid_end + int(gap_time * unit_factor)
test_end = test_start + int(test_time * unit_factor)

train = data.iloc[0:train_end, :]
valid = data.iloc[valid_start:valid_end, :]
test = data.iloc[test_start:test_end, :]

In [12]:
from own_stock_env import OwnStocksEnv, REVENUE_REWARD, PRICE_REWARD

# TODO: Steps scheduling, starting from low number of steps to high
steps_schedule = [5, 10, 15, 20]
steps_per_episode = steps_schedule[0] # 20
window_size = 1
POSITION_AS_OBSERVATION = True

num_parallel_environments = 1

reward_type = PRICE_REWARD
max_step_reward = 0
max_final_reward = 1

SEED = 12345

#### ONLY FOR TESTING OVERFITING

# steps_per_episode = 5
# factor = 2
# # factor = 20
# train = train[0:steps_per_episode*factor]
# valid = valid[0:steps_per_episode*factor]
# test = test[0:steps_per_episode*factor]

##############################################

# Transform Gym Environment to TFPyEnvironment
from gym import spaces
from tf_agents.environments import tf_py_environment, parallel_py_environment
from tf_agents.environments.gym_wrapper import GymWrapper

def generateSplitEnvs(
    train_df,
    valid_df,
    test_df,
    window_size,
    steps_per_episode,
    feature_columns,
    reward_type=reward_type,
    max_final_reward=max_final_reward,
    max_step_reward=max_step_reward,
    num_parallel_environments=1,
    position_as_observation=True,
    constant_step=False,
    is_training=True,
    seed=12345,
):

    train_env = OwnStocksEnv(
        df=train_df,
        window_size=window_size,
        frame_bound=(window_size, len(train_df)),
        steps_per_episode=steps_per_episode,
        is_training=True,
        constant_step=constant_step,
        feature_columns=feature_columns,
        position_as_observation=position_as_observation,
        reward_type=reward_type,
        max_final_reward=max_final_reward,
        max_step_reward=max_step_reward,
    )
    train_env.seed(seed);

    eval_env = OwnStocksEnv(
        df=valid_df,
        window_size=window_size,
        frame_bound=(window_size, len(valid_df)),
        steps_per_episode=steps_per_episode,
        is_training=is_training,
        constant_step=constant_step,
        feature_columns=feature_columns,
        position_as_observation=position_as_observation,
        reward_type=reward_type,
        max_final_reward=max_final_reward,
        max_step_reward=max_step_reward,
    )
    eval_env.seed(seed);

    test_env = OwnStocksEnv(
        df=test_df,
        window_size=window_size,
        frame_bound=(window_size, len(test_df)),
        steps_per_episode=steps_per_episode,
        is_training=is_training,
        constant_step=constant_step,
        feature_columns=feature_columns,
        position_as_observation=position_as_observation,
        reward_type=reward_type,
        max_final_reward=max_final_reward,
        max_step_reward=max_step_reward,
    )
    test_env.seed(seed);

    # Otherwise raise error on evaluating ChosenActionHistogram metric
    spec_dtype_map = {spaces.Discrete: np.int32}

    # TODO: Implement Parallel Environment (need tf_agents.system.multiprocessing.enable_interactive_mode() added in github last updates)
    if num_parallel_environments != 1:
        parallel_envs = []
        tf_parallel_envs = []
        for i in range(num_parallel_environments):
            train_env = OwnStocksEnv(
                df=train,
                window_size=window_size,
                frame_bound=(window_size, len(train)),
                steps_per_episode=steps_per_episode,
                is_training=True,
                constant_step=constant_step,
                feature_columns=feature_columns,
                position_as_observation=position_as_observation,
                reward_type=reward_type,
                max_final_reward=max_final_reward,
                max_step_reward=max_step_reward,
            )
            train_env.seed(SEED + i);
            parallel_envs.append(train_env)
            tf_parallel_envs.append(
                GymWrapper(train_env, spec_dtype_map=spec_dtype_map)
            )
        
        tf_env = tf_py_environment.TFPyEnvironment(parallel_py_environment.ParallelPyEnvironment(tf_parallel_envs))

        train_env = parallel_envs[0]
    else:
        tf_env = tf_py_environment.TFPyEnvironment(GymWrapper(train_env, spec_dtype_map=spec_dtype_map))

    eval_tf_env = tf_py_environment.TFPyEnvironment(GymWrapper(eval_env, spec_dtype_map=spec_dtype_map))
    test_tf_env = tf_py_environment.TFPyEnvironment(GymWrapper(test_env, spec_dtype_map=spec_dtype_map))

    return tf_env, eval_tf_env, test_tf_env

In [13]:
tf_env, eval_tf_env, test_tf_env = generateSplitEnvs(
    train,
    valid,
    test,
    window_size,
    steps_per_episode,
    FEATURE_COLUMNS,
    reward_type=reward_type,
    max_final_reward=max_final_reward,
    max_step_reward=max_step_reward,
    num_parallel_environments=num_parallel_environments,
    position_as_observation=POSITION_AS_OBSERVATION,
    constant_step=False,
    is_training=True,
    seed=SEED,
)

In [14]:
from absl import logging
# Added in last versions
# import tf_agents.system import multiprocessing

logging.set_verbosity(logging.INFO)
# tf.logging.set_verbosity(tf.logging.INFO)
tf.compat.v1.enable_v2_behavior()

# Added in last versions
# multiprocessing.enable_interactive_mode()

In [15]:
agent = 'PPO'

STEP = 'step'
EPISODE = 'episode'
agent_unit = {
    'DQN': STEP,
    'PPO': EPISODE,
    'REINFORCE': EPISODE,
}
unit = agent_unit[agent]

In [16]:
from tensorflow.keras.optimizers import Adam, SGD
from tf_agents.utils import common

# Params for train
num_iterations = 1000000

summary_frequency = 10000

train_steps_per_iteration = 5
collect_per_iteration = 20 * num_parallel_environments
replay_buffer_capacity = steps_per_episode * collect_per_iteration // num_parallel_environments + 1

# TODO: Improve learning rate with schedule and on e-greedy too
batch_size = 32
learning_rate = 6e-5 # 3e-4
optimizer = Adam(learning_rate=learning_rate) # SGD(learning_rate=learning_rate) # Adam(learning_rate=learning_rate)
gradient_clipping = 0

if agent == 'DQN':
    # TODO: Use other kind of policy like Boltzam?
    epsilon_greedy = 0.1

    target_update_tau = 0.05
    target_update_period = 5

    initial_collect_steps = num_iterations // 1000 # 1000

    n_step_update = 1

    td_errors_loss_fn = common.element_wise_huber_loss # common.element_wise_squared_loss # common.element_wise_huber_loss

    gamma = 0.99
    reward_scale_factor = 1.0

elif agent == 'PPO':
    
    importance_ratio_clipping = 0.2
    
    kl_cutoff_factor = 0 # 2.0
    kl_cutoff_coef = 1000.0
    initial_adaptive_kl_beta = 0 # 1.0
    adaptive_kl_target = 0.01
    adaptive_kl_tolerance = 0.3

    normalize_observations=True
    normalize_rewards=True
    reward_norm_clipping=10.0 # Not used if normalize_rewards=False
    use_gae=True
    lambda_value=1 # 0.95 
    discount_factor=1 # TODO: Rethink on how to implement discount factor because reward by prices is accumulative

    entropy_regularization = 0
    policy_l2_reg = 0
    value_function_l2_reg = 0
    shared_vars_l2_reg = 0
    value_pred_loss_coef = 0.5
    use_td_lambda_return = False
    log_prob_clipping = 0.0
    value_clipping = None
    num_epochs = 25

use_tf_functions = True

# Params for summaries and logging
log_interval = num_iterations // summary_frequency
log_interval = max(log_interval, steps_per_episode)
summaries_flush_secs = 10
summary_interval = num_iterations // summary_frequency
summary_interval = max(summary_interval, steps_per_episode)
debug_summaries = True
summarize_grads_and_vars = True
check_numerics = True

# Params for eval
num_eval_episodes = eval_tf_env.envs[0].frame_bound[-1] // eval_tf_env.envs[0].steps_per_episode
num_eval_seeds = 1
eval_interval = summary_interval * 4
eval_interval = max(eval_interval, steps_per_episode)
eval_metrics_callback = None

# Params for checkpoints
train_checkpoint_interval = eval_interval * 4
policy_checkpoint_interval = eval_interval * 2
rb_checkpoint_interval = eval_interval * 8

In [17]:
# !rmdir /s /q .\\logs\\dqn

In [18]:
TRAIN_MODEL = True

root_dir = 'logs\\' + agent

if TRAIN_MODEL:
    root_dir = os.path.join(root_dir, datetime.datetime.now().strftime("%Y%m%d-%H%M%S"))
else:
    root_dir = os.path.join(root_dir, '20200503-095638')

In [19]:
class AgentEarlyStopping():
  def __init__(self,
               monitor='AverageReturn',
               min_delta=0,
               patience=0,
               patience_after_change=0,
               verbose=0,
               mode='max',
               baseline=None):
    """Initialize an AgentEarlyStopping.
    Arguments:
        monitor: Quantity to be monitored.
        min_delta: Minimum change in the monitored quantity
            to qualify as an improvement, i.e. an absolute
            change of less than min_delta, will count as no
            improvement.
        patience: Number of iterations with no improvement
            after which training will be stopped.
        patience_after_change: Number of iterations after change
             on monitor with no improvement after which training
             will be stopped.
        verbose: verbosity mode.
        mode: One of `{"auto", "min", "max"}`. In `min` mode,
            training will stop when the quantity
            monitored has stopped decreasing; in `max`
            mode it will stop when the quantity
            monitored has stopped increasing; in `auto`
            mode, the direction is automatically inferred
            from the name of the monitored quantity.
        baseline: Baseline value for the monitored quantity.
            Training will stop if the model doesn't show improvement over the
            baseline.
    """
    # super(AgentEarlyStopping, self).__init__()

    self.monitor = monitor
    self.patience = patience
    if patience_after_change < patience:
        self.patience_after_change = patience
    else:
        self.patience_after_change = patience_after_change
    self.verbose = verbose
    self.baseline = baseline
    self.min_delta = abs(min_delta)

    if mode not in ['auto', 'min', 'max']:
      logging.warning('EarlyStopping mode %s is unknown, '
                      'fallback to auto mode.', mode)
      mode = 'auto'

    if mode == 'min':
      self.monitor_op = np.less
    elif mode == 'max':
      self.monitor_op = np.greater
    else:
      if 'acc' in self.monitor:
        self.monitor_op = np.greater
      elif 'return' in self.monitor.lower():
        self.monitor_op = np.less
      else:
        self.monitor_op = np.less

    if self.monitor_op == np.greater:
      self.min_delta *= 1
    else:
      self.min_delta *= -1

    self.reset()

  def reset(self):
    # Allow instances to be re-used
    self.wait = 0
    self.wait_after_change = 0
    self.stopped_step = 0
    self.stop_training = False
    self.monitor_changed = False
    if self.baseline is not None:
      self.best = self.baseline
    else:
      self.best = np.Inf if self.monitor_op == np.less else -np.Inf

  def __call__(self, computed_metrics, global_step):
    current = self.get_monitor_value(computed_metrics)
    if current is None:
      return
    if not tf.equal(current, self.best) and not np.isinf(self.best):
        self.monitor_changed = True
    if self.monitor_op(current - self.min_delta, self.best):
      self.best = current
      self.wait = 0
      self.wait_after_change = 0
    else:
      self.wait += 1
      if self.monitor_changed:
          self.wait_after_change += 1
          print(f'self.wait_after_change: {self.wait_after_change}')
      if self.wait >= self.patience or self.wait_after_change >= self.patience_after_change:
        self.stopped_step = global_step
        self.stop_training = True
        logging.info('Global step %05d: early stopping' % (self.stopped_step + 1))

  def get_monitor_value(self, computed_metrics):
    computed_metrics = computed_metrics or {}
    monitor_value = computed_metrics.get(self.monitor).numpy()
    if monitor_value is None:
      logging.warning('Agent early stopping conditioned on metric `%s` '
                      'which is not available. Available metrics are: %s',
                      self.monitor, ','.join(list(computed_metrics.keys())))
    return monitor_value

In [20]:
from tf_agents.metrics import tf_metrics

root_dir = os.path.expanduser(root_dir)
train_dir = os.path.join(root_dir, 'train')
eval_dir = os.path.join(root_dir, 'eval')
saved_model_dir = os.path.join(root_dir, 'policy_saved_model')

train_summary_writer = tf.summary.create_file_writer(
    train_dir, flush_millis=summaries_flush_secs * 1000)
train_summary_writer.set_as_default()

step_metrics = []
train_metrics = step_metrics + [
    # tf_metrics.NumberOfEpisodes(),
    # tf_metrics.EnvironmentSteps(),
    tf_metrics.AverageReturnMetric(batch_size=num_parallel_environments),
    # tf_metrics.AverageEpisodeLengthMetric(),
    # tf_metrics.ChosenActionHistogram(dtype=tf.int32),
]

eval_summary_writer = tf.summary.create_file_writer(
    eval_dir, flush_millis=summaries_flush_secs * 1000)
eval_metrics = [
    tf_metrics.AverageReturnMetric(buffer_size=1),
    # tf_metrics.AverageEpisodeLengthMetric(buffer_size=num_eval_episodes)
]
eval_metrics_callback = AgentEarlyStopping(
    monitor='AverageReturn', min_delta=0.0001, patience=np.inf, patience_after_change=5, verbose=1, mode='max'
)

global_step = tf.compat.v1.train.get_or_create_global_step()

In [21]:
import collections

def evaluate(eval_metrics, eval_tf_env, eval_policy, num_eval_episodes, num_eval_seeds, global_step=None, eval_summary_writer=None, summary_prefix='Metrics', seed=12345):
    all_results = []
    for i in range(num_eval_seeds):
        for env in eval_tf_env.envs:
            env.seed(seed + i)
        # One final eval before exiting.
        results = metric_utils.eager_compute(
            eval_metrics,
            eval_tf_env,
            eval_policy,
            num_episodes=num_eval_episodes,
            train_step=global_step,
        )
        all_results.append(results)

    mean_results = collections.OrderedDict(results)
    if num_eval_seeds > 1:
        for metric in mean_results:
            metric_sum = 0
            for result in all_results:
                metric_sum = tf.add(metric_sum, result[metric])
            mean_results[metric] = metric_sum / len(all_results)
    if global_step and eval_summary_writer:
        with eval_summary_writer.as_default():
            for metric, value in mean_results.items():
                tag = common.join_scope(summary_prefix, metric)
                tf.compat.v2.summary.scalar(name=tag, data=value, step=global_step)

    log = ['{0} = {1}'.format(metric, value) for metric, value in mean_results.items()]
    logging.info('%s \n\t\t %s','', '\n\t\t '.join(log))

    return mean_results

In [22]:
# Define Q-network

train_sequence_length = window_size

# dropout_layer = (0.2,0.2,0.2,0.2,0.2)
dropout_layer = None
activation_fn = tf.nn.leaky_relu # tf.keras.activations.relu # tf.keras.activations.tanh

if agent == 'DQN':
    if train_sequence_length > 1:
        input_fc_layer_params = (8,)
        lstm_size = (16,)
        output_fc_layer_params = (8,)
        
    else:
        fc_layer_params = (100,)

elif agent == 'PPO':
    if train_sequence_length > 1:
        actor_fc_layers = (8,)
        actor_lstm_size = (16,)
        actor_output_fc_layer = (8,)
        
        value_fc_layers = (8,)
        value_lstm_size = (16,)
        value_output_fc_layers = (8,)
    else:
        actor_fc_layers = (512,1024,2048,1024,512,)
        
        value_fc_layers = (512,1024,2048,1024,512,)


if agent == 'DQN':
    from tf_agents.networks import q_network
    from tf_agents.networks import q_rnn_network

    if train_sequence_length > 1:
        q_net = q_rnn_network.QRnnNetwork(
            tf_env.observation_spec(),
            tf_env.action_spec(),
            input_fc_layer_params=input_fc_layer_params,
            lstm_size=lstm_size,
            output_fc_layer_params=output_fc_layer_params
        )
    else:
        q_net = q_network.QNetwork(
            tf_env.observation_spec(),
            tf_env.action_spec(),
            fc_layer_params=fc_layer_params,
            dropout_layer_params=dropout_layer,
        )
        train_sequence_length = n_step_update

    if train_sequence_length != 1 and n_step_update != 1:
        raise NotImplementedError(
            'Currently not supporting n-step updates with stateful networks (i.e., RNNs)')

elif agent == 'PPO':
    from tf_agents.networks import actor_distribution_network
    from tf_agents.networks import actor_distribution_rnn_network
    from tf_agents.networks import value_network
    from tf_agents.networks import value_rnn_network

    if train_sequence_length > 1:
        actor_net = actor_distribution_rnn_network.ActorDistributionRnnNetwork(
            tf_env.observation_spec(),
            tf_env.action_spec(),
            input_fc_layer_params=actor_fc_layers,
            input_dropout_layer_params=dropout_layer,
            lstm_size=actor_lstm_size,
            activation_fn=activation_fn,
            output_fc_layer_params=actor_output_fc_layer)
        value_net = value_rnn_network.ValueRnnNetwork(
            tf_env.observation_spec(),
            input_fc_layer_params=value_fc_layers,
            input_dropout_layer_params=dropout_layer,
            lstm_size=value_lstm_size,
            activation_fn=activation_fn, # alredy relu on source code
            output_fc_layer_params=actor_output_fc_layer)
    else:
        actor_net = actor_distribution_network.ActorDistributionNetwork(
            tf_env.observation_spec(),
            tf_env.action_spec(),
            fc_layer_params=actor_fc_layers,
            dropout_layer_params=dropout_layer,
            activation_fn=activation_fn)
        value_net = value_network.ValueNetwork(
            tf_env.observation_spec(),
            fc_layer_params=value_fc_layers,
            dropout_layer_params=dropout_layer,
            activation_fn=activation_fn)

In [23]:
def train_eval(tf_agent, num_iterations, batch_size, tf_env, eval_tf_env, train_metrics, step_metrics, eval_metrics, global_step, replay_buffer_capacity, num_parallel_environments, collect_per_iteration, train_steps_per_iteration, train_dir, saved_model_dir, eval_summary_writer, num_eval_episodes, num_eval_seeds=1, eval_metrics_callback=None, train_sequence_length=1, initial_collect_steps=1000, train_model=True, use_tf_functions=True, eval_early_stopping=False):

    tf_agent.initialize()
    agent_name = tf_agent.__dict__['_name']

    eval_policy = tf_agent.policy
    collect_policy = tf_agent.collect_policy

    replay_buffer = tf_uniform_replay_buffer.TFUniformReplayBuffer(
        data_spec=tf_agent.collect_data_spec,
        batch_size=num_parallel_environments, # batch_size=tf_env.batch_size,
        max_length=replay_buffer_capacity)

    if train_model:
      if agent_name in ['dqn_agent']:
        collect_driver = dynamic_step_driver.DynamicStepDriver(
            tf_env,
            collect_policy,
            observers=[replay_buffer.add_batch] + train_metrics,
            num_steps=collect_per_iteration)
      elif agent_name in ['ppo_agent']:
        collect_driver = dynamic_episode_driver.DynamicEpisodeDriver(
            tf_env,
            collect_policy,
            observers=[replay_buffer.add_batch] + train_metrics,
            num_episodes=collect_per_iteration)
      else:
          raise NotImplementedError(f'{agent_name} agent not yet implemented')

    train_checkpointer = common.Checkpointer(
        ckpt_dir=train_dir,
        agent=tf_agent,
        global_step=global_step,
        metrics=metric_utils.MetricsGroup(train_metrics, 'train_metrics'))
    policy_checkpointer = common.Checkpointer(
        ckpt_dir=os.path.join(train_dir, 'policy'),
        policy=eval_policy,
        global_step=global_step)
    saved_model = policy_saver.PolicySaver(eval_policy, train_step=global_step)
    rb_checkpointer = common.Checkpointer(
        ckpt_dir=os.path.join(train_dir, 'replay_buffer'),
        max_to_keep=1,
        replay_buffer=replay_buffer)

    policy_checkpointer.initialize_or_restore() # TODO: To be tested
    train_checkpointer.initialize_or_restore()
    rb_checkpointer.initialize_or_restore()

    if train_model:

      # TODO: should they use autograph=False?? as in tf_agents/agents/ppo/examples/v2/train_eval_clip_agent.py
      if use_tf_functions:
        # To speed up collect use common.function.
        collect_driver.run = common.function(collect_driver.run) 
        tf_agent.train = common.function(tf_agent.train)

      # Only run Replay buffer initialization if using one of the following agents
      if agent_name in ['dqn_agent']:
        initial_collect_policy = random_tf_policy.RandomTFPolicy(
            tf_env.time_step_spec(), tf_env.action_spec())

        # Collect initial replay data.
        logging.info(
            'Initializing replay buffer by collecting experience for %d steps with '
            'a random policy.', initial_collect_steps)
        dynamic_step_driver.DynamicStepDriver(
            tf_env,
            initial_collect_policy,
            observers=[replay_buffer.add_batch] + train_metrics,
            num_steps=initial_collect_steps).run()

      logging.info(
          f'Initial eval metric'
      )
      results = evaluate(eval_metrics, eval_tf_env, eval_policy, num_eval_episodes, num_eval_seeds, global_step, eval_summary_writer, summary_prefix='Metrics', seed=SEED)

      if eval_early_stopping and not isinstance(eval_metrics_callback, AgentEarlyStopping):
          raise ValueError('Cannot set eval_early_stopping without eval_metric_callback being Agent Early Stopping instance')

      if eval_metrics_callback is not None:
        eval_metrics_callback(results, global_step.numpy())

      time_step = None
      policy_state = collect_policy.get_initial_state(tf_env.batch_size)

      timed_at_step = global_step.numpy()
      collect_time = 0
      train_time = 0
      summary_time = 0

      if agent_name in ['dqn_agent']:
        # Dataset generates trajectories with shape [Bx2x...]
        logging.info(
            f'Dataset generates trajectories'
        )
        dataset = replay_buffer.as_dataset(
            num_parallel_calls=3,
            sample_batch_size=batch_size,
            # single_deterministic_pass=True,
            num_steps=train_sequence_length + 1).prefetch(3)
        iterator = iter(dataset)

        def train_step():
          experience, _ = next(iterator)
          return tf_agent.train(experience)
      elif agent_name in ['ppo_agent']:
        def train_step():
          trajectories = replay_buffer.gather_all()
          return tf_agent.train(experience=trajectories)
      else:
        raise NotImplementedError(f'{agent_name} agent not yet implemented')

      if use_tf_functions:
        train_step = common.function(train_step)

      logging.info(
            f'Starting training...'
      )
      for _ in range(num_iterations):
        start_time = time.time()
        if agent_name in ['dqn_agent']:
          time_step, policy_state = collect_driver.run(
              time_step=time_step,
              policy_state=policy_state,
          )
        elif agent_name in ['ppo_agent']:
          collect_driver.run()
        else:
          raise NotImplementedError(f'{agent_name} agent not yet implemented')
        
        collect_time += time.time() - start_time

        start_time = time.time()
        for _ in range(train_steps_per_iteration):
          train_loss = train_step()
        train_time += time.time() - start_time

        start_time = time.time()
        for train_metric in train_metrics:
          train_metric.tf_summaries(
              train_step=global_step, step_metrics=step_metrics)
        summary_time += time.time() - start_time

        if global_step.numpy() % log_interval == 0:
          logging.info('step = %d, loss = %f', global_step.numpy(),
                      train_loss.loss)
          steps_per_sec = (global_step.numpy() - timed_at_step) / (train_time + collect_time + summary_time)
          logging.info('%.3f steps/sec', steps_per_sec)
          logging.info('collect_time = %.3f, train_time = %.3f, summary_time = %.3f', collect_time,
                     train_time, summary_time)
          tf.compat.v2.summary.scalar(
              name='global_steps_per_sec', data=steps_per_sec, step=global_step)
          timed_at_step = global_step.numpy()
          collect_time = 0
          train_time = 0
          summary_time = 0

        if global_step.numpy() % train_checkpoint_interval == 0:
          start_time = time.time()
          train_checkpointer.save(global_step=global_step.numpy())
          logging.info(
            f'Saving Train lasts: {time.time() - start_time:.3f} s'
          )

        if global_step.numpy() % policy_checkpoint_interval == 0:
          start_time = time.time()
          policy_checkpointer.save(global_step=global_step.numpy())
          saved_model_path = os.path.join(
              saved_model_dir, 'policy_' + ('%d' % global_step.numpy()).zfill(9))
          saved_model.save(saved_model_path)
          logging.info(
            f'Saving Policy lasts: {time.time() - start_time:.3f} s'
          )

        if global_step.numpy() % rb_checkpoint_interval == 0:
          start_time = time.time()
          rb_checkpointer.save(global_step=global_step.numpy())
          logging.info(
            f'Saving Replay Buffer lasts: {time.time() - start_time:.3f} s'
          )

        if global_step.numpy() % eval_interval == 0:
          start_time = time.time()
          results = evaluate(eval_metrics, eval_tf_env, eval_policy, num_eval_episodes, num_eval_seeds, global_step, eval_summary_writer, summary_prefix='Metrics', seed=SEED)
          if eval_metrics_callback is not None:
            eval_metrics_callback(results, global_step.numpy())
          logging.info(
            f'Calculate Evaluation lasts {time.time() - start_time:.3f} s'
          )

          if eval_early_stopping and eval_metrics_callback.stop_training:
              logging.info(
                  f'Training stopped due to Agent Early Stopping at step: {global_step.numpy()}'
              )
              logging.info(
                  f'Best {eval_metrics_callback.monitor} was {eval_metrics_callback.best:.5f} at step {eval_metrics_callback.stopped_step}'
              )
              break

In [24]:
# TODO: Adapt for using step or episodes as unit to then can switch easily between TF-Agents
# Compare here: https://github.com/tensorflow/agents/blob/master/tf_agents/agents/ppo/examples/v2/train_eval_clip_agent.py
from tf_agents.agents.dqn import dqn_agent
from tf_agents.agents.ppo import ppo_agent # TODO: Use ppo_clip_agent which is the proposed above
from tf_agents.drivers import dynamic_step_driver, dynamic_episode_driver
from tf_agents.eval import metric_utils
from tf_agents.policies import random_tf_policy
from tf_agents.replay_buffers import tf_uniform_replay_buffer
from tf_agents.utils import common

from tf_agents.policies import policy_saver

with tf.summary.record_if(
    lambda: tf.math.equal(global_step % summary_interval, 0)):

    if agent == 'DQN':
      # TODO(b/127301657): Decay epsilon based on global step, cf. cl/188907839
      tf_agent = dqn_agent.DqnAgent(
          tf_env.time_step_spec(),
          tf_env.action_spec(),
          q_network=q_net,
          epsilon_greedy=epsilon_greedy,
          n_step_update=n_step_update,
          target_update_tau=target_update_tau,
          target_update_period=target_update_period,
          optimizer=optimizer,
          td_errors_loss_fn=td_errors_loss_fn,
          gamma=gamma,
          reward_scale_factor=reward_scale_factor,
          gradient_clipping=gradient_clipping,
          debug_summaries=debug_summaries,
          summarize_grads_and_vars=summarize_grads_and_vars,
          check_numerics=check_numerics,
          train_step_counter=global_step)
    elif agent == 'PPO':
      # TODO: Use ppo_clip_agent which is the proposed above
      # tf_agent = ppo_clip_agent.PPOClipAgent(
      tf_agent = ppo_agent.PPOAgent(
        tf_env.time_step_spec(),
        tf_env.action_spec(),
        optimizer=optimizer,
        actor_net=actor_net,
        value_net=value_net,
        importance_ratio_clipping=importance_ratio_clipping,
        kl_cutoff_factor=kl_cutoff_factor,
        kl_cutoff_coef=kl_cutoff_coef,
        initial_adaptive_kl_beta=initial_adaptive_kl_beta,
        adaptive_kl_target=adaptive_kl_target,
        adaptive_kl_tolerance=adaptive_kl_tolerance,
        lambda_value=lambda_value,
        discount_factor=discount_factor,
        entropy_regularization=entropy_regularization,
        policy_l2_reg=policy_l2_reg,
        value_function_l2_reg=value_function_l2_reg,
        # shared_vars_l2_reg=shared_vars_l2_reg,
        value_pred_loss_coef=value_pred_loss_coef,
        normalize_observations=normalize_observations,
        use_gae=use_gae,
        use_td_lambda_return=use_td_lambda_return,
        normalize_rewards=normalize_rewards,
        reward_norm_clipping=reward_norm_clipping,
        log_prob_clipping=log_prob_clipping,
        gradient_clipping=gradient_clipping,
        # value_clipping=value_clipping,
        num_epochs=num_epochs,
        debug_summaries=debug_summaries,
        summarize_grads_and_vars=summarize_grads_and_vars,
        check_numerics=check_numerics,
        train_step_counter=global_step)
    else:
      raise NotImplementedError('Other agents than DQN and PPO are not yet implemented')

    for steps_per_episode in steps_schedule:

      logging.info(
        f'Steps per episode equal to {steps_per_episode}'
      )

      tf_env, eval_tf_env, test_tf_env = generateSplitEnvs(
        train,
        valid,
        test,
        window_size,
        steps_per_episode,
        FEATURE_COLUMNS,
        reward_type=reward_type,
        max_final_reward=max_final_reward,
        max_step_reward=max_step_reward,
        num_parallel_environments=num_parallel_environments,
        position_as_observation=POSITION_AS_OBSERVATION,
        constant_step=False,
        is_training=True,
        seed=SEED,
      )

      tf.compat.v2.summary.scalar(
        name='step_scheduling', data=steps_per_episode, step=global_step)

      eval_metrics_callback.reset()

      train_eval(
        tf_agent,
        num_iterations,
        batch_size,
        tf_env,
        eval_tf_env,
        train_metrics,
        step_metrics,
        eval_metrics,
        global_step,
        replay_buffer_capacity,
        num_parallel_environments,
        collect_per_iteration,
        train_steps_per_iteration,
        train_dir,
        saved_model_dir,
        eval_summary_writer,
        num_eval_episodes,
        num_eval_seeds=num_eval_seeds,
        eval_metrics_callback=eval_metrics_callback,
        train_sequence_length=train_sequence_length,
        initial_collect_steps=initial_collect_steps if agent=='DQN' else None,
        train_model=TRAIN_MODEL,
        use_tf_functions=use_tf_functions,
        eval_early_stopping=True,
      )

      tf.compat.v2.summary.scalar(
        name='step_scheduling', data=steps_per_episode, step=global_step)

INFO:absl:Steps per episode equal to 5
INFO:absl:No checkpoint available at logs\PPO\20200518-173612\train
INFO:absl:No checkpoint available at logs\PPO\20200518-173612\train\policy
INFO:absl:No checkpoint available at logs\PPO\20200518-173612\train\replay_buffer
INFO:absl:Initial eval metric
INFO:absl: 
		 AverageReturn = 0.0
INFO:absl:Starting training...
INFO:absl:step = 500, loss = 0.035012
INFO:absl:4.609 steps/sec
INFO:absl:collect_time = 1.684, train_time = 106.728, summary_time = 0.075
INFO:absl:step = 1000, loss = -0.045781
INFO:absl:19.369 steps/sec
INFO:absl:collect_time = 1.106, train_time = 24.701, summary_time = 0.007
INFO:absl:step = 1500, loss = -0.113828
INFO:absl:19.443 steps/sec
INFO:absl:collect_time = 1.086, train_time = 24.618, summary_time = 0.012
INFO:absl:step = 2000, loss = 0.050401
INFO:absl:17.271 steps/sec
INFO:absl:collect_time = 1.054, train_time = 27.891, summary_time = 0.005
INFO:absl: 
		 AverageReturn = 0.0
INFO:absl:Calculate Evaluation lasts 10.178 

KeyboardInterrupt: 

In [25]:
# One last evaluation
results = evaluate(eval_metrics, eval_tf_env, tf_agent.policy, num_eval_episodes, num_eval_seeds, global_step, eval_summary_writer, summary_prefix='Metrics')

INFO:absl: 
		 AverageReturn = 0.0


In [26]:
env_data = train # valid

all_envs = {}

full_env = OwnStocksEnv(
    df=env_data,
    window_size=window_size,
    frame_bound=(window_size, len(env_data)),
    steps_per_episode=len(env_data) - window_size, # steps_per_episode,
    constant_step=True,
    is_training=False,
    feature_columns=FEATURE_COLUMNS,
    position_as_observation=POSITION_AS_OBSERVATION,
    reward_type=reward_type,
    max_final_reward=max_final_reward,
    max_step_reward=max_step_reward,
)
all_envs['Full eval'] = full_env

#TODO: For the is_training=True we have to make that all executions are using same cases
step_env = OwnStocksEnv(
    df=env_data,
    window_size=window_size,
    frame_bound=(window_size, len(env_data)),
    steps_per_episode=steps_per_episode,
    constant_step=True,
    is_training=True,
    feature_columns=FEATURE_COLUMNS,
    position_as_observation=POSITION_AS_OBSERVATION,
    reward_type=reward_type,
    max_final_reward=max_final_reward,
    max_step_reward=max_step_reward,
)
all_envs[f'Eval step of {steps_per_episode}'] = step_env

large_step_env = OwnStocksEnv(
    df=env_data,
    window_size=window_size,
    frame_bound=(window_size, len(env_data)),
    steps_per_episode=10 * steps_per_episode,
    constant_step=True,
    is_training=True,
    feature_columns=FEATURE_COLUMNS,
    position_as_observation=POSITION_AS_OBSERVATION,
    reward_type=reward_type,
    max_final_reward=max_final_reward,
    max_step_reward=max_step_reward,
)
all_envs[f'Eval step of {10*steps_per_episode}'] = large_step_env

if int(0.1 * steps_per_episode) > 1:
    small_step_env = OwnStocksEnv(
        df=env_data,
        window_size=window_size,
        frame_bound=(window_size, len(env_data)),
        steps_per_episode=int(0.1 * steps_per_episode),
        constant_step=True,
        is_training=True,
        feature_columns=FEATURE_COLUMNS,
        position_as_observation=POSITION_AS_OBSERVATION,
        reward_type=reward_type,
        max_final_reward=max_final_reward,
        max_step_reward=max_step_reward,
    )
    all_envs[f'Eval step of {int(0.1 * steps_per_episode)}'] = small_step_env

In [27]:
from own_stock_env import runAllTestEnv

In [33]:
# Apply random policy on env
runAllTestEnv(all_envs, select_action_func=full_env.action_space.sample);

Testing enviorment Full eval:
Total rewards: 7783.67 ± 0.000 (mean ± std. dev. of 1 iterations)
Total profits: 608.61% ± 0.000% (mean ± std. dev. of 1 iterations)
Total revenue ratio: 0.00% ± 0.000% (mean ± std. dev. of 1 iterations)
--------------------------------------------------
Testing enviorment Eval step of 5:
Total rewards: 2.14 ± 82.127 (mean ± std. dev. of 3455 iterations)
Total profits: 0.05% ± 1.189% (mean ± std. dev. of 3455 iterations)
Total revenue ratio: 20.44% ± 28.138% (mean ± std. dev. of 3455 iterations)
--------------------------------------------------
Testing enviorment Eval step of 50:
Total rewards: 25.30 ± 261.359 (mean ± std. dev. of 345 iterations)
Total profits: 0.89% ± 4.038% (mean ± std. dev. of 345 iterations)
Total revenue ratio: 12.52% ± 14.422% (mean ± std. dev. of 345 iterations)
--------------------------------------------------


In [34]:
# Applying long term policy (buy at initial and do not sell) on env
from gym_anytrading.envs import Actions 

def always_buy_func():
    return  Actions.Buy.value

runAllTestEnv(all_envs, select_action_func=always_buy_func);

Testing enviorment Full eval:
Total rewards: 13743.53 ± 0.000 (mean ± std. dev. of 1 iterations)
Total profits: 3169.19% ± 0.000% (mean ± std. dev. of 1 iterations)
Total revenue ratio: 0.00% ± 0.000% (mean ± std. dev. of 1 iterations)
--------------------------------------------------
Testing enviorment Eval step of 5:
Total rewards: 2.15 ± 110.181 (mean ± std. dev. of 3455 iterations)
Total profits: 0.08% ± 1.538% (mean ± std. dev. of 3455 iterations)
Total revenue ratio: 26.94% ± 32.416% (mean ± std. dev. of 3455 iterations)
--------------------------------------------------
Testing enviorment Eval step of 50:
Total rewards: 53.09 ± 321.614 (mean ± std. dev. of 345 iterations)
Total profits: 1.25% ± 4.849% (mean ± std. dev. of 345 iterations)
Total revenue ratio: 16.59% ± 19.129% (mean ± std. dev. of 345 iterations)
--------------------------------------------------


In [35]:
# Applying baseline policy on env
# Manual policy used as baseline
from gym_anytrading.envs import Positions, Actions

rsi_col = 'RSI_14'
# rsi_col = 'Close_rsi'
rsi_index = full_env.feature_columns.index(rsi_col)

# RSI usually is between 0 and 100, here is normalized between -1 and 1
# The baseline strategy is buy at 30 and sell at 70 otherwise hold
def select_baseline_action(observation, rsi_thresh_buy=-0.6, rsi_thresh_sell=0.4, rsi_index=rsi_index):
    # Use only last observation
    obs = observation[-1]

    position_value = int(obs[-1])
    rsi = obs[rsi_index]

    if position_value == Positions.Short.value and rsi <= rsi_thresh_buy:
        action = Actions.Buy.value
    elif position_value == Positions.Long.value and rsi >= rsi_thresh_sell:
        action = Actions.Sell.value
    else:
        # Hold
        # if it was in short remain in short because is selling
        # if it was in long remain in long because is buying
        action = position_value
    
    return action

runAllTestEnv(all_envs, select_action_func=select_baseline_action, use_observation=True, rsi_thresh_buy=0.2, rsi_thresh_sell=0.8);

Testing enviorment Full eval:
Total rewards: 1071.96 ± 0.000 (mean ± std. dev. of 1 iterations)
Total profits: 62.15% ± 0.000% (mean ± std. dev. of 1 iterations)
Total revenue ratio: 0.00% ± 0.000% (mean ± std. dev. of 1 iterations)
--------------------------------------------------
Testing enviorment Eval step of 5:
Total rewards: 0.03 ± 2.367 (mean ± std. dev. of 3455 iterations)
Total profits: 0.00% ± 0.145% (mean ± std. dev. of 3455 iterations)
Total revenue ratio: 1.12% ± 10.059% (mean ± std. dev. of 3455 iterations)
--------------------------------------------------
Testing enviorment Eval step of 50:
Total rewards: -0.47 ± 11.744 (mean ± std. dev. of 345 iterations)
Total profits: -0.04% ± 1.093% (mean ± std. dev. of 345 iterations)
Total revenue ratio: 0.91% ± 4.896% (mean ± std. dev. of 345 iterations)
--------------------------------------------------


In [28]:
def select_TFEnv_action(TFEnv, policy, done, time_step=None, policy_state=None):
    
    action_step = policy.action(time_step, policy_state)
    # distribution_step = policy._distribution(  # pylint: disable=protected-access
    #     time_step, policy_state)
    # if distribution_step.action.log_prob(0) > distribution_step.action.log_prob(1):
    #     print(distribution_step)
    #     print(distribution_step.action.log_prob(0), distribution_step.action.log_prob(1))

    # TODO(b/134487572): TF2 while_loop seems to either ignore
    # parallel_iterations or doesn't properly propagate control dependencies
    # from one step to the next. Without this dep, self.env.step() is called
    # in parallel.
    with tf.control_dependencies(tf.nest.flatten([time_step])):
        next_time_step = TFEnv.step(action_step.action)

    policy_state = action_step.state

    action = action_step.action.numpy()[0]
    # print(action)

    done = next_time_step.discount.numpy()[0] == 0
    # if done:
    #     display(TFEnv.envs[0].max_possible_profit_df.iloc[-1,0])
    #     print(TFEnv.envs[0]._total_profit)
    #     print(TFEnv.envs[0].calculate_revenue_ratio())

    return action, done, next_time_step, policy_state

In [29]:
all_tf_envs = {}

for key, value in all_envs.items():
    all_tf_envs[key] = tf_py_environment.TFPyEnvironment(GymWrapper(value))

In [30]:
runAllTestEnv(all_tf_envs, select_action_func=select_TFEnv_action, use_model=True, isTFEnv=True, policy=tf_agent.collect_policy)

Testing enviorment Full eval:
Total rewards: 6688.14 ± 0.000 (mean ± std. dev. of 1 iterations)
Total profits: 118.80% ± 0.000% (mean ± std. dev. of 1 iterations)
Total revenue ratio: 0.00% ± 0.000% (mean ± std. dev. of 1 iterations)
--------------------------------------------------
Testing enviorment Eval step of 5:


KeyboardInterrupt: 