In [23]:
from typing import List, Tuple, Dict, Any, Callable, Union, Optional
import warnings
import time
from datetime import datetime
import numpy as np
import pandas as pd
from scipy.optimize import brentq

from sklearn.neighbors import KernelDensity
from scipy import integrate
from scipy.stats import qmc


In [24]:
import json

def serialize_dict(dic: Dict) -> str:
    return json.dumps(
        obj=dic,
        default=lambda o: ' '.join(repr(o).split()),
        indent=4
    )

# Initalize search parameters

In [25]:
N_DIM = 2

# Generate uniform random samples in [0, 1]^p space
sampler = qmc.LatinHypercube(d=N_DIM)
X_TRAIN = sampler.random(n=50)

THR = 0.1  # The density threshold below which is considered "low"

TARGET_RATIO = 0.1  # Desired low density volume ratio

# Smooth Indicator Function for Low Density Value

In [26]:
def smooth_lower_bound_indicator(
    X: Union[float, np.ndarray],
    threshold: float,
    sigma: float = 0.01,
    eps: float = 0.01
) -> Union[float, np.ndarray]:
    """
    Compute a smooth approximation of the indicator function I(X < threshold)
    on [0, 1].
    
    This function uses a scaled and shifted logistic function to approximate
    the step function at the threshold, with a smooth transition. It avoids
    computing large exponential values for X >> threshold.

    Args:
        X (Union[float, np.ndarray]): Input value(s) to evaluate.
        threshold (float): The threshold value for the indicator function.
        sigma (float): Controls the steepness of the sigmoid.
        eps (float): Marginal value after transition

    Note:
        The transition width is controlled by sigma, where the function
        transitions from 1-eps/2 to 0+eps/2 over an interval of 2*sigma_eps
        centered at pos.
    """
    sigma_eps = sigma * np.log((2-eps)/eps)  # Half-width of the transition region
    pos = threshold + sigma_eps  # Center of the transition region

    # Compute the centered and scaled input
    z = (X - pos) / sigma

    return np.where(
        z > 10, 0,  # Avoid large exponential computations
        1 / (1 + np.exp(z))  # Compute sigmoid for other values
    )

In [27]:
import numpy as np
import plotly.graph_objects as go
from plotly.subplots import make_subplots

def sigmoid_plot_with_slider(sigma_range=(0.001, 0.1), sigma_steps=30):
    EPS = 0.01
    
    def sigmoid_width(sigma, eps):
        # Transition effective width according to eps
        return sigma * np.log((2-eps)/eps)
    
    def sigmoid_range(threshold, sigma, eps):
        sigma_eps = sigmoid_width(sigma, eps)
        # Create x values with denser sampling around POS
        x_min, x_max = 0, 1.8
        x_left = np.linspace(x_min, threshold, 100)
        x_middle = np.linspace(threshold, threshold+2*sigma_eps, 100)
        x_right = np.linspace(threshold+2*sigma_eps, x_max, 100)
        x = np.unique(np.concatenate([x_left, x_middle, x_right]))
        
        return x

    # Create figure
    fig = make_subplots(rows=1, cols=1)

    # Create x values
    x = np.linspace(0, 2, 300)

    # Create sigma values
    sigmas = np.logspace(np.log10(sigma_range[0]), np.log10(sigma_range[1]), sigma_steps)

    # Create traces for all sigma values
    for sigma in sigmas:
        sigma_eps = sigmoid_width(sigma, EPS)
        x = sigmoid_range(THR, sigma, EPS)
        x_eps = np.array([THR, THR+2*sigma_eps])
        y = smooth_lower_bound_indicator(x, THR, sigma, EPS)
        y_eps = smooth_lower_bound_indicator(x_eps, THR, sigma, EPS)
        fig.add_trace(go.Scatter(x=x, y=y, mode='lines', name=f'Sigmoid', visible=False))
        fig.add_trace(go.Scatter(x=x_eps, y=y_eps, mode='markers', 
                                 name='Transition points', marker=dict(color='red', size=5), visible=False))

    # Add veritical line at threshold
    fig.add_vline(x=THR, line_dash="dash", line_color="red")

    # Make the first sigmoid trace and threshold vline visible
    fig.data[0].visible = True
    fig.data[1].visible = True

    # Create slider steps
    steps = []
    for i, sigma in enumerate(sigmas):
        sigma_eps = sigmoid_width(sigma, EPS)
        step = dict(
            method="update",
            args=[
                {"visible": [False] * len(fig.data)},
                {"annotations": [{
                    "text": f"σ_eps: {sigma_eps:.4f}",
                    "x": 1, "y": 1, "xref": "paper", "yref": "paper",
                    "showarrow": False, "font": {"size": 12},
                    "bgcolor": "rgba(255,255,255,0.8)", "bordercolor": "rgba(0,0,0,0.5)", "borderwidth": 1
                }]}
            ],
            label=f"{sigma:.4f}"
        )
        step["args"][0]["visible"][i*2] = True  # Make sigmoid visible
        step["args"][0]["visible"][i*2+1] = True  # Make transition points visible
        steps.append(step)

    # Add slider
    sliders = [dict(
        active=0,
        currentvalue={"prefix": "σ: "},
        pad={"t": 50},
        steps=steps
    )]

    # Update layout
    fig.update_layout(
        title=f"Sigmoid Lower Bound Indicator of threshold={THR} and eps={EPS} for varying sigma",
        xaxis_title="X Values",
        yaxis_title="Sigmoid(X)",
        showlegend=False,
        sliders=sliders
    )

    return fig

# Create and show the plot
fig = sigmoid_plot_with_slider()
fig.show()


overflow encountered in exp



### A plot function of low density volume ratio evolution for varying bandwidth

In [28]:
import plotly.graph_objects as go
from plotly.subplots import make_subplots

def plot_ratio(bandwidths: np.ndarray, ratios: np.ndarray):
    fig = make_subplots(rows=1, cols=1)

    # Add points of evalaluated points
    fig.add_trace(go.Scatter(
        x=bandwidths,
        y=ratios,
        mode='lines+markers',
        name='Evaluated Ratios',
    ))
    # Update layout
    fig.update_layout(
        title='Low Density Ratio',
        xaxis_title='Bandwidth',
        yaxis_title='Low Density Ratio',
        showlegend=True,
        hovermode='closest'
    )
    
    fig.show()
    

# Monte Carole Estimation for Low Density Volume Ratio

In [29]:
def estimate_low_density_ratio(
    kde: KernelDensity,
    threshold: float,
    mc_samples: Optional[np.ndarray],
) -> np.ndarray:
    """
    Estimate the volume ratio of feature space [0, 1]^p with normalized density
    below a threshold using Monte Carlo sampling.

    This function uses Latin Hypercube Sampling to generate uniform random
    samples in the feature space, which is equivalent to Monte Carlo sampling
    for this purpose.

    Parameters:
    kde : sklearn.neighbors.KernelDensity object
        The trained KDE object
    threshold : float, optional
        The density threshold below which is considered "low"
    mc_samples : np.ndarray, optional
        The Monte Carlo samples to use 
    """

    # Evaluate the KDE at each sample point
    densities = np.exp(kde.score_samples(mc_samples))
    max_density = densities.max()
    densities_scaled = densities / max_density if max_density > 0 else 0

    # Smooth indicator function for density < threshold
    low_density_mask = smooth_lower_bound_indicator(densities_scaled, threshold)

    # Estimate the ratio of low-density volume
    # n_low_density_samples / n_samples
    low_density_ratio = np.mean(low_density_mask)

    return low_density_ratio

In [30]:
# Generate uniform random samples in [0, 1]^p space
sampler = qmc.LatinHypercube(d=N_DIM)
mc_samples = sampler.random(n=10000)

bandwidths = np.logspace(np.log10(0.001), np.log10(1), 10)
ratios = []
for bandwidth in bandwidths:
    kde = KernelDensity(kernel='gaussian', bandwidth=bandwidth).fit(X_TRAIN)
    low_bound_ratio = estimate_low_density_ratio(kde, THR, mc_samples)
    ratios.append(low_bound_ratio)

    # Print progress
    start_time = datetime.now().strftime("%H:%M:%S")
    print(f"[{start_time}] bandwidth: {bandwidth:.2e}, low_bound_ratio: {low_bound_ratio:.2e}")

[15:15:51] bandwidth: 1.00e-03, low_bound_ratio: 9.99e-01
[15:15:51] bandwidth: 2.15e-03, low_bound_ratio: 9.97e-01
[15:15:51] bandwidth: 4.64e-03, low_bound_ratio: 9.87e-01
[15:15:51] bandwidth: 1.00e-02, low_bound_ratio: 9.48e-01
[15:15:51] bandwidth: 2.15e-02, low_bound_ratio: 8.19e-01
[15:15:51] bandwidth: 4.64e-02, low_bound_ratio: 4.40e-01
[15:15:51] bandwidth: 1.00e-01, low_bound_ratio: 5.65e-02
[15:15:51] bandwidth: 2.15e-01, low_bound_ratio: 1.52e-03
[15:15:51] bandwidth: 4.64e-01, low_bound_ratio: 0.00e+00
[15:15:51] bandwidth: 1.00e+00, low_bound_ratio: 0.00e+00


In [31]:
plot_ratio(bandwidths, ratios)

# Integral of Low Density Indicator

In [32]:
from sklearn.cluster import MeanShift

def integrate_low_density_indicator(
    X: np.ndarray,
    bandwidth: float,
    threshold: float
) -> float:
    """
    Compute the integral of the Indicator function of (density < threshold)
    over [0, 1]^p.

    Parameters:
    kde : sklearn.neighbors.KernelDensity object
        The trained KDE object
    threshold : float, optional
        The density threshold below which is considered "low"
    """
    n_dim = X.shape[1]

    # Fit KDE model
    kde = KernelDensity(kernel='gaussian', bandwidth=bandwidth).fit(X)
    
    # Search for max density
    mean_shift = MeanShift(bandwidth=bandwidth).fit(X)
    modes = mean_shift.cluster_centers_  # Get modes (highest density points)
    max_density = np.exp(kde.score_samples(modes)).max()

    def indicator_function(*x):
        x = np.array(x).reshape(1, -1)
        density = np.exp(kde.score_samples(x))
        density_scaled = density / max_density if max_density > 0 else 0

        # Return sigmoid function equal 1 for density < threshold
        return smooth_lower_bound_indicator(density_scaled, threshold)

    # Perform the integration
    result, _ = integrate.nquad(
        func=indicator_function,
        ranges=[(0, 1)] * n_dim,
        opts=dict(epsabs=1e-2, epsrel=1e-2, limit=50)
    )
    return result

In [33]:
bandwidths = np.logspace(np.log10(0.001), np.log10(1), 10)

ratios = []
for bandwidth in bandwidths:
    low_bound_ratio = integrate_low_density_indicator(X_TRAIN, bandwidth, THR)
    ratios.append(low_bound_ratio)

    # Print progress
    start_time = datetime.now().strftime("%H:%M:%S")
    print(f"[{start_time}] bandwidth: {bandwidth:.2e}, low_bound_ratio: {low_bound_ratio:.2e}")

[15:15:52] bandwidth: 1.00e-03, low_bound_ratio: 1.00e+00
[15:15:52] bandwidth: 2.15e-03, low_bound_ratio: 1.00e+00
[15:15:54] bandwidth: 4.64e-03, low_bound_ratio: 9.96e-01
[15:17:15] bandwidth: 1.00e-02, low_bound_ratio: 9.47e-01
[15:20:06] bandwidth: 2.15e-02, low_bound_ratio: 8.18e-01
[15:21:03] bandwidth: 4.64e-02, low_bound_ratio: 4.43e-01
[15:21:07] bandwidth: 1.00e-01, low_bound_ratio: 5.17e-02
[15:21:08] bandwidth: 2.15e-01, low_bound_ratio: 9.53e-04
[15:21:09] bandwidth: 4.64e-01, low_bound_ratio: 0.00e+00
[15:21:09] bandwidth: 1.00e+00, low_bound_ratio: 0.00e+00


In [34]:
plot_ratio(bandwidths, ratios)

Integral drawbacks:
- Needs a search for maximum density. Perhaps could be optimized by using the same max_density over multiple bandwidths.
- Integrals computation is very expensive compared to MC. And not much difference of accuracy is noticed.

# Class to Search for Bandwidth

In [35]:
class LowDensityRatioBandwidth:
    """
    Find the largest bandwidth where low density volume ratio is close to
    target ratio. Formally:

    best_bandwidth = max({bw | low_density_ratio > target_ratio})
                   = max({
        bandwidth in bw_range |
        V({X in [0, 1]^p | Density(X) < 0 + tol }) / V([0, 1]^p) > target_ratio
    })

    This class stores the optimal bandwidth for next search to optimize the
    search process.

    Note:
        - When X data is not distributed all over [0, 1]^p, the threshold must
          be adapted and high enough to have a meaning. Otherwise condition will
          be unusefully always satisfied. But in IRBS context X is always well
          distributed thanks to LHS initialization.
    """
    def __init__(self,
        default_bounds: Tuple[float, float],  # Bandhwidth range
        threshold: float,  # Threshold for indicator function
        target_ratio: float,
    ):
        self.default_bounds = default_bounds
        self.threshold = threshold
        self.target_ratio = target_ratio
        self.a_rdecrease = 0.2  # Relative decrease of lower bound
        self.f_atol = self.target_ratio * 0.01  # Absolute tol around 0
        self.previous_root = None
        self.X_train = None
        self._first_estimation = True

        # Initialize profiling variables
        self.ncalls_log: List[int] = []
        self.cumtime_log: List[float] = []
        self.ncalls: int = 0
        self.cumtime: float = 0.0
        self.cache: Dict[float, float] = {}
        self._first_reset: bool = True

    def init_profiler(self):
        """ Resets profilier and store logs of current value. """
        if self._first_reset:
            self._first_reset = False
        else:
            # Log calls and cumulative time
            self.ncalls_log.append(self.ncalls)
            self.cumtime_log.append(self.cumtime)

        # Reset profiler variables
        self.ncalls = 0
        self.cumtime = 0.0
        self.cache.clear()

    def init_mc_samples(self, n_dim: int):
        if self._first_estimation:
            self._first_estimation = False
            # Generate uniform random samples in [0, 1]^p space for Monte Carlo
            sampler = qmc.LatinHypercube(d=n_dim)
            self.mc_samples = sampler.random(n=10000)

    def compute_ratio(self, bandwidth: float) -> float:
        # Check if the result is already in the cache
        if bandwidth in self.cache:
            return self.cache[bandwidth]

        # Mark start time
        start_time = time.perf_counter()

        # Set the bandwidth
        kde = KernelDensity(kernel='gaussian', bandwidth=bandwidth)
        kde.fit(self.X_train)

        # Compute function for which to find root
        low_density_ratio = estimate_low_density_ratio(kde, self.threshold, self.mc_samples)

        # Update profiler
        self.ncalls += 1
        self.cumtime += time.perf_counter() - start_time

        # Store the result in the cache
        self.cache[bandwidth] = low_density_ratio

        return low_density_ratio

    def compute_ratio_discrepancy(self, bandwidth: float) -> float:
        """
        Compute the discrepancy between the computed low density ratio and the
        target ratio.
        """
        return self.compute_ratio(bandwidth) - self.target_ratio
    
    def set_search_range(self) -> Tuple[float, float]:
        """ Set the search range for the Brent's method. """
        if self.previous_root is None:
            return self.default_bounds
        else:
            a = max(self.default_bounds[0], self.previous_root * (1 - self.a_rdecrease))
            b = min(self.default_bounds[1], self.previous_root)
            return a, b

    def check_root_in_range(self, a: float, b: float) -> Optional[float]:
        """
        Check if the root is in the given range based on the Intermediate Value
        Theorem, minimizing function evaluations.
        
        Note: Low density ratio is decreasing with bandwidth. To find a root,
              bounds must verify f(a) <= 0 <= f(b).
        """
        # Evaluate f(b) first, as it's more likely to be close to the root
        fb = self.compute_ratio_discrepancy(b)

        # f is a decreasing function, f(b) must be negative
        if fb >= 0 - self.f_atol:
            # If fb is positive, then b is nearest value to the root
            return b
        
        # Only evaluate f(a) if necessary
        fa = self.compute_ratio_discrepancy(a)

        # f is a decreasing function, f(a) must be positive
        if fa <= 0 + self.f_atol:
            # If fb is negative, then a is nearest value to the root
            return a

        # If we reach here, the root is within the bounds
        return None

    def brentq_search(self, X: np.ndarray) -> float:
        """ 
        Set the range for the Brent's method search of a decreasing function. 
        
        Assumptions:
            (1) ratio_discrepancy is a decreasing function. To find a root,
                bounds must verify f(a) <= 0 <= f(b).
            (2) ratio_discrepancy is smaller that previous one, which results in
                a smaller expected root, i.e. expected_root <= previous_root
            (3) If previous_root is available, new root is expected to be smaller
                 by maximum 20%.

        Algorithm:
            1. Set search range:
                - If previous_root is not available, use full search range
                - Else, use narrowed search range with previous_root as upper
                  bound.
            2. Check if root is in search range:
                - If f changes sign, search root using brentq
                - Else, use nearest bound to root
        """
        # Reset profiling variables
        self.init_profiler()
    
        # Initialize Monte Carlo samples
        self.init_mc_samples(X.shape[1])

        # Updat training data
        self.X_train = X

        # Set search range
        a, b = self.set_search_range()

        # Check if root in search range (f(a) and f(b) have different signs)
        x_root = self.check_root_in_range(a, b)

        if x_root is None:
            # Use Brent's method to find the root
            x_root, results = brentq(
                self.compute_ratio_discrepancy,
                a, b,
                xtol=1e-3,  # absolute tol over root bandwidth
                rtol=1e-2,
                maxiter=50,
                full_output=True,
                disp=False
            )

        # Use cache to evaluate low density ratio
        low_density_ratio = self.compute_ratio(x_root)

        # Store root for next search
        self.previous_root = x_root

        return x_root, low_density_ratio

In [36]:
# Set bounds
# bounds = (optimal_bw * (1 + 0.1), optimal_bw * 2)  # a > optimal_bw
# bounds = (1e-5, optimal_bw * (1 - 0.1))            # b < optimal_bw
# bounds = (1e-5, optimal_bw * (1 + 1e-3))           # b ~ optimal_bw
bounds = (1e-5, 10)

# Estimate optimal bandwidth
kde_optimal_bw = LowDensityRatioBandwidth(
    default_bounds=bounds,
    threshold=THR,
    target_ratio=TARGET_RATIO,
)
res_bw, res_ratio = kde_optimal_bw.brentq_search(X_TRAIN)
# found_bw, found_ratio = kde_optimal_bw.brentq_search(X_TRAIN)  # Do it twice to test previous_root and logs

report = serialize_dict({
    'bounds': bounds,
    'optimal_bw': res_bw,
    'low_density_ratio': res_ratio,
    'target_ratio': TARGET_RATIO,
    'absolute_error': kde_optimal_bw.compute_ratio_discrepancy(res_bw),
    'f_atol': kde_optimal_bw.f_atol,
    'ncalls': kde_optimal_bw.ncalls,
    'cumtime': kde_optimal_bw.cumtime,
    'ncalls_log': kde_optimal_bw.ncalls_log,
    'cumtime_log': kde_optimal_bw.cumtime_log,
})

print(f"brentq_search -> report: \n{report}")

brentq_search -> report: 
{
    "bounds": [
        1e-05,
        10
    ],
    "optimal_bw": 0.07953667518337112,
    "low_density_ratio": 0.09975803688013564,
    "target_ratio": 0.1,
    "absolute_error": -0.00024196311986436625,
    "f_atol": 0.001,
    "ncalls": 14,
    "cumtime": 0.665791499999898,
    "ncalls_log": [],
    "cumtime_log": []
}


### Plot the brentq log

In [37]:
import plotly.graph_objects as go
from plotly.subplots import make_subplots

def plot_cache(cache: np.ndarray):
    fig = make_subplots(rows=1, cols=1)
    
    # Convert cache to sorted numpy arrays
    x_cache = np.array(list(cache.keys()))
    y_cache = np.array(list(cache.values()))

    # Get the sorting indices
    sort_indices = np.argsort(x_cache)

    # Create the hover text in the original order
    hover_text = [f'Evaluation {i+1}' for i in range(len(x_cache))]

    # Add points of evalaluated points by brentq
    fig.add_trace(go.Scatter(
        x=x_cache[sort_indices],
        y=y_cache[sort_indices],
        mode='lines+markers',
        name='Evaluated Points',
        text=[hover_text[i] for i in sort_indices],  # Reorder hover text
        hoverinfo='text+x+y',
    ))
    
    # Add point of root
    fig.add_trace(go.Scatter(
        x=[x_cache[-1]],
        y=[y_cache[-1]],
        mode='markers',
        name='Root',
    ))
    
    # Update layout
    fig.update_layout(
        title='Brentq Search History',
        xaxis_title='Search Range',
        yaxis_title='Low Density Ratio',
        showlegend=True,
        hovermode='closest'
    )
    
    fig.show()
    
plot_cache(kde_optimal_bw.cache)

In [38]:
import numpy as np
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from sklearn.neighbors import KernelDensity

def plot_kde_and_samples(X_train: np.ndarray, bandwidth: float, target_ratio: float):
    """
    Plot the density of a fitted KDE over [0, 1]^2 using a contour map and show the training samples.

    Args:
    kde (KernelDensity): Fitted KernelDensity object
    X_train (np.ndarray): Training data used to fit the KDE
    resolution (int): Resolution of the contour plot (default: 100)

    Returns:
    go.Figure: Plotly figure object
    """
    # Fit KDE
    kde = KernelDensity(kernel='gaussian', bandwidth=bandwidth)
    kde.fit(X_train)

    # Create a grid of points
    resolution = 100
    x = np.linspace(0, 1, resolution)
    y = np.linspace(0, 1, resolution)
    xx, yy = np.meshgrid(x, y)
    xy = np.vstack([xx.ravel(), yy.ravel()]).T

    # Compute the log-density on the grid
    Z = np.exp(kde.score_samples(xy)).reshape(xx.shape)

    # Create the figure
    fig = make_subplots(rows=1, cols=1)

    # Add contour plot
    contour = go.Contour(
        z=Z,
        x=x,
        y=y,
        colorscale='Viridis',
        colorbar=dict(title='Density')
    )
    fig.add_trace(contour)

    # Add scatter plot of training samples
    scatter = go.Scatter(
        x=X_train[:, 0],
        y=X_train[:, 1],
        mode='markers',
        marker=dict(color='red', size=5),
        name='Training Samples'
    )
    fig.add_trace(scatter)

    # Update layout
    fig.update_layout(
        title=f'KDE Contour and Training Samples<br>target_ratio: {target_ratio:.2f}, bw: {bandwidth:.1e}, n: {len(X_train)}',
        xaxis_title='X',
        yaxis_title='Y',
        xaxis_range=[0, 1],
        yaxis_range=[0, 1],
        width=700,
        height=600
    )

    return fig

fig = plot_kde_and_samples(X_TRAIN, res_bw, res_ratio)
fig.show()