Skip to content

Commit

Permalink
PEP8. Refactored into Base, Naive, Tree
Browse files Browse the repository at this point in the history
  • Loading branch information
tommyod committed Jul 8, 2018
1 parent cc7fba3 commit 571bb53
Show file tree
Hide file tree
Showing 7 changed files with 652 additions and 702 deletions.
194 changes: 194 additions & 0 deletions KDEpy/BaseKDE.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Sun Feb 4 10:52:17 2018
@author: tommy
"""
from abc import ABC, abstractmethod
from collections.abc import Sequence
import pytest
import numbers
import numpy as np
from KDEpy.kernel_funcs import _kernel_functions
from KDEpy.bw_selection import _bw_methods


class BaseKDE(ABC):
"""
Abstract Base Class for every kernel density estimator.
This class is never instantiated, it merely defines some common methods
which never subclass must implement. In summary, it facilitates:
- The `_available_kernels` parameter
- Correct handling of `kernel` and `bw` in __init__
- Forces subclasses to implement `fit(data)`, converts `data` to
correct shape (obs, dims)
- Forces subclasses to implement `evaluate(grid_points)`, with handling
"""

_available_kernels = _kernel_functions
_bw_methods = _bw_methods

@abstractmethod
def __init__(self, kernel: str, bw: float):
"""Initialize the kernel density estimator.
The return type must be duplicated in the docstring to comply
with the NumPy docstring style.
Parameters
----------
kernel
Kernel function, or string matching available options.
bw
The bandwidth, either a number, a string or an array-like.
"""

# Verify that the choice of a kernel is valid, and set the function
akernels = sorted(list(self._available_kernels.keys()))
msg = f'Kernel must be a string or callable. Options: {akernels}'
if isinstance(kernel, str):
kernel = kernel.strip().lower()

if kernel not in akernels:
raise ValueError(msg)

self.kernel = self._available_kernels[kernel]
elif callable(kernel):
self.kernel = kernel
else:
raise ValueError(msg)

# bw may either be a positive number, a string, or array-like such that
# each point in the data has a uniue bw
if (isinstance(bw, numbers.Number) and bw > 0):
self.bw = bw
elif isinstance(bw, str):
kernel = kernel.strip().lower()
amethods = sorted(list(self._bw_methods.keys()))
if bw not in amethods:
msg = f'Kernel not recognized. Options are: {amethods}'
raise ValueError(msg)
self.bw = self._bw_methods[bw]
elif isinstance(bw, (np.ndarray, Sequence)):
self.bw = bw
else:
raise ValueError(f'Bandwidth must be > 0, array-like or a string.')

@abstractmethod
def fit(self, data):
"""
Fit the kernel density estimator to the data.
"""

# In the end, the data should be an ndarray of shape (obs, dims)
data = self._process_sequence(data)

assert len(data.shape) == 2
obs, dims = data.shape

if not obs > 0:
raise ValueError('Data must contain at least one data point.')
assert dims > 0
self.data = np.asfarray(data)

@abstractmethod
def evaluate(self, grid_points=None):
"""
Evaluate the kernel density estimator.
grid_points: positive integer (number of points), or a grid Sequence
or ndarray of shape (obs, dims)
"""
if not hasattr(self, 'data'):
raise ValueError('Must call fit before evaluating.')

# If no information is supplied at all, call the autogrid method
if grid_points is None:
self._user_supplied_grid = False
grid_points = self._autogrid(self.data)

# If a number is specified, interpret it as the number of grid points
elif isinstance(grid_points, numbers.Number):
if not (isinstance(grid_points, numbers.Integral) and
grid_points > 0):
raise ValueError('grid_points must be positive integer.')
self._user_supplied_grid = False
grid_points = self._autogrid(self.data, num_points=grid_points)

else:
self._user_supplied_grid = True
grid_points = self._process_sequence(grid_points)

obs, dims = grid_points.shape
if not obs > 0:
raise ValueError('Grid must contain at least one data point.')

self.grid_points = grid_points

assert hasattr(self, '_user_supplied_grid')

def _process_sequence(self, sequence_array_like):
"""
Process a sequence of data input to ndarray of shape (obs, dims).
"""
if isinstance(sequence_array_like, Sequence):
out = np.asfarray(sequence_array_like).reshape(-1, 1)
elif isinstance(sequence_array_like, np.ndarray):
if len(sequence_array_like.shape) == 1:
out = sequence_array_like.reshape(-1, 1)
elif len(sequence_array_like.shape) == 2:
out = sequence_array_like
else:
raise ValueError('Must be of shape (obs, dims)')
else:
raise TypeError('Must be of shape (obs, dims)')

return np.asarray_chkfinite(np.asfarray(out))

def _evalate_return_logic(self, evaluated, grid_points):
"""
Return either evaluation points y, or tuple (x, y) based on inputs.
"""
obs, dims = evaluated.shape
if self._user_supplied_grid:
if dims == 1:
return evaluated.ravel()
return evaluated
else:
if dims == 1:
return grid_points.ravel(), evaluated.ravel()
return grid_points, evaluated

@staticmethod
def _autogrid(data, num_points=1024, percentile=0.05):
"""
Automatically select a grid if the user did not supply one.
number of grid : should be a power of two
percentile : is how far out we go out
"""
obs, dims = data.shape
minimums, maximums = data.min(axis=0), data.max(axis=0)
ranges = maximums - minimums

grid_points = np.empty(shape=(num_points // 2**(dims - 1), dims))

generator = enumerate(zip(minimums, maximums, ranges))
for i, (minimum, maximum, rang) in generator:
outside_borders = max(percentile * rang, 3)
grid_points[:, i] = np.linspace(minimum - outside_borders,
maximum + outside_borders,
num=num_points // 2**(dims - 1))

return grid_points

def __call__(self, *args, **kwargs):
return self.evaluate(*args, **kwargs)


if __name__ == "__main__":
# --durations=10 <- May be used to show potentially slow tests
pytest.main(args=['.', '--doctest-modules', '-v'])
195 changes: 195 additions & 0 deletions KDEpy/NaiveKDE.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Sun Feb 4 10:52:17 2018
@author: tommy
"""
import pytest
import numbers
import numpy as np
from KDEpy.BaseKDE import BaseKDE


class NaiveKDE(BaseKDE):
"""
The class for a naive implementation of the KDE.
"""

def __init__(self, kernel='gaussian', bw=1):
"""
Initialize a naive KDE.
"""
super().__init__(kernel, bw)

def fit(self, data, weights=None):
"""Fit the KDE to the data.
Parameters
----------
data
The data points.
weights
The weights.
Returns
-------
self
Returns the instance.
Examples
--------
>>> data = [1, 3, 4, 7]
>>> kde = NaiveKDE().fit(data)
"""

# Sets self.data
super().fit(data)

# If weights were passed
if weights is not None:
if not len(weights) == len(data):
raise ValueError('Length of data and weights must match.')
else:
weights = self._process_sequence(weights)
self.weights = np.asfarray(weights)
else:
self.weights = np.ones_like(self.data)

self.weights = self.weights / np.sum(self.weights)

return self

def evaluate(self, grid_points=None):
"""Evaluate on the grid points.
"""

# This method sets self.grid points and verifies it
super().evaluate(grid_points)

# Return the array converted to a float type
grid_points = np.asfarray(self.grid_points)

# Create zeros on the grid points
evaluated = np.zeros_like(grid_points)

# For every data point, compute the kernel and add to the grid
bw = self.bw
if isinstance(bw, numbers.Number):
bw = np.asfarray(np.ones_like(self.data) * bw)
elif callable(bw):
bw = np.asfarray(np.ones_like(self.data) * bw(self.data))

for weight, data_point, bw in zip(self.weights, self.data, bw):
evaluated += weight * self.kernel(grid_points - data_point, bw=bw)

return self._evalate_return_logic(evaluated, grid_points)


if __name__ == "__main__":
# --durations=10 <- May be used to show potentially slow tests
pytest.main(args=['.', '--doctest-modules', '-v'])

if __name__ == '__main__':

import matplotlib.pyplot as plt

# Basic example of the naive KDE
# -----------------------------------------
data = [3, 3.5, 4, 6, 8]
kernel = 'gaussian'
bw = 1

plt.figure(figsize=(10, 4))
plt.title('Basic example of the naive KDE')

plt.subplot(1, 2, 1)
kde = NaiveKDE(kernel=kernel, bw=bw)
kde.fit(data)
x = np.linspace(0, 10, num=1024)
for d in data:
k = NaiveKDE(kernel=kernel, bw=bw).fit([d]).evaluate(x) / len(data)
plt.plot(x, k, color='k', ls='--')

y = kde.evaluate(x)
plt.plot(x, y)
plt.scatter(data, np.zeros_like(data))

plt.subplot(1, 2, 2)
kde = NaiveKDE(kernel=kernel, bw=bw)
kde.fit(data)
x = np.linspace(0, 10, num=1024)
for d in data:
k = NaiveKDE(kernel=kernel, bw=bw).fit([d]).evaluate(x) / len(data)
plt.plot(x, k, color='k', ls='--')

y = kde.evaluate(x)
plt.plot(x, y)
plt.scatter(data, np.zeros_like(data))
plt.show()

# Naive KDE with weights
# -----------------------------------------
data = [3, 3.5, 4, 6, 8]
weights = np.array([1, 1, 1, 1, 5])
weights = weights / np.sum(weights)
kernel = 'gaussian'
bw = 1

kde = NaiveKDE(kernel=kernel, bw=bw)
kde.fit(data, weights=weights)

x = np.linspace(0, 10, num=1024)
for d, w in zip(data, weights):
k = (NaiveKDE(kernel=kernel, bw=bw).fit([d], weights=[w]).evaluate(x) *
w)
plt.plot(x, k, color='k', ls='--')

y = kde.evaluate(x)
plt.title('Naive KDE with weights')
plt.plot(x, y)
plt.scatter(data, np.zeros_like(data))
plt.show()

# Naive KDE with variable h
# -----------------------------------------
data = [2, 3, 4, 5, 6, 7]
bws = [1, 2, 3, 4, 5, 6]
bws = [1 / k for k in bws]
kernel = 'gaussian'

kde = NaiveKDE(kernel=kernel, bw=bws)
kde.fit(data)

x = np.linspace(0, 10, num=1024)
for d, bw in zip(data, bws):
k = NaiveKDE(kernel=kernel, bw=bw).fit([d]).evaluate(x) / len(data)
plt.plot(x, k, color='k', ls='--')

y = kde.evaluate(x)
plt.title('Naive KDE with variable h')
plt.plot(x, y)
plt.scatter(data, np.zeros_like(data))
plt.show()

# Naive KDE with silverman
# -----------------------------------------
data = [2, 3, 4, 5, 6, 7]
bws = [1, 2, 3, 4, 5, 6]
bws = [1 / k for k in bws]
kernel = 'gaussian'

kde = NaiveKDE(kernel=kernel, bw='silverman')
kde.fit(data)

x = np.linspace(0, 10, num=1024)
for d, bw in zip(data, bws):
k = (NaiveKDE(kernel=kernel, bw='silverman').fit([d]).evaluate(x) /
len(data))
plt.plot(x, k, color='k', ls='--')

y = kde.evaluate(x)
plt.title('Naive KDE with silverman')
plt.plot(x, y)
plt.scatter(data, np.zeros_like(data))
plt.show()

0 comments on commit 571bb53

Please sign in to comment.