 参数估计中的频率主义学派和贝叶斯学派
 
 事实上,概率模型的训练过程就是参数估计(parameter estimation)过程对于参数估计,统计学界的两个学派分别提供了不同的解决方案:频率主义学派(Frequentist)认为参数虽然未知,但却是客观存在的固定值,因此,可通过优化似然函数等准则来确定参数值;贝叶斯学派(Bayesian)则认为参数是未观察到的随机变量,其本身也可有分布,因此,可假定参数服从一个先验分布,然后基于观测到的数据来计算参数的后验分布.本节介绍源自频率主义学派的极大似然估计(Maximum Likelihood Estimation,简称MLE),这是根据数据采样来估计概率分布参数的经典方法.

In [35]:
#Compute the log of the sum of exponentials of input elements
from scipy.special import  logsumexp
#View inputs as arrays with at least two dimensions.
#np.atleast_2d
from sklearn import datasets
#from sklearn.naive_bayes import GaussianNB
import numpy as np
from abc import ABCMeta, abstractmethod

BaseNB <- GaussianNB

假设类条件概率服从高斯分布，其概率密度函数为：
$$P(x_i | y_k)=\frac{1}{\sqrt{2\pi\sigma^2_{yk}}}exp(-\frac{(x_i-\mu_{yk})^2}{2\sigma^2_{yk}})$$
数学期望(mean)：$\mu$，方差：$\sigma^2=\frac{\sum(X-\mu)^2}{N}$

可以利用MLE对$\mu$和$\sigma^2$进行参数

In [36]:
class BaseNB(metaclass=ABCMeta):
    
    @abstractmethod
    def _joint_log_likelihood(self, X):
        """
        计算没有被归一化的后验概率的对数 I.e. ``log P(c) + log P(x|c)``, 右边分子部分
        Returns: [n_sample, n_classes]
        """
        pass
    def predict(self, X):
        """
        X : array-like, shape = [n_samples, n_features]
        """
        jll = self._joint_log_likelihood(X)
        return self.classes_[np.argmax(jll, axis=1)]

    def predict_log_proba(self, X):
        """
        X : array-like, shape = [n_samples, n_features]
        Returns : array-like, shape = [n_samples, n_classes]
        """
        jll = self._joint_log_likelihood(X)
        log_prob_x = logsumexp(jll, axis=1)
        return jll - np.atleast_2d(log_prob_x).T

    def predict_proba(self, X):
        """
        X : array-like, shape = [n_samples, n_features]
        Returns : array-like, shape = [n_samples, n_classes]
        """
        return np.exp(self.predict_log_proba(X))

In [37]:
class GaussianNB(BaseNB):
    def __init__(self, priors=None, var_smoothing=1e-9):
        self.priors = priors
        self.var_smoothing = var_smoothing
    def _get_mean_variance(self, X, sample_weight=None):
        # Compute (potentially weighted) mean and variance of new datapoints
        if sample_weight is not None:
            n_new = float(sample_weight.sum())
            new_mu = np.average(X, axis=0, weights=sample_weight / n_new)
            new_var = np.average((X - new_mu) ** 2, axis=0,
                                 weights=sample_weight / n_new)
        else:
            n_new = X.shape[0]
            new_var = np.var(X, axis=0)
            new_mu = np.mean(X, axis=0)

        return new_mu, new_var
    def fit(self, X, y, sample_weight=None):
        # If the ratio of data variance between dimensions is too small, it
        # will cause numerical errors. To address this, we artificially
        # boost the variance by epsilon, a small fraction of the standard
        # deviation of the largest dimension.
        self.epsilon_ = self.var_smoothing * np.var(X, axis=0).max()
        self.classes_ = np.unique(y)
        n_features = X.shape[1]
        n_classes = len(self.classes_)
        self.theta_ = np.zeros((n_classes, n_features))
        self.sigma_ = np.zeros((n_classes, n_features))
        self.class_count_ = np.zeros(n_classes, dtype=np.float64)

        for (i, y_i) in enumerate(self.classes_):
            X_i = X[y == y_i, :]
            if sample_weight is not None:
                sw_i = sample_weight[y == y_i]
                N_i = sw_i.sum()
            else:
                sw_i = None
                N_i = X_i.shape[0]
            self.theta_[i, :], self.sigma_[i, :] = self._get_mean_variance(X_i, sw_i)
            self.class_count_[i] += N_i
        self.sigma_[:, :] += self.epsilon_

        # Set priors 
        if self.priors:
            self.class_prior_ = np.asarray(self.priors)
        else:
            # Empirical prior, with sample_weight taken into account
            self.class_prior_ = self.class_count_ / self.class_count_.sum()
        return self
    
    def _joint_log_likelihood(self, X):
        joint_log_likelihood = []
        for i in range(np.size(self.classes_)):
            jointi = np.log(self.class_prior_[i])
            n_ij = - 0.5 * np.sum(np.log(2. * np.pi * self.sigma_[i, :]))
            n_ij -= 0.5 * np.sum(((X - self.theta_[i, :]) ** 2) /
                                 (self.sigma_[i, :]), 1)
            joint_log_likelihood.append(jointi + n_ij)

        joint_log_likelihood = np.array(joint_log_likelihood).T
        return joint_log_likelihood

In [38]:
iris = datasets.load_iris()
gnb = GaussianNB(priors=[.3, .2, .5])
gnb.fit(iris.data, iris.target)
y_pred = gnb.fit(iris.data, iris.target).predict(iris.data)
#gnb.predict_proba(iris.data)
(y_pred == iris.target).sum() / len(iris.target)

0.9533333333333334

----------------
 类别不平衡问题
 
 再缩放的思想虽简单,但实际操作却并不平凡,主要因为“训练集是真实样本总体的无偏采样”这个假设往往并不成立,也就是说,我们未必能有效地基于训练集观测几率来推断出真实几率.现有技术大体上有三类做法:第一类是直接对训练集里的反类样例进行“欠采样”(undersampling),即去除一些反例使得正、反例数目接近,然后再进行学习;第二类是对训练集里的正类样例进行“过采样”(oversampling)即增加一些正例使得正、反例数目接近,然后再进行学习;第三类则是直接基于原始训练集进行学习,但在用训练好的分类器进行预测时,将式(3.48)入到其决策过程中,称为“阈值移动”(threshold-moving).

In [28]:
class BaseDiscreteNB(BaseNB):
    """Abstract base class for naive Bayes on discrete/categorical data
    Any estimator based on this class should provide:
    __init__
    _joint_log_likelihood(X) as per BaseNB
    """

    def _update_class_log_prior(self, class_prior=None):
        n_classes = len(self.classes_)
        if class_prior is not None:
            self.class_log_prior_ = np.log(class_prior)
        elif self.fit_prior:
            # empirical prior, with sample_weight taken into account
            self.class_log_prior_ = (np.log(self.class_count_) -
                                     np.log(self.class_count_.sum()))
        else:
            self.class_log_prior_ = np.full(n_classes, -np.log(n_classes))


    def fit(self, X, y, sample_weight=None):
        """Fit Naive Bayes classifier according to X, y
        X : {array-like, sparse matrix}, shape = [n_samples, n_features]
        y : array-like, shape = [n_samples]
        sample_weight : array-like, shape = [n_samples], (default=None)
            Weights applied to individual samples (1. for unweighted).
        """
      
        _, n_features = X.shape
        labelbin = LabelBinarizer()
        Y = labelbin.fit_transform(y)
        self.classes_ = labelbin.classes_
        if Y.shape[1] == 1:
            Y = np.concatenate((1 - Y, Y), axis=1)

        # LabelBinarizer().fit_transform() returns arrays with dtype=np.int64.
        # We convert it to np.float64 to support sample_weight consistently;
        # this means we also don't have to cast X to floating point
        Y = Y.astype(np.float64)
        if sample_weight is not None:
            sample_weight = np.atleast_2d(sample_weight)
            Y *= check_array(sample_weight).T

        class_prior = self.class_prior

        # Count raw events from data before updating the class log prior
        # and feature log probas
        n_effective_classes = Y.shape[1]
        self.class_count_ = np.zeros(n_effective_classes, dtype=np.float64)
        self.feature_count_ = np.zeros((n_effective_classes, n_features),
                                       dtype=np.float64)
        self._count(X, Y)
        self._update_feature_log_prob(alpha)
        self._update_class_log_prior(class_prior=class_prior)
        return self

    # XXX The following is a stopgap measure; we need to set the dimensions
    # of class_log_prior_ and feature_log_prob_ correctly. for interpreting NB-model as a linear model.
    def _get_coef(self):
        return (self.feature_log_prob_[1:]
                if len(self.classes_) == 2 else self.feature_log_prob_)

    def _get_intercept(self):
        return (self.class_log_prior_[1:]
                if len(self.classes_) == 2 else self.class_log_prior_)

    coef_ = property(_get_coef)
    intercept_ = property(_get_intercept)


0.9533333333333334

In [None]:
class MultinomialNB(BaseDiscreteNB):

    def __init__(self, alpha=1.0, fit_prior=True, class_prior=None):
        self.alpha = alpha
        self.fit_prior = fit_prior
        self.class_prior = class_prior

    def _count(self, X, Y):
        """Count and smooth feature occurrences."""
        if np.any((X.data if issparse(X) else X) < 0):
            raise ValueError("Input X must be non-negative")
        self.feature_count_ += safe_sparse_dot(Y.T, X)
        self.class_count_ += Y.sum(axis=0)

    def _update_feature_log_prob(self, alpha):
        """Apply smoothing to raw counts and recompute log probabilities"""
        smoothed_fc = self.feature_count_ + alpha
        smoothed_cc = smoothed_fc.sum(axis=1)

        self.feature_log_prob_ = (np.log(smoothed_fc) -
                                  np.log(smoothed_cc.reshape(-1, 1)))

    def _joint_log_likelihood(self, X):
        """Calculate the posterior log probability of the samples X"""
        return (np.dot(X, self.feature_log_prob_.T) +
                self.class_log_prior_)