In [None]:

class BayesianTargetEncoder:
    # https://github.com/aslakey/CBM_Encoding/blob/master/gaussian_inverse_gamma_encoder.py
    def __init__(
        self,
        categorical_cols: List[str],
        u_0: float = 0.0,
        v: float = 1.0,
        alpha: float = 3.0,
        beta: float = 1.0,
        n_samples: int = 10,
        sample_size: float = .75,
        moments: str = "m",
        random_state: int = 1
    ):

        self._categorical_cols = categorical_cols
        self._alpha_prior = alpha
        self._beta_prior = beta
        self._u_0_prior = u_0
        self._v_prior = v
        self._gig_distributions = dict()
        self._random_state = random_state
        self._n_samples = n_samples
        self._sample_size = sample_size
        self.moments = moments
        np.random.seed(random_state)

    def fit(
        self,
        X: pd.DataFrame,
        y: pd.Series
    ):

        X_temp = X.copy(deep=True)

        # add target
        target_col = '_target'
        X_temp[target_col] = y

        for categorical_col in self._categorical_cols:

            # All Levels
            # Bootstrap samples may not contain all levels, so fill NA with priors
            ALL_LEVELS = X_temp.groupby(categorical_col)[target_col].count().reset_index()

            for i in range(self._n_samples):

                X_sample = X_temp[[categorical_col, target_col]].sample(
                    n=int(len(X_temp) * self._sample_size),
                    replace=True,
                    random_state=self._random_state + i
                )

                X_group = X_sample.groupby(categorical_col)[target_col].agg(["count", "mean", "std"])
                X_group.columns = [f"target_{agg_method}_grpby_{categorical_col}" for agg_method in X_group.columns]

                count_array = X_group[f"target_count_grpby_{categorical_col}"].to_numpy()
                mean_array = X_group[f"target_mean_grpby_{categorical_col}"].to_numpy()
                std_array = X_group[f"target_std_grpby_{categorical_col}"].to_numpy()

                # weighted means
                X_group['_u'] = self._calc_weighted_mean(
                    u_0=self._u_0_prior,
                    v=self._v_prior,
                    mean_array=mean_array,
                    count_array=count_array,
                )

                # new count
                X_group['_v'] = self._v_prior + count_array

                # new alpha
                X_group['_alpha'] = self._alpha_prior + count_array * .5

                # new beta
                X_group['_beta'] = self._calc_new_beta(
                    u_0=self._u_0_prior,
                    v=self._v_prior,
                    beta=self._beta_prior,
                    count_array=count_array,
                    mean_array=mean_array,
                    std_array=std_array
                )

                # fill NAs with prior
                X_group = pd.merge(ALL_LEVELS, X_group, on=categorical_col, how='left')
                X_group['_u'] = X_group['_u'].fillna(self._u_0_prior)
                X_group['_v'] = X_group['_v'].fillna(self._v_prior)
                X_group['_alpha'] = X_group['_alpha'].fillna(self._alpha_prior)
                X_group['_beta'] = X_group['_beta'].fillna(self._beta_prior)

                if categorical_col not in self._gig_distributions.keys():
                    self._gig_distributions[categorical_col] = X_group[[categorical_col, '_u', '_v', '_alpha', '_beta']]
                else:
                    self._gig_distributions[categorical_col][['_u', '_v', '_alpha', '_beta']] += X_group[['_u', '_v', '_alpha', '_beta']]

            # report mean alpha and beta:
            self._gig_distributions[categorical_col]['_u'] = self._gig_distributions[categorical_col]['_u'] / self._n_samples
            self._gig_distributions[categorical_col]['_v'] = self._gig_distributions[categorical_col]['_v'] / self._n_samples
            self._gig_distributions[categorical_col]['_alpha'] = self._gig_distributions[categorical_col]['_alpha'] / self._n_samples
            self._gig_distributions[categorical_col]['_beta'] = self._gig_distributions[categorical_col]['_beta'] / self._n_samples
        return self

    def transform(self, X, y=None):
        X_copy = X[self._categorical_cols].copy(deep=True)
        output_df = pd.DataFrame()

        for categorical_col in self._categorical_cols:
            if categorical_col not in self._gig_distributions.keys():
                raise AssertionError("Column " + categorical_col + " not fit by GIGEncoder")

            # add `_alpha` and `_beta` columns vi lookups, impute with prior
            X_temp = X_copy.merge(self._gig_distributions[categorical_col], on=[categorical_col], how='left')

            X_temp['_u'] = X_temp['_u'].fillna(self._u_0_prior)
            X_temp['_v'] = X_temp['_v'].fillna(self._v_prior)
            X_temp['_alpha'] = X_temp['_alpha'].fillna(self._alpha_prior)
            X_temp['_beta'] = X_temp['_beta'].fillna(self._beta_prior)

            #   encode with moments
            if 'm' in self.moments:
                output_df[categorical_col + '__M_u'] = X_temp["_u"]

                # check alpha > 1
                if (X_temp['_alpha'] <= 1).any():
                    raise ValueError("'alpha' must be greater than 1")
                output_df[categorical_col + '__M_v'] = X_temp["_beta"] / (X_temp['_alpha'] - 1)

            if 'v' in self.moments:
                output_df[categorical_col + '__V_u'] = X_temp["_beta"] / ((X_temp['_alpha'] - 1) * X_temp['_v'])

                if (X_temp['_alpha'] <= 2).any():
                    raise ValueError("'alpha' must be greater than 2")
                output_df[categorical_col + '__V_v'] = (X_temp["_beta"]**2) /\
                    (((X_temp['_alpha'] - 1)**2) * (X_temp['_alpha'] - 2))

        return output_df

    def _calc_weighted_mean(
        self,
        u_0: float,
        v: float,
        mean_array: np.array,
        count_array: np.array
    ) -> np.array:

        return (v * u_0 + count_array * mean_array) / (v + count_array)

    def _calc_new_beta(
        self,
        u_0: float,
        v: float,
        beta: float,
        count_array: np.array,
        mean_array: np.array,
        std_array: np.array
    ) -> np.array:

        new_beta = (count_array * v) / (count_array + v)
        new_beta *= .5 * (mean_array - u_0)**2
        new_beta += beta
        new_beta += .5 * std_array**2
        return new_beta
