In [69]:
import numpy as np
import statistics as stat
from sklearn.model_selection import cross_val_score
from sklearn.preprocessing import StandardScaler
from typing import Callable

In [71]:
def dropout(x: np.ndarray, m: int, ϕ: float) -> np.ndarray:
    """ Regularizes matrix x using dropout """
    
    assert(0 <= ϕ <= 1)  # r is a valid probability
    
    # perform dropout w/ bernoulli dist.
    _, p  = x.shape
    drop  = np.random.binomial(1, 1-ϕ, (m, p))
    x_out = np.concatenate([d*x for d in drop])
    
    return {'ϕ': ϕ}, x_out

In [72]:
def noise_addition(x, sd=1):
    """ Regularizes matrix x with normal(0, sd) noise """
    
    assert(sd > 0)  # std. dev. is valid
    
    # add normal(0, sd) noise to x
    x_out = x + np.random.normal(loc=0, scale=sd, size=x.shape)
    
    return {'σ': σ}, x_out

In [73]:
def robust(x: np.ndarray, c: np.ndarray, Δ: np.ndarray=None) -> np.ndarray:
    """ 
    Regularizes matrix x using robust regularization
    The L2-norm of each col of noise = c[j]
    """
    assert(all(c > 0))  # valid constants
    
    if Δ is None:
        # add normalized noise
        rand = np.random.randn(*x.shape)                # n-by-p std. normal draws
        Δ    = rand * c / np.linalg.norm(rand, axis=0)  # generate Δ-values
    
    return {'Δ': Δ}, x + Δ

In [None]:
def regularize_blackbox (
    model, 
    x_train: np.ndarray, 
    y_train: np.ndarray, 
    m: int,
    k: int,
    c: np.ndarray, 
    reg_method: str, 
    eval_method: str
) -> Callable:
    """Automatically tunes a Blackbox regression model.

    Parameters
    ----------
    model : object
        fit method takes as input an matrix X ∈ R_n×p and a vector of responses 
        Y ∈ R_n and returns a function that maps inputs to outputs.
        
    x_train : array-like of shape (n,p)
        An array of training data.
        
    y_train : array-like of shape (n,)
        A vector of response values to X_train.
        
    m : int
        A positive integer that specifies the number of Monte Carlo replicates
        to be used if the regularization specified is Dropout or NoiseAddition.

    c : array-like of shape (p,)
        A vector of column bounds to be used if the method specified
        is Robust.
        
    k : int
        A positive integer indicating the number of CV-folds to be
        used to tune the amount of regularization.
        
    reg_method : str
        A regularization method that belongs to the set
        {Dropout, NoiseAddition, Robust}.

    eval_method : str
        A criterion to be used to evaluate the regularization that belongs to
        the set {MSE, MAD} where MSE encodes mean square error and
        MAD encodes mean absolute deviation.

    Returns
    ------- 
    reg_model: function
        A predictive model that optimizes the specified criterion using 
        the specified regularization method.
    """
    
    # evaluation name for scikit later
    if eval_method == 'MSE':
        score_type = 'neg_mean_squared_error'
    else if eval_method == 'MAD':
        score_type = 'neg_mean_absolute_error'
    else:
        raise ValueError('Invalid eval_method value: {}'.format(eval_method))
    
    # tunable parameter values
    regularizers = {        # method        # function parameter generators
        'Dropout'       : [ dropout,        ( {'m': m, 'ϕ': p} for p in np.linspace(0, 1, 101) ) ],
        'NoiseAddition' : [ noise_addition, ( {'sd': sd} for sd in np.linspace(0.01, 5, 500) ) ],
        'Robust'        : [ robust,         ( {'c': c} for _ in range(m) ) ]
    }
    
    # standardize data
    scaler = StandardScaler()
    X_train = scaler.fit_transform(X_train)
    
    # get specified regularizer
    method, params = regularizers[eval_method]
    best_params = params[0]
    best_error  = np.inf
    
    # iterate over parameters
    for param_dict in params:
        # regularize data & tile obs.
        p, x_new = method(x, **param_dict)
        y_new = np.tile(y_train, x_new.shape[0] // n)
        
        # CV error
        cv_error = np.mean(
            cross_val_score(model, x_new, y_new, cv=k, scoring=eval_method)
        )
        
        # check for new best
        if cv_error < best_error:
            best_error = cv_error
            best_params.update(p)
    
    # best model with regularized data
    x_best = method(x_train, **best_params)
    best_fit = model.fit(x_best, y_new)
    
    # wrapper for model with tuned regularization
    def reg_model(x: np.ndarray):
        scaler = StandardScaler()
        x = scaler.fit_transform(x)
        _, x_reg = method(x, **best_params)
        return best_fit.predict(x_reg)
    
    return reg_model
