# CloudEx Algo Main Script

#### Default Imports

In [None]:
import sys 
sys.path

new_paths = ['/usr/lib/python37.zip', '/usr/lib/python3.7', '/usr/lib/python3.7/lib-dynload', '/usr/local/lib/python3.7/dist-packages', '/usr/lib/python3/dist-packages', '/root/CloudExchange', '/root/']
for p in new_paths : 
    sys.path.append(p) 

In [None]:
# Import packages.
import datetime
import json
import os
import sys
import time

import pandas as pd
import numpy as np
import redis
from pandas.core.common import SettingWithCopyWarning

import warnings
warnings.simplefilter(action="ignore", category=SettingWithCopyWarning)

# CloudEx imports.
import cloud_ex

# Import AlgorithmicTrader helper class.
from algorithmic_trader_shay import AlgorithmicTrader
from algorithmic_trader_shay import summarize_historical_trades_df

# Start Redis and its Python API.
os.system("redis-server --daemonize yes")
time.sleep(1)

# Get CloudEX and VM-specific config. 
# NOTE: gateway_ip will be null when the exchange is not online 
def get_vm_config():
    with open("/root/vm_config.json", "r") as read_file:
        config = json.load(read_file)
    return config

config = get_vm_config()


# utilities
ORDER_FIELDS_LIST = [
    'Symbol', 'OrderID', 'CancelID', 'ClientID', 'OrderType', 'OrderAction',
    'SubmitTimestamp', 'GatewayTimestamp', 'EnqueueTimestamp',
    'DequeueTimestamp', 'OrderSerialNum', 'LimitPrice', 'ResultType','NumShares'
]

TRADE_FIELDS_LIST = [
    "Symbol", "BuyerSerialNum", "SellerSerialNum", "BuyerOrderID",
    "SellerOrderID", "BuyerClientID", "SellerClientID", "ExecPrice",
    "CashTraded", "SharesTraded", "CreationTimestamp", "ReleaseTimestamp",
    "TradeSerialNum"
]

'''
Takes in a cloud_ex.VectorOrder with serialized orders and returns a DataFrame
'''
def OrderDF(order_vec):
    if not len(order_vec):
        return pd.DataFrame(columns=ORDER_FIELDS_LIST)
    df = pd.DataFrame(order_vec).applymap(lambda x:x.SerializeOrder())[0].str.split('|', expand=True)
    df.columns = ORDER_FIELDS_LIST
    for label in ['SubmitTimestamp', 'GatewayTimestamp', 'EnqueueTimestamp',
                  'DequeueTimestamp', 'OrderSerialNum', 'LimitPrice','NumShares']:
        df.loc[:, label] = pd.to_numeric(df[label], errors='coerce')
    return df

'''
Takes in a cloud_ex.VectorOrder with serialized trades and returns a DataFrame
'''
def TradeDF(trade_vec):
    if not len(trade_vec):
        return pd.DataFrame(columns=TRADE_FIELDS_LIST)
    df = pd.DataFrame(trade_vec).applymap(lambda x:x.SerializeTrade())[0].str.split('|', expand=True)
    df.columns = TRADE_FIELDS_LIST
    for label in ["ExecPrice", "CashTraded", "SharesTraded",
                  "CreationTimestamp", "ReleaseTimestamp", "TradeSerialNum"]:
        df.loc[:, label] = pd.to_numeric(df[label], errors='coerce')
    return df

'''
Takes in a cloud_ex.MapStringOrder mapping Order ID strings to outstanding the coorresponding orders, 
and returns a DataFrame
'''
def OutstandingOrderDF(outstanding_orders):
    if not len(outstanding_orders):
        return pd.DataFrame(columns=ORDER_FIELDS_LIST)
    df = (pd.DataFrame(outstanding_orders.items())[1]).apply(lambda x:x.SerializeOrder()).str.split('|', expand=True)
    df.columns = ORDER_FIELDS_LIST
    for label in ['SubmitTimestamp', 'GatewayTimestamp', 'EnqueueTimestamp',
                  'DequeueTimestamp', 'OrderSerialNum', 'LimitPrice','NumShares']:
        df.loc[:, label] = pd.to_numeric(df[label], errors='coerce')
    return df 

In [None]:
config

#### Custom Imports

In [None]:
import importlib 
import sys 
#importlib.reload(sys.modules['mean_reversion_shay'])
def reload(r) : 
    importlib.reload(sys.modules[r])

In [None]:
import strategies_shay   
import threading
import utilities as u 
default_symbols = ['AA', 'AB', 'AC', 'AD', 'AE', 'AF', 'AG', 'AH', 'AI', 'AJ', 'AK', 'AL', 'AM', 'AN', 'AO', 'AP', 'AQ', 'AR', 'AS', 'AT', 'AU', 'AV', 'AW', 'AX', 'AY', 'AZ', 'BA', 'BB', 'BC', 'BD', 'BE', 'BF', 'BG', 'BH', 'BI', 'BJ', 'BK', 'BL', 'BM', 'BN', 'BO', 'BP', 'BQ', 'BR', 'BS', 'BT', 'BU', 'BV', 'BW', 'BX', 'BY', 'BZ', 'CA', 'CB', 'CC', 'CD', 'CE', 'CF', 'CG', 'CH', 'CI', 'CJ', 'CK', 'CL', 'CM', 'CN', 'CO', 'CP', 'CQ', 'CR', 'CS', 'CT', 'CU', 'CV', 'CW', 'CX', 'CY', 'CZ', 'DA', 'DB', 'DC', 'DD', 'DE', 'DF', 'DG', 'DH', 'DI', 'DJ', 'DK', 'DL', 'DM', 'DN', 'DO', 'DP', 'DQ', 'DR', 'DS', 'DT', 'DU', 'DV']
default_top_symbols = ['DV', 'CQ', 'DN', 'CL', 'CT', 'BF', 'CW', 'AW', 'DS', 'CS']

In [None]:
trader = None

#### Get the trader object

In [None]:
def getTrader() : 
    global trader
    if trader : 
        u.debug("Returning saved trader")
        return trader

    # Get relevant fields from VM-specific config. Token is yours only, so don't make it public.
    gateway_ip = config["gateway_ip"]
    client_id = config["client_id"]
    client_token = config["client_token"]

    # Clear any existing data locally.
    redis_api = redis.Redis()
    redis_api.flushall();

    # Create CloudEx base trader object.
    trader = cloud_ex.Trader(gateway_ip, client_id, client_token)
    return trader 

def getSymbols() : 
    trader = getTrader() 
    return trader.GetSymbols()


def getPortfolio(): 
    portfolio_mat = cloud_ex.MapStringInt()
    trader.GetPortfolioMatrix(portfolio_mat)
    return portfolio_mat

In [None]:
T = getTrader()

In [None]:
config['gateway_ip']

#### Main Todos



### Selecting symbols to trade on based on aggregate volume over last n seconds  

We need to figure out which symbols to trade on prior to the initiation of trading. It seems natural to rank them by some metric and then take the top N of them, for example volume 

In [None]:
import math
import numpy as np
import matplotlib.pyplot as plt

def periodic_trading_decision(signal_y, timestep_lag = 10): 
    """
    Trade periodic signals by finding inflection points of price derivative over time.
    """
    buys, sells = [], []    
    dydx = np.diff(y)/np.diff(x)
    dydx = np.insert(dydx, 0, 0.0, axis=0)
    
    for idx in range(timestep_lag, len(signal_y)):
        sign_dy_current = math.copysign(1, dydx[idx])
        sign_dy_prev = math.copysign(1, dydx[idx - 1])
        
        if sign_dy_current > sign_dy_prev:
            buys.append((idx, signal_y[idx]))
        elif sign_dy_current < sign_dy_prev:
            sells.append((idx, signal_y[idx]))
        else:
            None

    return buys, sells

## (1) Define signal - clean (unnoised) periodic signal
time_steps = 1000
angular_frequency = 2 * np.pi * (5/time_steps) # 2 pi f -  5 periods over time_steps 
x = np.arange(time_steps)
y = 5 * np.sin(angular_frequency * x) + 100 #  A sin(\omega t) + \delta

## (2) Get trading decisions and plot
buys, sells = periodic_trading_decision(y)
plt.plot(x, y)

plt.scatter([buys[idx][0] for idx in range(len(buys))],
            [buys[idx][1] for idx in range(len(buys))], s=150,color='g', zorder=2)
plt.scatter([sells[idx][0] for idx in range(len(sells))],
            [sells[idx][1] for idx in range(len(sells))], s=150,color='r', zorder=2)

plt.xlabel('Timestep')
plt.ylabel('Price ($)')
plt.show()

In [None]:
def periodic_trading_decision_noised(signal_y, timestep_lag = 20, ma_length = 25): 
    """
    Trade periodic signals by finding inflection points of price derivative over time.
    """
    buys, sells = [], []
    dydx = np.diff(y)/np.diff(x)
    dydx = np.insert(dydx, 0, 0.0, axis=0)
    
    for idx in range(timestep_lag, len(signal_y) - timestep_lag):
        sign_dy_current = math.copysign(1, np.mean(dydx[(idx - ma_length):idx]))
        sign_dy_prev = math.copysign(1, np.mean(dydx[(idx - 2*ma_length):(idx - ma_length)]))
        
        if sign_dy_current > sign_dy_prev:
            buys.append((idx, signal_y[idx]))
        elif sign_dy_current < sign_dy_prev:
            sells.append((idx, signal_y[idx]))
        else:
            None

    return buys, sells

def smooth(y, box_pts = 10):
    box = np.ones(box_pts)/box_pts
    y_smooth = np.convolve(y, box, mode='same')
    return y_smooth

## Define signal - Add standard normal noise (~Gaussian(0,1)) to the periodic signal
time_steps = 1000
angular_frequency = 2 * np.pi * (5/time_steps) # 2 pi f -  5 periods over time_steps 
x = np.arange(time_steps)
y = 10 * np.sin(angular_frequency * x) + 100 + np.random.normal(0, 1, time_steps) #  A sin(\omega t) + \delta + \gamma
y_smooth = smooth(y, box_pts = 10)

## Get trading decisions and plot
buys, sells = periodic_trading_decision_noised(y_smooth)
plt.plot(x[10:-10], y_smooth[10:-10])

plt.xlabel('Timestep')
plt.ylabel('Price ($)')
plt.show()

In [None]:
buys, sells = periodic_trading_decision_noised(x, y_smooth)
plt.plot(x[10:-10], y_smooth[10:-10])

plt.scatter([buys[idx][0] for idx in range(len(buys))],
            [buys[idx][1] for idx in range(len(buys))], s=150,color='g', zorder=2)
plt.scatter([sells[idx][0] for idx in range(len(sells))],
            [sells[idx][1] for idx in range(len(sells))], s=150,color='r', zorder=2)

plt.xlabel('Timestep')
plt.ylabel('Price ($)')
plt.show()

In [None]:
import numpy as np

def aggregate_volume(symbol, seconds_in_past) : 
    """
    Gets the most recent 'seconds_in_past' historical data for the symbol and compute the total 
    'CashTraded' 
    """
    u.debug("Getting aggregate volume for {}".format(symbol))
    end_time_ms = int(time.time()*1e3)
    start_time_ms = end_time_ms - int(seconds_in_past*1e3)
    symbol_trades_vec = cloud_ex.VectorTrade()  
    cloud_ex.MarketDataAPI.PullTrades(config['project_id'], config['bigtable_id'], 
                                          config['table_name'], symbol, start_time_ms, 
                                          end_time_ms, symbol_trades_vec)
    sym_df = TradeDF(symbol_trades_vec)
    #print(sym_df)
    print("Returning volume for symbol:" + symbol)
    return np.sum(sym_df['CashTraded'])    

def periodicity_metric(symbol, seconds_in_past, normalize_with_dc = True): 
    """
    Find symbols with high degrees of periodicity.
    """
    u.debug("Getting periodicity of {}".format(symbol))
    end_time_ms = int(time.time()*1e3)
    start_time_ms = end_time_ms - int(seconds_in_past*1e3)
    symbol_trades_vec = cloud_ex.VectorTrade()  
    cloud_ex.MarketDataAPI.PullTrades(config['project_id'], config['bigtable_id'], 
                                          config['table_name'], symbol, start_time_ms, 
                                          end_time_ms, symbol_trades_vec)
    sym_df = TradeDF(symbol_trades_vec)
    sym_df.to_csv("periodicity_" + str(symbol) + "_" + str(time.time()) +".csv", sep=',')
    close_price_vector = sym_df['ExecPrice']

    ps = np.abs(np.fft.fft(close_price_vector))**2
    freqs = np.fft.fftfreq(len(close_price_vector), GLOBALS['BIN_INTERVAL_MS']/10**3)
    if normalize_with_dc:
        periodicity = np.max(ps[1:])/float(np.sum(ps))
    else:
        periodicity = np.max(ps[1:])/float(np.sum(ps[1:]))

    print(symbol, periodicity)
    return periodicity, freqs


def multithreaded_volume_request(syms,seconds_in_past) : 
    import concurrent.futures
    with concurrent.futures.ThreadPoolExecutor() as executor:
        futures = [] 
        for sym in syms : 
            print("Submitting thread for sym: " + sym )
            future = executor.submit(aggregate_volume, sym, seconds_in_past)
            futures.append(future)
        
    # now we get the results 
    values = [future.result() for future in futures ] 
    return values 
    
def get_volume_for_symbol_in_thread(symbol, seconds_in_past) : 
    # create the thread 
    t = threading.Thread(target=aggregate_volume,
                         args=(symbol,seconds_in_past))
    # start the thread 
    t.start() 
    # return it 
    return t 

def ranked_volume_symbols(symbols , seconds_in_past) : 
    
    ## -- 
    threads = [ get_volume_for_symbol_in_thread(symbol,seconds_in_past) for symbol in symbols  ] 
    
    for thread in threads : 
        thread.join() 
    
    ## -- 

def rank_symbols_by_volume(symbols , seconds_in_past) : 
    data = [ [symbol, aggregate_volume(symbol,seconds_in_past)  ] for  symbol in symbols ] 
    data.sort(key=lambda x:  x[1] , reverse=True )
    return data

def get_top_n_symbols_by_volume(symbols,seconds_in_past, n )  : 
    ranked = rank_symbols_by_volume(symbols,seconds_in_past) 
    syms = [ x[0] for x in ranked[0:n]] 
    return (syms , ranked)  

def rank_symbols_by_periodicity(symbols , seconds_in_past) : 
    data = [ [symbol, periodicity_metric(symbol, seconds_in_past)  ] for  symbol in symbols ] 
    data.sort(key=lambda x:  x[1][0], reverse=True )
    return data

def get_top_n_symbols_by_periodicity(symbols,seconds_in_past, n )  : 
    ranked = rank_symbols_by_periodicity(symbols, seconds_in_past) 
    syms = [ x[0] for x in ranked[0:n]] 
    return (syms , ranked)

In [None]:
def place_order(trader, symbol, price, num_shares, buy=True,limit=True):
        """
        Place an order.

        :param symbol: Symbol to buy or sell (str).
        :param price: Price at which to execute the order (int).
        :param num_shares: Number of shares to buy or sell (int).
        :param buy: Whether to buy or sell (bool).
        """
        returned_order_ = cloud_ex.Order()
    
        if limit : 
            type_ = cloud_ex.OrderType.limit 
        else : 
            type_ = cloud_ex.OrderType.market  

        action_ = cloud_ex.OrderAction.buy if buy else cloud_ex.OrderAction.sell

        # Submit order and wait
        result = trader.SubmitOrder(symbol, returned_order_, type_,
                                         action_, num_shares, int(price))
        if result != cloud_ex.OrderResult.in_sequencer:
            return None
        return returned_order_.order_id_

In [None]:
outstanding_orders()

In [None]:
periodicity_metric("CC",60*18,False)

#### Conclusion from above is that multithreaded market data request is not helpful 

In [None]:
top_symbols, symbol_ranks = get_top_n_symbols_by_volume(default_symbols, 60*3, 10)

In [None]:
aggregate_volume('AD', 60*4)

In [None]:
most_periodic, periodicity_ranks = get_top_n_symbols_by_periodicity(default_symbols, 24*60*60, 10)

In [None]:
most_periodic

In [None]:
periodicity_ranks

In [None]:
getSymbols()

In [None]:
top_symbols

In [None]:
symbol_ranks

### QUERY ORDER HISTORY

In [None]:
### Order Monitoring 
def outstanding_orders() : 
    outstanding_orders = cloud_ex.MapStringOrder()
    trader.GetOutstandingOrders(outstanding_orders)
    u.debug("You have {} outstanding orders.".format(len(outstanding_orders)))
    # Transform outstanding orders into a DataFrame
    outstanding_orders = OutstandingOrderDF(outstanding_orders)
    return outstanding_orders
    
def historical_orders() : 
    my_historical_orders = cloud_ex.VectorOrder()
    trader.GetAllHistoricalOrders(my_historical_orders)
    u.debug("You have submitted a total of {} order(s).".format(len(my_historical_orders))) 
    my_historical_orders_df = OrderDF(my_historical_orders)
    return my_historical_orders_df

def historical_trades() : 
    my_historical_trades = cloud_ex.VectorTrade()
    trader.GetAllHistoricalTrades(my_historical_trades)
    u.debug("You have made a total of {} trade(s).".format(len(my_historical_trades)))
    my_historical_trades_df = TradeDF(my_historical_trades)
    return my_historical_trades_df

In [None]:
outstanding_orders()

In [None]:
historical_trades()

# 1. Backtesting

Let's see how we can backtest our trading algorithms to get them ready for live trading. In the following cells we will download historical data and evaluate how well a mean reversion trader would have done.

In [None]:
GLOBALS = { 
    'NUM_SHARES' :  100, 
    'BIN_INTERVAL_MS' : 500,  #interval to bin the data with 
    'WAIT_INTERVAL_SECONDS' : 0.5, 
    'BACKTEST_LOOKBACK_PERIOD_SECONDS' : 6*60  , #amount of historical data to backtest on 
    'MAX_NUM_ORDERS' : 2*60*2 , #how long the algo will trade for, in # of bins 
}

## 1.1 Get our bank of strategies

### Note on strategy and strategy parameters terminology

A strategy is implemented with specific parameters. <b>The pair of the strategy and parameters will be called an "algo"</b>
The backtest logic will take a dictionary of string keys (identifiers) to a tuple of (strategy , params). This dictionary will be called "algo_bank" 

For example => 

In [None]:
from periodic_trader_vin import PeriodicTraderStrategy
algo_bank = {'periodic_trader': PeriodicTraderStrategy()}      

# Deploying the trading threads 


In [None]:
ACTIVE_THREADS = [] # global handle on the currently deployed threads 

In [None]:
def run_and_evaluate_algorithm(**kwargs) : 
    """
    Intended as target of new thread() 
    1. Launches the algo on the symbol and starts trading 
    2. should calculate ROI of the algo 
    4. Writes ROI and submitted order ids to disk 
    """
    name     = kwargs['name']
    strategy = kwargs['strategy']  
    strategy_parameters = kwargs['strategy_parameters'] 
    num_shares = kwargs['num_shares'] 
    max_num_orders = kwargs['max_num_orders'] 
    symbol   = kwargs['symbol']
    trader   = kwargs['trader'] #reference to the 1 trader instance connnected to cloudX
    print("RE: Running algo: {} - {}".format(name,symbol))

    # create the AlgorithmicTrader Object 
    algo = strategy(trader, [symbol], bin_interval_ms=GLOBALS['BIN_INTERVAL_MS']) 
    
    # get and set id for this trader  (for logging purposes)
    trader_id = name + "_" + str(time.time()).split(".")[0]

    # start and finish trading
    order_ids = algo.trade(symbol,num_shares,max_num_orders,GLOBALS['WAIT_INTERVAL_SECONDS'] ,trader_id=trader_id,**strategy_parameters)

    # write the order ids to a log file 
    u.logfile(trader_id + "_order_ids", json.dumps(order_ids) )


def run_algorithm_in_thread(kwargs) : 
    # create the thread 
    print("Running algo in thread: {} - {}".format(kwargs['name'],kwargs['symbol']))
    t = threading.Thread(target=run_and_evaluate_algorithm,
                         kwargs=kwargs)
    # start the thread 
    t.start() 
    # return it
    return t

def deploy_top_N_algorithms(trader, ranked_algos, N,  num_shares, max_num_orders) : 
    global ACTIVE_THREADS
    sublist = ranked_algos[0:N] 
    print("Deploying algos: ")
    print(sublist) 

    # potential fix for the multi-threading?? 
    active_symbols = [ x[1] for x in sublist ] 
    print("Setting trader active symbols to: {}".format(json.dumps(active_symbols)))
    trader.set_active_symbols( active_symbols )

    ts = []
    for to_deploy in sublist : 
        roi, symbol, algoname , _  =  to_deploy 
        strategy, strategy_parameters = algo_bank[algoname]  
        
        arguments = { 
            'name' : algoname, 
            'strategy' : strategy, 
            'strategy_parameters' : strategy_parameters, 
            'num_shares' : num_shares, 
            'max_num_orders' : max_num_orders,  
            'symbol' : symbol , 
            'trader' : trader ,
        }
        ts.append(run_algorithm_in_thread(arguments))
    ACTIVE_THREADS = ts 
    return ts 


def get_most_periodic(symbols):
    ranked_symbols = rank_symbols_by_periodicity(symbols , GLOBALS['BACKTEST_LOOKBACK_PERIOD_SECONDS'])
    ranked_list = []
    for idx in range(len(ranked_symbols)):
        ranked_list.append([ranked_symbol[idx][1], ranked_symbol[idx][0], 'periodic_trader', ['buy', 'sell']])

    return ranked_list


def trade_algorithmically(trader, algobank, symbols, N=5) : 
    from random import sample 
    while True : 
        sym_subset = sample(symbols, 25)
        u.logfile("tradeloop", "\nBacktesting on symbols =>")
        u.logfile("tradeloop", "\n{}\n".format(json.dumps(sym_subset)))

        u.logfile("tradeloop", "\n{}, Doing backtest".format(time.time()))
        ranked = get_most_periodic(sym_subset)
        
        fname = "backtest_results_{}".format(time.time())
        u.logfile(fname, json.dumps(ranked))
        u.logfile("tradeloop","backtest_results=>")
        u.logfile("tradeloop", "\n{}\n".format(json.dumps(ranked[0:N])))
        u.logfile("tradeloop", "\n{}, Launching Trading".format(time.time()))
        
        algo_threads = deploy_top_N_algorithms(trader, ranked, N, GLOBALS['NUM_SHARES'] , GLOBALS['MAX_NUM_ORDERS']) 
        for t in algo_threads : 
            t.join()
    
def stop_trading_threads() : 
        print("There were {} threads running".format(len(threading.enumerate())))
        import os 
        import time
        os.environ['STOP_TRADING'] = "TRUE" 
        time.sleep(GLOBALS['WAIT_INTERVAL_SECONDS']+0.1)
        print("There are now {} threads running".format(len(threading.enumerate())))
        os.environ['STOP_TRADING'] = "FALSE" 

In [None]:
trade_algorithmically(getTrader(), algo_bank, default_symbols, N=1)