In [None]:
import numpy as np
import pandas as pd
import random
import matplotlib.pyplot as plt
import time
import datetime
import torch
from torch import optim
from torch import nn
from torch.distributions import MultivariateNormal
import tensorflow as tf
from tensorflow import keras
from tensorflow import nn
import tensorflow_probability as tfp
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import Dense
from tensorflow.python.keras.layers import deserialize, serialize
from tensorflow.python.keras.saving import saving_utils


from copy import deepcopy
import dask
from dask import delayed
from dask.distributed import Client
import pickle



In [None]:
# Save/Load Settings
load_model = None  # None to train a new model; filename to load and plot an old model

# RL Hyperparameters
LAMBDA_actor = 0.8
LAMBDA_critic = 0.8
lr_actor = 0.001
lr_critic = 0.001
P = 1
Xt_sigma_base = tf.Variable([1.0])
Xt_sigma_scale = tf.Variable([0.0])

# Iterations
MAX_ITER = 20000
BATCH_SIZE = 1
N_models = 1
N_simulations = 100

# Parallel settings
dask_scheduler = 'single-threaded'
N_workers = N_models
if dask_scheduler == 'local':
    client = Client(num_workers=N_workers)
elif dask_scheduler not in ['single-threaded', 'processes', 'threads']:
    client = Client(dask_scheduler)

# Diagnostic output settings
print_window = 100
movavg_window = 1000
window_startfrom = 100
verbose = 1  # (None = no printing; 1 = minimal printing; 2 = full printing)

# Model Parameters
S = 2
K = 2
gamma = 0.5
rho = 0.05
rf = 1/(1-rho)-1
R = 0.1
phi = tf.Variable([[0.1, 0], [0, 0.4]])
sigma = tf.eye(S)
omega = tf.eye(S)
B = tf.eye(S)
w = 0.5
Atilde = 2*tf.eye(S)
A = w*Atilde
C = (1-w)*Atilde
X0 = tf.zeros([S,1])
f0 = tf.ones([S,1])
D0 = tf.zeros([S,1])

In [None]:
class stockEnv():
    """
    An environment for agents to trade on assets

    """
    def __init__(self):
        '''
        Pass in the dataset to self.df
        '''
        self.t = 0
        self.X_last = X0
        self.ft = f0
        self.Dt = D0

    def reset(self):
        '''
        Reset env for new episode and return H0
        '''
        self.t = 0
        self.X_last = X0
        self.ft = f0
        self.Dt = D0
        return self.X_last, self.ft, self.Dt

    def step(self, Xt, random=True):
        X_last = self.X_last
        self.X_last = Xt
        TR = TC(Xt, X_last) + (1-rho)*U(Xt, X_last, self.ft, self.Dt)
        if random:
            self.ft = (tf.eye(S) - phi) @ tf.reshape(self.ft, [2, 1]) + tf.reshape(
                tfp.distributions.MultivariateNormalFullCovariance(loc=tf.Variable([0.0, 0.0]),
                                                                   covariance_matrix=omega).sample(), [2, 1])
        else:
            self.ft = (tf.eye(S) - phi) @ self.ft
        self.Dt = (1 - R) * (self.Dt + C @ tf.reshape((Xt-X_last),[2,1]))
        self.t += 1
        return self.X_last, self.ft, self.Dt, TR


class Critic(keras.Model):
    '''
    Only consider SGD cases now. Batch with memory buffer will be considerd in future.
    In dim: N*H  (St)
    Out dim: scalar (value function)
    '''

    def __init__(self):
        super(Critic, self).__init__()

        self.L1 = tf.keras.layers.Dense(20, input_shape=(2*S+K,1), activation=None)
        self.A1 = tf.keras.layers.ReLU()
        self.L2 = tf.keras.layers.Dense(1, activation=None)

    def forward(self, X_last, ft, Dt):
        #ft = tf.reshape(ft,[-1])

        x = tf.concat([X_last,ft, Dt], 0)
        y = self.L2(self.A1(self.L1(tf.transpose(x))))
        return y



class Actor(keras.Model):
    '''
    Only consider SGD cases now. Batch with memory buffer will be considerd in future.
    In dim: N*H  (St)
    Out dim: N*1 (mu)
    '''

    def __init__(self):

        super(Actor, self).__init__()
        self.L1 = tf.keras.layers.Dense(S, input_shape=(2*S+K,1), activation=None)
        # self.A1 = tf.keras.layers.ReLU()
        # self.L2 = tf.keras.layers.Dense(S, input_shape=(100,), activation=None)

    def forward(self, X_last, ft, Dt):
        #ft = tf.reshape(ft, [-1])

        x=tf.concat([X_last,ft, Dt],0)
        y = self.L1(tf.transpose(x))
        return tf.transpose(y)

        # return self.L2(self.A1(self.L1(torch.cat((Xt_1, ft, Dt)))))

class ACE:

    def __init__(self, new_env=stockEnv()):
        self.env = new_env

        self.actor_lambda = LAMBDA_actor
        self.actor_eligibilities = []

        self.critic_lambda = LAMBDA_critic
        self.critic_eligibilities = []
        self.needreset = True

        self.env.reset()

    def update_grads_with_eligibility(self, is_critic, delta,grads, P, need_reset):
    # Important tip: all list operations will pass by ref.
    # So changes to eligibilities will also affect self.critic_eligibilities/self.actor_eligibilities
        if is_critic:
            params = list(self.critic.trainable_variables)
            lamb = self.critic_lambda
            eligibilities = self.critic_eligibilities
            P = 1
        else:
            params = list(self.actor.trainable_variables)
            lamb = self.actor_lambda
            eligibilities = self.actor_eligibilities


    # Reset and populate by zeros

        if self.needreset:
            eligibilities.clear()
            for i, p in enumerate(grads):
                eligibilities.append(tf.zeros_like(grads[i]))
    # update eligibility traces and nn.module weight grad
        for i, p in enumerate(grads):

             eligibilities[i] = ((1-rho) * lamb * eligibilities[i]) + (P * grads[i])
             grads[i] = tf.squeeze(delta) * eligibilities[i]
        return grads

    def step(self, copied_actor, copied_critic, Xt_sigma_base, Xt_sigma_scale):
        self.actor = copied_actor
        self.critic = copied_critic

        # Action and current value
        with tf.GradientTape() as tape:
            tape.watch(self.critic.trainable_variables)
            tape.watch(self.actor.trainable_variables)

        Xt_mu = self.actor.forward(self.env.X_last, self.env.ft, self.env.Dt)  # Xt_mu
        value = self.critic.forward(self.env.X_last, self.env.ft, self.env.Dt)
        # value_pred = torch.cat((value_pred, value.reshape(1,1)), dim=0)
        Xt_sigma = Xt_sigma_base + Xt_sigma_scale * np.mean(np.absolute(
              tf.stop_gradient(Xt_mu)))
        # print(Xt_sigma)
        Xt = tf.stop_gradient(tf.random.normal([1], Xt_mu, Xt_sigma, tf.float32, seed=1))
        log_prob = tfp.distributions.MultivariateNormalFullCovariance(loc=tf.reshape(Xt_mu,[2,]),covariance_matrix=Xt_sigma ** 2 * tf.eye(Xt_mu.shape[0])).log_prob(tf.reshape(Xt,[2,]))
        X_last = self.env.X_last
        ft = self.env.ft
        Dt = self.env.Dt

        if np.mean(np.absolute(tf.stop_gradient(Xt_mu))) >20:
            print('stop')

        # Reward, state transition, and TD error
        _, _, _, TR = self.env.step(Xt)
        value_next = self.critic.forward(self.env.X_last, self.env.ft, self.env.Dt)
        delta = (TR + (1 - rho) * tf.stop_gradient(value_next) - tf.stop_gradient(value))
        with tf.GradientTape() as tape:

          l = -value

          grads = tape.gradient(l,self.critic.trainable_variables)
          critic_grads = self.update_grads_with_eligibility(is_critic=True, delta=delta, P=P, grads=grads, need_reset=self.needreset)
        with tf.GradientTape() as tape:
          l = -log_prob

          grads = tape.gradient(l, self.actor.trainable_variables)
          actor_grads = self.update_grads_with_eligibility(is_critic=False, delta=delta, P=P,grads=grads, need_reset=self.needreset)
          self.needreset = False

        return value, log_prob, TR, delta, critic_grads, actor_grads, Xt, Xt_mu, X_last, ft, Dt




In [None]:
class GlobalHead:

    def __init__(self, worker_id, random_seed):
        self.global_actor = Actor()
        self.global_critic = Critic()
        self.global_actor_optimizer = tf.keras.optimizers.Adam(learning_rate=lr_actor)
        self.global_critic_optimizer = tf.keras.optimizers.Adam(learning_rate=lr_critic)
        self.reward_history = []
        self.reward_movavg = []
        self.delta_history = []
        self.delta_movavg = []
        self.worker_id = worker_id
        self.random_seed = random_seed

    def train(self):
        with tf.GradientTape(persistent = True) as tape:
            tape.watch(self.global_critic.trainable_variables)
            tape.watch(self.global_actor.trainable_variables)
            t0 = time.time()
            tf.random.set_seed(self.random_seed)
            self.ACEs = [ACE(new_env=stockEnv()) for i in range(BATCH_SIZE)]
            w_U_env = deepcopy(self.ACEs[-1].env)
            iteration = 0

            while self.ACEs[0].env.t <= MAX_ITER:
                iteration += 1
                critic_grads_list = []
                actor_grads_list = []
                TR_list = []
                delta_list = []

                # Batches
                for ACE_i in self.ACEs:
                    value, log_prob, TR, delta, critic_grads, actor_grads, Xt, Xt_mu, X_last, ft, Dt = ACE_i.step(
                        copied_actor=deepcopy(self.global_actor), copied_critic=deepcopy(self.global_critic),
                        Xt_sigma_base=Xt_sigma_base, Xt_sigma_scale=Xt_sigma_scale)
                    critic_grads_list.append(critic_grads)
                    actor_grads_list.append(actor_grads)
                    TR_list.append(TR)
                    delta_list.append(delta)

                # update global critic
                c = tf.math.reduce_sum(self.global_critic.forward(w_U_env.X_last, w_U_env.ft, w_U_env.Dt)) * 0
                global_critic_grads = tape.gradient(c,self.global_critic.trainable_variables)
                for i, p in enumerate(list(global_critic_grads)):
                        #if not p.requires_grad:
                        # continue
                    for c_i in critic_grads_list:
                        global_critic_grads[i] += (c_i[i] / BATCH_SIZE)
                self.global_critic_optimizer.apply_gradients(zip(global_critic_grads,self.global_critic.trainable_variables))

                # update global actor
                c = tf.math.reduce_sum(self.global_actor.forward(w_U_env.X_last, w_U_env.ft, w_U_env.Dt)) * 0
                global_actor_grads = tape.gradient(c, self.global_actor.trainable_variables)
                for i, p in enumerate(list(global_actor_grads)):
                        #if not p.requires_grad:
                            #continue
                    for c_i in actor_grads_list:
                        global_actor_grads[i] += (c_i[i]/ BATCH_SIZE)
                self.global_actor_optimizer.apply_gradients(zip(global_actor_grads,self.global_actor.trainable_variables))

                # store diagnostic output (means within the batch)
                self.reward_history.append(np.mean(TR_list))
                self.delta_history.append(np.mean(delta_list))

                # Compute moving averages for printing
                window_start = max(iteration - movavg_window + 1, 0)
                self.reward_movavg.append(np.array(self.reward_history[window_start:]).mean())
                self.delta_movavg.append(np.array(self.delta_history[window_start:]).mean())

                # Diagnostic output
                if iteration % print_window == 0 and verbose:
                    time_elapsed = (time.time() - t0) / 60
                    print(f'Worker {self.worker_id} - Timestep: {iteration}')
                    print(f'Time elapsed: {time_elapsed:.3f} minutes')
                    print('Moving Avg Reward: ', self.reward_movavg[-1])
                    print('Moving Avg Delta: ', self.delta_movavg[-1])
                    if verbose == 2:
                        print('---')
                        print('X_last: ', X_last)
                        print('ft: ', ft)
                        print('Dt: ', Dt)
                        print('Action Xt: ', Xt)
                        print('Mu Xt: ', Xt_mu)
                        print('##############################################################')


def TC(Xt, X_last):
    deltaX = Xt - X_last
    return -0.5 * tf.transpose(deltaX) @ A @ deltaX + tf.transpose(X_last) @ C @ deltaX + 0.5 * tf.transpose(
       deltaX) @ C @ deltaX


def U(Xt, X_last, ft, Dt):
    deltaX = Xt - X_last
    return tf.transpose(Xt) @ (B @ ft - (R + rf) * (Dt + C @ deltaX)) - gamma / 2 * tf.transpose(Xt) @ sigma @ Xt



def init_weights(m):
    if type(m) == tf.keras.Model:
        tf.keras.initializers.HeUniform(m.weight)
        m.bias.data.fill_(0.00)


def simulate(models):
    # Plot training statistics for model average
    reward_movavg = np.mean([model.reward_movavg for model in models], axis=0)
    delta_movavg = np.mean([model.delta_movavg for model in models], axis=0)
    plt.figure(figsize=(8, 12))
    plt.subplot(3, 1, 1)
    plt.title("Reward (Moving Avg)")
    plt.plot(reward_movavg[window_startfrom:])
    plt.subplot(3, 1, 2)
    plt.title("Delta (Moving Avg)")
    plt.plot(delta_movavg[window_startfrom:])
    plt.subplot(3, 1, 3)
    plt.title("Delta Squared (Moving Avg)")
    plt.plot(np.array(delta_movavg[window_startfrom:]) ** 2)
    plt.savefig("./plots/Reward_Delta_" + str(datetime.datetime.now()).replace(':', '-') + ".png")
    # Simulate model average
    env = stockEnv()
    env.reset()
    Mt1s = []
    Xt1s = []
    Mt2s = []
    Xt2s = []
    while env.t <= N_simulations:
        Xt_all = [model.global_actor.forward(env.X_last, env.ft, env.Dt).T for model in models]
        Xt = tf.stack(Xt_all).mean(axis=0)
        Mt = (gamma*sigma).inverse() @ (B @ env.ft)
        Mt1s.append(Mt[0])
        Xt1s.append(Xt[0].item())
        Mt2s.append(Mt[1])
        Xt2s.append(Xt[1].item())
        env.step(Xt, random=False)
    Mt1s = np.array(Mt1s)
    Mt2s = np.array(Mt2s)
    Xt1s = np.array(Xt1s)
    Xt2s = np.array(Xt2s)
    # Plot simulation
    plt.figure()
    plt.quiver(Mt1s[:-1], Mt2s[:-1], Mt1s[1:] - Mt1s[:-1], Mt2s[1:] - Mt2s[:-1], scale_units='xy', angles='xy',
               scale=1, label='Markowitz_t')
    plt.quiver(Xt1s[:-1], Xt2s[:-1], Xt1s[1:] - Xt1s[:-1], Xt2s[1:] - Xt2s[:-1], scale_units='xy', angles='xy',
               scale=1, label='X_t', color='r')
    plt.legend()
    plt.savefig('./plots/sim_' + str(datetime.datetime.now()).replace(':', '-') + ".png")


def train_model(model):
    model.train()
    return model


def save(models):
    with open('/content/drive/My Drive/Plots/' + str(datetime.datetime.now()).replace(':', '-') + ".pkl", 'wb') as pickle_file:
        pickle.dump(models, pickle_file)


def load(filename):
    with open('/content/drive/My Drive/Plots/' + filename, 'rb') as pickle_file:
        return pickle.load(models, pickle_file)


if __name__ == '__main__':
    t0 = time.time()
    if not load_model:
        models = [GlobalHead(x, tf.random.uniform([1],0,1000000000000000000)) for x in range(N_models)]  # assign id number and random seed to each worker
        dask_collect = []
        for model in models:
            dask_collect.append(delayed(train_model)(model))
        if dask_scheduler in ['single-threaded', 'processes', 'threads']:
            result, = dask.compute(dask_collect, scheduler=dask_scheduler, n_workers=N_workers)
        else:
            result, = dask.compute(dask_collect)
       # save(result)
    else:
        result = load(load_model)
    simulate(result)
    print(f'FINISHED! Total time taken: {time.time() - t0:.3f} seconds')