# SGD Linear Regression with numpy and numba

# Initialization

In [None]:
import xarray as xr
import xarray.ufuncs as xruf
import numpy as np
import numba as nb
import qnt.forward_looking as qnfl

from qnt.data import load_data, write_output
from qnt.stepper import test_strategy
from qnt.stats import calc_stat, print_correlation
from qnt.graph import make_plot, make_plot_double, make_plot_filled

# Data

In [None]:
data = load_data(
    min_date = "2014-01-01", 
    max_date = "2019-08-13", # you should not set max_date for final calculations
    dims=('time', 'field', 'asset'), 
    forward_order=True
)
data.coords

# Model

If we'll see an increasing price trend, then we decide that:
* the next n_days=20 days the price will increase
* the weight for the next n_days=20 days will be equated to the tilt sign

After n_days=20 days, let's review the previous k_days=100 days, 
build the regression again and update the weights.

Let's check for first of k_days=100 prices items. We will normalize it on first price and will rate prices trands with the SGD Regression model.

In [None]:
# this is SGD Regressor implemented with Numba
@nb.jitclass([
    ('slope', nb.float64),
    ('intercept', nb.float64),
    ('learning_rate', nb.float64),
    ('start_slope', nb.float64),
    ('start_intercept', nb.float64),
    ('max_iter', nb.float64),
])
class SGDRegressor(object):

    def __init__(self, max_iter):
        self.slope = 0
        self.intercept = 0
        
        self.learning_rate = 0.0001
        self.start_slope = 0
        self.start_intercept = 0
        self.max_iter = max_iter
    
    def fit(self, X_train, y_train):
        s_slope = self.start_slope
        s_intercept = self.start_intercept
        
        for i in range(self.max_iter):
            int_slope = 0
            int_intercept = 0
            n_pt = float(len(X_train))
            
            for i in range(len(X_train)):
                int_intercept += - (2/n_pt) * (y_train[i] - ((s_slope * X_train[i]) + s_intercept))
                int_slope += - (2/n_pt) * X_train[i] * (y_train[i] - ((s_slope * X_train[i]) + s_intercept))
            
            final_slope = s_slope - (self.learning_rate * int_slope)
            final_intercept = s_intercept - (self.learning_rate * int_intercept)
            s_slope = final_slope
            s_intercept = final_intercept
            
            self.slope = s_slope
            self.intercept = s_intercept


In [None]:
k_days = 100
n_days = 20

            
X = np.arange(k_days)
last_weights = xr.DataArray(np.zeros([len(data.asset)]), dims=['asset'], coords={'asset':data.asset})


# this is function for output calculation step by step
def step(data_slice):
    # we will recalculate model every n_days
    if (len(data_slice.time) - k_days) % n_days == 0:
        last_weights[:] = 0
        
        for asset in data_slice.asset.values:
            prices = data.loc[:, "close", asset]
            prices = prices.dropna('time') # rm holes from prices
            prices = prices[-k_days:] 
            
            if len(prices) == k_days:
                y_train = prices.values
                y_train = y_train / y_train[-1]
                
                model = SGDRegressor(max_iter=50)
                model.fit(X, y_train)
            
                last_weights.loc[asset] = np.sign(model.slope)
     
    is_liquid = data_slice[-1].loc['is_liquid'] > 0

    out = last_weights.loc[is_liquid]
    out = out / abs(out).sum('asset')
    
    return out

# Backtest

In [None]:
output = test_strategy(data, step=step)

## Stats and plots

In [None]:
stat = calc_stat(data, output, slippage_factor=0.05)
display(stat.to_pandas().tail())

In [None]:
make_plot_filled(stat.coords['time'].to_pandas(), stat.loc[:, 'equity'].values,  color="blue", name="PnL (Equity)", type="log")

In [None]:
make_plot_filled(stat.coords['time'].to_pandas(), stat.loc[:, 'underwater'].values, color="red", name="Underwater Chart", range_max= 0)

In [None]:
make_plot_filled(stat.coords['time'].to_pandas(), stat.loc[:, 'sharpe_ratio'].values[20:], color="purple", name="Rolling SR")

In [None]:
make_plot_filled(stat.coords['time'].to_pandas(), stat.loc[:, 'bias'].values, color="gray", name="Bias")

# Improvement

In [None]:
# Well, the sharpe ratio of this strategy is not enough...

stat.sel(field='sharpe_ratio').to_pandas().tail()

In [None]:
# Let's build the output only with "good" "short term" and "long term" Sharpe ratios.
# Sharpe ratio is "good" when its average more then 0.
#
# This is only example of a heuristic which can improve you strategy 
# using statistics per asset. 
# I believe that you can invent a new better way to do it =)

short_term = 43
long_term = short_term*3

stat_per_asset_short_term = calc_stat(data, output, max_periods=short_term, per_asset = True)
stat_per_asset_long_term = calc_stat(data, output, max_periods=long_term, per_asset = True)

avg_short_term_sr = stat_per_asset_short_term.sel(field='sharpe_ratio')\
    .rolling(time=short_term, min_periods=short_term*19//20)\
    .mean() # min periods allows to pass small holes in data
avg_long_term_sr = stat_per_asset_long_term.sel(field='sharpe_ratio')\
    .rolling(time=long_term, min_periods=long_term*19//20)\
    .mean()

output2 = output
output2 = output2.where(avg_short_term_sr > 0)
output2 = output2.where(avg_long_term_sr > 0)
output2 = output2 / abs(output2).sum('asset')

In [None]:
# print stats

stat2 = calc_stat(data, output2)

print("Old stats:")
print("-\n3y SR:")
print(stat.sel(field='sharpe_ratio').to_pandas().tail())

print("---")

print("New stats:")
print("-\n3y SR:")
print(stat2.sel(field='sharpe_ratio').to_pandas().tail())


# Checks

In [None]:
# Use the function from 'qnfl' ensures that no forward-looking
# is taking place.
def strategy():
    """
    Entire code for strategy output calculation.
    """
    #load data
    data = load_data(
        min_date = "2014-01-01", 
        # max_date = "2019-08-13", # you should not set max_date for final calculations
        dims=('time', 'field', 'asset'), 
        forward_order=True
    )
    
    #calc output
    global last_weights
    last_weights = xr.DataArray(np.zeros([len(data.asset)]), dims=['asset'], coords={'asset':data.asset})
    output1 = test_strategy(data, step=step)

    #improve output with statistic per asset
    short_term = 43
    long_term = short_term*3

    stat_per_asset_short_term = calc_stat(data, output1, max_periods=short_term, per_asset = True)
    stat_per_asset_long_term = calc_stat(data, output1, max_periods=long_term, per_asset = True)

    avg_short_term_sr = stat_per_asset_short_term.sel(field='sharpe_ratio')\
        .rolling(time=short_term, min_periods=short_term*19//20)\
        .mean() # min periods allows to pass small holes in data
    avg_long_term_sr = stat_per_asset_long_term.sel(field='sharpe_ratio')\
        .rolling(time=long_term, min_periods=long_term*19//20)\
        .mean()

    output2 = output1
    output2 = output2.where(avg_short_term_sr > 0)
    output2 = output2.where(avg_long_term_sr > 0)
    output2 = output2 / abs(output2).sum('asset')

    return output2

# This function runs strategy twice on the different periods: 
# the entire data and data the with a cropped last half year.
# After that this function compares outputs. 
# Overlapped outputs must be same.
output_final = qnfl.load_data_calc_output_and_check_forward_looking(strategy)

In [None]:
print_correlation(output_final, data)

# Submit

In [None]:
write_output(output_final)

Finally, you can submit only the necessary code excluding graphs and checks. The minimal code is:

In [None]:
import xarray as xr
import xarray.ufuncs as xruf
import numpy as np
import numba as nb
import qnt.forward_looking as qnfl

from qnt.data import load_data, write_output
from qnt.stepper import test_strategy
from qnt.stats import calc_stat

@nb.jitclass([
    ('slope', nb.float64),
    ('intercept', nb.float64),
    ('learning_rate', nb.float64),
    ('start_slope', nb.float64),
    ('start_intercept', nb.float64),
    ('max_iter', nb.float64),
])
class SGDRegressor(object):

    def __init__(self, max_iter):
        self.slope = 0
        self.intercept = 0
        
        self.learning_rate = 0.0001
        self.start_slope = 0
        self.start_intercept = 0
        self.max_iter = max_iter
    
    def fit(self, X_train, y_train):
        s_slope = self.start_slope
        s_intercept = self.start_intercept
        
        for i in range(self.max_iter):
            int_slope = 0
            int_intercept = 0
            n_pt = float(len(X_train))
            
            for i in range(len(X_train)):
                int_intercept += - (2/n_pt) * (y_train[i] - ((s_slope * X_train[i]) + s_intercept))
                int_slope += - (2/n_pt) * X_train[i] * (y_train[i] - ((s_slope * X_train[i]) + s_intercept))
            
            final_slope = s_slope - (self.learning_rate * int_slope)
            final_intercept = s_intercept - (self.learning_rate * int_intercept)
            s_slope = final_slope
            s_intercept = final_intercept
            
            self.slope = s_slope
            self.intercept = s_intercept

            
data = load_data(
    min_date = "2014-01-01", 
    # max_date = "2019-08-13", # you should not set max_date for final calculations
    dims=('time', 'field', 'asset'), 
    forward_order=True
)
            
            
k_days = 100
n_days = 20

            
X = np.arange(k_days)
last_weights = xr.DataArray(np.zeros([len(data.asset)]), dims=['asset'], coords={'asset':data.asset})


# this is function for output calculation step by step
def step(data_slice):
    # we will recalculate model every n_days
    if (len(data_slice.time) - k_days) % n_days == 0:
        last_weights[:] = 0
        
        for asset in data_slice.asset.values:
            prices = data.loc[:, "close", asset]
            prices = prices.dropna('time') # rm holes from prices
            prices = prices[-k_days:] 
            
            if len(prices) == k_days:
                y_train = prices.values
                y_train = y_train / y_train[-1]
                
                model = SGDRegressor(max_iter=50)
                model.fit(X, y_train)
            
                last_weights.loc[asset] = np.sign(model.slope)
     
    is_liquid = data_slice[-1].loc['is_liquid'] > 0

    out = last_weights.loc[is_liquid]
    out = out / abs(out).sum('asset')
    
    return out

output = test_strategy(data, step=step)

write_output(output_final)