In [1]:
import numpy as np
import pandas as pd
import yfinance as yf
df=yf.download('NVDA','2021-01-01','2021-12-31')
df=df[['Adj Close']]

[*********************100%***********************]  1 of 1 completed


In [2]:
#Prepare purged test dataset to prevent data leakage.
def getTrainTimes(trainObs,testTimes):
    trn=trainObs.copy(deep=True)
    #trn.index = time when the observation started
    #trn.values = time when the observation ended
    #testTimes = times of testing observations
    for i,j in testTimes.items():
        df0=trn[(i<=trn.index)&(trn.index<=j)].index
        df1=trn[(i<=trn)&(trn<=j)].index
        df2=trn[(trn.index<=i)&(j<=trn.index)].index
        trn=trn.drop(df0.union(df1).union(df2))
    return trn

In [None]:
#Embargo on training observations
def getEmbargo(trn, pctEmbargo):
    step=int(trn.shape[0]*pctEmbargo)
    if step==0:
        mbrg=pd.Series(trn,index=trn)
    else:
        mbrg=pd.Series(trn[step:],index=trn[:-step])
        mbrg=mbrg.append(pd.Series(trn[-1],index=trn[-step:]))
    return mbrg

In [None]:
#Cross validation class when observations overlap
class PurgedKFold(_BaseKFold):
    def __init__(self,n_splits=3,t1=None,pctEmbargo=0.):
        if not isinstance(t1,pd.Series):
            raise ValueError('Label Through Dates must be a pandas series.')
        super(PurgedKFold,self).__init__(n_splits,shuffle=False,random_state=None)
        self.t1=t1 #data set
        self.pctEmbargo=pctEmbargo
    def split(self,X,y=None,groups=None):
        if (X.index==self.t1.index).sum()!=len(self.t1):
            raise ValueError('X and ThruDateValues must have the same index')
        indices=np.arange(X.shape[0])
        mbrg=int(X.shape[0]*self.pctEmbargo)
        test_starts=[(arr[0],arr[-1]+1) for arr in np.array_split(
        np.arange(X.shape[0]),self.n_splits)]
        for i,j in test_starts:
            t0=self.t1.index[i] #start of test set
            test_indices=indices[i:j]
            maxT1Idx=self.t1.index.searchsorted(
                self.t1[test_indices].max())
            train_indices=self.t1.index.searchsorted(
                self.t1[self.t1<=t0].index)
            train_indices=np.concatenate((train_indices,
                                          indices[maxT1Idx+mbrg:]))
            yield train_indices, test_indices

In [5]:
def cvScore(clf,X,y,sample_weight,scoring='neg_log_loss',t1=None,
           cv=None,cvGen=None,pctEmbargo=None):
    if scoring not in ['neg_log_loss','accuracy']:
        raise Exception('wrong scoring method.')
    from sklearn.metrics import log_loss,accuracy_score
    from clfSequential import PurgedKFold
    if cvGen is None:
        cvGen=PurgedKFold(n_splits=cv,t1=t1,pctEmbargo=pctEmbargo)
    score=[]
    for train,test in cvGen.split(X=X):
        fit=clf.fit(X=X.iloc[train,:],y=y.iloc[train],
                   sample_weight=sample_weight.iloc[train].values)
        if scoring=='neg_log_loss':
            prob=fit.predict_proba(X.iloc[test,:])
            score_=-log_loss(y.iloc[test],prob,
                            sample_weight=sample_weight.iloc[test].values,
                            labels=clf.classes_)
        else:
            pred=fit.predict(X.iloc[test,:])
            score_=accuracy_score(y.iloc[test],pred,
                                 sample_weight=sample_weight.iloc[test].values)
        score.append(score_)
    return np.array(score)