# Import libraries

In [15]:
import pandas as pd
import numpy as np
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error
import itertools as itt
import numbers
from typing import Iterable, Tuple 

# Define purge function to remove overlapping training samples

In [16]:
def purge(cv, train_indices, test_fold_start, test_fold_end):
    """
    Remove training indices where prediction times overlap with test fold's evaluation times.
    This prevents leakage by ensuring no training sample's pred_time falls within the test fold's time range.
    """
    train_times = cv.pred_times.iloc[train_indices]
    eval_times_test = cv.eval_times.iloc[test_fold_start:test_fold_end]
    pred_times_test = cv.pred_times.iloc[test_fold_start:test_fold_end]
    
    # Mask to exclude train indices where pred_time is within [min(pred_times_test), max(eval_times_test)]
    mask = ~((train_times >= pred_times_test.min()) & 
             (train_times <= eval_times_test.max()))
    return train_indices[mask]

# Define embargo function to enforce a time gap between train and test sets

In [17]:
def embargo(cv, train_indices, test_indices, test_fold_end):
    """
    Apply embargo by removing training samples where pred_time is too close to test eval_time.
    This ensures a temporal buffer (embargo period) to prevent leakage due to temporal correlation.
    """
    eval_times_test = cv.eval_times.iloc[test_indices]
    max_eval_time = eval_times_test.max()
    embargo_time = max_eval_time + cv.embargo_td
    
    train_times = cv.pred_times.iloc[train_indices]
    mask = train_times < embargo_time
    return train_indices[mask]


# Define base class for time series cross-validation

In [18]:
class BaseTimeSeriesCrossValidator:
    """
    Abstract base class for time series cross-validation.
    Ensures samples have prediction and evaluation times, and enforces time-ordering and index alignment.
    """
    def __init__(self, n_splits=10):
        # Validate n_splits as an integer >= 2
        if not isinstance(n_splits, numbers.Integral):
            raise ValueError(f"The number of folds must be of Integral type. {n_splits} of type {type(n_splits)} was passed.")
        n_splits = int(n_splits)
        if n_splits <= 1:
            raise ValueError(f"K-fold cross-validation requires at least one train/test split by setting n_splits = 2 or more, got n_splits = {n_splits}.")
        self.n_splits = n_splits
        self.pred_times = None
        self.eval_times = None
        self.indices = None

    def split(self, X: pd.DataFrame, y: pd.Series = None, pred_times: pd.Series = None, eval_times: pd.Series = None):
        """
        Validate input data and store prediction/evaluation times and indices.
        Ensures X, y, pred_times, and eval_times are pandas objects with aligned indices and sorted times.
        """
        if not isinstance(X, pd.DataFrame) and not isinstance(X, pd.Series):
            raise ValueError('X should be a pandas DataFrame/Series.')
        if not isinstance(y, pd.Series) and y is not None:
            raise ValueError('y should be a pandas Series.')
        if not isinstance(pred_times, pd.Series):
            raise ValueError('pred_times should be a pandas Series.')
        if not isinstance(eval_times, pd.Series):
            raise ValueError('eval_times should be a pandas Series.')
        if y is not None and (X.index == y.index).sum() != len(y):
            raise ValueError('X and y must have the same index')
        if (X.index == pred_times.index).sum() != len(pred_times):
            raise ValueError('X and pred_times must have the same index')
        if (X.index == eval_times.index).sum() != len(eval_times):
            raise ValueError('X and eval_times must have the same index')

        if not pred_times.equals(pred_times.sort_values()):
            raise ValueError('pred_times should be sorted')
        if not eval_times.equals(eval_times.sort_values()):
            raise ValueError('eval_times should be sorted')

        self.pred_times = pred_times
        self.eval_times = eval_times
        self.indices = np.arange(X.shape[0])

# Define class purged combinatorial K-fold cross-validator

In [19]:
class CombPurgedKFoldCVLocal(BaseTimeSeriesCrossValidator):
    """
    Implements purged and embargoed combinatorial K-fold cross-validation.
    Splits data into n_splits folds, uses n_test_splits folds as test set, and purges/embargoes to prevent leakage.
    Based on Marcos Lopez de Prado's 'Advances in Financial Machine Learning'.
    """
    def __init__(self, n_splits=10, n_test_splits=2, embargo_td=pd.Timedelta(minutes=0)):
        # Initialize base class and validate n_test_splits and embargo_td
        super().__init__(n_splits)
        if not isinstance(n_test_splits, numbers.Integral):
            raise ValueError(f"The number of test folds must be of Integral type. {n_test_splits} of type {type(n_test_splits)} was passed.")
        n_test_splits = int(n_test_splits)
        if n_test_splits <= 0 or n_test_splits > self.n_splits - 1:
            raise ValueError(f"K-fold cross-validation requires at least one train/test split by setting n_test_splits between 1 and n_splits - 1, got n_test_splits = {n_test_splits}.")
        self.n_test_splits = n_test_splits
        if not isinstance(embargo_td, pd.Timedelta):
            raise ValueError(f"The embargo time should be of type Pandas Timedelta. {embargo_td} of type {type(embargo_td)} was passed.")
        if embargo_td < pd.Timedelta(minutes=0):
            raise ValueError(f"The embargo time should be positive, got embargo = {embargo_td}.")
        self.embargo_td = embargo_td

    def split(self, X: pd.DataFrame, y: pd.Series = None, pred_times: pd.Series = None, eval_times: pd.Series = None) -> Iterable[Tuple[np.ndarray, np.ndarray]]:
        """
        Generate train/test indices for each fold.
        Yields purged and embargoed train/test indices to prevent temporal leakage.
        """
        super().split(X, y, pred_times, eval_times)
        # Create fold boundaries
        fold_bounds = [(fold[0], fold[-1] + 1) for fold in np.array_split(self.indices, self.n_splits)]
        # Generate all combinations of n_test_splits folds for test sets
        selected_fold_bounds = list(itt.combinations(fold_bounds, self.n_test_splits))
        selected_fold_bounds.reverse()  # Start with test set at the end

        for fold_bound_list in selected_fold_bounds:
            test_fold_bounds, test_indices = self.compute_test_set(fold_bound_list)
            train_indices = self.compute_train_set(test_fold_bounds, test_indices)
            yield train_indices, test_indices

    def compute_train_set(self, test_fold_bounds: list, test_indices: np.ndarray) -> np.ndarray:
        """
        Compute training indices by excluding test indices and applying purge/embargo.
        """
        train_indices = np.setdiff1d(self.indices, test_indices)
        for test_fold_start, test_fold_end in test_fold_bounds:
            train_indices = purge(self, train_indices, test_fold_start, test_fold_end)
            train_indices = embargo(self, train_indices, test_indices, test_fold_end)
        return train_indices

    def compute_test_set(self, fold_bound_list: list) -> Tuple[list, np.ndarray]:
        """
        Compute test indices and fold boundaries for the test set.
        Merges contiguous folds and collects all test indices.
        """
        test_indices = np.empty(0)
        test_fold_bounds = []
        for fold_start, fold_end in fold_bound_list:
            if not test_fold_bounds or fold_start != test_fold_bounds[-1][-1]:
                test_fold_bounds.append((fold_start, fold_end))
            elif fold_start == test_fold_bounds[-1][-1]:
                test_fold_bounds[-1] = (test_fold_bounds[-1][0], fold_end)
            test_indices = np.union1d(test_indices, self.indices[fold_start:fold_end]).astype(int)
        return test_fold_bounds, test_indices

# Generate synthetic time series data

In [20]:
np.random.seed(42)  # For reproducibility
n_samples = 100
dates = pd.date_range(start='2023-01-01', periods=n_samples, freq='D')

# Create feature DataFrame with two random features

In [21]:
X = pd.DataFrame({
    'feature1': np.random.randn(n_samples),
    'feature2': np.random.randn(n_samples)
}, index=dates)

# Create target series as a linear combination of features plus noise

In [22]:
y = pd.Series(np.random.randn(n_samples) + 0.5 * X['feature1'] + 0.3 * X['feature2'], index=dates)

# Define prediction and evaluation times

In [23]:
pred_times = pd.Series(dates, index=dates)  # Prediction at index time
eval_times = pd.Series(dates + pd.Timedelta(days=1), index=dates)  # Evaluation 1 day later

# Initialize cross-validator

In [24]:
# Use 5 folds, 1 test fold per round, and 2-day embargo
cv = CombPurgedKFoldCVLocal(n_splits=5, n_test_splits=1, embargo_td=pd.Timedelta(days=2))

# Perform cross-validation with Linear Regression

In [25]:
model = LinearRegression()
mse_scores = []
fold = 1

for train_indices, test_indices in cv.split(X, y, pred_times, eval_times):
    # Split data into train and test sets
    X_train, X_test = X.iloc[train_indices], X.iloc[test_indices]
    y_train, y_test = y.iloc[train_indices], y.iloc[test_indices]
    
    # Train model and predict
    model.fit(X_train, y_train)
    y_pred = model.predict(X_test)
    mse = mean_squared_error(y_test, y_pred)
    mse_scores.append(mse)
    
    # Print fold details
    print(f"Fold {fold}:")
    print(f"  Train indices: {train_indices[:5]}... (length: {len(train_indices)})")
    print(f"  Test indices: {test_indices[:5]}... (length: {len(test_indices)})")
    print(f"  MSE: {mse:.4f}\n")
    fold += 1

Fold 1:
  Train indices: [0 1 2 3 4]... (length: 80)
  Test indices: [80 81 82 83 84]... (length: 20)
  MSE: 0.8725

Fold 2:
  Train indices: [0 1 2 3 4]... (length: 61)
  Test indices: [60 61 62 63 64]... (length: 20)
  MSE: 1.6806

Fold 3:
  Train indices: [0 1 2 3 4]... (length: 41)
  Test indices: [40 41 42 43 44]... (length: 20)
  MSE: 1.3040

Fold 4:
  Train indices: [0 1 2 3 4]... (length: 21)
  Test indices: [20 21 22 23 24]... (length: 20)
  MSE: 1.2827

Fold 5:
  Train indices: [21]... (length: 1)
  Test indices: [0 1 2 3 4]... (length: 20)
  MSE: 8.4302



# Summarize cross-validation results

In [26]:
print("Cross-Validation Results:")
print(f"Mean MSE: {np.mean(mse_scores):.4f}")
print(f"Std MSE: {np.std(mse_scores):.4f}")

Cross-Validation Results:
Mean MSE: 2.7140
Std MSE: 2.8695
