In [None]:
from functions import *
from functions2 import *
from parameters import *

from sklearn.linear_model import Lasso
from sklearn.linear_model import lasso_path
import torch
import torch.nn.functional as F

from statsmodels.nonparametric.smoothers_lowess import lowess

from tqdm import tqdm
from IPython.display import display, HTML

In [None]:
LOG( "Data (data-frame)" )
filename = "raw/data_ml.csv"
LOG( f"  Reading {filename} [20 seconds]" )
d = pd.read_csv(filename)
d['date'] = pd.to_datetime( d['date'] )

predictors = list( signs.keys() )
target = 'R1M_Usd'

LOG( "Data (list of matrices)" )
LOG( "  Reading data/data_ml.pickle" )
dd = load( "data/data_ml.pickle" )

# Deep learning: lattice networks

In [None]:
class LinearPositive(torch.nn.Module):
    def __init__(self, n, k): 
        super(LinearPositive, self).__init__()
        self.layer = torch.nn.Linear(n,k)
    def forward(self,x):
        w = self.weight_actual()
        y = F.linear(x,w) + self.layer.bias
        return y
    def weight_actual(self):
        #return self.layer.weight
        #return torch.nn.Softplus()( self.layer.weight )
        return self.layer.weight.abs()
    
class Calibrator(torch.nn.Module):
    def __init__(self, k=5, xmin=-1, xmax=+1):
        assert k >= 2
        super(Calibrator, self).__init__()
        self.k = k
        self.xs = torch.Tensor( np.linspace(xmin, xmax, k) )
        ys = np.random.uniform(-1,1, size=k)
        ys[0] = xmin
        ys[1] = xmax
        ys = sorted(ys)
        ys = np.diff(ys)
        ys = inverse_softplus(ys)
        self.ys = torch.nn.Parameter( torch.Tensor( ys ) )
    def ys_actual(self):
        ys = self.ys
        ys = F.softplus(ys)
        ys = torch.cumsum(ys,0)
        y = torch.zeros(self.k)
        y[1:] = ys
        y = y - 1
        return y
    def interpolate(self, xs, ys, x_new ):
        ## k points, k-1 intervals between them, k+1 intervals if we include the infinite ones on the left and right
        k = len(xs)
        slopes = ( ys[1:] - ys[:-1] ) / ( xs[1:] - xs[:-1] )
        #print( slopes )
        current_slope = 0
        X = torch.zeros( x_new.shape )
        X = X + ys[0]
        for i in range(k-1): 
            X = X + F.relu( x_new - xs[i] ) * ( slopes[i] - current_slope )
            current_slope = slopes[i]
            #print( current_slope )
        X = X + F.relu( x_new - xs[k-1] ) * ( 0 - current_slope )
        return X
    def interpolate_test(self):
        """Check that my interpolation function works as it should"""
        xs = torch.Tensor( np.linspace(0,1,6) )
        ys = torch.Tensor( np.random.uniform( size=6 ) )
        x_new = torch.Tensor( np.linspace(-.1,1.1,100) )
        y_new = Calibrator().interpolate(xs, ys, x_new)    
        x_new = x_new.detach().numpy()
        y_new = y_new.detach().numpy()
        fig, ax = plt.subplots()
        ax.plot( x_new, y_new )
        ax.scatter( xs.detach().numpy(), ys.detach().numpy() )
        plt.show()
    def forward(self, x):
        return self.interpolate( self.xs, self.ys_actual(), x )    
    
def inverse_softplus(y):
    x = np.log( np.exp(y) - 1 )
    return np.where( y > 20, y, x )

### Positive weights + parametrized activation function

In [None]:
class Monotonic2(torch.nn.Module):
    def __init__(self,k):
        super(Monotonic2,self).__init__()
        self.fc1 = LinearPositive(k,16)
        self.fc2 = LinearPositive(16,4)
        self.fc3 = LinearPositive(4,1)
        self.f1 = Calibrator()
        self.f2 = Calibrator()
    def forward(self,xs):
        x, universe = xs
        # x is n×l×k; the linear layer is applied on the last dimension
        y = self.fc1(x); y = self.f1(y)
        y = self.fc2(y); y = self.f2(y)
        y = self.fc3(y)      # n×l×1
        p = torch.nn.Softplus()(y) + 1e-6  # Was: p=y.exp()
        p = p * universe
        p = p[:,:,0]         #  n×l
        p = p / ( 1e-16 + p.sum(axis=0) )  # portolio weights: positive, sum up to 1 for each date
        return p

In [None]:
LOG( "[LONG] 1h30min for 10,000 epochs; convergence issues" )

x, y, universe = get_data_3( date=DATE1, signs=signs, flip_signs=True )

universe = universe.reshape( y.shape[0], y.shape[1], 1 )
y = y.reshape( y.shape[0], y.shape[1], 1 )
x = torch.tensor(x, dtype=torch.float32)
y = torch.tensor(y, dtype=torch.float32)
universe = torch.tensor(universe, dtype=torch.float32)

model = Monotonic2(x.shape[2])

optimizer = torch.optim.Adam(model.parameters())
N = 10_000
IRs = np.nan * np.zeros(N)
pbar = tqdm(range(N))
for t in pbar:
    
    x.shape  # id×date×feature
    ## Take half the stocks at random
    i = np.random.choice( x.shape[0], x.shape[0] // 2, replace=False ) 
    ## Take a 3-year period, at random
    j = np.random.choice( x.shape[1] - 36 )
    j = np.arange( j, j+36 )
    
    w = model( (x[i,:,:][:,j,:], universe[i,:,:][:,j,:]) )
    ratio_returns = w * y[i,:,:][:,j,:][:,:,0].expm1()     # y already contains the forward returns
    ratio_returns = ratio_returns.sum(axis=0)
    log_returns = ratio_returns.log1p()
    IR = log_returns.mean() / log_returns.std()
    loss = -IR
    IRs[t] = IR.item()
    pbar.set_description( f"IR={np.nanmean(IRs):.3f}" )
    if not np.isfinite( loss.item() ):
        LOG( f"{t} PROBLEM" )
        break
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    
LOG( "DONE" )

## Add the final IR, on the whole sample
w = model( (x,universe) )
ratio_returns = w * y[:,:,0].expm1()     # y already contains the forward returns
ratio_returns = ratio_returns.sum(axis=0)
log_returns = ratio_returns.log1p()
IR = log_returns.mean() / log_returns.std()
IR = IR.item()

fig, ax = plt.subplots()
ax.scatter( 1+np.arange(len(IRs)), IRs )
r = lowess( IRs, 1+np.arange(len(IRs)) )
ax.plot( r[:,0], r[:,1], color = 'black', linewidth=5 )
ax.scatter( 1+len(IRs), IR, color = 'tab:orange', marker='x', s=200, linewidth=5)
ax.set_xlabel("Epoch")
ax.set_ylabel("IR")
ax.set_xscale('log')
fig.savefig("plots/model8_monotonic2_loss.pdf")
plt.show()

In [None]:
## Wealth curves
##
## This is not the strategy actually learned, but the quintile portfolios from the score.
## The strategy learned provided actual weights.
## 

x, y, universe = get_data_3(all=True, signs=signs, flip_signs=True)
universe = universe.reshape( y.shape[0], y.shape[1], 1 )
y = y.reshape( y.shape[0], y.shape[1], 1 )
x = torch.tensor(x, dtype=torch.float32)
y = torch.tensor(y, dtype=torch.float32)
universe = torch.tensor(universe, dtype=torch.float32)

signal = model( (x,universe) ).detach().numpy()

trailing_log_returns = LAG( np.log1p( dd[ 'R1M_Usd' ] ) )
y = trailing_log_returns.copy()
y.fillna(0, inplace=True)

assert signal.shape == y.shape
signal = pd.DataFrame( signal, index = y.index, columns = y.columns )

res = signal_backtest(signal, y, date=DATE1)

fig, ax = plt.subplots()
for i in range(6):
    ax.plot( res['dates'], res['prices'].iloc[i,:], color = quintile_colours[i] )
ax.axvline( pd.to_datetime(DATE1), color='black', linewidth=1 )
ax.set_yscale('log')
ax.set_title('Maximizing the IR (non-linear, signal)')
ax.text(0.02, .97, f"μ={100*res['out-of-sample'].iloc[5,:]['CAGR']:.1f}%",                  horizontalalignment='left', verticalalignment='top', transform = ax.transAxes)
ax.text(0.02, .90, f"σ={100*res['out-of-sample'].iloc[5,:]['Annualized Volatility']:.1f}%", horizontalalignment='left', verticalalignment='top', transform = ax.transAxes)
ax.text(0.02, .83, f"IR={res['out-of-sample'].iloc[5,:]['Information Ratio']:.2f}",         horizontalalignment='left', verticalalignment='top', transform = ax.transAxes)
fig.savefig("plots/model8_monotonic2_wealth.pdf")
plt.show()

res['out-of-sample']

In [None]:
# TODO: Check the activation functions learned