# Cross-Validation in Finance

### Loading Libraries

In [8]:
# Randomness
import random

# Numerical Computing
import numpy as np

# Data Manipulation
import pandas as pd
from pandas import Timestamp

# Data Visualization
import seaborn as sns
import matplotlib.pyplot as plt

import plotly.graph_objects as go
import plotly.io as pio
%matplotlib inline

# Date & Time
from datetime import datetime, timedelta

# Typing
from typing import Tuple, List, Dict, Union, Optional, Any, Generator

# Scikit-Learn
from sklearn.preprocessing import MinMaxScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.model_selection import KFold as _BaseKFold
from sklearn.metrics import accuracy_score, precision_score, recall_score, plot_roc_curve

# Scientific Statistical Python
from scipy.stats import jarque_bera

## Why K-Fold CV Fails in Finance

### A Solution: Purged K-Fold CV
#### Purging The Training Set

In [9]:
def get_train_times(t1: pd.Series, testTimes: pd.Series) -> pd.Series:
    train = t1.copy(deep=True)
    for start, end in testTimes.iteritems():
        df0 = train[(start <= train.index) & (train.index <= end)].index    # train starts within test
        df1 = train[(start <= train) & (train <= end)].index                # train ends within test
        df2 = train[(train.index <= start) & (end <= train)].index          # train envelops test
        train = train.drop(df0.union(df1).union(df2))
    return train

### Embargo
#### On Training Observations

In [10]:
def get_embargo_times(times: np.ndarray, pctEmbargo: float = 0.0) -> pd.Series:
    step = int(times.shape[0] * pctEmbargo)
    if step == 0:
        mbrg = pd.Series(times, index=times)
    else:
        mbrg = pd.Series(times[step:], index=times[:-step])
        mbrg = mbrg.append(pd.Series(times[-1], index=times[-step:]))
    return mbrg

### The Purged K-Fold Class
#### Cross-Validation Class When Obsertations Overlap

In [11]:
class PurgedKFold(_BaseKFold):
    def __init__(self, n_splits: int = 3, t1: Optional[pd.Series] = None, pctEmbargo: float = 0.0) -> None:
        if not isinstance(t1, pd.Series):
            raise ValueError('Label Through Dates must be a pd.Series')
        super(PurgedKFold, self).__init__(n_splits, shuffle=False, random_state=None)
        self.t1 = t1
        self.pctEmbargo = pctEmbargo
        
    def split(self, X: pd.DataFrame, y: Optional[pd.Series] = None, groups: Optional[np.ndarray] = None
             ) -> Generator[Tuple[np.ndarray, np.ndarray], None, None]:
        if (X.index == self.t1.index).sum() != len(self.t1):
            raise ValueError('X and ThruDateValues must have the same index')
        indices = np.arange(X.shape[0])
        mbrg = int(X.shape[0] * self.pctEmbargo)
        test_starts = [(i[0], i[-1] + 1) for i in np.array_split(np.arange(X.shape[0]), self.n_splits)]
        for i, j in test_starts:
            t0 = self.t1.index[i]    # start of test set
            test_indices = indices[i: j]
            maxT1Idx = self.t1.index.searchsorted(self.t1[test_indices].max())
            train_indices = self.t1.index.searchsorted(self.t1[self.t1 <= t0].index)
            if maxT1Idx < X.shape[0]:    # right train (with embargo)
                train_indices = np.concatenate((train_indices, indices[maxT1Idx + mbrg:]))
            yield train_indices, test_indices

### Bugs Scikit-Learn's Cross-Validation
#### Using `PurgedKFold` Class

In [12]:
def cvScore(
    clf: Any, X: pd.DataFrame, y: pd.Series, sample_weight: pd.Series, scoring: str ='neg_log_loss',
    t1: Optional[pd.Series] = None, cv: Optional[int] = None,
    cvGen: Optional[PurgedKFold] = None, pctEmbargo: Optional[float] = None) -> np.ndarray:
    
    if scoring not in ['neg_log_loss', 'accuracy']:
        raise Exception('wrong scoring method')
    if cvGen is None:
        cvGen = PurgedKFold(n_splits=cv, t1=t1, pctEmbargo=pctEmbargo)    # purged
    score = []
    for train, test in cvGen.split(X=X):
        fit = clf.fit(X=X.iloc[train, :], y=y.iloc[train], sample_weight=sample_weight.iloc[train].values)
        if scoring == 'neg_log_loss':
            prob = fit.predict_proba(X.iloc[test, :])
            score_ = -log_loss(y.iloc[test], prob, sample_weight=sample_weight.iloc[test].values, labels=clf.classes_)
        else:
            pred = fit.predict(X.iloc[test, :])
            score_ = accuracy_score(y.iloc[test], pred, sample_weight=sample_weight.iloc[test].values)
        score.append(score_)
    return np.array(score)