In [1]:
from sklearn.datasets import load_iris
from sklearn.decomposition import PCA
from sklearn.feature_selection import SelectKBest
from sklearn.model_selection import GridSearchCV
from sklearn.pipeline import FeatureUnion, Pipeline
from sklearn.svm import SVC

In [2]:
iris = load_iris()

X, y = iris.data, iris.target

In [7]:
X.shape

(150, 4)

In [3]:
# This dataset is way too high-dimensional. Better do PCA:
pca = PCA(n_components=2)

# Maybe some original features were good, too?
selection = SelectKBest(k=1)

# Build estimator from PCA and Univariate selection:

combined_features = FeatureUnion([("pca", pca), ("univ_select", selection)])


In [4]:
# Use combined features to transform dataset:
X_features = combined_features.fit(X, y).transform(X)

In [5]:
print("Combined space has", X_features.shape[1], "features")

Combined space has 3 features


class ToyModel(Component):
    def __init__(self):
        TurnoverMax = 7.5
        TurnoverMaxAge = 30
        TurnoverMultiplier = 1.0

        turnover = Effect1D(None, None, 
                            'wala', 'ht_cpr',
                            lambda x : TurnoverMultiplier * TurnoverMax * (2/ (1 + np.exp(-4.5 * x/TurnoverMaxAge))-1)/100)
        
        RefiMax = 60
        RefiInflection = 150
        RefiRamp = 50
        RefiMultiplier = 1

        refi = Effect1D(None, None,
                        'incentive', 'rf_cpr',
                        lambda x: RefiMultiplier * RefiMax / (1+np.exp(-(x-RefiInflection)/RefiRamp))/100)
        super().__init__('ToyModel', [turnover, refi], False)
    
    def __call__(self, x):
        '''return smm'''
        return cpr2smm(super().__call__(x))

In [9]:
import numpy as np

def smm2cpr(smm):
    return 1 - np.power(1 - smm, 12)


def cpr2smm(cpr):
    return 1 - np.power(1 - cpr, 1/12)

In [39]:
from sklearn.base import BaseEstimator, RegressorMixin, TransformerMixin
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import FeatureUnion, Pipeline
from sklearn.linear_model._glm import _GeneralizedLinearRegressor
import pandas as pd
from sklearn.metrics import root_mean_squared_error
from sklearn._loss import HalfPoissonLoss, HalfSquaredError

In [None]:



class PrepayRegressor(_GeneralizedLinearRegressor):
    '''Regression model for prepayment:
    Functions:
    - fit: fit the model
    - predict: predict the prepayment rate
    - set_params: set a subset of effects for calibration while others are fixed
    - get_params: get the parameters of the model
    - get_effects: get the effects of the model
    - set_effects: set the effects of the model
    - seet_regression_type: set the regression type Least-squre or poisson regression
    '''
    def __init__(self,         
                 effects: FeatureUnion,
                 multiplicative = True,
                 solver="lbfgs",
                 max_iter=100,
                 tol=1e-4,
                 warm_start=False,
                 verbose=0,
                 ):
        super().__init__(
                        alpha=0,
                        fit_intercept=False,
                        solver=solver,
                        max_iter=max_iter,
                        tol=tol,
                        warm_start=warm_start,
                        verbose=verbose,
                    )
        self.effects = effects
        self.multiplicative = multiplicative
        self.set_fit_request()

    def _get_loss(self):
        if self.regression_type == 'ls':
            return HalfSquaredError()
        elif self.regression_type == 'poisson':
            return HalfPoissonLoss()
    
    def _linear_predictor(self, X):
        if self.multiplicative:
            return self.effects.transform(X).prod(axis=1)
        else:
            return self.effects.transform(X).sum(axis=1)
    
    def get_effect_names(self):
        return self.effects.get_feature_names_out()
    
    def set_fit_request(self, 
                        free_effects: list[str] = None,
                        regression_type:list['ls', 'poisson'] = 'ls', 
                        sample_weight = None):
        self.regression_type = regression_type
        self.sample_weight = sample_weight

    def score(self, X, y, sample_weight = None):
        '''if ls regression, return the rmse,
        if poisson regression, return the negative log likelihood'''
        y_pred = self.transform(X)
        if sample_weight is None:
            sample_weight = self.sample_weight
        
        if isinstance(sample_weight, str):
            sample_weight = X[sample_weight]

        if self.regression_type == 'ls':
            return root_mean_squared_error(y, y_pred, sample_weight)
        
        elif self.regression_type == 'poisson':
            return super().score(X, y, sample_weight)
        
        else:
            pass
    
    def fit(self, X, y, sample_weight = None):
        # optimization to find the best parameters
        return self

    def predict(self, X: pd.DataFrame):
        return self._linear_predictor(X)
    



In [None]:
class ToyTurnover(RegressorMixin, BaseEstimator):
    def __init__(self,         
                 TurnoverMax = 7.5,
                 TurnoverMaxAge = 30,
                 TurnoverMultiplier = 1.0):
        self.TurnoverMax = TurnoverMax
        self.TurnoverMaxAge = TurnoverMaxAge
        self.TurnoverMultiplier = TurnoverMultiplier

    def fit(self, X, y):
        return self

    def predict(self, X: pd.DataFrame):
        '''X must have wala column'''
        self.fit()
        if isinstance(X, pd.DataFrame):
            X_in = X['wala']
        elif isinstance(X, pd.Series):
            X_in = X.values
        else:
            X_in = X[:, 0]
        return cpr2smm(self.TurnoverMultiplier * self.TurnoverMax * (2/ (1 + np.exp(-4.5 * X_in/self.TurnoverMaxAge))-1)/100)


class ToyRefinance(RegressorMixin, BaseEstimator):
    def __init__(self,         
                 RefiMax = 60,
                 RefiInflection = 150,
                 RefiRamp = 50,
                 RefiMultiplier = 1):
        self.RefiMax = RefiMax
        self.RefiInflection = RefiInflection
        self.RefiRamp = RefiRamp
        self.RefiMultiplier = RefiMultiplier

    def fit(self, X, y):
        return self

    def predict(self, X: pd.DataFrame):
        '''X: incentive in bps'''
        self.fit(   )

        if isinstance(X, pd.DataFrame):
            X_in = X['incentive']
        elif isinstance(X, pd.Series):
            X_in = X.values
        else:
            X_in = X[:, 0]
        return cpr2smm(self.RefiMultiplier * self.RefiMax / (1+np.exp(-(X_in-self.RefiInflection)/self.RefiRamp))/100)


In [32]:
from sklearn.utils.estimator_checks import check_estimator

check_estimator(ToyTurnover())

TypeError: ToyTurnover.fit() missing 2 required positional arguments: 'X' and 'y'

In [None]:
class ToyModelRegressor(RegressorMixin, BaseEstimator):
    def __init__(self):
        pass

    def fit(self, X, y):
        return self

    def predict(self, X):
        return X.sum(axis=1)
    
    def score(self, X, y, sample_weight = None):
        return super().score(X, y, sample_weight)
    
    
comp = ColumnTransformer([
                        ('tf_smm', ToyTurnover(), ['wala']),
                        ('rf_smm', ToyRefinance(), ['incentive'])
                        ])

Pipeline()