In [4]:
#functions

def create_df(test,pred):
    
    df=pd.DataFrame({
                  'true':test,
                  'pred':pred, #predicted value, hopefully close to "true"
                  'log_ret':np.log(test/test.shift(1)), #today over yesterday return
                  'pred_ret': np.log(pred/test.shift(1)),
                 })
    
    df['cum_ret']=np.exp(df.log_ret.cumsum())
    
    df['pos']=np.sign(df.pred_ret).shift(0)
    daily_stg_log_ret=df.pos*df.log_ret.shift(0)
    df['stg_ret']=np.exp(daily_stg_log_ret.cumsum())
    #also equals #df['stg_ret']=np.exp((np.sign(df.pred_ret)*df.log_ret).cumsum())
    
    #shift the pred columns -1 to get the expected values for tomorrow
    #shift the pos by -1 to get the position to hold tomorrow = the actions to perform today
    
    return df

########################################################

def log_ret(prices):
    log_ret=np.log(prices/prices.shift(1))
    return log_ret

def cum_ret(prices):
    cum_ret=np.exp(log_ret(prices).cumsum())
    return cum_ret

def stg_ret(true_log_ret,pred_log_ret, short=True):
    
    cum_ret = np.exp(true_log_ret.cumsum())
    pos     = np.sign(pred_log_ret) #shorting allowed
    if short==False:
        pos     = pos.replace(-1,0)
    daily_stg_log_ret= pos * true_log_ret
    stg_ret =np.exp(daily_stg_log_ret.cumsum())
    
    return stg_ret
    
########################################################

def get_prices(ticker,start,end):
    data_raw=yf.Ticker(ticker).history(start=start, end=end).Close
    data_raw.index=pd.to_datetime(data_raw.index.date)
    return data_raw


def split_train_test(data_raw,test_start):
    train=data_raw.loc[:test_start].iloc[:-1]
    test= data_raw.loc[test_start:]
    return train,test


def make_xy(data,use_steps,steps_ahead):
        #batch_size=len(aapl)-use_steps-steps_ahead+1
        batch_size=data.shape[0]-use_steps
    
        X=np.zeros((batch_size,use_steps,1))
        y=np.zeros((batch_size,use_steps,steps_ahead))
        
        for i in range(batch_size):
            for step in range(use_steps):
                X_instance=data[i:i+use_steps]
               #y_instance=data[i+use_steps:i+use_steps+steps_ahead]
                y_instance=data[i+step+1:i+1+step+steps_ahead].reshape(-1)
            
                X[i,:,:]=X_instance
                y[i,step,:]=y_instance
                y_1d=y[:,-1,0]
        return X,y_1d


from sklearn.preprocessing import MinMaxScaler
scaler=MinMaxScaler()

def preprocess_train_test(train,test,use_steps,steps_ahead):
    train_sc=scaler.fit_transform(train.values.reshape(-1,1))
    test_sc =scaler.transform(test.values.reshape(-1,1))
        
    X_train, y_train =  make_xy(train_sc,use_steps,steps_ahead)
    X_test,  y_test  =  make_xy(test_sc,use_steps,steps_ahead)
    return X_train,y_train, X_test,y_test



########################################################
########################################################


def get_paths(name):
    fits_path = os.path.join('fits','{}'.format(name))
    cp_path = os.path.join(fits_path, 'checkpoints')
    sm_path = os.path.join(fits_path, 'saved_models')
    return fits_path, cp_path, sm_path

def check_make_dir(name):
    if not os.path.isdir('fits'):
        os.mkdir('fits')
        
    fits_path,cp_path,sm_path = get_paths(name)
    if not os.path.isdir(fits_path):
        os.mkdir(fits_path)
        os.mkdir(cp_path)
        os.mkdir(sm_path)
    return
        
def fit_save(name, X_train,y_train,X_test,y_test,model,
             #checkpoints, optimizer, 
             epochs, patience=0):
    check_make_dir(name)
    fits_path, cp_path, sm_path = get_paths(name)
    checkpoint_path  =os.path.join(cp_path,'{}_{}_cp.keras'.format(name,model.name))
    saved_model_path =os.path.join(sm_path,'{}_{}.keras'.format(name,model.name))

    early_stopping=keras.callbacks.EarlyStopping(monitor='val_loss',
                                                 patience=patience,
                                                 restore_best_weights=True)
   
    checkpoint = keras.callbacks.ModelCheckpoint(checkpoint_path, 
                    monitor="val_loss", mode="min", 
                    save_best_only=True, verbose=1)
    
    optimizer=keras.optimizers.RMSprop()
    model.compile(loss='mean_squared_error',optimizer=optimizer)
    result=model.fit(X_train,y_train,
                     validation_data=(X_test,y_test),
                     epochs=epochs,
                     callbacks=[
                         checkpoint,
                         early_stopping
                                ])

    model.load_weights(checkpoint_path)
    model.save(saved_model_path)
    
########################################################
########################################################

def create_pred(model,X_test):  ###references global variable (scaler)
    model_pred=model.predict(X_test)
    model_pred=scaler.inverse_transform(model_pred).reshape(-1)
    model_pred=pd.Series(model_pred,index=test.index[use_steps:])
    return model_pred



#Load saved models
def load_models(fit_name):
    sm_path=get_paths(fit_name)[2]
    files=os.listdir(sm_path)
    filenames=[filename for filename in files if filename[-6:]=='.keras']
    models=dict()
    for filename in filenames:
        model_path=os.path.join(sm_path,filename)
        models[filename[:-6]]=keras.saving.load_model(model_path)
    return models






def prices_from_logrets(preds,test_price):
    logrets = preds
    cumrets = np.exp(logrets.cumsum())  #cumulative returns are normalized prices
    cumrets = cumrets/cumrets.iloc[0,0] #make it start from 1 (we took returns before splitting train and test)
    prices  = test_price.iloc[0]*cumrets
   # prices.iloc[:,0] = test_price
    return prices



def get_rps(test_price,preds,kind):

    preds=preds.dropna() #gets rid of the first test_price data up to use_steps index (all plots below will start from the same index)
    
    if kind=='prices':
        prices = preds
        logrets = log_ret(prices)
        cumrets = cum_ret(prices)
        
    if kind=='returns':
        logrets = preds
        prices  = prices_from_logrets(preds,test_price)
        cumrets = np.exp(logrets.cumsum())
    
    strats  = cumrets.copy()
    
    #strats.iloc[:,0]=cumrets.iloc[:,0]/cumrets.iloc[use_steps,0] #normalized from start of trading actions; comment this line to start from test set start
                                                                  #depends if log_ret is dropping nan or not
    for col in strats.columns[1:]:
        strats[col]=stg_ret(logrets.iloc[:,0],logrets[col])

    return logrets,prices,strats





#print plots given data from rps function
def get_plots(rps_output):
    fig,ax=plt.subplots(nrows=3)
    plt.subplots_adjust(top = 2, bottom=0, left=0, right=2,hspace=0.25, wspace=0)
    for i,df in enumerate(rps_output):
        df.plot(ax=ax[i],lw=1, alpha=1)
    ax[2].axhline(y=1, c='r',lw=0.5,alpha=0.5)
    test_start_idx=rps_output[0].index[use_steps]
    #ax[2].axvline(x=test_start_idx, c='k', lw=0.5, alpha=0.5)
    #plt.fill_between(x=test_start_idx, y1=0,y2=1)
    ax[0].set_title('Returns')
    ax[1].set_title('Prices')
    ax[2].set_title('Strategies')
    
    print(rps_output[-1].iloc[-1,:])

In [None]:
##old

def make_preds(models,test,X_test): #argument is a dictionary modelname:model
    
    preds=dict()
    preds['stock']=test
    for modelname,model in models.items():
        pred=create_pred(model, X_test)
        preds[modelname]=pred
    return preds