In [1]:
# from rl.core import Agent
from rl.policy import EpsGreedyQPolicy, GreedyQPolicy
from rl.util import *
from rl.memory import Memory
# from rl.layers import Layer
from rl.agents.dqn import AbstractDQNAgent
from keras.callbacks import History
from keras.callbacks import Callback as KerasCallback, CallbackList as KerasCallbackList
from keras import initializers, regularizers, activations, constraints
# from rl.callbacks import (
#     TrainIntervalLogger,
#     Visualizer,
# )
from copy import deepcopy

import operator
import warnings
import timeit
import json
from tempfile import mkdtemp
import os
import numpy as np
from collections import deque, namedtuple

Using TensorFlow backend.


In [2]:
"""
修改Memory成PrioritizedMemory
#根據TD-error作為被選取到機率的依據 Reference:https://arxiv.org/abs/1511.05952
argument:
    alpha:priority 
    beta:importance-sampling 
    steps_annealed:Simulate Anneal
    根據Reference的實驗結果 alpha=.6, start_beta=.4, end_beta=1. 或alpha=.7, start_beta=.5, end_beta=1.
    這兩種為最佳組合
"""
Experience = namedtuple('Experience', 'state0, action, reward, state1, terminal1')
class PrioritizedMemory(Memory): 
    def __init__(self, limit, alpha=.6, start_beta=.4, end_beta=1., steps_annealed=10000000, **kwargs):
        super(PrioritizedMemory, self).__init__(**kwargs)

        #The capacity of the replay buffer
        self.limit = limit

        #Transitions are stored in individual RingBuffers, similar to the SequentialMemory.
        #This does complicate things a bit relative to the OpenAI baseline implementation.
        self.actions = RingBuffer(limit)
        self.rewards = RingBuffer(limit)
        self.terminals = RingBuffer(limit)
        self.observations = RingBuffer(limit)

        assert alpha >= 0
        #how aggressively to sample based on TD error
        self.alpha = alpha
        #how aggressively to compensate for that sampling. This value is typically annealed
        #to stabilize training as the model converges (beta of 1.0 fully compensates for TD-prioritized sampling).
        self.start_beta = start_beta
        self.end_beta = end_beta
        self.steps_annealed = steps_annealed

        #SegmentTrees need a leaf count that is a power of 2
        tree_capacity = 1
        while tree_capacity < self.limit:
            tree_capacity *= 2

        #Create SegmentTrees with this capacity
        self.sum_tree = SumSegmentTree(tree_capacity)
        self.min_tree = MinSegmentTree(tree_capacity)
        self.max_priority = 1.

        #wrapping index for interacting with the trees
        self.next_index = 0

    def append(self, observation, action, reward, terminal, training=True):\
        #super() call adds to the deques that hold the most recent info, which is fed to the agent
        #on agent.forward()
        super(PrioritizedMemory, self).append(observation, action, reward, terminal, training=training)
        if training:
            self.observations.append(observation)
            self.actions.append(action)
            self.rewards.append(reward)
            self.terminals.append(terminal)
            #The priority of each new transition is set to the maximum
            self.sum_tree[self.next_index] = self.max_priority ** self.alpha
            self.min_tree[self.next_index] = self.max_priority ** self.alpha

            #shift tree pointer index to keep it in sync with RingBuffers
            self.next_index = (self.next_index + 1) % self.limit

    def _sample_proportional(self, batch_size):
        #outputs a list of idxs to sample, based on their priorities.
        idxs = list()

        for _ in range(batch_size):
            mass = random.random() * self.sum_tree.sum(0, self.limit - 1)
            idx = self.sum_tree.find_prefixsum_idx(mass)
            idxs.append(idx)

        return idxs

    def sample(self, batch_size, beta=1.):
        idxs = self._sample_proportional(batch_size)

        #importance sampling weights are a stability measure
        importance_weights = list()

        #The lowest-priority experience defines the maximum importance sampling weight
        prob_min = self.min_tree.min() / self.sum_tree.sum()
        max_importance_weight = (prob_min * self.nb_entries)  ** (-beta)
        obs_t, act_t, rews, obs_t1, dones = [], [], [], [], []

        experiences = list()
        for idx in idxs:
            while idx < self.window_length + 1:
                idx += 1

            terminal0 = self.terminals[idx - 2]
            while terminal0:
                # Skip this transition because the environment was reset here. Select a new, random
                # transition and use this instead. This may cause the batch to contain the same
                # transition twice.
                idx = sample_batch_indexes(self.window_length + 1, self.nb_entries, size=1)[0]
                terminal0 = self.terminals[idx - 2]

            assert self.window_length + 1 <= idx < self.nb_entries

            #probability of sampling transition is the priority of the transition over the sum of all priorities
            prob_sample = self.sum_tree[idx] / self.sum_tree.sum()
            importance_weight = (prob_sample * self.nb_entries) ** (-beta)
            #normalize weights according to the maximum value
            importance_weights.append(importance_weight/max_importance_weight)

            # Code for assembling stacks of observations and dealing with episode boundaries is borrowed from
            # SequentialMemory
            state0 = [self.observations[idx - 1]]
            for offset in range(0, self.window_length - 1):
                current_idx = idx - 2 - offset
                assert current_idx >= 1
                current_terminal = self.terminals[current_idx - 1]
                if current_terminal and not self.ignore_episode_boundaries:
                    # The previously handled observation was terminal, don't add the current one.
                    # Otherwise we would leak into a different episode.
                    break
                state0.insert(0, self.observations[current_idx])
            while len(state0) < self.window_length:
                state0.insert(0, zeroed_observation(state0[0]))
            action = self.actions[idx - 1]
            reward = self.rewards[idx - 1]
            terminal1 = self.terminals[idx - 1]
            state1 = [np.copy(x) for x in state0[1:]]
            state1.append(self.observations[idx])

            assert len(state0) == self.window_length
            assert len(state1) == len(state0)
            experiences.append(Experience(state0=state0, action=action, reward=reward,
                                          state1=state1, terminal1=terminal1))
        assert len(experiences) == batch_size

        # Return a tuple whre the first batch_size items are the transititions
        # while -2 is the importance weights of those transitions and -1 is
        # the idxs of the buffer (so that we can update priorities later)
        return tuple(list(experiences)+ [importance_weights, idxs])

    def update_priorities(self, idxs, priorities):
        #adjust priorities based on new TD error
        for i, idx in enumerate(idxs):
            assert 0 <= idx < self.limit
            priority = priorities[i] ** self.alpha
            self.sum_tree[idx] = priority
            self.min_tree[idx] = priority
            self.max_priority = max(self.max_priority, priority)
#         print 'priority :',priority
    def calculate_beta(self, current_step):
        a = float(self.end_beta - self.start_beta) / float(self.steps_annealed)
        b = float(self.start_beta)
        current_beta = min(self.end_beta, a * float(current_step) + b)
        return current_beta

    def get_config(self):
        config = super(PrioritizedMemory, self).get_config()
        config['alpha'] = self.alpha
        config['start_beta'] = self.start_beta
        config['end_beta'] = self.end_beta
        config['beta_steps_annealed'] = self.steps_annealed

    @property
    def nb_entries(self):
        """Return number of observations
        # Returns
            Number of observations
        """
        return len(self.observations)
    
#PrioritizedMemory內部的function
def sample_batch_indexes(low, high, size): 
    """Return a sample of (size) unique elements between low and high
        # Argument
            low (int): The minimum value for our samples
            high (int): The maximum value for our samples
            size (int): The number of samples to pick
        # Returns
            A list of samples of length size, with values between low and high
        """
    if high - low >= size:
        # We have enough data. Draw without replacement, that is each index is unique in the
        # batch. We cannot use `np.random.choice` here because it is horribly inefficient as
        # the memory grows. See https://github.com/numpy/numpy/issues/2764 for a discussion.
        # `random.sample` does the same thing (drawing without replacement) and is way faster.
        try:
            r = xrange(low, high)
        except NameError:
            r = range(low, high)
        batch_idxs = random.sample(r, size)
    else:
        # Not enough data. Help ourselves with sampling from the range, but the same index
        # can occur multiple times. This is not good and should be avoided by picking a
        # large enough warm-up phase.
        warnings.warn('Not enough entries to sample without replacement. Consider increasing your warm-up phase to avoid oversampling!')
        batch_idxs = np.random.random_integers(low, high - 1, size=size)
    assert len(batch_idxs) == size
    return batch_idxs
class RingBuffer(object):
    def __init__(self, maxlen):
        self.maxlen = maxlen
        self.start = 0
        self.length = 0
        self.data = [None for _ in range(maxlen)]

    def __len__(self):
        return self.length

    def __getitem__(self, idx):
        """Return element of buffer at specific index
        # Argument
            idx (int): Index wanted
        # Returns
            The element of buffer at given index
        """
        if idx < 0 or idx >= self.length:
            raise KeyError()
        return self.data[(self.start + idx) % self.maxlen]

    def append(self, v):
        """Append an element to the buffer
        # Argument
            v (object): Element to append
        """
        if self.length < self.maxlen:
            # We have space, simply increase the length.
            self.length += 1
        elif self.length == self.maxlen:
            # No space, "remove" the first item.
            self.start = (self.start + 1) % self.maxlen
        else:
            # This should never happen.
            raise RuntimeError()
        self.data[(self.start + self.length - 1) % self.maxlen] = v

class SegmentTree(object):
    """
    Abstract SegmentTree data structure used to create PrioritizedMemory.
    https://github.com/openai/baselines/blob/master/baselines/common/segment_tree.py
    """
    def __init__(self, capacity, operation, neutral_element):

        #powers of two have no bits in common with the previous integer
        assert capacity > 0 and capacity & (capacity - 1) == 0, "Capacity must be positive and a power of 2"
        self._capacity = capacity

        #a segment tree has (2*n)-1 total nodes
        self._value = [neutral_element for _ in range(2 * capacity)]

        self._operation = operation

        self.next_index = 0

    def _reduce_helper(self, start, end, node, node_start, node_end):
        if start == node_start and end == node_end:
            return self._value[node]
        mid = (node_start + node_end) // 2
        if end <= mid:
            return self._reduce_helper(start, end, 2 * node, node_start, mid)
        else:
            if mid + 1 <= start:
                return self._reduce_helper(start, end, 2 * node + 1, mid + 1, node_end)
            else:
                return self._operation(
                    self._reduce_helper(start, mid, 2 * node, node_start, mid),
                    self._reduce_helper(mid + 1, end, 2 * node + 1, mid + 1, node_end)
                )

    def reduce(self, start=0, end=None):
        if end is None:
            end = self._capacity
        if end < 0:
            end += self._capacity
        end -= 1
        return self._reduce_helper(start, end, 1, 0, self._capacity - 1)

    def __setitem__(self, idx, val):
        # index of the leaf
        idx += self._capacity
        self._value[idx] = val
        idx //= 2
        while idx >= 1:
            self._value[idx] = self._operation(
                self._value[2 * idx],
                self._value[2 * idx + 1]
            )
            idx //= 2

    def __getitem__(self, idx):
        assert 0 <= idx < self._capacity
        return self._value[self._capacity + idx]

class SumSegmentTree(SegmentTree):
    """
    SumTree allows us to sum priorities of transitions in order to assign each a probability of being sampled.
    """
    def __init__(self, capacity):
        super(SumSegmentTree, self).__init__(
            capacity=capacity,
            operation=operator.add,
            neutral_element=0.0
        )

    def sum(self, start=0, end=None):
        """Returns arr[start] + ... + arr[end]"""
        return super(SumSegmentTree, self).reduce(start, end)

    def find_prefixsum_idx(self, prefixsum):
        """Find the highest index `i` in the array such that
            sum(arr[0] + arr[1] + ... + arr[i - i]) <= prefixsum
        if array values are probabilities, this function
        allows to sample indexes according to the discrete
        probability efficiently.
        Parameters
        ----------
        perfixsum: float
            upperbound on the sum of array prefix
        Returns
        -------
        idx: int
            highest index satisfying the prefixsum constraint
        """
        assert 0 <= prefixsum <= self.sum() + 1e-5
        idx = 1
        while idx < self._capacity:  # while non-leaf
            if self._value[2 * idx] > prefixsum:
                idx = 2 * idx
            else:
                prefixsum -= self._value[2 * idx]
                idx = 2 * idx + 1
        return idx - self._capacity

class MinSegmentTree(SegmentTree):
    """
    In PrioritizedMemory, we normalize importance weights according to the maximum weight in the buffer.
    This is determined by the minimum transition priority. This MinTree provides an efficient way to
    calculate that.
    """
    def __init__(self, capacity):
        super(MinSegmentTree, self).__init__(
            capacity=capacity,
            operation=min,
            neutral_element=float('inf')
        )

    def min(self, start=0, end=None):
        """Returns min(arr[start], ...,  arr[end])"""

        return super(MinSegmentTree, self).reduce(start, end)

In [3]:
"""
跟原本Dense唯一的不同是增加normal distribution的Noisy，使agent有機會選擇不同action，增加Exploration。

Reference: https://arxiv.org/abs/1706.10295
"""
# class NoisyNetDense(Layer):
#     def __init__(self,
#                 units,
#                 activation=None,
#                 kernel_constraint=None,
#                 bias_constraint=None,
#                 kernel_regularizer=None,
#                 bias_regularizer=None,
#                 mu_initializer=None,
#                 sigma_initializer=None,
#                 **kwargs):

#         super(NoisyNetDense, self).__init__(**kwargs)
#         self.units = units
#         self.activation = activations.get(activation)
#         self.kernel_constraint = constraints.get(kernel_constraint) if kernel_constraint is not None else None
#         self.bias_constraint = constriants.get(bias_constraint)if kernel_constraint is not None else None
#         self.kernel_regularizer = regularizers.get(kernel_regularizer)if kernel_constraint is not None else None
#         self.bias_regularizer = regularizers.get(bias_regularizer) if kernel_constraint is not None else None

#     def build(self, input_shape):
#         self.input_dim = input_shape[-1]

#         #See section 3.2 of Fortunato et al.
#         sqr_inputs = self.input_dim**(1/2)
#         self.sigma_initializer = initializers.Constant(value=.5/sqr_inputs)
#         self.mu_initializer = initializers.RandomUniform(minval=(-1/sqr_inputs), maxval=(1/sqr_inputs))


#         self.mu_weight = self.add_weight(shape=(self.input_dim, self.units),
#                                         initializer=self.mu_initializer,
#                                         name='mu_weights',
#                                         constraint=self.kernel_constraint,
#                                         regularizer=self.kernel_regularizer)

#         self.sigma_weight = self.add_weight(shape=(self.input_dim, self.units),
#                                         initializer=self.sigma_initializer,
#                                         name='sigma_weights',
#                                         constraint=self.kernel_constraint,
#                                         regularizer=self.kernel_regularizer)

#         self.mu_bias = self.add_weight(shape=(self.units,),
#                                         initializer=self.mu_initializer,
#                                         name='mu_bias',
#                                         constraint=self.bias_constraint,
#                                         regularizer=self.bias_regularizer)

#         self.sigma_bias = self.add_weight(shape=(self.units,),
#                                         initializer=self.sigma_initializer,
#                                         name='sigma_bias',
#                                         constraint=self.bias_constraint,
#                                         regularizer=self.bias_regularizer)

#         super(NoisyNetDense, self).build(input_shape=input_shape)

#     def call(self, x):
#         #sample from noise distribution
#         e_i = K.random_normal((self.input_dim, self.units))
#         e_j = K.random_normal((self.units,))

#         #We use the factorized Gaussian noise variant from Section 3 of Fortunato et al.
#         eW = K.sign(e_i)*(K.sqrt(K.abs(e_i))) * K.sign(e_j)*(K.sqrt(K.abs(e_j)))
#         eB = K.sign(e_j)*(K.abs(e_j)**(1/2))

#         #See section 3 of Fortunato et al.
#         noise_injected_weights = K.dot(x, self.mu_weight + (self.sigma_weight * eW))
#         noise_injected_bias = self.mu_bias + (self.sigma_bias * eB)
#         output = K.bias_add(noise_injected_weights, noise_injected_bias)
#         if self.activation != None:
#             output = self.activation(output)
#         return output

#     def compute_output_shape(self, input_shape):
#         output_shape = list(input_shape)
#         output_shape[-1] = self.units
#         return tuple(output_shape)

#     def get_config(self):
#         config = {
#             'units': self.units,
#             'activation': activations.serialize(self.activation),
#             'mu_initializer': initializers.serialize(self.mu_initializer),
#             'sigma_initializer': initializers.serialize(self.sigma_initializer),
#             'kernel_regularizer': regularizers.serialize(self.kernel_regularizer),
#             'bias_regularizer': regularizers.serialize(self.bias_regularizer),
#             'kernel_constraint': constraints.serialize(self.kernel_constraint),
#             'bias_constraint': constraints.serialize(self.bias_constraint)
#         }
#         base_config = super(NoisyNetDense, self).get_config()
#         return dict(list(base_config.items()) + list(config.items()))



In [4]:
"""
繼承DQN 
"""
def mean_q(y_true, y_pred):
    return K.mean(K.max(y_pred, axis=-1))
class DQNAgent(AbstractDQNAgent):
    """
    # Arguments
        model__: A Keras model.
        policy__: A Keras-rl policy that are defined in [policy](https://github.com/keras-rl/keras-rl/blob/master/rl/policy.py).
        test_policy__: A Keras-rl policy.
        enable_double_dqn__: A boolean which enables target network as a second network proposed by van Hasselt et al. to decrease overfitting.
        enable_dueling_dqn__: A boolean which enables dueling architecture proposed by Wang et al.
        dueling_type__: If `enable_dueling_dqn` is set to `True`, a type of dueling architecture must be chosen which calculate Q(s,a) from V(s) and A(s,a) differently. Note that `avg` is recommanded in the [paper](https://arxiv.org/abs/1511.06581).
            `avg`: Q(s,a;theta) = V(s;theta) + (A(s,a;theta)-Avg_a(A(s,a;theta)))
            `max`: Q(s,a;theta) = V(s;theta) + (A(s,a;theta)-max_a(A(s,a;theta)))
            `naive`: Q(s,a;theta) = V(s;theta) + A(s,a;theta)
        nb_actions__: The total number of actions the agent can take. Dependent on the environment.
        processor__: A Keras-rl processor. An intermediary between the environment and the agent. Resizes the input, clips rewards etc. Similar to gym env wrappers.
        nb_steps_warmup__: An integer number of random steps to take before learning begins. This puts experience into the memory.
        gamma__: The discount factor of future rewards in the Q function.
        target_model_update__: How often to update the target model. Longer intervals stabilize training.
        train_interval__: The integer number of steps between each learning process.
        delta_clip__: A component of the huber loss.
    """
    def __init__(self, model, policy=None, test_policy=None, enable_double_dqn=True, enable_dueling_network=False,
                 dueling_type='avg', *args, **kwargs):
        super(DQNAgent, self).__init__(*args, **kwargs)
#         self.custom_model_objects={"NoisyNetDense":NoisyNetDense}
        self.history_q_values = []
        # Validate (important) input.
        if hasattr(model.output, '__len__') and len(model.output) > 1:
            raise ValueError('Model "{}" has more than one output. DQN expects a model that has a single output.'.format(model))
        if model.output._keras_shape != (None, self.nb_actions):
            raise ValueError('Model output "{}" has invalid shape. DQN expects a model that has one dimension for each action, in this case {}.'.format(model.output, self.nb_actions))

        # Parameters.
        self.enable_double_dqn = enable_double_dqn
        self.enable_dueling_network = enable_dueling_network
        self.dueling_type = dueling_type
        if self.enable_dueling_network:
            # get the second last layer of the model, abandon the last layer
            layer = model.layers[-2]
            nb_action = model.output._keras_shape[-1]
            # layer y has a shape (nb_action+1,)
            # y[:,0] represents V(s;theta)
            # y[:,1:] represents A(s,a;theta)
            y = Dense(nb_action + 1, activation='linear')(layer.output)
#             if type(layer) == NoisyNetDense:
#                 y = NoisyNetDense(nb_action + 1, activation='linear')(layer.output)
            # caculate the Q(s,a;theta)
            # dueling_type == 'avg'
            # Q(s,a;theta) = V(s;theta) + (A(s,a;theta)-Avg_a(A(s,a;theta)))
            # dueling_type == 'max'
            # Q(s,a;theta) = V(s;theta) + (A(s,a;theta)-max_a(A(s,a;theta)))
            # dueling_type == 'naive'
            # Q(s,a;theta) = V(s;theta) + A(s,a;theta)
            if self.dueling_type == 'avg':
                outputlayer = Lambda(lambda a: K.expand_dims(a[:, 0], -1) + a[:, 1:] - K.mean(a[:, 1:], keepdims=True), output_shape=(nb_action,))(y)
            elif self.dueling_type == 'max':
                outputlayer = Lambda(lambda a: K.expand_dims(a[:, 0], -1) + a[:, 1:] - K.max(a[:, 1:], keepdims=True), output_shape=(nb_action,))(y)
            elif self.dueling_type == 'naive':
                outputlayer = Lambda(lambda a: K.expand_dims(a[:, 0], -1) + a[:, 1:], output_shape=(nb_action,))(y)
            else:
                assert False, "dueling_type must be one of {'avg','max','naive'}"

            model = Model(inputs=model.input, outputs=outputlayer)

        # Related objects.
        self.model = model
        if policy is None:
            policy = EpsGreedyQPolicy()
        if test_policy is None:
            test_policy = GreedyQPolicy()
        self.policy = policy
        self.test_policy = test_policy
        self.wait = 0
        self.monitor_max_value = 0
        self.reset_states()

        #flag for changes to algorithm that come from dealing with importance sampling weights and priorities
        self.prioritized = True if type(self.memory) == PrioritizedMemory else False

    def fit(self, env, check_env, file_nm, patience, check_interval, nb_steps, action_repetition=1, \
            callbacks=None, verbose=1,\
            visualize=False, nb_max_start_steps=0, start_step_policy=None, log_interval=10000,
            nb_max_episode_steps=None):
        """Trains the agent on the given environment.

        # Arguments
            env: (`Env` instance): Environment that the agent interacts with. See [Env](#env) for details.
            nb_steps (integer): Number of training steps to be performed.
            action_repetition (integer): Number of times the agent repeats the same action without
                observing the environment again. Setting this to a value > 1 can be useful
                if a single action only has a very small effect on the environment.
            callbacks (list of `keras.callbacks.Callback` or `rl.callbacks.Callback` instances):
                List of callbacks to apply during training. See [callbacks](/callbacks) for details.
            verbose (integer): 0 for no logging, 1 for interval logging (compare `log_interval`), 2 for episode logging
            visualize (boolean): If `True`, the environment is visualized during training. However,
                this is likely going to slow down training significantly and is thus intended to be
                a debugging instrument.
            nb_max_start_steps (integer): Number of maximum steps that the agent performs at the beginning
                of each episode using `start_step_policy`. Notice that this is an upper limit since
                the exact number of steps to be performed is sampled uniformly from [0, max_start_steps]
                at the beginning of each episode.
            start_step_policy (`lambda observation: action`): The policy
                to follow if `nb_max_start_steps` > 0. If set to `None`, a random action is performed.
            log_interval (integer): If `verbose` = 1, the number of steps that are considered to be an interval.
            nb_max_episode_steps (integer): Number of steps per episode that the agent performs before
                automatically resetting the environment. Set to `None` if each episode should run
                (potentially indefinitely) until the environment signals a terminal state.

        # Returns
            A `keras.callbacks.History` instance that recorded the entire training process.
        """
        if not self.compiled:
            raise RuntimeError('Your tried to fit your agent but it hasn\'t been compiled yet. Please call `compile()` before `fit()`.')
        if action_repetition < 1:
            raise ValueError('action_repetition must be >= 1, is {}'.format(action_repetition))
        self.training = True
        self.file_nm = file_nm
        self._patience = patience

        callbacks = [] if not callbacks else callbacks[:]
        
#         if verbose == 1:
#             callbacks += [TrainIntervalLogger(interval=log_interval)]
#         elif verbose > 1:
#             callbacks += [TrainEpisodeLogger()]   
        if visualize:
            callbacks += [Visualizer()]
        history = History()
        callbacks += [history]
        callbacks = CallbackList(callbacks)
        if hasattr(callbacks, 'set_model'):
            callbacks.set_model(self)
        else:
            callbacks._set_model(self)
        callbacks._set_env(env)
        params = {
            'nb_steps': nb_steps,
        }
        if hasattr(callbacks, 'set_params'):
            callbacks.set_params(params)
        else:
            callbacks._set_params(params)
        self._on_train_begin()
        callbacks.on_train_begin()

        episode = np.int16(0)
        self.step = np.int16(0)
        observation = None
        episode_reward = None
        episode_step = None
        did_abort = False
        try:
            while self.step < nb_steps:
                if observation is None:  # start of a new episode
                    callbacks.on_episode_begin(episode)
                    episode_step = np.int16(0)
                    episode_reward = np.float32(0)

                    # Obtain the initial observation by resetting the environment.
                    self.reset_states()
                    observation = deepcopy(env.reset())
                    if self.processor is not None:
                        observation = self.processor.process_observation(observation)
                    assert observation is not None

                    # Perform random starts at beginning of episode and do not record them into the experience.
                    # This slightly changes the start position between games.
                    nb_random_start_steps = 0 if nb_max_start_steps == 0 else np.random.randint(nb_max_start_steps)
                    for _ in range(nb_random_start_steps):
                        if start_step_policy is None:
                            action = env.action_space.sample()
                        else:
                            action = start_step_policy(observation)
                        if self.processor is not None:
                            action = self.processor.process_action(action)
                        callbacks.on_action_begin(action)
                        observation, reward, done, info = env.step(action)
                        observation = deepcopy(observation)
                        if self.processor is not None:
                            observation, reward, done, info = self.processor.process_step(observation, reward, done, info)
                        callbacks.on_action_end(action)
                        if done:
                            warnings.warn('Env ended before {} random steps could be performed at the start. You should probably lower the `nb_max_start_steps` parameter.'.format(nb_random_start_steps))
                            observation = deepcopy(env.reset())
                            if self.processor is not None:
                                observation = self.processor.process_observation(observation)
                            break

                # At this point, we expect to be fully initialized.
                assert episode_reward is not None
                assert episode_step is not None
                assert observation is not None

                # Run a single step.
                callbacks.on_step_begin(episode_step)
                # This is were all of the work happens. We first perceive and compute the action
                # (forward step) and then use the reward to improve (backward step).
                action = self.forward(observation)
                if self.processor is not None:
                    action = self.processor.process_action(action)
                reward = np.float32(0)
                accumulated_info = {}
                done = False
                for _ in range(action_repetition):
                    callbacks.on_action_begin(action)
                    observation, r, done, info = env.step(action)
                    observation = deepcopy(observation)
                    if self.processor is not None:
                        observation, r, done, info = self.processor.process_step(observation, r, done, info)
                    for key, value in info.items():
                        if not np.isreal(value):
                            continue
                        if key not in accumulated_info:
                            accumulated_info[key] = np.zeros_like(value)
                        accumulated_info[key] += value
                    callbacks.on_action_end(action)
                    reward += r
                    if done:
                        break
                if nb_max_episode_steps and episode_step >= nb_max_episode_steps - 1:
                    # Force a terminal state.
                    done = True
                metrics = self.backward(reward, terminal=done)
                episode_reward += reward

                step_logs = {
                    'action': action,
                    'observation': observation,
                    'reward': reward,
                    'metrics': metrics,
                    'episode': episode,
                    'info': accumulated_info,
                }
                callbacks.on_step_end(episode_step, step_logs)
                episode_step += 1
                self.step += 1

                if done:
                    # We are in a terminal state but the agent hasn't yet seen it. We therefore
                    # perform one more forward-backward call and simply ignore the action before
                    # resetting the environment. We need to pass in `terminal=False` here since
                    # the *next* state, that is the state of the newly reset environment, is
                    # always non-terminal by convention.
                    self.forward(observation)
                    self.backward(0., terminal=False)

                    # This episode is finished, report and reset.
                    episode_logs = {
                        'episode_reward': episode_reward,
                        'nb_episode_steps': episode_step,
                        'nb_steps': self.step,
                        'accumulate_return': np.float32(env._params['accumulate_return']),
                        'benchmark_accumulate_return': np.float32(env._params['benchmark_accumulate_return']),
                        'index_nm': env._params['price_col'],
                    }
                    callbacks.on_episode_end(episode, episode_logs)
                    episode += 1                    
                    if (episode % check_interval ==0) and (episode>=4000):
                        _, monitor_list = self.test(env=check_env,nb_episodes=len(check_env._params['data_list']), visualize=False)
                        did_abort = self.check_model(file_nm = self.file_nm, patience = self._patience, monitor_list = monitor_list)
#                     print(did_abort)
                    if did_abort:
                        nb_steps = self.step
                    observation = None
                    episode_step = None
                    episode_reward = None
        except KeyboardInterrupt:
            # We catch keyboard interrupts here so that training can be be safely aborted.
            # This is so common that we've built this right into this function, which ensures that
            # the `on_train_end` method is properly called.
            did_abort = True
        callbacks.on_train_end(logs={'did_abort': did_abort})

        self._on_train_end()

        return history
    def test(self, env, filepath='', nb_episodes=1, action_repetition=1, callbacks=None, visualize=True,
             nb_max_episode_steps=None, nb_max_start_steps=0, start_step_policy=None, verbose=1):
        """Callback that is called before training begins.

        # Arguments
            env: (`Env` instance): Environment that the agent interacts with. See [Env](#env) for details.
            nb_episodes (integer): Number of episodes to perform.
            action_repetition (integer): Number of times the agent repeats the same action without
                observing the environment again. Setting this to a value > 1 can be useful
                if a single action only has a very small effect on the environment.
            callbacks (list of `keras.callbacks.Callback` or `rl.callbacks.Callback` instances):
                List of callbacks to apply during training. See [callbacks](/callbacks) for details.
            verbose (integer): 0 for no logging, 1 for interval logging (compare `log_interval`), 2 for episode logging
            visualize (boolean): If `True`, the environment is visualized during training. However,
                this is likely going to slow down training significantly and is thus intended to be
                a debugging instrument.
            nb_max_start_steps (integer): Number of maximum steps that the agent performs at the beginning
                of each episode using `start_step_policy`. Notice that this is an upper limit since
                the exact number of steps to be performed is sampled uniformly from [0, max_start_steps]
                at the beginning of each episode.
            start_step_policy (`lambda observation: action`): The policy
                to follow if `nb_max_start_steps` > 0. If set to `None`, a random action is performed.
            log_interval (integer): If `verbose` = 1, the number of steps that are considered to be an interval.
            nb_max_episode_steps (integer): Number of steps per episode that the agent performs before
                automatically resetting the environment. Set to `None` if each episode should run
                (potentially indefinitely) until the environment signals a terminal state.

        # Returns
            A `keras.callbacks.History` instance that recorded the entire training process.
        """
        print('test!')
        if not self.compiled:
            raise RuntimeError('Your tried to test your agent but it hasn\'t been compiled yet. Please call `compile()` before `test()`.')
        if action_repetition < 1:
            raise ValueError('action_repetition must be >= 1, is {}'.format(action_repetition))
        self.history_q_values = []
        self.training = False
        self.step = 0
        self.monitor_list = []
        callbacks = [] if not callbacks else callbacks[:]
        if filepath:
            callbacks += [ToCsvLogger(filepath=filepath)]
        
        if verbose >= 1:
            callbacks += [TestLogger()]
#             callbacks += [Earlystop()]
        if visualize:
            callbacks += [Visualizer()]
        history = History()
        callbacks += [history]
        callbacks = CallbackList(callbacks)
        if hasattr(callbacks, 'set_model'):
            callbacks.set_model(self)
        else:
            callbacks._set_model(self)
        callbacks._set_env(env)
        params = {
            'nb_episodes': nb_episodes,
        }
        if hasattr(callbacks, 'set_params'):
            callbacks.set_params(params)
        else:
            callbacks._set_params(params)

        self._on_test_begin()
        callbacks.on_train_begin()
        for episode in range(nb_episodes):
            callbacks.on_episode_begin(episode)
            episode_reward = 0.
            episode_step = 0

            # Obtain the initial observation by resetting the environment.
            self.reset_states()
            observation = deepcopy(env.reset())
            if self.processor is not None:
                observation = self.processor.process_observation(observation)
            assert observation is not None
            # Perform random starts at beginning of episode and do not record them into the experience.
            # This slightly changes the start position between games.
            nb_random_start_steps = 0 if nb_max_start_steps == 0 else np.random.randint(nb_max_start_steps)
            for _ in range(nb_random_start_steps):
                if start_step_policy is None:
                    action = env.action_space.sample()
                else:
                    action = start_step_policy(observation)
                if self.processor is not None:
                    action = self.processor.process_action(action)
                callbacks.on_action_begin(action)
                observation, r, done, info = env.step(action)
                observation = deepcopy(observation)
                if self.processor is not None:
                    observation, r, done, info = self.processor.process_step(observation, r, done, info)
                callbacks.on_action_end(action)
                if done:
                    warnings.warn('Env ended before {} random steps could be performed at the start. You should probably lower the `nb_max_start_steps` parameter.'.format(nb_random_start_steps))
                    observation = deepcopy(env.reset())
                    if self.processor is not None:
                        observation = self.processor.process_observation(observation)
                    break

            # Run the episode until we're done.
            done = False
            while not done:
                callbacks.on_step_begin(episode_step)

                action = self.forward(observation)
                if self.processor is not None:
                    action = self.processor.process_action(action)
                reward = 0.
                accumulated_info = {}
                for _ in range(action_repetition):
                    callbacks.on_action_begin(action)
                    observation, r, d, info = env.step(action)
                    observation = deepcopy(observation)
                    if self.processor is not None:
                        observation, r, d, info = self.processor.process_step(observation, r, d, info)
                    callbacks.on_action_end(action)
                    reward += r
                    for key, value in info.items():
                        if not np.isreal(value):
                            continue
                        if key not in accumulated_info:
                            accumulated_info[key] = np.zeros_like(value)
                        accumulated_info[key] += value
                    if d:
                        done = True
                        break
                if nb_max_episode_steps and episode_step >= nb_max_episode_steps - 1:
                    done = True
#                 self.backward(reward, terminal=done)
                episode_reward += reward

                step_logs = {
                    'action': action,
                    'observation': observation,
                    'reward': reward,
                    'episode': episode,
                    'info': accumulated_info,
                }
                callbacks.on_step_end(episode_step, step_logs)
                episode_step += 1
                self.step += 1

            # We are in a terminal state but the agent hasn't yet seen it. We therefore
            # perform one more forward-backward call and simply ignore the action before
            # resetting the environment. We need to pass in `terminal=False` here since
            # the *next* state, that is the state of the newly reset environment, is
            # always non-terminal by convention.
            self.forward(observation)
#             self.backward(0., terminal=False)

            # Report end of episode.
            episode_logs = {
                'episode_reward': episode_reward,
                'nb_steps': episode_step,
                'index_nm': env._params['price_col'],
                'test_length' : len(env._params['data_list']),
                'accumulate_return': np.float32(env._params['accumulate_return']),
                'benchmark_accumulate_return': np.float32(env._params['benchmark_accumulate_return']),
                
            }
            self.monitor_list.append(episode_logs['accumulate_return'])
            callbacks.on_episode_end(episode, episode_logs)
            
        callbacks.on_train_end()
        self._on_test_end()
        self.training = True

        return history , self.monitor_list
                                 
    def get_config(self):
        config = super(DQNAgent, self).get_config()
        config['enable_double_dqn'] = self.enable_double_dqn
        config['dueling_type'] = self.dueling_type
        config['enable_dueling_network'] = self.enable_dueling_network
        config['model'] = get_object_config(self.model)
        config['policy'] = get_object_config(self.policy)
        config['test_policy'] = get_object_config(self.test_policy)
        if self.compiled:
            config['target_model'] = get_object_config(self.target_model)
        return config

    def compile(self, optimizer, metrics=[]):
        metrics += [mean_q]  # register default metrics

        # We never train the target model, hence we can set the optimizer and loss arbitrarily.
        self.target_model = clone_model(self.model, self.custom_model_objects)
        self.target_model.compile(optimizer='sgd', loss='mse')
        self.model.compile(optimizer='sgd', loss='mse')

        # Compile model.
        if self.target_model_update < 1.:
            # We use the `AdditionalUpdatesOptimizer` to efficiently soft-update the target model.
            updates = get_soft_target_model_updates(self.target_model, self.model, self.target_model_update)
            optimizer = AdditionalUpdatesOptimizer(optimizer, updates)

        def clipped_masked_error(args):
            y_true, y_pred, importance_weights, mask = args
            loss = huber_loss(y_true, y_pred, self.delta_clip)
            loss *= mask  # apply element-wise mask
            #adjust updates by importance weights. Note that importance weights are just 1.0
            #(and have no effect) if not using a prioritized memory
            return K.sum(loss * importance_weights, axis=-1)

        # Create trainable model. The problem is that we need to mask the output since we only
        # ever want to update the Q values for a certain action. The way we achieve this is by
        # using a custom Lambda layer that computes the loss. This gives us the necessary flexibility
        # to mask out certain parameters by passing in multiple inputs to the Lambda layer.
        y_pred = self.model.output
        y_true = Input(name='y_true', shape=(self.nb_actions,))
        mask = Input(name='mask', shape=(self.nb_actions,))
        importance_weights = Input(name='importance_weights',shape=(self.nb_actions,))
        loss_out = Lambda(clipped_masked_error, output_shape=(1,), name='loss')([y_true, y_pred, importance_weights, mask])
        ins = [self.model.input] if type(self.model.input) is not list else self.model.input
        trainable_model = Model(inputs=ins + [y_true, importance_weights, mask], outputs=[loss_out, y_pred])
        assert len(trainable_model.output_names) == 2
        combined_metrics = {trainable_model.output_names[1]: metrics}
        losses = [
            lambda y_true, y_pred: y_pred,  # loss is computed in Lambda layer
            lambda y_true, y_pred: K.zeros_like(y_pred),  # we only include this for the metrics
        ]
        trainable_model.compile(optimizer=optimizer, loss=losses, metrics=combined_metrics)
        self.trainable_model = trainable_model

        self.compiled = True
    
    def check_model(self,  file_nm, patience,  monitor_list):
        stop_training = False
        self.monitor_value_now = np.mean(monitor_list)
        filepath = file_nm.format(self.monitor_value_now)
        print('max:',self.monitor_max_value)
        print('now:',self.monitor_value_now)
        if filepath :
            print('check!')
            if self.monitor_value_now > self.monitor_max_value :
                print('accumulate_return improved from %0.5f to %0.5f, saving model to %s'
                  % (self.monitor_max_value,self.monitor_value_now, filepath))
                self.model.save_weights(filepath, overwrite=True) 
                self.monitor_max_value = self.monitor_value_now 
                self.wait = 0
                self.monitor_list = []
            else:
                self.wait += 1  
                if self.wait>=patience:
                    print('accumulate_return did not improve and stop training')
                    stop_training = True

        print('wait:',self.wait)
        print('patience:',patience)
        return stop_training
                
    def load_weights(self, filepath):
        self.model.load_weights(filepath)
        self.update_target_model_hard()

    def save_weights(self, filepath, overwrite=False):
        self.model.save_weights(filepath, overwrite=overwrite)

    def reset_states(self):
        self.recent_action = None
        self.recent_observation = None
        if self.compiled:
            self.model.reset_states()
            self.target_model.reset_states()

    def update_target_model_hard(self):
        self.target_model.set_weights(self.model.get_weights())

    def forward(self, observation):
        # Select an action.
        state = self.memory.get_recent_state(observation)
        q_values = self.compute_q_values(state)
#        print(q_values)
        self.history_q_values.append(q_values)
        if self.training:
            action = self.policy.select_action(q_values=q_values)
        else:
            action = self.test_policy.select_action(q_values=q_values)

        # Book-keeping.
        self.recent_observation = observation
        self.recent_action = action

        return action

    def backward(self, reward, terminal):
        # Store most recent experience in memory.
        if self.step % self.memory_interval == 0:
            self.memory.append(self.recent_observation, self.recent_action, reward, terminal,
                               training=self.training)

        metrics = [np.nan for _ in self.metrics_names]
        if not self.training:
            # We're done here. No need to update the experience memory since we only use the working
            # memory to obtain the state over the most recent observations.
            return metrics

        # Train the network on a single stochastic batch.
        if self.step > self.nb_steps_warmup and self.step % self.train_interval == 0:

            if self.prioritized:
                # Calculations for current beta value based on a linear schedule.
                current_beta = self.memory.calculate_beta(self.step)
                # Sample from the memory.
                experiences = self.memory.sample(self.batch_size, current_beta)
            else:
                #SequentialMemory
                experiences = self.memory.sample(self.batch_size)

            # Start by extracting the necessary parameters (we use a vectorized implementation).
            state0_batch = []
            reward_batch = []
            action_batch = []
            terminal1_batch = []
            state1_batch = []
            importance_weights = []
            # We will be updating the idxs of the priority trees with new priorities
            pr_idxs = []

            if self.prioritized:
                for e in experiences[:-2]: # Prioritized Replay returns Experience tuple + weights and idxs.
                    state0_batch.append(e.state0)
                    state1_batch.append(e.state1)
                    reward_batch.append(e.reward)
                    action_batch.append(e.action)
                    terminal1_batch.append(0. if e.terminal1 else 1.)
                importance_weights = experiences[-2]
                pr_idxs = experiences[-1]
            else: #SequentialMemory
                for e in experiences:
                    state0_batch.append(e.state0)
                    state1_batch.append(e.state1)
                    reward_batch.append(e.reward)
                    action_batch.append(e.action)
                    terminal1_batch.append(0. if e.terminal1 else 1.)

            # Prepare and validate parameters.
            state0_batch = self.process_state_batch(state0_batch)
            state1_batch = self.process_state_batch(state1_batch)
            terminal1_batch = np.array(terminal1_batch)
            reward_batch = np.array(reward_batch)
            assert reward_batch.shape == (self.batch_size,)
            assert terminal1_batch.shape == reward_batch.shape
            assert len(action_batch) == len(reward_batch)

            # Compute Q values for mini-batch update.
            if self.enable_double_dqn:
                # According to the paper "Deep Reinforcement Learning with Double Q-learning"
                # (van Hasselt et al., 2015), in Double DQN, the online network predicts the actions
                # while the target network is used to estimate the Q value.
                q_values = self.model.predict_on_batch(state1_batch)
                assert q_values.shape == (self.batch_size, self.nb_actions)
                actions = np.argmax(q_values, axis=1)
                assert actions.shape == (self.batch_size,)

                # Now, estimate Q values using the target network but select the values with the
                # highest Q value wrt to the online model (as computed above).
                target_q_values = self.target_model.predict_on_batch(state1_batch)
                assert target_q_values.shape == (self.batch_size, self.nb_actions)
                q_batch = target_q_values[range(self.batch_size), actions]
            else:
                # Compute the q_values given state1, and extract the maximum for each sample in the batch.
                # We perform this prediction on the target_model instead of the model for reasons
                # outlined in Mnih (2015). In short: it makes the algorithm more stable.
                target_q_values = self.target_model.predict_on_batch(state1_batch)
                assert target_q_values.shape == (self.batch_size, self.nb_actions)
                q_batch = np.max(target_q_values, axis=1).flatten()
            assert q_batch.shape == (self.batch_size,)

            targets = np.zeros((self.batch_size, self.nb_actions))
            dummy_targets = np.zeros((self.batch_size,))
            masks = np.zeros((self.batch_size, self.nb_actions))

            # Compute r_t + gamma * max_a Q(s_t+1, a) and update the target targets accordingly,
            # but only for the affected output units (as given by action_batch).
            discounted_reward_batch = self.gamma * q_batch
            # Set discounted reward to zero for all states that were terminal.
            discounted_reward_batch *= terminal1_batch
            assert discounted_reward_batch.shape == reward_batch.shape
            #Putting together the multi-step target
            Rs = reward_batch + discounted_reward_batch

            for idx, (target, mask, R, action) in enumerate(zip(targets, masks, Rs, action_batch)):
                target[action] = R  # update action with estimated accumulated reward
                dummy_targets[idx] = R
                mask[action] = 1.  # enable loss for this specific action
            targets = np.array(targets).astype('float32')
            masks = np.array(masks).astype('float32')

            if not self.prioritized:
                importance_weights = [1. for _ in range(self.batch_size)]
            #Make importance_weights the same shape as the other tensors that are passed into the trainable model
            assert len(importance_weights) == self.batch_size
            importance_weights = np.array(importance_weights)
            importance_weights = np.vstack([importance_weights]*self.nb_actions)
            importance_weights = np.reshape(importance_weights, (self.batch_size, self.nb_actions))
            # Perform a single update on the entire batch. We use a dummy target since
            # the actual loss is computed in a Lambda layer that needs more complex input. However,
            # it is still useful to know the actual target to compute metrics properly.
            ins = [state0_batch] if type(self.model.input) is not list else state0_batch
            metrics = self.trainable_model.train_on_batch(ins + [targets, importance_weights, masks], [dummy_targets, targets])

            if self.prioritized:
                assert len(pr_idxs) == self.batch_size
                #Calculate new priorities.
                y_true = targets
                y_pred = self.model.predict_on_batch(ins)
                #Proportional method. Priorities are the abs TD error with a small positive constant to keep them from being 0.
                new_priorities = (abs(np.sum(y_true - y_pred, axis=-1))) + 1e-5
                assert len(new_priorities) == self.batch_size
                #update priorities
                self.memory.update_priorities(pr_idxs, new_priorities)

            metrics = [metric for idx, metric in enumerate(metrics) if idx not in (1, 2)]  # throw away individual losses
            metrics += self.policy.metrics
            if self.processor is not None:
                metrics += self.processor.metrics

        if self.target_model_update >= 1 and self.step % self.target_model_update == 0:
            self.update_target_model_hard()

        return metrics

    @property
    def layers(self):
        return self.model.layers[:]

    @property
    def metrics_names(self):
        # Throw away individual losses and replace output name since this is hidden from the user.
        assert len(self.trainable_model.output_names) == 2
        dummy_output_name = self.trainable_model.output_names[1]
        model_metrics = [name for idx, name in enumerate(self.trainable_model.metrics_names) if idx not in (1, 2)]
        model_metrics = [name.replace(dummy_output_name + '_', '') for name in model_metrics]

        names = model_metrics + self.policy.metrics_names[:]
        if self.processor is not None:
            names += self.processor.metrics_names[:]
        return names

    @property
    def policy(self):
        return self.__policy

    @policy.setter
    def policy(self, policy):
        self.__policy = policy
        self.__policy._set_agent(self)

    @property
    def test_policy(self):
        return self.__test_policy

    @test_policy.setter
    def test_policy(self, policy):
        self.__test_policy = policy
        self.__test_policy._set_agent(self)


In [5]:
class Callback(KerasCallback):
    def _set_env(self, env):
        self.env = env

    def on_episode_begin(self, episode, logs={}):
        """Called at beginning of each episode"""
        pass

    def on_episode_end(self, episode, logs={}):
        """Called at end of each episode"""
        pass

    def on_step_begin(self, step, logs={}):
        """Called at beginning of each step"""
        pass

    def on_step_end(self, step, logs={}):
        """Called at end of each step"""
        pass

    def on_action_begin(self, action, logs={}):
        """Called at beginning of each action"""
        pass

    def on_action_end(self, action, logs={}):
        """Called at end of each action"""
        pass
    def accumulate_return_to_month(self, step, accumulate_return):
        return (accumulate_return+1.)**(1./step) - 1. 


class CallbackList(KerasCallbackList):
    def _set_env(self, env):
        """ Set environment for each callback in callbackList """
        for callback in self.callbacks:
            if callable(getattr(callback, '_set_env', None)):
                callback._set_env(env)

    def on_episode_begin(self, episode, logs={}):
        """ Called at beginning of each episode for each callback in callbackList"""
        for callback in self.callbacks:
            # Check if callback supports the more appropriate `on_episode_begin` callback.
            # If not, fall back to `on_epoch_begin` to be compatible with built-in Keras callbacks.
            if callable(getattr(callback, 'on_episode_begin', None)):
                callback.on_episode_begin(episode, logs=logs)
            else:
                callback.on_epoch_begin(episode, logs=logs)

    def on_episode_end(self, episode, logs={}):
        """ Called at end of each episode for each callback in callbackList"""
        for callback in self.callbacks:
            # Check if callback supports the more appropriate `on_episode_end` callback.
            # If not, fall back to `on_epoch_end` to be compatible with built-in Keras callbacks.
            if callable(getattr(callback, 'on_episode_end', None)):
                callback.on_episode_end(episode, logs=logs)
            else:
                callback.on_epoch_end(episode, logs=logs)

    def on_step_begin(self, step, logs={}):
        """ Called at beginning of each step for each callback in callbackList"""
        for callback in self.callbacks:
            # Check if callback supports the more appropriate `on_step_begin` callback.
            # If not, fall back to `on_batch_begin` to be compatible with built-in Keras callbacks.
            if callable(getattr(callback, 'on_step_begin', None)):
                callback.on_step_begin(step, logs=logs)
            else:
                callback.on_batch_begin(step, logs=logs)

    def on_step_end(self, step, logs={}):
        """ Called at end of each step for each callback in callbackList"""
        for callback in self.callbacks:
            # Check if callback supports the more appropriate `on_step_end` callback.
            # If not, fall back to `on_batch_end` to be compatible with built-in Keras callbacks.
            if callable(getattr(callback, 'on_step_end', None)):
                callback.on_step_end(step, logs=logs)
            else:
                callback.on_batch_end(step, logs=logs)

    def on_action_begin(self, action, logs={}):
        """ Called at beginning of each action for each callback in callbackList"""
        for callback in self.callbacks:
            if callable(getattr(callback, 'on_action_begin', None)):
                callback.on_action_begin(action, logs=logs)

    def on_action_end(self, action, logs={}):
        """ Called at end of each action for each callback in callbackList"""
        for callback in self.callbacks:
            if callable(getattr(callback, 'on_action_end', None)):
                callback.on_action_end(action, logs=logs)

In [6]:
"""
新增Callback

argument:
    monitor:用於監控的數值，把相較於上次更好的weight存下來
"""
class ModelCheckByTrainAccRe(Callback): 
    def __init__(self, filepath, monitor='month_return', verbose=2,
                 save_best_only=True, save_weights_only=True,min_save_length=3500,
                 mode='auto'):
        self.monitor = monitor
        self.verbose = verbose
        self.filepath = filepath
        self.save_best_only = save_best_only
        self.save_weights_only = save_weights_only
        self.min_save_length = min_save_length
        if mode == 'min':
            self.monitor_op = np.less
            self.best = np.Inf
        elif mode == 'max':
            self.monitor_op = np.greater
            self.best = -np.Inf
        else:
            if 'accumulate_return' in self.monitor or self.monitor.startswith('fmeasure'):
                self.monitor_op = np.greater
                self.best = -np.Inf
            else:
                self.monitor_op = np.less
                self.best = np.Inf

    def on_episode_end(self, epoch, logs=None):
        if epoch>self.min_save_length:
            logs = logs or {}
            logs['month_return'] = accumulate_return_to_month(logs['nb_episode_steps'],logs['accumulate_return'])
            logs['benchmark_month_return'] = accumulate_return_to_month(logs['nb_episode_steps'],logs['benchmark_accumulate_return'])
            logs['month_return_diff'] = (logs.get('month_return') - logs.get('benchmark_month_return'))
            filepath = self.filepath.format(epoch=epoch + 1, **logs) 
            if self.save_best_only:
                current_train_acc_re = logs.get(self.monitor)
                if(current_train_acc_re > 0)&(logs.get('month_return')>0):
                    if self.monitor_op(current_train_acc_re, self.best):
                        if self.verbose > 0:
                            print('\nEpoch %05d: %s improved from %0.5f to %0.5f,'
                                     ' saving model to %s'
                                      % (epoch + 1, self.monitor, self.best,
                                         current_train_acc_re, filepath))
                        self.best = current_train_acc_re

                        if self.save_weights_only:
                            self.model.save_weights(filepath, overwrite=True)
                        else:
                            self.model.save(filepath, overwrite=True)
                    else:
                        if self.verbose > 0:
                            print('\nEpoch %05d: %s did not improve' %
                                  (epoch + 1, self.monitor))
            else:
                if self.verbose > 0:
                    print('\nEpoch %05d: saving model to %s' % (epoch + 1, filepath))
                if self.save_weights_only:
                    self.model.save_weights(filepath, overwrite=True)
                else:
                    self.model.save(filepath, overwrite=True)     

In [1]:
"""
新增Callback

於Training時輸出各項自訂的變數
"""
class TrainEpisodeLogger(Callback):
    def __init__(self, filepath=None):
        # Some algorithms compute multiple episodes at once since they are multi-threaded.
        # We therefore use a dictionary that is indexed by the episode to separate episodes
        # from each other.
        self.episode_start = {}
        self.observations = {}
        self.rewards = {}
        self.actions = {}
        self.metrics = {}
        self.step = 0
        self.filepath = filepath

    def on_train_begin(self, logs):
        """ Print training values at beginning of training """
        self.train_start = timeit.default_timer()
        self.metrics_names = self.model.metrics_names
        print('Training for {} steps ...'.format(self.params['nb_steps']))
        
    def on_train_end(self, logs):
        """ Print training time at end of training """
        duration = timeit.default_timer() - self.train_start
        print('done, took {:.3f} seconds'.format(duration))
        if self.filepath != None:
            with open(self.filepath,'a') as f:
                f.write("WALL CLOCK TIME: " + str(duration))

    def on_episode_begin(self, episode, logs):
        """ Reset environment variables at beginning of each episode """
        self.episode_start[episode] = timeit.default_timer()
        self.observations[episode] = []
        self.rewards[episode] = []
        self.actions[episode] = []
        self.metrics[episode] = []

    def on_episode_end(self, episode, logs):
        """ Compute and print training statistics of the episode when done """
        duration = timeit.default_timer() - self.episode_start[episode]
        episode_steps = len(self.observations[episode])

        # Format all metrics.
        metrics = np.array(self.metrics[episode])
        metrics_template = ''
        metrics_variables = []
        with warnings.catch_warnings():
            warnings.filterwarnings('error')
            for idx, name in enumerate(self.metrics_names):
                if idx > 0:
                    metrics_template += ', '
                try:
                    value = np.nanmean(metrics[:, idx])
                    metrics_template += '{}: {:f}'
                except Warning:
                    value = '--'
                    metrics_template += '{}: {}'
                metrics_variables += [name, value]          
        metrics_text = metrics_template.format(*metrics_variables)

        nb_step_digits = str(int(np.ceil(np.log10(self.params['nb_steps']))) + 1)
#         template = '{step: ' + nb_step_digits + 'd}/{nb_steps}: episode: {episode}, duration: {duration:.3f}s, episode steps: {episode_steps}, steps per second: {sps:.0f}, episode reward: {episode_reward:.3f}, mean reward: {reward_mean:.3f} [{reward_min:.3f}, {reward_max:.3f}], mean action: {action_mean:.3f} [{action_min:.3f}, {action_max:.3f}], mean observation: {obs_mean:.3f} [{obs_min:.3f}, {obs_max:.3f}], month_return: {month_return:.3f}, benchmark_month_return: {benchmark_month_return:.3f}, {metrics}'
        template = '{step: ' + nb_step_digits + 'd}/{nb_steps}: episode: {episode}, duration: {duration:.3f}s, episode steps: {episode_steps}, steps per second: {sps:.0f}, mean reward: {reward_mean:.3f} [{reward_min:.3f}, {reward_max:.3f}], mean action: {action_mean:.3f} [{action_min:.3f}, {action_max:.3f}], mean observation: {obs_mean:.3f} [{obs_min:.3f}, {obs_max:.3f}], month_return: {month_return:.5f}, accumulate_return: {accumulate_return:.5f}, benchmark_month_return: {benchmark_month_return:.5f}, benchmark_accumulate_return: {benchmark_accumulate_return: .5f}, {metrics},nb_episode_steps: {nb_episode_steps}'
        variables = {
            'step': self.step,
            'nb_steps': self.params['nb_steps'],
            'episode': episode + 1,
            'duration': duration,
            'episode_steps': episode_steps,
            'nb_episode_steps' : logs['nb_episode_steps'],
            'sps': float(episode_steps) / duration,
            'episode_reward': np.sum(self.rewards[episode]),
            'reward_mean': np.mean(self.rewards[episode]),
            'reward_min': np.min(self.rewards[episode]),
            'reward_max': np.max(self.rewards[episode]),
            'action_mean': np.mean(self.actions[episode]),
            'action_min': np.min(self.actions[episode]),
            'action_max': np.max(self.actions[episode]),
            'obs_mean': np.mean(self.observations[episode]),
            'obs_min': np.min(self.observations[episode]),
            'obs_max': np.max(self.observations[episode]),
            'accumulate_return' : logs.get('accumulate_return'),
            'month_return' : accumulate_return_to_month(episode_steps,logs['accumulate_return']),
            'benchmark_accumulate_return': logs.get('benchmark_accumulate_return'),
            'benchmark_month_return' : accumulate_return_to_month(episode_steps,logs['benchmark_accumulate_return']),
            'metrics': metrics_text,
        }
#         print(variables['metrics'])
        print(template.format(**variables))
        
        metrics_text_list = metrics_text.replace(': ', ', ').split(', ')
        for i in range(len(metrics_text_list))[::2]:
            variables[metrics_text_list[i]]=metrics_text_list[i+1]
        try:
            if self.filepath != None:
                if os.path.isfile(self.filepath):
                    with open(self.filepath, 'a') as f:
                        w = csv.DictWriter(f, variables.keys())
                        w.writerow(variables)
                else:
                    with open(self.filepath, 'a') as f:
                        w = csv.DictWriter(f, variables.keys())
                        w.writeheader()
                        w.writerow(variables)
                        
#                 with open(self.filepath, 'a') as f:
#                     line = [str(variables[key]) + "," for key in sorted(variables.keys())]
#                     f.write(str(line).replace(',','').replace('[','').replace(']','').strip())
#                     f.write('\n')

        finally:
        # Free up resources.
            del self.episode_start[episode]
            del self.observations[episode]
            del self.rewards[episode]
            del self.actions[episode]
            del self.metrics[episode]

    def on_step_end(self, step, logs):
        """ Update statistics of episode after each step """
        episode = logs['episode']
        self.observations[episode].append(logs['observation'])
        self.rewards[episode].append(logs['reward'])
        self.actions[episode].append(logs['action'])
        self.metrics[episode].append(logs['metrics'])
        self.step += 1


NameError: name 'Callback' is not defined

In [8]:
"""
新增Callback

於Training計算自定義的評估函數
"""
class TestLogger(Callback):
    """ Logger Class for Test """
    
    def on_train_begin(self, logs):
        """ Print logs at beginning of training"""
#         print('Testing for {} episodes ...'.format(self.params['nb_episodes']))

    def on_episode_end(self, episode, logs):
        """ Print logs at end of each episode """
        template = 'Episode {episode}: reward_mean: {reward_mean:.3f}, accumulate_return: {accumulate_return:.5f}, benchmark_accumulate_return: {benchmark_accumulate_return:.5f}, nb_steps: {nb_steps}'
        variables = {
            'episode': episode + 1,
            'reward_mean': float(logs['episode_reward'])/logs['nb_steps'],
            'month_return' : accumulate_return_to_month(logs['nb_steps'],logs['accumulate_return']),
            'accumulate_return' : logs['accumulate_return'],
            'benchmark_accumulate_return' : logs['benchmark_accumulate_return'],
            'benchmark_month_return' : accumulate_return_to_month(logs['nb_steps'],logs['benchmark_accumulate_return']),
            'nb_steps' : logs['nb_steps'],
        }
#         print(template.format(**variables))
class ToCsvLogger(Callback):
    """ Logger Class for Test """
    def __init__(self, filepath):
        self.filepath = filepath
    def on_episode_end(self, episode, logs):
        """ Print logs at end of each episode """
        template = 'Episode {episode}: reward_mean: {reward_mean:.3f}, month_return: {month_return:.5f}, benchmark_month_return: {benchmark_month_return:.5f}, nb_steps: {nb_steps}'
        variables = {
            'episode': episode + 1,
            'reward_mean': float(logs['episode_reward'])/logs['nb_steps'],
            'accumulate_return' : logs['accumulate_return'],
            'index_nm' : logs['index_nm'],
            'benchmark_accumulate_return' : logs['benchmark_accumulate_return'],
            'month_return' : accumulate_return_to_month(logs['nb_steps'],logs['accumulate_return']),
            'benchmark_month_return' : accumulate_return_to_month(logs['nb_steps'],logs['benchmark_accumulate_return']),
            'nb_steps' : logs['nb_steps'],
        }
        if self.filepath != None:
            if os.path.isfile(self.filepath):
                with open(self.filepath, 'a') as f:
                    w = csv.DictWriter(f, variables.keys())
                    w.writerow(variables)
            else:
                with open(self.filepath, 'a') as f:
                    w = csv.DictWriter(f, variables.keys())
                    w.writeheader()
                    w.writerow(variables)

In [9]:
"""
繼承Env
"""

import matplotlib as mpl
import matplotlib.pyplot as plt
import numpy as np
from tgym.core import Env, DataGenerator 
import pandas as pd
from sklearn.preprocessing import StandardScaler
import random

mpl.rcParams.update(
    {
        "font.size": 15,
        "axes.labelsize": 15,
        "lines.linewidth": 1,
        "lines.markersize": 8
    }
)
class AbstractEnv(Env):
    
    _actions = {
        'hold': 0,
        'buy': 1,
        'sell': 2
    }
    
    def __init__(self, params):
        self._params=params
        assert self._params['history_length'] > 0
        self._params['price_history'] = []
        self._params['accumulate_return_list'] = []
        self._params['data_index'] =-1
        self._params['rolling_start_len'] = -1
        
    def reset(self):
        self._params['cash']=self._params['cash_start']
        self._params['total_cash']=0
        self._params['data_index'] +=1
        self._params['rolling_start_len'] += 1
        self._params['data_index'] %=len(self._params['data_list'])
        self._params['data']=self._params['data_list'][self._params['data_index']]
        if self._params.get('StandardScaler_list'):
            self._params['SS']=self._params['StandardScaler_list'][self._params['data_index']]
        else:
            SS=StandardScaler()
            SS.fit(self._params['data'].loc[:,self._params['nn_input_col']])
            self._params['SS']=SS
        self._params['benchmark_position'] = 0
        self._params['benchmark_history_trade'] = []
        self._params['first_buy_price'] = 0
        self._params['position'] = 0
        self._params['sell_price'] = 0
        self._params['buy_price'] = 0
        self._params['action'] = self._actions['buy']
        self._params['history_action'] = []
        self._params['history_trade'] = []
        self._params['accumulate_return_list'] = []
        self._params['reward_list'] = []
        self._params['hold_times'] = 0
        self._params['accumulate_return'] = 0
        self._params['total_buy_price'] = 0
        self._params['benchmark_total_cash'] = 0
        self._params['iteration'] = self._params['history_length']
        self._params['trade_limit'] = len(self._params['data'])
        self._params['hold_sell_list'] = []
        self._params['benchmark_hold_sell_list'] = []
        self._params['over_buy_price_list'] = []
        self._params['benchmark_over_buy_price_list'] = []
        self._params['enter_times'] = 0
#         if self._params['train_lab'] :
#             if self._params['reward_step'] :
#                 while self._params['trade_limit'] + max(self._params['reward_step'])  >= len(self._params['data']):
#                     self._params['iteration'] = self._params['history_length'] + random.randrange(int(len(self._params['data'])))
#                     self._params['trade_limit'] = self._params['iteration'] + random.randrange(24,len(self._params['data']))
#             else:
#                 while self._params['trade_limit'] >= len(self._params['data']) :
#                     self._params['iteration'] = self._params['history_length'] + random.randrange(int(len(self._params['data'])))
#                     self._params['trade_limit'] = self._params['iteration'] + random.randrange(24,len(self._params['data']))
        self._params['iteration'] += self._params['rolling_start_len']/len(self._params['data_list'])
        if self.stop_condition()[0]:
            self._params['rolling_start_len'] = 0
            self._params['iteration'] = self._params['history_length']
        self._params['trade_limit'] = len(self._params['data'])
        self._params['history_data'] = self.gen_data ( self._params['data'], self._params['iteration'] )
        self._params['price_history'] = self._params['history_data'][self._params['price_col']].tolist()     
        self._params['benchmark_buy_price'] = self._params['price_history'][-1]
        self._params['observation'] = self._get_observation()
        return self._params['observation'][0]
      
    def step(self, action):
        """Take an action (buy/sell/hold) and computes the immediate reward.
        Args:
            action (numpy.array): Action to be taken, one-hot encoded.
        Returns:
            tuple:
                - observation (numpy.array): Agent's observation of the current environment.
                - reward (float) : Amount of reward returned after previous action.
                - done (bool): Whether the episode has ended, in which case further step() calls will return undefined results.
                - info (dict): Contains auxiliary diagnostic information (helpful for debugging, and sometimes learning).
        """
        pass
    
    def stop_condition(self):
        info = {}
        if self._params['iteration'] >= self._params['episode_length']:
            done = True
            info['status'] = 'Time out.' 
        elif self._params['reward_step'] :
            if self._params['iteration'] + max(self._params['reward_step'])  >= len(self._params['data']):
                done = True
                info['status'] = 'No reward data.'
            else :
                done = False
        elif self._params['iteration'] >= len(self._params['data']):
            done = True
            info['status'] = 'No more data.' 
        elif (self._params['train_lab']) and (self._params['iteration'] >= self._params['trade_limit']):
            done = True
            info['status'] = 'Stop trading.' 
        else :
            done = False
        return done, info
    
    def done_true_to_do(self):
               
        self._params['volatility'] = self.volatility(self._params['hold_sell_list'])
        self._params['benchmark_volatility'] = self.volatility(self._params['benchmark_hold_sell_list'])
        self._params['lower_buy_price_num'] = np.sum([i == -1 for i in self._params['over_buy_price_list']])
        self._params['over_buy_price_num'] = np.sum([i == 1 for i in self._params['over_buy_price_list']])
        self._params['benchmark_lower_buy_price_num'] = np.sum([i == -1 for i in self._params['benchmark_over_buy_price_list']])
        self._params['benchmark_over_buy_price_num'] = np.sum([i == 1 for i in self._params['benchmark_over_buy_price_list']])
        
        try:
            self._params['over_buy_price_prob'] = float(self._params['over_buy_price_num'])/\
            (self._params['lower_buy_price_num']+self._params['over_buy_price_num'])
        except ZeroDivisionError:
            self._params['over_buy_price_prob'] = 0
            
        try:
            self._params['benchmark_over_buy_price_prob'] = float(self._params['benchmark_over_buy_price_num'])/\
            (self._params['benchmark_lower_buy_price_num']+self._params['benchmark_over_buy_price_num'])     
        except ZeroDivisionError:
            self._params['benchmark_over_buy_price_prob'] = 0   
        
        try:
            self._params['sharpe'] = self._params['accumulate_return']/self._params['volatility']
        except ZeroDivisionError:
            self._params['sharpe'] = 0  
            
        try:
            self._params['benchmark_sharpe'] = self._params['benchmark_accumulate_return']/self._params['benchmark_volatility']
        except ZeroDivisionError:
            self._params['benchmark_sharpe'] = 0   
    
    def volatility(self,hold_sell_list):

        return_list = [(hold_sell_list[i]-hold_sell_list[i-1])/float(hold_sell_list[i-1]) \
                        if (hold_sell_list[i-1]!=0) and (hold_sell_list[i]!=0)\
                        else 0  for i in range(1,len(hold_sell_list))]
        return np.std(return_list,ddof=1)
    
    def gen_volatility(self,hold_sell_list):
        hold_sell_return_list = [(hold_sell_list[i]-hold_sell_list[i-1])/float(hold_sell_list[i-1]) \
                                 if (hold_sell_list[i-1]!=0) and (hold_sell_list[i]!=0) else 0  \
                                 for i in range(1,len(hold_sell_list))]
        return np.std(hold_sell_return_list,ddof=1)
    
    def gen_enter_times(self):
        if (self._params['position'] < self._params['max_position']) and (self._params['action'] == self._actions['buy']):
            self._params['enter_times'] += 1
            
    def gen_hold_sell_list(self):
        if self._params['position'] < self._params['max_position']:
            if (self._params['action'] == self._actions['sell']) or (self._params['action'] == self._actions['hold']):
                self._params['hold_sell_list'].append(0)
            else:
                self._params['hold_sell_list'].append(self._params['price_history'][-2])
        else:
            self._params['hold_sell_list'].append(self._params['price_history'][-2])
        self._params['benchmark_hold_sell_list'].append(self._params['price_history'][-2])
                
    def gen_over_buy_price_list(self,over_buy_price_list,buy_price):
        #1:持有 ， -1:價錢比買價低 ， 0:沒有單位
        if self._params['position'] == 0:
            over_buy_price_list.append(0)
        else:
            if self._params['price_history'][-2] > buy_price:
                over_buy_price_list.append(1) 
            else :
                over_buy_price_list.append(-1) 
    def _get_observation(self):
        """Concatenate all necessary elements to create the observation.
        Returns:
            numpy.array: observation array. 
        """
        if self._params['data_nor']:
            ob_data = pd.DataFrame(self._params['SS'].transform(self._params['history_data'].loc[:,self._params['nn_input_col']]))
        else :
            ob_data = self._params['history_data'].loc[:,self._params['nn_input_col']]
        if self._params['position']>0 :
            position_lab = [1,-1]
        else :
            position_lab = [-1,1]
        ob = []
        for i in ob_data.as_matrix().T:
            ob += i.tolist()
        if self._params.get('trend_months'): 
            predict_months_price = []
            for i in self._params['trend_months']:
                #因為index從0開始的關係，所以需要-1
                predict_months_price = predict_months_price + [self._params['data'].loc[self._params['iteration'] + i - 1,self._params['reward_col']]]
            trend = self.trend(self._params['price_history'][-1], predict_months_price, self._params['threshold'], self._params['trend_noise'])
            state = ob + position_lab + \
            [self._params['history_data'].tail(1).loc[:,i].values[0] for i in self._params['no_time_var_nm']]  + trend

        else :
            state = ob + position_lab + \
            [self._params['history_data'].tail(1).loc[:,i].values[0] for i in self._params['no_time_var_nm']]
        future_months_price = []
        if self._params.get('reward_step') :
            for i in self._params['reward_step']:
                future_months_price = future_months_price + [self._params['data'].loc[self._params['iteration'] + i - 1, self._params['reward_col']]]
        else :
            future_months_price = []
        return [state,future_months_price]    
#     def _get_observation(self):
#         """Concatenate all necessary elements to create the observation.
#         Returns:
#             numpy.array: observation array. 
#         """
#         if self._params['data_nor']:
#             ob_data = pd.DataFrame(self._params['SS'].transform(self._params['history_data'].loc[:,self._params['nn_input_col']]))
#         else :
#             ob_data = self._params['history_data'].loc[:,self._params['nn_input_col']]
#         if self._params['position']>0 :
#             position_lab = 1
#         else :
#             position_lab = -1
#         ob = []
#         for i in ob_data.as_matrix().T:
#             ob += i.tolist()
#         state = ob + [position_lab] + \
#         [self._params['history_data'].tail(1).loc[:,i].values[0] for i in self._params['no_time_var_nm']]
#         future_months_price = []
#         if self._params.get('reward_step') :
#             for i in self._params['reward_step']:
#                 future_months_price = future_months_price + [self._params['data'].loc[self._params['iteration'] + i - 1, self._params['reward_col']]]
#         else :
#             future_months_price = []
#         return [state,future_months_price]
    
    def gen_data(self, data, time):
        key = data.columns.tolist()
        D = pd.DataFrame()
        for key_ in key :
            history_key = [] 
            for j in range(self._params['history_length'],0,-1): 
                history_key = history_key + [data[key_][time-j]]
            D[key_]=history_key
        return D
    
    def reward_function(self, return_list, gamma):
        if return_list:
            reward = 0
            for i in range(len(return_list)-1):
                reward += gamma**(i+1)*(1-gamma)*return_list[i]
            i += 1
            reward += gamma**(i+1)*return_list[i]
        else:
            reward=0
        return reward 