## Imports and Environment Setup

### kaggle api reference
https://github.com/Kaggle/kaggle-api/blob/main/docs/README.md

In [1]:
import os
import pandas as pd
import polars as pl
import numpy as np
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Optional, List, Dict, Any, Callable, Union, Tuple
import lightgbm as lgb
import optuna
import dill
import warnings
import datetime
warnings.filterwarnings('ignore')

In [2]:
f"pandas {pd.__version__}, polars {pl.__version__}, numpy {np.__version__}, lightgbm {lgb.__version__}, optuna {optuna.__version__}"

'pandas 2.2.3, polars 1.6.0, numpy 2.0.2, lightgbm 4.5.0, optuna 4.0.0'

In [3]:
IS_KAGGLE = 'KAGGLE_KERNEL_RUN_TYPE' in os.environ
if not IS_KAGGLE: 
    os.chdir('./kaggle')
    from kaggle.api.kaggle_api_extended import KaggleApi
BASE_PATH = '/kaggle/input/jane-street-real-time-market-data-forecasting' if IS_KAGGLE else './data'
MODEL_PATH = '/kaggle/working' if IS_KAGGLE else './models'

In [4]:
IS_KAGGLE, BASE_PATH, MODEL_PATH

(False, './data', './models')

In [5]:
if TYPE_CHECKING:
    from __main__ import Config, ModelConfig

#### Define Metrics

In [6]:
def r2_metric(y_true, y_pred, weights=None):
    """Calculate weighted R2 score"""
    y_true = y_true.ravel()
    y_pred = y_pred.ravel()

    # If weights is None, use uniform weights
    if weights is None:
        weights = np.ones_like(y_true)
    else:
        weights = weights.ravel()

    numerator = np.sum(weights * (y_true - y_pred) ** 2)
    denominator = np.sum(weights * (y_true ** 2))
    r2_score = 1 - (numerator / denominator)
    return 'r2', r2_score, True

#### Version control

In [7]:
from functools import wraps

def versioned_function(version: str, description: str = ""):
    """Function versioning decorator"""
    def decorator(func: Callable) -> Callable:
        @wraps(func)
        def wrapper(*args, **kwargs):
            return func(*args, **kwargs)
        
        wrapper.version = version
        wrapper.description = description
        return wrapper
    return decorator

## Model Class

In [8]:
class BaseModel:
    """Base model class for easy extension"""
    def __init__(self, config: "ModelConfig"):
        self.config = config
        self.model = None
        self._register_custom_metrics()
    
    def _register_custom_metrics(self):
        """Register custom metrics if needed"""
        pass
    
    def fit(self, train_data: Tuple[np.ndarray, np.ndarray], 
           val_data: Optional[Tuple[np.ndarray, np.ndarray]] = None):
        raise NotImplementedError
    
    def predict(self, X: np.ndarray) -> np.ndarray:
        raise NotImplementedError
    
    def save(self, path: str):
        with open(path, 'wb') as f:
            dill.dump(self.model, f)
    
    def load(self, path: str):
        with open(path, 'rb') as f:
            self.model = dill.load(f)

class LightGBMModel(BaseModel):
    def _register_custom_metrics(self):
        """Register custom metrics for LightGBM"""
        # Instead of registering metrics directly, we'll add them to params
        if self.config.custom_metrics:
            self.config.params['metric'] = list(self.config.custom_metrics.keys())
    
    def fit(self, train_data: Tuple[np.ndarray, np.ndarray, np.ndarray], 
           val_data: Optional[Tuple[np.ndarray, np.ndarray, np.ndarray]] = None):
        train_X, train_y, train_w = train_data
        train_set = lgb.Dataset(train_X, train_y, weight=train_w, free_raw_data=False)
        
        val_set = None
        if val_data is not None:
            val_X, val_y, val_w = val_data
            val_set = lgb.Dataset(val_X, val_y, weight=val_w, free_raw_data=False)
        
        callbacks = [
            lgb.early_stopping(stopping_rounds=100),
            lgb.log_evaluation(period=100)
        ]
        
        self.model = lgb.train(
            self.config.params,
            train_set,
            num_boost_round=1000,
            valid_sets=[val_set] if val_set else None,
            valid_names=['valid'] if val_set else None,
            callbacks=callbacks
        )
    
    def predict(self, X: np.ndarray) -> np.ndarray:
        return self.model.predict(X)

# Example model implementations
"""
class XGBoostModel(BaseModel):
    def _register_custom_metrics(self):
        # XGBoost에서는 custom_metrics를 train 파라미터로 전달
        if self.config.custom_metrics:
            self.config.params['custom_metric'] = list(self.config.custom_metrics.values())
    
    def fit(self, train_data: Tuple[np.ndarray, np.ndarray], 
           val_data: Optional[Tuple[np.ndarray, np.ndarray]] = None):
        train_X, train_y = train_data
        train_set = xgb.DMatrix(train_X, train_y)
        
        val_set = None
        watchlist = [(train_set, 'train')]
        if val_data is not None:
            val_X, val_y = val_data
            val_set = xgb.DMatrix(val_X, val_y)
            watchlist.append((val_set, 'valid'))
        
        self.model = xgb.train(
            self.config.params,
            train_set,
            evals=watchlist
        )
    
    def predict(self, X: np.ndarray) -> np.ndarray:
        return self.model.predict(xgb.DMatrix(X))

class NeuralNetworkModel(BaseModel):
    def _register_custom_metrics(self):
        # PyTorch/TensorFlow에서는 metrics를 모델 컴파일 시 등록
        self.metrics = list(self.config.custom_metrics.values())
    
    def fit(self, train_data: Tuple[np.ndarray, np.ndarray], 
           val_data: Optional[Tuple[np.ndarray, np.ndarray]] = None):
        train_X, train_y = train_data
        
        # PyTorch example
        self.model = torch.nn.Sequential(
            torch.nn.Linear(train_X.shape[1], 64),
            torch.nn.ReLU(),
            torch.nn.Linear(64, 1)
        )
        
        optimizer = torch.optim.Adam(self.model.parameters(), 
                                   lr=self.config.params.get('learning_rate', 0.001))
        
        # Training loop with custom metrics
        for epoch in range(self.config.params.get('epochs', 10)):
            self.model.train()
            # ... training implementation ...
            
            if val_data is not None:
                self.model.eval()
                # ... validation implementation ...
    
    def predict(self, X: np.ndarray) -> np.ndarray:
        self.model.eval()
        with torch.no_grad():
            X_tensor = torch.FloatTensor(X)
            return self.model(X_tensor).numpy()

class EnsembleModel(BaseModel):
    def __init__(self, config: "ModelConfig", models: List[BaseModel]):
        super().__init__(config)
        self.models = models
    
    def _register_custom_metrics(self):
        # 각 모델의 custom metrics는 이미 등록되어 있음
        pass
    
    def fit(self, train_data: Tuple[np.ndarray, np.ndarray], 
           val_data: Optional[Tuple[np.ndarray, np.ndarray]] = None):
        for model in self.models:
            model.fit(train_data, val_data)
    
    def predict(self, X: np.ndarray) -> np.ndarray:
        predictions = [model.predict(X) for model in self.models]
        # Weighted average if weights are specified in config
        weights = self.config.params.get('weights', None)
        if weights is not None:
            return np.average(predictions, axis=0, weights=weights)
        return np.mean(predictions, axis=0)
"""

'\nclass XGBoostModel(BaseModel):\n    def _register_custom_metrics(self):\n        # XGBoost에서는 custom_metrics를 train 파라미터로 전달\n        if self.config.custom_metrics:\n            self.config.params[\'custom_metric\'] = list(self.config.custom_metrics.values())\n    \n    def fit(self, train_data: Tuple[np.ndarray, np.ndarray], \n           val_data: Optional[Tuple[np.ndarray, np.ndarray]] = None):\n        train_X, train_y = train_data\n        train_set = xgb.DMatrix(train_X, train_y)\n        \n        val_set = None\n        watchlist = [(train_set, \'train\')]\n        if val_data is not None:\n            val_X, val_y = val_data\n            val_set = xgb.DMatrix(val_X, val_y)\n            watchlist.append((val_set, \'valid\'))\n        \n        self.model = xgb.train(\n            self.config.params,\n            train_set,\n            evals=watchlist\n        )\n    \n    def predict(self, X: np.ndarray) -> np.ndarray:\n        return self.model.predict(xgb.DMatrix(X))\n\ncl

## Data Handler Class

In [9]:
class DataHandler:
    def __init__(self, config: "Config"):
        self.config = config
        self.train_data = None
        self.test_data = None
        self.features = None
        self.preprocessor = None
        self.feature_generator = None
    
    def load_data(self) -> Tuple[pl.DataFrame, Optional[pl.DataFrame]]:
        """Load train and test data"""
        try:
            if hasattr(self.config, 'partition_range') and self.config.partition_range is not None:
                # Load specific partitions
                train_parts = []
                for i in self.config.partition_range:
                    part_df = pl.read_parquet(
                        f"{BASE_PATH}/train.parquet/partition_id={i}/part-0.parquet"
                    )
                    train_parts.append(part_df)
                self.train_data = pl.concat(train_parts, how='vertical')
            else:
                # Load full dataset
                self.train_data = pl.read_parquet(f"{BASE_PATH}/train.parquet")
            
            self.test_data = pl.read_parquet(f"{BASE_PATH}/test.parquet")
            return self.train_data, self.test_data
            
        except Exception as e:
            print(f"Error loading data: {e}")
            self.train_data = None
            self.test_data = None
            return self.train_data, self.test_data
        
    def prepare_data(self, preprocessor: Optional[Callable] = None, 
                    feature_generator: Optional[Callable] = None,
                    is_inference: bool = False) -> Tuple[pl.DataFrame, pl.DataFrame]:
        """Prepare data with custom preprocessing and feature generation"""
        self.preprocessor = preprocessor
        self.feature_generator = feature_generator
        
        # Process both train and test data with preprocessor only
        if self.preprocessor:
            self.train_data = self.preprocessor(self.train_data)
            if self.test_data is not None:
                self.test_data = self.preprocessor(self.test_data)
        
        # Feature generation will be done separately for each fold
        # Only generate features for full dataset during inference
        if is_inference and self.feature_generator:
            self.train_data = self.feature_generator(self.train_data)
            if self.test_data is not None:
                self.test_data = self.feature_generator(self.test_data)
            self.features = [col for col in self.train_data.columns
                            if col.startswith('feature_')]

        return self.train_data, self.test_data
    
    def generate_features(self, df: pl.DataFrame) -> pl.DataFrame:
        """Generate features for a specific DataFrame"""
        if self.feature_generator:
            df = self.feature_generator(df)
            self.features = [col for col in df.columns
                            if col.startswith('feature_')]
            return df
        return df
    
    def get_feature_data(self, df: pl.DataFrame) -> Tuple[np.ndarray, Optional[np.ndarray], Optional[np.ndarray]]:
        """Extract features, target, and weights"""
        missing_features = [f for f in self.features if f not in df.columns]
        if missing_features:
            raise ValueError(f"Missing features in input data: {missing_features}")

        X = df.select(self.features).to_numpy()
        y = df.select('responder_6').to_numpy() if 'responder_6' in df.columns else None
        w = df.select('weight').to_numpy() if 'weight' in df.columns else None

        return X, y, w

## Split Strategies

In [10]:
from sklearn.model_selection import KFold, TimeSeriesSplit
from abc import ABC, abstractmethod

class SplitStrategy(ABC):
    """Base class for split strategies"""
    def __init__(self, test_ratio: float = 0.2):
        self.test_ratio = test_ratio
    
    def get_holdout_test(self, data: pl.DataFrame) -> Tuple[pl.DataFrame, pl.DataFrame]:
        """Split out final holdout test set using date_id"""
        unique_dates = data['date_id'].unique().sort()
        split_idx = int(len(unique_dates) * (1 - self.test_ratio))
        split_date = unique_dates[split_idx]
        
        train_data = data.filter(pl.col('date_id') <= split_date)
        test_data = data.filter(pl.col('date_id') > split_date)
        
        print("\nHoldout Test Split Info:")
        print(f"Total unique dates: {len(unique_dates)}")
        print(f"Train dates range: {unique_dates[0]} - {unique_dates[split_idx]}")
        print(f"Test dates range: {unique_dates[split_idx + 1]} - {unique_dates[-1]}")
        print(f"Train samples: {len(train_data):,}, Test samples: {len(test_data):,}")
        
        return train_data, test_data
    
    @abstractmethod
    def split(self, data: pl.DataFrame) -> List[Tuple[pl.DataFrame, pl.DataFrame]]:
        """Split remaining data into train/val sets for cross validation
        Returns:
            List of (train, val) DataFrame tuples
        """
        pass

class TimeBasedSplit(SplitStrategy):
    def __init__(self, train_ratio: float = 0.75, test_ratio: float = 0.2):
        super().__init__(test_ratio)
        self.train_ratio = train_ratio
    
    def split(self, data: pl.DataFrame) -> List[Tuple[pl.DataFrame, pl.DataFrame]]:
        """Single split based on date_id"""
        unique_dates = data['date_id'].unique().sort()
        split_idx = int(len(unique_dates) * self.train_ratio)
        split_date = unique_dates[split_idx]
        
        train = data.filter(pl.col('date_id') <= split_date)
        val = data.filter(pl.col('date_id') > split_date)
        
        print("\nTime Based Split Info:")
        print(f"Train dates range: {unique_dates[0]} - {unique_dates[split_idx]}")
        print(f"Val dates range: {unique_dates[split_idx + 1]} - {unique_dates[-1]}")
        print(f"Train samples: {len(train):,}, Val samples: {len(val):,}")
        
        return [(train, val)]

class TimeSeriesKFold(SplitStrategy):
    def __init__(self, n_splits: int = 5, test_ratio: float = 0.2):
        super().__init__(test_ratio)
        self.n_splits = n_splits
    
    def split(self, data: pl.DataFrame) -> List[Tuple[pl.DataFrame, pl.DataFrame]]:
        """Multiple splits based on date_id"""
        unique_dates = data['date_id'].unique().sort()
        splits = []
        
        # Calculate initial training size and increment
        initial_train_size = len(unique_dates) // (self.n_splits + 1)
        remaining_dates = len(unique_dates) - initial_train_size
        val_size = remaining_dates // self.n_splits
        
        print(f"\nTime Series {self.n_splits}-Fold Split Info:")
        print(f"Total unique dates: {len(unique_dates)}")
        print(f"Initial train size: {initial_train_size} dates")
        print(f"Validation size: ~{val_size} dates per fold")
        
        for i in range(self.n_splits):
            train_end_idx = initial_train_size + (i * val_size)
            val_end_idx = train_end_idx + val_size
            if i == self.n_splits - 1:  # Last fold uses all remaining dates
                val_end_idx = len(unique_dates)
            
            train_dates = unique_dates[:train_end_idx]
            val_dates = unique_dates[train_end_idx:val_end_idx]
            
            train = data.filter(pl.col('date_id').is_in(train_dates))
            val = data.filter(pl.col('date_id').is_in(val_dates))
            
            print(f"\nFold {i+1}:")
            print(f"Train dates range: {train_dates[0]} - {train_dates[-1]}")
            print(f"Val dates range: {val_dates[0]} - {val_dates[-1]}")
            print(f"Train samples: {len(train):,}, Val samples: {len(val):,}")
            
            splits.append((train, val))
        
        return splits

## Optimize Handler Class

In [11]:
class OptimizationHandler:
    def __init__(self, config: "Config", model_class: type):
        self.config = config
        self.model_class = model_class
    
    def get_search_space(self, trial: optuna.Trial) -> Dict[str, Any]:
        """Define search space for each model type"""
        if self.config.model.name == 'lightgbm':
            return {
                'learning_rate': trial.suggest_float('learning_rate', 1e-3, 0.1, log=True),
                'num_leaves': trial.suggest_int('num_leaves', 16, 96),
                'min_data_in_leaf': trial.suggest_int('min_data_in_leaf', 10, 100),
                'feature_fraction': trial.suggest_float('feature_fraction', 0.5, 1.0),
                'lambda_l1': trial.suggest_float('lambda_l1', 1e-8, 10.0, log=True),
                'lambda_l2': trial.suggest_float('lambda_l2', 1e-8, 10.0, log=True),
            }
        """
        elif self.config.model.name == 'xgboost':
            return {
                'learning_rate': trial.suggest_float('learning_rate', 1e-3, 0.1, log=True),
                'max_depth': trial.suggest_int('max_depth', 3, 10),
                'min_child_weight': trial.suggest_int('min_child_weight', 1, 20),
                'subsample': trial.suggest_float('subsample', 0.5, 1.0),
                'colsample_bytree': trial.suggest_float('colsample_bytree', 0.5, 1.0),
                'alpha': trial.suggest_float('alpha', 1e-8, 10.0, log=True),
                'lambda': trial.suggest_float('lambda', 1e-8, 10.0, log=True),
            }
        """
        return {}
    
    def objective(self, trial: optuna.Trial, train_data: Tuple[np.ndarray, np.ndarray], 
                 val_data: Tuple[np.ndarray, np.ndarray]) -> float:
        """Optimization objective"""
        params = self.get_search_space(trial)
        self.config.model.params.update(params)
        
        model = self.model_class(self.config.model)
        model.fit(train_data, val_data)
        
        val_X, val_y = val_data
        predictions = model.predict(val_X)
        
        return np.mean((predictions - val_y) ** 2) ** 0.5
    
    def optimize(self, train_data: Tuple[np.ndarray, np.ndarray], 
                val_data: Tuple[np.ndarray, np.ndarray], n_trials: int = 100) -> Dict[str, Any]:
        """Run optimization"""
        study = optuna.create_study(direction='minimize')
        objective = lambda trial: self.objective(trial, train_data, val_data)
        study.optimize(objective, n_trials=n_trials)
        
        print(f"Best score: {study.best_value:.4f}")
        print("Best params:", study.best_params)
        
        return study.best_params

## Kaggle Handler Class

In [12]:
class KaggleHandler:
    def __init__(self, config: "Config"):
        self.config = config
        self.api = KaggleApi()
        self.api.authenticate()
    
    def upload_pipeline(self, pipeline: Any, dataset_title: Optional[str] = None):
        """Upload pipeline to Kaggle dataset"""
        if IS_KAGGLE:
            raise ValueError("This function is for local environment only")
        
        tmp_dir = "./kaggle_upload"
        os.makedirs(tmp_dir, exist_ok=True)
        
        filename = f"{self.config.dataset_name.split('/')[-1]}.pkl"
        pipeline_path = os.path.join(tmp_dir, filename)
        
        original_path = pipeline.config.model_path
        pipeline.config.model_path = os.path.join(os.path.dirname(original_path), filename)
        
        pipeline.save()
        
        import shutil
        shutil.copy2(pipeline.config.model_path, pipeline_path)
        
        metadata = {
            "title": dataset_title or self.config.dataset_name.split('/')[-1],
            "id": f"{self.config.dataset_name}",
            "licenses": [{"name": "CC0-1.0"}]
        }
        
        import json
        with open(os.path.join(tmp_dir, "dataset-metadata.json"), "w") as f:
            json.dump(metadata, f)
        
        try:
            print(f"Creating new dataset: {self.config.dataset_name}")
            self.api.dataset_create_new(
                folder=tmp_dir,
                public=False, # For private datasets
                quiet=False
            )
            print("Dataset created successfully")
            
        except Exception as e:
            print(f"Error creating Kaggle dataset: {e}")
            raise
        finally:
            shutil.rmtree(tmp_dir)

## Pipeline Class

In [13]:
import gc
class Pipeline:
    def __init__(self, config: "Config"):
        self.config = config
        self.data_handler = DataHandler(config)
        self.model = self._get_model()
        self.kaggle_handler = KaggleHandler(config) if not IS_KAGGLE else None
    
    def _get_model(self) -> BaseModel:
        """Get model instance based on config"""
        model_map = {
            'lightgbm': LightGBMModel,
            # 'xgboost': XGBoostModel,
            # 'neural_network': NeuralNetworkModel,
        }
        model_class = model_map.get(self.config.model.name)
        if model_class is None:
            raise ValueError(f"Unknown model: {self.config.model.name}")
        return model_class(self.config.model)
    
    def train(self, preprocessor: Optional[Callable] = None, 
            feature_generator: Optional[Callable] = None,
            optimize: bool = False,
            n_trials: int = 100) -> pl.DataFrame:
        """Train pipeline and return holdout test set"""
        # Load and prepare data
        print("Loading and preparing data...")
        self.data_handler.load_data()
        self.data_handler.prepare_data(preprocessor, feature_generator, is_inference=False)
        
        # Split data using configured strategy
        print("Splitting data using configured strategy...")
        train_data_full, holdout_test = self.config.split_strategy.get_holdout_test(self.data_handler.train_data)
        splits = self.config.split_strategy.split(train_data_full)
        
        # Clear memory after splitting
        del train_data_full
        gc.collect()
        
        best_model = None
        best_score = float('-inf')
        
        # Train and validate on each split
        for i, (train_df, val_df) in enumerate(splits):
            print(f"\nTraining fold {i+1}/{len(splits)}")
           
            # Generate features for this fold's train and validation data
            train_df_with_features = self.data_handler.generate_features(train_df)
            val_df_with_features = self.data_handler.generate_features(val_df)
            
            # Clear original dataframes
            del train_df, val_df
            gc.collect() 

            train_data = self.data_handler.get_feature_data(train_df_with_features)
            val_data = self.data_handler.get_feature_data(val_df_with_features)

            # Clear feature dataframes
            del train_df_with_features, val_df_with_features
            gc.collect()

            # Create new model instance for each fold
            fold_model = self._get_model()

            # Optionally run optimization (only on first fold)
            if optimize and i == 0:
                print("Running hyperparameter optimization...")
                optimizer = OptimizationHandler(self.config, type(fold_model))
                best_params = optimizer.optimize(train_data, val_data, n_trials)
                self.config.model.params.update(best_params)
                # Recreate model with optimized parameters
                del fold_model
                gc.collect()
                fold_model = self._get_model()
            
            # Train model
            print("Training model...")
            fold_model.fit(train_data, val_data)
            
            # Evaluate on validation set using R2
            val_X, val_y, val_w = val_data
            val_pred = fold_model.predict(val_X)
            _, val_score, _ = r2_metric(val_y, val_pred, val_w)
            print(f"Validation R2 score for fold {i+1}: {val_score:.4f}")
            
            # Keep track of best model
            if val_score > best_score:
                best_score = val_score
                if best_model is not None:
                    del best_model
                    gc.collect()
                best_model = fold_model
            else:
                del fold_model
                gc.collect()

            # Clear fold data
            del train_data, val_data, val_X, val_y, val_w, val_pred
            gc.collect()
        
        # Use best model for final predictions
        self.model = best_model
        print(f"\nBest validation score: {best_score:.4f}")
        
        # Save pipeline
        print("\nSaving pipeline...")
        self.save()

        # After finding best model, generate features for holdout test
        if holdout_test is not None:
            holdout_test = self.data_handler.generate_features(holdout_test)
        
        # For inference, generate features for full dataset
        self.data_handler.prepare_data(preprocessor, feature_generator, is_inference=True)
        
        # Clear any remaining memory
        gc.collect()

        return holdout_test

    def predict(self, df: pl.DataFrame) -> np.ndarray:
        """Make predictions"""
        print("Starting prediction...")
        print(f"Input DataFrame shape: {df.shape}")
        print(f"Available features: {self.data_handler.features}")

        X, _, _ = self.data_handler.get_feature_data(df)
        print(f"Extracted feature matrix shape: {X.shape}")

        predictions = self.model.predict(X)

        print(f"Predictions shape: {predictions.shape}")
        return predictions

    def save(self):
        """Save pipeline with detailed logging and version tracking"""
        os.makedirs(os.path.dirname(self.config.model_path), exist_ok=True)
        
        # Verify model state
        if not hasattr(self.model, 'config') or not hasattr(self.model, 'model'):
            raise ValueError("Model appears to be uninitialized or invalid")
        
        # Create detailed config dictionary
        config_dict = {
            'model': {
                'name': self.config.model.name,
                'params': dict(self.config.model.params),
                'custom_metrics': {k: v.__name__ for k, v in self.config.model.custom_metrics.items()}
            },
            'paths': {
                'model_path': self.config.model_path,
                'dataset_name': self.config.dataset_name
            },
            'split_strategy': {
                'type': type(self.config.split_strategy).__name__,
                'params': {
                    'test_ratio': self.config.split_strategy.test_ratio,
                    **(({'train_ratio': self.config.split_strategy.train_ratio} 
                        if isinstance(self.config.split_strategy, TimeBasedSplit) else
                        {'n_splits': self.config.split_strategy.n_splits}))
                }
            },
            'seed': self.config.seed
        }
        
        # Create pipeline metadata with version information
        pipeline_data = {
            'model': self.model,
            'config': config_dict,
            'data_handler': {
                'preprocessor': self.data_handler.preprocessor,
                'preprocessor_version': getattr(self.data_handler.preprocessor, 'version', 'unknown'),
                'preprocessor_description': getattr(self.data_handler.preprocessor, 'description', ''),
                'feature_generator': self.data_handler.feature_generator,
                'feature_generator_version': getattr(self.data_handler.feature_generator, 'version', 'unknown'),
                'feature_generator_description': getattr(self.data_handler.feature_generator, 'description', ''),
                'features': self.data_handler.features,
                'n_features': len(self.data_handler.features)
            },
            'version': '1.0',
            'timestamp': datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }
        
        # Print pipeline information
        print("\n" + "="*50)
        print("Pipeline Information:")
        print("="*50)
        
        print(f"\n1. Model Configuration:")
        print(f"   - Model Type: {config_dict['model']['name']}")
        print(f"   - Number of Features: {len(self.data_handler.features)}")
        print(f"   - Model Parameters:")
        for k, v in config_dict['model']['params'].items():
            print(f"     * {k}: {v}")
        
        print(f"\n2. Data Processing:")
        print(f"   - Preprocessor: {self.data_handler.preprocessor.__name__ if self.data_handler.preprocessor else 'None'}")
        print(f"     * Version: {getattr(self.data_handler.preprocessor, 'version', 'unknown')}")
        print(f"     * Description: {getattr(self.data_handler.preprocessor, 'description', '')}")
        print(f"   - Feature Generator: {self.data_handler.feature_generator.__name__ if self.data_handler.feature_generator else 'None'}")
        print(f"     * Version: {getattr(self.data_handler.feature_generator, 'version', 'unknown')}")
        print(f"     * Description: {getattr(self.data_handler.feature_generator, 'description', '')}")
        print(f"   - First 5 Features: {self.data_handler.features[:5]}")
        
        print(f"\n3. Split Strategy:")
        print(f"   - Type: {config_dict['split_strategy']['type']}")
        for k, v in config_dict['split_strategy']['params'].items():
            print(f"   - {k}: {v}")
        
        print(f"\n4. Save Location:")
        print(f"   - Path: {self.config.model_path}")
        print(f"   - Dataset Name: {self.config.dataset_name}")
        print(f"   - Timestamp: {pipeline_data['timestamp']}")
        
        try:
            with open(self.config.model_path, 'wb') as f:
                dill.dump(pipeline_data, f)
            print("\nPipeline saved successfully! ✓")
        except Exception as e:
            print(f"\nError saving pipeline: {str(e)}")
            raise

    def load(self, path: Optional[str] = None):
        """Load pipeline with extensive validation and version tracking"""
        load_path = path if path is not None else self.config.model_path
        
        print("\n" + "="*50)
        print("Loading Pipeline:")
        print("="*50)
        
        try:
            with open(load_path, 'rb') as f:
                pipeline_data = dill.load(f)
            
            # Phase 1: Structure Validation
            print("\nPhase 1: Structure Validation")
            print("-" * 30)
            
            required_components = ['model', 'config', 'data_handler', 'version']
            missing = [comp for comp in required_components if comp not in pipeline_data]
            if missing:
                raise ValueError(f"Missing required components in pipeline: {missing}")
            print("✓ Basic structure validation passed")
            
            required_data_handler = ['preprocessor', 'feature_generator', 'features', 'n_features',
                                'preprocessor_version', 'feature_generator_version']
            missing_dh = [comp for comp in required_data_handler if comp not in pipeline_data['data_handler']]
            if missing_dh:
                raise ValueError(f"Missing data handler components: {missing_dh}")
            print("✓ Data handler structure validation passed")
            
            # Phase 2: Model Validation
            print("\nPhase 2: Model Validation")
            print("-" * 30)
            
            self.model = pipeline_data['model']
            config_dict = pipeline_data['config']
            
            required_model_attrs = ['predict', 'model', 'config']
            missing_attrs = [attr for attr in required_model_attrs if not hasattr(self.model, attr)]
            if missing_attrs:
                raise AttributeError(f"Model missing required attributes: {missing_attrs}")
            print("✓ Model attributes validation passed")
            
            # Test model with dummy data
            try:
                n_features = len(pipeline_data['data_handler']['features'])
                dummy_input = np.random.random((5, n_features))
                dummy_pred = self.model.predict(dummy_input)
                if not isinstance(dummy_pred, np.ndarray):
                    raise TypeError(f"Model prediction returned {type(dummy_pred)}, expected numpy.ndarray")
                if len(dummy_pred.shape) != 1 or len(dummy_pred) != 5:
                    raise ValueError(f"Unexpected prediction shape: {dummy_pred.shape}, expected (5,)")
                print("✓ Model prediction test passed")
            except Exception as e:
                raise RuntimeError(f"Model prediction test failed: {str(e)}")
            
            # Phase 3: Function Version Validation
            print("\nPhase 3: Function Version Validation")
            print("-" * 30)
            
            self.data_handler.preprocessor = pipeline_data['data_handler']['preprocessor']
            self.data_handler.feature_generator = pipeline_data['data_handler']['feature_generator']
            self.data_handler.features = pipeline_data['data_handler']['features']
            
            # Version validation
            preprocessor_version = getattr(self.data_handler.preprocessor, 'version', 'unknown')
            saved_preprocessor_version = pipeline_data['data_handler']['preprocessor_version']
            if preprocessor_version != saved_preprocessor_version:
                print(f"⚠️ Warning: Current preprocessor version ({preprocessor_version}) "
                    f"differs from saved version ({saved_preprocessor_version})")
            
            feature_gen_version = getattr(self.data_handler.feature_generator, 'version', 'unknown')
            saved_feature_gen_version = pipeline_data['data_handler']['feature_generator_version']
            if feature_gen_version != saved_feature_gen_version:
                print(f"⚠️ Warning: Current feature generator version ({feature_gen_version}) "
                    f"differs from saved version ({saved_feature_gen_version})")
            
            # Phase 4: Data Handler Function Validation
            print("\nPhase 4: Data Handler Function Validation")
            print("-" * 30)
            
            # Validate preprocessor
            if self.data_handler.preprocessor:
                try:
                    dummy_df = pl.DataFrame({
                        'time_id': np.arange(5),
                        'symbol_id': np.ones(5),
                        'weight': np.ones(5),
                        **{f'feature_{i:02d}': np.random.random(5) for i in range(79)}
                    })
                    processed_df = self.data_handler.preprocessor(dummy_df)
                    if not isinstance(processed_df, pl.DataFrame):
                        raise TypeError(f"Preprocessor returned {type(processed_df)}, expected polars.DataFrame")
                    print("✓ Preprocessor function test passed")
                except Exception as e:
                    raise RuntimeError(f"Preprocessor function test failed: {str(e)}")
            
            # Validate feature generator
            if self.data_handler.feature_generator:
                try:
                    dummy_df = pl.DataFrame({
                        'time_id': np.arange(5),
                        'symbol_id': np.ones(5),
                        'weight': np.ones(5),
                        **{f'feature_{i:02d}': np.random.random(5) for i in range(79)}
                    })
                    generated_df = self.data_handler.feature_generator(dummy_df)
                    if not isinstance(generated_df, pl.DataFrame):
                        raise TypeError(f"Feature generator returned {type(generated_df)}, expected polars.DataFrame")
                    print("✓ Feature generator function test passed")
                except Exception as e:
                    raise RuntimeError(f"Feature generator function test failed: {str(e)}")
            
            # Validate features list
            if not self.data_handler.features:
                raise ValueError("Features list is empty")
            if not all(isinstance(f, str) for f in self.data_handler.features):
                raise TypeError("All feature names must be strings")
            if len(self.data_handler.features) != len(set(self.data_handler.features)):
                raise ValueError("Duplicate feature names found")
            print("✓ Features list validation passed")
            
            # Phase 5: Pipeline Information
            print("\n" + "="*50)
            print("Pipeline Information:")
            print("="*50)
            
            print(f"\n1. Model Configuration:")
            print(f"   - Model Type: {config_dict['model']['name']}")
            print(f"   - Number of Features: {pipeline_data['data_handler']['n_features']}")
            print(f"   - Model Parameters:")
            for k, v in config_dict['model']['params'].items():
                print(f"     * {k}: {v}")
            
            print(f"\n2. Data Processing:")
            print(f"   - Preprocessor: {self.data_handler.preprocessor.__name__ if self.data_handler.preprocessor else 'None'}")
            print(f"     * Version: {saved_preprocessor_version}")
            print(f"     * Description: {pipeline_data['data_handler']['preprocessor_description']}")
            print(f"   - Feature Generator: {self.data_handler.feature_generator.__name__ if self.data_handler.feature_generator else 'None'}")
            print(f"     * Version: {saved_feature_gen_version}")
            print(f"     * Description: {pipeline_data['data_handler']['feature_generator_description']}")
            print(f"   - First 5 Features: {self.data_handler.features[:5]}")
            
            print(f"\n3. Split Strategy:")
            print(f"   - Type: {config_dict['split_strategy']['type']}")
            for k, v in config_dict['split_strategy']['params'].items():
                print(f"   - {k}: {v}")
            
            print(f"\n4. Load Location:")
            print(f"   - Path: {load_path}")
            print(f"   - Dataset Name: {config_dict['paths']['dataset_name']}")
            print(f"   - Original Save Timestamp: {pipeline_data['timestamp']}")
            
            # Final validation: Complete pipeline test
            try:
                dummy_features = {f: np.random.random(5) for f in self.data_handler.features}
                dummy_df = pl.DataFrame({
                    **dummy_features,
                    'weight': np.ones(5)
                })
                X, _, _ = self.data_handler.get_feature_data(dummy_df)
                final_pred = self.model.predict(X)
                print("\n✓ Complete pipeline test passed")
            except Exception as e:
                raise RuntimeError(f"Complete pipeline test failed: {str(e)}")
            
            print("\nPipeline loaded and validated successfully! ✓")
            
        except Exception as e:
            print(f"\nError loading pipeline: {str(e)}")
            raise

    def upload_to_kaggle(self, dataset_title: Optional[str] = None):
        """Upload this pipeline to Kaggle dataset"""
        self.kaggle_handler.upload_pipeline(self, dataset_title)

## Configuration Class

In [14]:
@dataclass
class ModelConfig:
    name: str = 'lightgbm'
    params: Dict[str, Any] = None
    custom_metrics: Dict[str, Callable] = field(default_factory=dict)
    
    def __post_init__(self):
        if self.params is None:
            self.params = self.get_default_params()
    
    def get_default_params(self) -> Dict[str, Any]:
        params = {
            'lightgbm': {
                'objective': 'regression',
                'metric': 'rmse',
                'verbosity': -1,
                'boosting_type': 'gbdt',
                'learning_rate': 0.05,
            },
            'xgboost': {
                'objective': 'reg:squarederror',
                'eval_metric': 'rmse',
                'verbosity': 0,
            },
            # 'neural_network': {
            #     'learning_rate': 0.001,
            #     'batch_size': 512,
            #     'epochs': 10,
            # }
        }
        return params.get(self.name, {})

@dataclass
class Config:
    # Model
    model: ModelConfig = ModelConfig()
    # Paths
    model_path: str = f"{MODEL_PATH}/pipeline.pkl"
    dataset_name: str = "jane-street-model"
    # Data loading
    partition_range: Optional[List[int]] = None
    # Training
    split_strategy: SplitStrategy = field(default_factory=lambda: TimeBasedSplit(train_ratio=0.75, test_ratio=0.2))
    seed: int = 42
    
    def __post_init__(self):
        np.random.seed(self.seed)

## Custom Function

#### reduce memory

In [15]:
def reduce_memory(df: pl.DataFrame) -> pl.DataFrame:
    """Optimize data types for memory usage in Polars"""
    start_mem = df.estimated_size() / (1024**2)
    print(f'Memory usage of dataframe is {start_mem:.2f} MB')
    
    for col in df.columns:
        col_type = df[col].dtype
        
        if col_type in [pl.Float64, pl.Float32, pl.Int64, pl.Int32, pl.Int16, pl.Int8]:
            c_min = df[col].drop_nulls().min()
            c_max = df[col].drop_nulls().max()
            
            if c_min is not None and c_max is not None:  # null check 추가
                if col_type in [pl.Int64, pl.Int32, pl.Int16, pl.Int8]:
                    if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
                        df = df.with_columns(pl.col(col).cast(pl.Int8))
                    elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:
                        df = df.with_columns(pl.col(col).cast(pl.Int16))
                    elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:
                        df = df.with_columns(pl.col(col).cast(pl.Int32))
                    elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max:
                        df = df.with_columns(pl.col(col).cast(pl.Int64))
                else:
                    if c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:
                        df = df.with_columns(pl.col(col).cast(pl.Float32))
                    else:
                        df = df.with_columns(pl.col(col).cast(pl.Float64))
        
        elif col_type == pl.Utf8:
            df = df.with_columns(pl.col(col).cast(pl.Categorical))
    
    end_mem = df.estimated_size() / (1024**2)
    print(f'Memory usage after optimization is: {end_mem:.2f} MB')
    print(f'Decreased by {100 * (start_mem - end_mem) / start_mem:.1f}%')
    
    return df

#### Preprocessor

In [16]:
@versioned_function("1.0.0", "Initial preprocessor with basic cleaning")
def default_preprocessor(df: pl.DataFrame) -> pl.DataFrame:
    """Default preprocessing function"""
    df = reduce_memory(df)
    return df

#### Feature Generator

In [17]:
import gc
import numpy as np

@versioned_function("1.1.0", "Added time-based features and symbol_id processing")
def default_feature_generator(df: pl.DataFrame) -> pl.DataFrame:
    """Feature generation with time-based features"""
    # Add time-based features using polars expressions
    result = df.with_columns([
        (2 * np.pi * pl.col('time_id') / 967).sin().alias('feature_sin_time_id'),
        (2 * np.pi * pl.col('time_id') / 967).cos().alias('feature_cos_time_id'),
        (2 * np.pi * pl.col('time_id') / 483).sin().alias('feature_sin_time_id_halfday'),
        (2 * np.pi * pl.col('time_id') / 483).cos().alias('feature_cos_time_id_halfday')
    ])

    # Fill NA values and rename columns
    result = (result
    .fill_null(-1)
    .rename({
        'symbol_id': 'feature_symbol_id',
        'weight': 'feature_weight'
    }))

    # Select and reorder columns
    feature_cols = ['feature_symbol_id', 'feature_sin_time_id', 'feature_cos_time_id',
                    'feature_sin_time_id_halfday', 'feature_cos_time_id_halfday', 'feature_weight']
    feature_cols.extend([f'feature_0{i}' if i < 10 else f'feature_{i}'
                         for i in range(79)])

    # Add target column if it exists
    if 'responder_6' in result.columns:
        feature_cols.insert(0, 'responder_6')

    return result.select(feature_cols) 

#### Helper Functions

In [18]:
lags_: pl.DataFrame | None = None

def predict(test: pl.DataFrame, lags: pl.DataFrame | None) -> pl.DataFrame:
    """Competition prediction function"""
    global pipeline
    global lags_

    row_ids = test['row_id'].to_numpy()

    if lags is not None:
        lags_ = lags

    if pipeline.data_handler.preprocessor:
        test = pipeline.data_handler.preprocessor(test)
    if pipeline.data_handler.feature_generator:
        test = pipeline.data_handler.feature_generator(test)

    predictions = pipeline.predict(test)
    print(f"Predictions shape: {predictions.shape}")
    print(f"Predictions: {predictions}")

    result = pl.DataFrame({
        'row_id': row_ids,
        'responder_6': predictions
    })
    
    # Validation checks
    assert isinstance(result, (pl.DataFrame, pd.DataFrame))
    assert result.columns == ['row_id', 'responder_6']
    assert len(result) == len(test)
    
    return result

def run_inference_only(dataset_name: str, model_filename: str = 'pipeline.pkl') -> Pipeline:
    """Kaggle dataset에서 모델 로드하고 inference 준비"""
    if not IS_KAGGLE:
        raise ValueError("This function is for Kaggle environment only")
    
    # Kaggle dataset에서 모델 파일 경로
    model_path = f'/kaggle/input/{dataset_name}/{dataset_name}.pkl'
    
    # 파이프라인 초기화 및 모델 로드
    pipeline = Pipeline(Config())
    print(f"Loading model from {model_path}")
    pipeline.load(model_path)
    
    return pipeline

# Run Experiments

In [19]:
INFERENCE_ONLY = False  # True: inference만 실행, False: 학습 포함
OPTIMIZE_HYPERPARAMS = False  # True: 하이퍼파라미터 최적화 실행
NICKNAME = "alvinlee9"  # Kaggle nickname
BASE_DATASET_NAME = "jane-street-model-v1"  # Base dataset name
DATASET_NAME = f"{BASE_DATASET_NAME}" if IS_KAGGLE else f"{NICKNAME}/{BASE_DATASET_NAME}"

if INFERENCE_ONLY and IS_KAGGLE:
    # Inference only mode
    print("Running in inference-only mode...")
    pipeline = run_inference_only(DATASET_NAME)
else:
    # Training mode
    config = Config(
        # partition_range=[6,7,8,9],
        model=ModelConfig(
            name='lightgbm',
            params={
                'objective': 'regression_l2',
                'metric': 'rmse',
                'boosting_type': 'gbdt',
                'learning_rate': 0.1,
                'random_state': 42,
                'verbose': 1,
                'device': 'cpu',
            },
            custom_metrics={},
        ),
        dataset_name=DATASET_NAME,
        split_strategy=TimeSeriesKFold(n_splits=5, test_ratio=0.2),
        seed=42
    )
    
    pipeline = Pipeline(config)
    
    if not IS_KAGGLE:
        # Local training
        print("Training model locally...")
        holdout_test = pipeline.train(
            preprocessor=default_preprocessor,
            feature_generator=default_feature_generator,
            optimize=OPTIMIZE_HYPERPARAMS,
            n_trials=100 if OPTIMIZE_HYPERPARAMS else None
        )

        print("\nUploading pipeline to Kaggle...")
        pipeline.upload_to_kaggle()

        # Evaluate on holdout test set using R2
        print("\nEvaluating on holdout test set...")
        test_X, test_y, test_w = pipeline.data_handler.get_feature_data(holdout_test)
        test_pred = pipeline.predict(holdout_test)
        
        # Calculate R2 score
        _, r2_score, _ = r2_metric(test_y, test_pred, test_w)
        print(f"Holdout test R2 score: {r2_score:.4f}")
        
        # Predict on competition test set if available
        if pipeline.data_handler.test_data is not None:
            print("\nPredicting on competition test set...")
            test_data = pipeline.data_handler.test_data
            print(f"Test data shape: {test_data.shape}")

            test_pred = pipeline.predict(test_data)
            print(f"Test predictions shape: {test_pred.shape}")
            print(test_pred)
    else:
        # Kaggle training
        print("Training model in Kaggle environment...")
        pipeline.train(
            preprocessor=default_preprocessor,
            feature_generator=default_feature_generator,
            optimize=False
        )

Training model locally...
Loading and preparing data...
Memory usage of dataframe is 16372.62 MB
Memory usage after optimization is: 16058.01 MB
Decreased by 1.9%
Memory usage of dataframe is 0.01 MB
Memory usage after optimization is: 0.01 MB
Decreased by 6.0%
Splitting data using configured strategy...

Holdout Test Split Info:
Total unique dates: 1699
Train dates range: 0 - 1359
Test dates range: 1360 - 1698
Train samples: 34,712,738, Test samples: 12,414,600

Time Series 5-Fold Split Info:
Total unique dates: 1360
Initial train size: 226 dates
Validation size: ~226 dates per fold

Fold 1:
Train dates range: 0 - 225
Val dates range: 226 - 451
Train samples: 2,834,811, Val samples: 3,786,540

Fold 2:
Train dates range: 0 - 451
Val dates range: 452 - 677
Train samples: 6,621,351, Val samples: 5,124,619

Fold 3:
Train dates range: 0 - 677
Val dates range: 678 - 903
Train samples: 11,745,970, Val samples: 6,715,016

Fold 4:
Train dates range: 0 - 903
Val dates range: 904 - 1129
Train sa

100%|██████████| 258k/258k [00:01<00:00, 212kB/s]  


Upload successful: jane-street-model-v1.pkl (258KB)
Dataset created successfully

Evaluating on holdout test set...
Starting prediction...
Input DataFrame shape: (12414600, 86)
Available features: ['feature_symbol_id', 'feature_sin_time_id', 'feature_cos_time_id', 'feature_sin_time_id_halfday', 'feature_cos_time_id_halfday', 'feature_weight', 'feature_00', 'feature_01', 'feature_02', 'feature_03', 'feature_04', 'feature_05', 'feature_06', 'feature_07', 'feature_08', 'feature_09', 'feature_10', 'feature_11', 'feature_12', 'feature_13', 'feature_14', 'feature_15', 'feature_16', 'feature_17', 'feature_18', 'feature_19', 'feature_20', 'feature_21', 'feature_22', 'feature_23', 'feature_24', 'feature_25', 'feature_26', 'feature_27', 'feature_28', 'feature_29', 'feature_30', 'feature_31', 'feature_32', 'feature_33', 'feature_34', 'feature_35', 'feature_36', 'feature_37', 'feature_38', 'feature_39', 'feature_40', 'feature_41', 'feature_42', 'feature_43', 'feature_44', 'feature_45', 'feature_46

In [20]:
if IS_KAGGLE:
    import kaggle_evaluation.jane_street_inference_server

    if not 'pipeline' in globals():  # pipeline이 아직 정의되지 않은 경우
        # Inference only mode로 가정하고 모델 로드
        pipeline = run_inference_only(DATASET_NAME)
    
    print("Setting up for competition submission...")
    inference_server = kaggle_evaluation.jane_street_inference_server.JSInferenceServer(
        predict
    )
    
    if os.getenv('KAGGLE_IS_COMPETITION_RERUN'):
        print("Starting inference server...")
        inference_server.serve()
    else:
        print("Running local gateway...")
        inference_server.run_local_gateway(
            (f'{BASE_PATH}/test.parquet', f'{BASE_PATH}/lags.parquet')
        )


## Custom Experiment Example

In [21]:
"""
# Custom preprocessing and feature generation example
def my_preprocessor(df: pl.DataFrame) -> pl.DataFrame:
    return df.with_columns([
        pl.col('weight').fill_null(pl.col('weight').mean()),
        pl.col('feature_00').clip(-3, 3),
        pl.col('feature_01').clip(-3, 3),
    ])

def my_feature_generator(df: pl.DataFrame) -> pl.DataFrame:
    return df.with_columns([
        # Moving statistics
        pl.col('feature_00').rolling_mean(window_size=10).alias('feature_00_ma10'),
        pl.col('feature_00').rolling_std(window_size=10).alias('feature_00_std10'),
        
        # Feature interactions
        (pl.col('feature_02') / (pl.col('feature_03') + 1e-7)).alias('feature_ratio_02_03'),
        
        # Group statistics
        pl.col('feature_00').mean().over('symbol_id').alias('feature_00_symbol_mean'),
    ])

# Run custom experiment
config = Config(...)
pipeline = Pipeline(config)
pipeline.train(
    preprocessor=my_preprocessor,
    feature_generator=my_feature_generator,
    optimize=True
)
"""

"\n# Custom preprocessing and feature generation example\ndef my_preprocessor(df: pl.DataFrame) -> pl.DataFrame:\n    return df.with_columns([\n        pl.col('weight').fill_null(pl.col('weight').mean()),\n        pl.col('feature_00').clip(-3, 3),\n        pl.col('feature_01').clip(-3, 3),\n    ])\n\ndef my_feature_generator(df: pl.DataFrame) -> pl.DataFrame:\n    return df.with_columns([\n        # Moving statistics\n        pl.col('feature_00').rolling_mean(window_size=10).alias('feature_00_ma10'),\n        pl.col('feature_00').rolling_std(window_size=10).alias('feature_00_std10'),\n        \n        # Feature interactions\n        (pl.col('feature_02') / (pl.col('feature_03') + 1e-7)).alias('feature_ratio_02_03'),\n        \n        # Group statistics\n        pl.col('feature_00').mean().over('symbol_id').alias('feature_00_symbol_mean'),\n    ])\n\n# Run custom experiment\nconfig = Config(...)\npipeline = Pipeline(config)\npipeline.train(\n    preprocessor=my_preprocessor,\n    feat