# MA + TRIX + adjusting

This strategy uses parameters adjusting to reach the maximum performance.

In [None]:
# Import basic libraries for manipulating data.

# Please refer to xarray.pydata.org for xarray documentation.

# xarray works optimally with N-dimensional datasets in Python
# and is well suited for financial datasets with labels "time",
# "field" and "asset". xarray data structures can also be easily
# converted to pandas dataframes.

import xarray as xr
import xarray.ufuncs as xruf

import numpy as np
import pandas as pd

# Import quantnet libraries.

import qnt.data as qndata          # data loading and manipulation
import qnt.stats as qnstats        # key statistics
import qnt.graph as qngraph        # graphical tools
import qnt.forward_looking as qnfl # forward looking checking
import qnt.ta as qnta              # technical analysis indicators

# display function for fancy displaying:
from IPython.display import display
# lib for charts
import plotly.graph_objs as go

import math
import time

In [None]:
# Load all available data since given date.

# It is possible to set a max_date in the call in order to
# develop the system on a limited in-sample period and later
# test the system on unseen data after max_date.

# A submission will be accepted only if no max_date is set,
# as submissions will be evaluated on live data on a daily basis.

data = qndata.load_data(min_date="2014-01-01",
                        max_date="2019-08-13", # You should not limit max date for final calculation!
                        dims=("time", "field", "asset"),
                        forward_order=True
                       )

Let's define the strategy based on MA for one asset:

In [None]:
stock_name = 'NASDAQ:AAPL'

# select only 1 stock
stock = data.sel(asset=stock_name)#.dropna('time', 'all')

pd_time = stock.time.to_pandas()
close = stock.sel(field='close')
is_liquid = stock.sel(field='is_liquid') > 0

# chart with prices
price_fig = [
   go.Candlestick(
       x=stock.time.to_pandas(),
       open=stock.sel(field='open').values,
       high=stock.sel(field='high').values,
       low=stock.sel(field='low').values,
       close=stock.sel(field='close').values,
       name=stock_name
   )
]

#
# Strategy parameters (*)
#
ma_periods = 120
roc_periods = 40
trix_periods = 40

# calculate MA 
ma = qnta.wma(close, ma_periods) # you can use also SMA, EMA, etc.
# calcuate ROC
roc = qnta.roc(ma, roc_periods)
# trix
trix = qnta.trix(close, trix_periods)

# positive trend direction
positive_trend = roc > 0
# negtive trend direction
negative_trend = roc < 0

# charts with trend indicator

trend_fig = [
    go.Scatter(
        x = pd_time,
        y = ma,
        name='ma',
        line = dict(width=1,color='orange')
    ),
    go.Scatter(
        x = pd_time,
        y = ma.where(positive_trend),
        name='positive-trend',
        line = dict(width=3,color='green')
    ),
    go.Scatter(
        x = pd_time,
        y = ma.where(negative_trend),
        name='negative-trend',
        line = dict(width=3,color='red')
    ) 
]


# define signals
buy_signal = xruf.logical_and(trix > 0, positive_trend)
buy_stop_signal = xruf.logical_not(is_liquid)
buy_stop_signal = xruf.logical_or(buy_stop_signal, negative_trend)
buy_stop_signal = xruf.logical_or(buy_stop_signal, trix < 0)

sell_signal = xruf.logical_and(trix < 0, negative_trend)
sell_stop_signal = xruf.logical_not(is_liquid)
sell_stop_signal = xruf.logical_or(sell_stop_signal, positive_trend)
sell_stop_signal = xruf.logical_or(sell_stop_signal, trix > 0)

# calc positions 
position = close.copy(True)
position[:] = np.nan
position = xr.where(buy_signal, 1, position)
position = xr.where(sell_signal, -1, position)
position = xr.where(xruf.logical_and(buy_stop_signal, position.ffill('time') > 0), 0, position)
position = xr.where(xruf.logical_and(sell_stop_signal, position.ffill('time') < 0), 0, position)

position = position.ffill('time').fillna(0)

# calc real orders
real_buy = xruf.logical_and(position > 0, position.shift(time=1) <= 0)
real_sell = xruf.logical_and(position < 0, position.shift(time=1) >= 0)
real_stop = xruf.logical_and(position == 0, position.shift(time=1) != 0)

# plot orders chart
signals_fig=[
    go.Scatter(
        x=close.loc[real_buy].time.to_pandas(),
        y=close.loc[real_buy],
        mode="markers",
        hovertext='buy',
        name="buy",
        marker_size=9,
        marker_color='green'
    ),
    go.Scatter(
        x=close.loc[real_sell].time.to_pandas(),
        y=close.loc[real_sell],
        mode="markers",
        hovertext='sell',
        name="sell",
        marker_size=9,
        marker_color='red'
    ),
    go.Scatter(
        x=close.loc[real_stop].time.to_pandas(),
        y=close.loc[real_stop],
        mode="markers",
        hovertext='stop',
        name="stop",
        marker_size=9,
        marker_color='gray'
    ),
]

# draw chart
fig = go.Figure(data = price_fig + trend_fig + signals_fig)
fig.update_yaxes(fixedrange=False) # unlock vertical scrolling
fig.show()

# draw trix chart
fig = go.Figure(data = [
    go.Scatter(
        x=trix.time.to_pandas(),
        y=trix,
        hovertext='trix',
    )
])
fig.show()


In [None]:
# calc stats
position_with_asset = xr.concat([position], pd.Index([stock_name], name='asset'))
stock_with_asset = xr.concat([stock], pd.Index([stock_name], name='asset'))
stats = qnstats.calc_stat(stock_with_asset, position_with_asset)
display(stats.to_pandas().tail())

performance = stats.loc[:,"equity"]

# draw performance chart
fig = go.Figure(data = [
    go.Scatter(
        x=performance.time.to_pandas(),
        y=performance,
        hovertext='performance',
    )
])
fig.update_yaxes(fixedrange=False) # unlock vertical scrolling
fig.show()

If you try to replace 'NASDAQ:AAPL' with other assets, the strategy will not work. It means that this strategy does not work for all assets with these parameters (*).

Then we should find way to set suitable parameters for every asset. For example you can use one of optimization methods to find these parameters. 

This is the one of the simplest methods of descend, that we can use for suitable parameters finding.

In [None]:
def descent(
    cost_function, # cost function
    min_arg,       # min value of arg
    max_arg,       # max value of arg
    min_range,     # stop condition, current range should be greater than min_range 
    arg_fix_function = lambda a:a, # function that fix the argument, we need it if we need int arg, for example
    probe_count=5, # probes count 
    range_div=2,   # range divisor, the range on the next step will be 2 times less than on the current step
    debug=False    # flag for debug output 
): 
    cost_cache = dict()
    def cost(arg):
        arg = arg_fix_function(arg)
        if arg not in cost_cache:
            cost_cache[arg] = cost_function(arg)
        return cost_cache[arg] 
    
    rng = max_arg - min_arg
    rng_start = min_arg
    min_point = (max_arg + min_arg)/2
    
    while rng >= min_range:
        probes = [(rng_start + rng * i / (probe_count - 1)) for i in range(probe_count)]
        for p in probes:
            if cost(min_point) > cost(p):
                min_point = p
        rng = rng/range_div
        if min_point + rng/2 > max_arg:
            rng_start = max_arg - rng/2
        elif min_point - rng/2 < min_arg:
            rng_start = min_arg
        else:
            rng_start = min_point - rng/2
        if debug:    
            print("rng_start =",  rng_start, "rng =", rng, "cache_size =", len(cost_cache),
                  "min_point =", min_point, "cost =", cost(min_point))
            
    return arg_fix_function(min_point)

# This is example of usage, we will find 'a', when sin(a) is minimal (or near the minimal)
descent(lambda a: math.sin(a), -3, 3, 0.01, debug = True)

In [None]:
def calc_positions(stock, ma_periods, roc_periods, trix_periods):
    """ Calculates positions for given data(stock) and parameters """
    close = stock.sel(field='close')
    is_liquid = stock.sel(field='is_liquid') > 0
    
    ma = qnta.wma(close, ma_periods)
    roc = qnta.roc(ma, roc_periods)
    trix = qnta.trix(close, trix_periods)

    positive_trend = roc > 0 
    negative_trend = roc < 0 

    buy_signal = xruf.logical_and(trix > 0, positive_trend)
    buy_stop_signal = xruf.logical_not(is_liquid)
    buy_stop_signal = xruf.logical_or(buy_stop_signal, negative_trend)
    buy_stop_signal = xruf.logical_or(buy_stop_signal, trix < 0)

    sell_signal = xruf.logical_and(trix < 0, negative_trend)
    sell_stop_signal = xruf.logical_not(is_liquid)
    sell_stop_signal = xruf.logical_or(sell_stop_signal, positive_trend)
    sell_stop_signal = xruf.logical_or(sell_stop_signal, trix > 0)

    # calc positions 
    position = close.copy(True)
    position[:] = np.nan
    position = xr.where(buy_signal, 1, position)
    position = xr.where(sell_signal, -1, position)
    position = xr.where(xruf.logical_and(buy_stop_signal, position.ffill('time') > 0), 0, position)
    position = xr.where(xruf.logical_and(sell_stop_signal, position.ffill('time') < 0), 0, position)

    position = position.ffill('time').fillna(0)

    return position


def calc_sr(stock, position):
    """ Calculates sharpe ratio for given data and position """
    position_with_asset = xr.concat([position], pd.Index(["a"], name='asset'))
    data_with_asset = xr.concat([stock], pd.Index(["a"], name='asset'))
    stats = qnstats.calc_stat(data_with_asset, position_with_asset, max_periods=len(stock.time))
    return stats.sel(field='sharpe_ratio').isel(time=-1).values.item()


def cost_function(stock, ma_periods, roc_periods, trix_periods):
    """ Cost function for optimization task (minimization) """
    pos = calc_positions(stock, ma_periods, roc_periods, trix_periods)
    sr = calc_sr(stock, pos)
    return -sr

In [None]:
stock_name = "NASDAQ:AAPL"
stock_data = data.sel(asset=stock_name)
stock_data_insmpl = stock_data.isel(time=slice(-9*252, -3*252)) # in-sample  for adjusting

# Initial parameters, we will optimize them
ma_periods = 120
roc_periods = 40
trix_periods = 40

print("optimize trix_periods")
cost_f = lambda trix_periods: cost_function(stock_data_insmpl, ma_periods, roc_periods, trix_periods) 
trix_periods = descent(cost_f, 40-32, 40+32, 4, lambda a: round(a), debug=True)

print("optimize roc_periods")
cost_f = lambda roc_periods: cost_function(stock_data_insmpl, ma_periods, roc_periods, trix_periods) 
roc_periods = descent(cost_f, 40-32, 40+32, 4, lambda a: round(a), debug=True)

print("optimize ma_periods")
cost_f = lambda ma_periods: cost_function(stock_data_insmpl, ma_periods, roc_periods, trix_periods) 
ma_periods = descent(cost_f, 120-64, 120+64, 4, lambda a: round(a), debug=True)

# out result SR, check using out-sample
calc_sr(stock_data, calc_positions(stock_data, ma_periods, roc_periods, trix_periods))

Parameters adjusting works, but works slow. The main problem is "sharpe_ratio" calculation. Well, let's implement fast function for SR calculation. It will be unexact, but much faster and still useful for the optimization task.

In [None]:
def fast_sr(data, positions):
    data = data.dropna('time', 'any')
    positions = xr.align(positions, data, join='right')[0]
    positions = positions.ffill('time').fillna(0)
    
    # normalization
    positions = positions.where(positions < 1, 1) 
    positions = positions.where(positions > -1, -1)

    slippage = qnta.atr(data.sel(field='high'), data.sel(field='low'), data.sel(field='close'), 14)
    slippage = slippage.bfill('time') * 0.05
    
    positions = positions.shift(time=1).fillna(0)
    prev_positions = positions.shift(time=1).fillna(0)
    
    open_prices = data.sel(field='open')
    prev_open_prices = open_prices.shift(time=1).bfill('time')
    
    divs = data.sel(field='divs')
    
    rr = prev_positions * (open_prices - prev_open_prices + divs) / prev_open_prices
    rr -= abs(positions - prev_positions) * slippage / open_prices
    
    volatility_annualized = rr.std().values * pow(252, 0.5)
    
    mean_return_annualized = math.exp(xr.ufuncs.log(rr + 1).mean().values*252) - 1
    
    return mean_return_annualized / volatility_annualized

# execution time test:

fake_positions = stock_data.sel(field='close')/stock_data.sel(field='close')

t1 = time.time()

fast = fast_sr(stock_data, fake_positions)

t2 = time.time()

slow = calc_sr(stock_data, fake_positions)

t3 = time.time()

print(fast, slow,  t2-t1, t3-t2)

The fast cost function:

In [None]:
def fast_cost_function(stock, ma_periods, roc_periods, sideways_threshold):
    position = calc_positions(stock, ma_periods, roc_periods, sideways_threshold)
    sr = fast_sr(stock, position)
    return -sr

In [None]:
stock_name = "NASDAQ:AAPL"
stock_data = data.sel(asset=stock_name)
stock_data_insmpl = stock_data.isel(time=slice(-6*252, -3*252)) # In sample period for adjusting

# Initial parameters, we will optimize them
ma_periods = 120
roc_periods = 40
trix_periods = 40

print("optimize trix_periods")
cost_f = lambda trix_periods: fast_cost_function(stock_data_insmpl, ma_periods, roc_periods, trix_periods) 
trix_periods = descent(cost_f, 40-32, 40+32, 4, lambda a: round(a), debug=True)

print("optimize roc_periods")
cost_f = lambda roc_periods: fast_cost_function(stock_data_insmpl, ma_periods, roc_periods, trix_periods) 
roc_periods = descent(cost_f, 40-32, 40+32, 4, lambda a: round(a), debug=True)

print("optimize ma_periods")
cost_f = lambda ma_periods: fast_cost_function(stock_data_insmpl, ma_periods, roc_periods, trix_periods) 
ma_periods = descent(cost_f, 120-64, 120+64, 4, lambda a: round(a), debug=True)

# out result SR, check using out-sample
calc_sr(stock_data, calc_positions(stock_data, ma_periods, roc_periods, trix_periods))

Now, the parameters adjusting works fast enough. Well, let's implement this adjusting model for multiple assets.

# Multiple assets strategy

We will use in-sample and out-sample to check our model.

In [None]:
in_smpl = 3*252
out_smpl = 252

We will trade only the most liquid assets:

In [None]:
def select_most_liq_data(data, in_smpl, out_smpl, top=200, top_period=252):
    liq = (data.sel(field='close') * data.sel(field='vol'))
    
    liq = liq[-(in_smpl+out_smpl):]
    if out_smpl > 0:
        liq = liq[:-out_smpl]
    
    liq = liq.rolling(time=top_period).mean()

    liq_ranks = (-liq).rank('asset')
    liq_asset = liq_ranks.where(liq_ranks <= top).dropna('asset','all').asset.values
    liq_data = data.sel(asset=liq_asset)
    return liq_data

liq_data = select_most_liq_data(data, in_smpl, out_smpl)
print(len(liq_data.asset))

In [None]:
np.seterr('ignore')

def adjust_all(
    in_smpl_data,  # data for adjusting. Also known as "in-sample" in ML. 
    in_smpl_length # length of "in-sample" 
):
    print("Adjusting...")
    params = dict()
    i = 0
    st = time.time()
    
    for stock_name in in_smpl_data.asset.values:
        i += 1
        if i % 10 == 0:
            dt = time.time() - st
            print(i, "/", len(in_smpl_data.asset), time.time() - st, dt/i * (len(in_smpl_data.asset) - i)) 
            # progress indicator
            
        stock_data = data.sel(asset=stock_name).dropna('time','any')
    
        if len(stock_data.time) < in_smpl_length: # skip short series
            continue
            
        stock_data = stock_data[-in_smpl_length:] # cut the head of data to prevent overfitting
        
        # Initial parameters, we will optimize them
        ma_periods = 120
        roc_periods = 40
        trix_periods = 40

        cost_f = lambda trix_periods: fast_cost_function(stock_data, ma_periods, roc_periods, trix_periods) 
        trix_periods = descent(cost_f, 40-32, 40+32, 4, lambda a: round(a), debug=False)

        cost_f = lambda roc_periods: fast_cost_function(stock_data, ma_periods, roc_periods, trix_periods) 
        roc_periods = descent(cost_f, 40-32, 40+32, 4, lambda a: round(a), debug=False)

        cost_f = lambda ma_periods: fast_cost_function(stock_data, ma_periods, roc_periods, trix_periods) 
        ma_periods = descent(cost_f, 120-64, 120+64, 4, lambda a: round(a), debug=False)

        params[stock_name] = {
            'ma_periods': ma_periods,
            'roc_periods': roc_periods,
            'trix_periods': trix_periods
        }
    print("Finish.")
    return params


params = adjust_all(
    liq_data[-out_smpl:],    # in-sample data
    in_smpl                  # in-sample data length
)

In [None]:
def calc_output_all(data, params):
    positions = data.sel(field='close').copy(True)
    positions[:] = np.nan
    
    for stock_name in params.keys(): 
        p = params[stock_name]
        stock_data = data.sel(asset=stock_name).dropna('time','any')
        p = calc_positions(stock_data, p['ma_periods'], p['roc_periods'], p['trix_periods'])
        positions.loc[{'asset':stock_name, 'time':p.time}] = p
    
    return positions

output = calc_output_all(liq_data, params)

In [None]:
# calc_output_all should avoid forward looking
# we will check it
output = qnfl.calc_output_and_check_forward_looking(data, lambda d: calc_output_all(d, params))

In [None]:
def print_stats_in_out_smpl(output):
    data_in = data[:-out_smpl]
    output_in = output[-(in_smpl+out_smpl):-out_smpl]
    stats_in = qnstats.calc_stat(data_in, output_in, max_periods=in_smpl)
    print("in-sample")
    display(stats_in.to_pandas().tail())

    data_out = data
    output_out = output[-out_smpl:]
    stats_out = qnstats.calc_stat(data_out, output_out, max_periods=out_smpl)
    print("out-sample")
    display(stats_out.to_pandas().tail())
    
print_stats_in_out_smpl(output)

In [None]:
def filter_output(output, top_period = 90, top_size = 20, top_step = 60):
    #
    # We will take 15 most performing assets
    # you can also apply optimization to these params
    #
    output_per_asset = output
    stats_per_asset = qnstats.calc_stat(data, output_per_asset, per_asset=True, max_periods=top_period)
    ranks = (-stats_per_asset.sel(field='sharpe_ratio')).rank('asset')

    top_output = output.copy(True)
    top_output[:] = 0

    for offset in range(top_period - 1, len(ranks), top_step):
        start_date = ranks.time[offset].values
        end_date = ranks.time[min(offset + top_step - 1, len(ranks.time) - 1)].values
        rank = ranks.loc[start_date]
        top = rank.where(rank <= top_size).dropna('asset').asset.values   
        #print(start_date, end_date, top)
        top_output.loc[start_date:end_date, top] = output.loc[start_date:end_date, top]

    # normalization
    top_output = top_output / top_output.sum('asset')
    top_stats = qnstats.calc_stat(data, top_output)

    return top_output

filtered_output = filter_output(output)

print_stats_in_out_smpl(filtered_output)

# Final calculations

In [None]:
data = qndata.load_data(min_date="2014-01-01",
                        # max_date="2019-08-13", # You should not limit max date for final calculation!
                        dims=("time", "field", "asset"),
                        forward_order=True
                       )

# select liquid data
liq_data = select_most_liq_data(data, 252, 0)

# adjust params
params = adjust_all(liq_data, 252*3)

# calc output
output = calc_output_all(data, params)

# filter output
output = filter_output(output)

## Statistics

In [None]:
# Calculate statistics on a rolling basis.

# Transactions are punished with slippage equal to a given
# fraction of ATR14 (read more about slippage in our full
# Strategy Buy and Hold template). We evaluate submissions
# using 5% of ATR14 for slippage.

# Mean return, volatility and Sharpe ratio are computed on a
# rolling basis using a lookback period of 3 years.

stat = qnstats.calc_stat(data, output, slippage_factor=0.05)

display(stat.to_pandas().tail())

In [None]:
def print_stat(stat):
    """Prints selected statistical key indicators:
       - the global Sharpe ratio of the strategy;
       - the global mean profit;
       - the global volatility;
       - the maximum drawdown.

       Note that Sharpe ratio, mean profit and volatility
       apply to  max simulation period, and not to the
       rolling basis of 3 years.
    """

    days = len(stat.coords["time"])
    
    returns = stat.loc[:, "relative_return"]

    equity = stat.loc[:, "equity"]
    
    sharpe_ratio = qnstats.calc_sharpe_ratio_annualized(
        returns,
        max_periods=days,
        min_periods=days).to_pandas().values[-1]

    profit = (qnstats.calc_mean_return_annualized(
        returns,
        max_periods=days,
        min_periods=days).to_pandas().values[-1])*100.0

    volatility = (qnstats.calc_volatility_annualized(
        returns,
        max_periods=days,
        min_periods=days).to_pandas().values[-1])*100.0

    max_ddown = (qnstats.calc_max_drawdown(
        qnstats.calc_underwater(equity)).to_pandas().values[-1])*100.0

    print("Sharpe Ratio         : ", "{0:.3f}".format(sharpe_ratio))
    print("Mean Return [%]      : ", "{0:.3f}".format(profit))
    print("Volatility [%]       : ", "{0:.3f}".format(volatility))
    print("Maximum Drawdown [%] : ", "{0:.3f}".format(-max_ddown))

print_stat(stat)

In [None]:
# show plot with profit and losses:
performance = stat.to_pandas()["equity"].iloc[(252*3):]
qngraph.make_plot_filled(performance.index, performance, name="PnL (Equity)", type="log")

In [None]:
# show underwater chart:
UWchart = stat.to_pandas()["underwater"].iloc[(252*3):]
qngraph.make_plot_filled(UWchart.index, UWchart, color="darkred", name="Underwater Chart", range_max=0)

In [None]:
# show rolling Sharpe ratio on a 3-year basis:
SRchart = stat.to_pandas()["sharpe_ratio"].iloc[(252*3):]
qngraph.make_plot_filled(SRchart.index, SRchart, color="#F442C5", name="Rolling SR")

In [None]:
# show bias chart:
biaschart = stat.to_pandas()["bias"].iloc[(252*3):]
qngraph.make_plot_filled(biaschart.index, biaschart, color="#5A6351", name="Bias Chart")

# Check correlation

In [None]:
# correlation check
# your strategy should not correlate with other strategies before submission
qnstats.print_correlation(output, data)

# Write output

In [None]:
# Finally, we write the last mandatory step for submission,
# namely writing output to file:

qndata.write_output(output)

At this stage code is ready for submission. Just click on the submission button in your account page and we will evaluate your strategy live on our servers!