In [69]:
import numbers
import warnings
from scipy.optimize import minimize_scalar
from scipy.special import gammaln
from scipy import stats

class KernelDensityEstimator(object):
    def __init__(self, X, bandwidth_method=None):
        # Store all the values
        self.X = np.asarray(X)
        self.n = len(self.X)
        self.bandwidth_method = bandwidth_method
        
        # Use cross-validation if desired
        if bandwidth_method=='cross_validation':
            self.bandwidth = self.cross_validation_optimisation()
        
    def evaluate_kernel(self, bandwidth, X, x):
        raise NotImplementedError
        
    def evaluate_kernel_product_integral(self, bandwidth, X1, X2):
        raise NotImplementedError
        
    def evaluate(self, x, bandwidth=None):
        # Evaluate the density at all points
        contributions = self.evaluate_kernel(bandwidth or self.bandwidth, 
                                             self.X, x)
        # Sum up the contributions
        return np.mean(contributions, axis=1)
    
    def cross_validation_score(self, bandwidth):
        # Return something unpleasant for negative bandwidths
        if bandwidth <= 0:
            return np.inf
        try:
            # Evaluate the integral of the product of the kernels for all points
            contributions = self.evaluate_kernel_product_integral(bandwidth, self.X, self.X)
            score = np.mean(contributions)
        except NotImplementedError:
            raise NotImplementedError('`evaluate_kernel_product_integral` must be implemented for cross validation.')
        # Evaluate the estimate at all data points
        contributions = self.evaluate_kernel(bandwidth, self.X, self.X)
        contributions[range(self.n), range(self.n)] = 0
        # Obtain the part due to the linear terms in (3.35)
        score -= 2 * np.sum(contributions) / (self.n * (self.n - 1))
        return score
    
    def cross_validation_optimisation(self):
        return minimize_scalar(self.cross_validation_score)['x']
    
class GaussianEstimator(KernelDensityEstimator):
    def __init__(self, X, bandwidth_method='silverman'):
        super(GaussianEstimator, self).__init__(X, bandwidth_method)
        
        if bandwidth_method=='silverman':
            self.bandwidth = self.silverman()
            
    def silverman(self):
        return 1.06 * np.std(self.X) * self.n ** -0.2
        
    def evaluate_kernel(self, bandwidth, X, x):
        # Evaluate the exponent of the gaussian kernel
        chi2 = (X[None, :] - x[:, None]) ** 2
        # Exponentiate and apply normalisation
        return np.exp(-.5 * chi2 / bandwidth**2) \
            / (np.sqrt(2 * np.pi) * bandwidth)
        
    def evaluate_kernel_product_integral(self, bandwidth, X1, X2):
        return self.evaluate_kernel(np.sqrt(2) * bandwidth, X1, X2)
    
class ImproperGammaEstimator(KernelDensityEstimator):
    def __init__(self, X, bandwidth_method='plugin'):
        super(ImproperGammaEstimator, self).__init__(X, bandwidth_method)
        
        if bandwidth_method=='plugin':
            self.bandwidth = self.plugin()
            
    def evaluate_kernel(self, bandwidth, X, x):
        # Calculate the shape and scale parameters
        theta = bandwidth ** 2
        k = 1 + x[:, None] / theta
        # Evaluate the gamma distribution
        X = X[None, :]
        loggamma = (k-1) * np.log(X) - X / theta - k * np.log(theta) - gammaln(k)
        return np.exp(loggamma)
    
    def evaluate_kernel_product_integral(self, bandwidth, X1, X2):
        # Calculate the shape and scale parameters
        theta = bandwidth ** 2
        k1 = 1 + X1[None, :] / theta
        k2 = 1 + X2[:, None] / theta
        # Evaluate
        loggamma = (1 - k1 - k2) * np.log(2) + gammaln(k1 + k2 - 1) \
            - gammaln(k1) - gammaln(k2)
        return np.exp(loggamma) / theta
            
    def plugin(self):
        # Compute the logarithmic mean and variance
        X = np.log(self.X)
        mu = np.mean(X)
        Sigma = np.std(X)
        
        return (2**0.8*np.exp(mu/2.)*Sigma)/(np.exp((17*Sigma**2)/8.)*self.n*(12 + 20*Sigma**2 + 9*Sigma**4))**0.2
    
    def plugin_score(self, bandwidth):
        X = np.log(self.X)
        mu = np.mean(X)
        Sigma = np.std(X)
        
        return np.exp(-3*mu + Sigma**2/8.) \
                *((64*np.exp((5*mu)/2.))/self.n + 
                  (np.exp((17*Sigma**2)/8.)* bandwidth**5*(12 + 20*Sigma**2 + 9*Sigma**4))/Sigma**5) \
            /(128.*np.sqrt(np.pi)*bandwidth)

In [62]:
X = np.random.normal(size=100)
kde = GaussianEstimator(X, 'cross_validation')
x = np.linspace(-5, 5)

plt.plot(x, kde.evaluate(x))
plt.plot(x, kernel_gaussian(1, np.zeros(1), x).T)

[<matplotlib.lines.Line2D at 0x1141d0990>]

In [110]:
distribution = stats.lognorm(1, scale=3)
X = distribution.rvs(size=500)
kde = ImproperGammaEstimator(X, 'cross_validation')
x = np.linspace(1e-4, 10)

plt.plot(x, kde.evaluate(x))
kde.bandwidth = kde.plugin()
plt.plot(x, kde.evaluate(x))
plt.plot(x, distribution.pdf(x))

[<matplotlib.lines.Line2D at 0x122de8890>]

In [121]:
runs = 200
bandwidths = np.logspace(-2, 0, 50)
distribution = stats.lognorm(1, scale=3)

plugin_scores=[]
cv_scores=[]

for run in range(runs):
    X = distribution.rvs(size=500)
    kde = ImproperGammaEstimator(X, None)
    plugin_scores.append([kde.plugin_score(bw) for bw in bandwidths])
    cv_scores.append([kde.cross_validation_score(bw) for bw in bandwidths])

In [132]:
def plot_scores(scores, f=5, color='k', ax=None):
    scores = np.asarray(scores)
    ax = ax or plt.gca()
    median = np.median(scores, axis=0)
    lower = np.percentile(scores, f, axis=0)
    upper = np.percentile(scores, 100-f, axis=0)
    ax.plot(bandwidths, median, color=color)
    ax.fill_between(bandwidths, lower, upper, alpha=.5, color=color)
    ax.scatter(bandwidths[np.argmin(median)], np.min(median), color=color)
    #idx = np.argmin(scores, axis=1)
    #print idx, scores[:, idx]
    #ax.scatter(bandwidths[idx], scores[0, idx])
    
plot_scores(plugin_scores)
plot_scores(cv_scores, color='g')
plt.xscale('log')

In [115]:
plt.plot(bandwidths, plugin_scores - np.min(plugin_scores), label='Plugin', ls='--', color='k')
plt.plot(bandwidths, cv_scores - np.min(cv_scores), label='Cross-validation', color='k')
plt.axvline(kde.cross_validation_optimisation(), color='k')
plt.axvline(kde.plugin(), ls='--', color='k')
plt.xscale('log')
plt.legend(loc='best', frameon=False)
plt.xlabel('Bandwidth')

<matplotlib.text.Text at 0x1221c90d0>