In [4]:
import numpy as np
from scipy import optimize, stats

def project_to_monotone_component_density_ratios(component_param_sets, xmin, xmax,
                               grid_n=120, eps_density=1e-12,
                               maxiter=200,
                               skew_bounds=(-5,5),):
    """
    Find the parameter set Q that minimizes sum_k=1^K KL(P_k || Q_k) where P are the current
    skew-normal components defined by `component_param_sets`, subject to the constraint that
    density ratio between consecutive components is *decreasing* on [xmin,xmax].

    Inputs
    ------
    component_param_sets : tuple or list
        ((skew1, loc1, scale1),
         (skew2, loc2, scale2),
         ...,
         (skewK, locK, scaleK)))
    xmin, xmax : floats
        domain over which monotonicity must hold and (also) used for numeric KL integral.
    grid_n : int
        number of grid points to discretize [xmin, xmax] for constraints & integration.
    eps_density : float
        small floor to densities to avoid log(0).
    maxiter : int
        maximum iterations for optimizer.

    Returns
    -------
    result_params, info
        result_params: ((skew1, loc1, scale1), (skew2, loc2, scale2), (skew3, loc3, scale3), w_adj)
        info: dict containing optimizer success flag and message
    """
    # --- unpack input params ---
    
    # Grid for constraints and integration
    x_grid = np.linspace(xmin, xmax, grid_n)

    # helper: build skew-normal pdf (scipy uses 'a' as skew)
    def skew_pdf(a, loc, scale, x):
        # scipy.stats.skewnorm.pdf handles a, loc, scale
        return stats.skewnorm.pdf(x, a, loc=loc, scale=scale)
    
    def skew_log_pdf(a, loc, scale, x):
        # scipy.stats.skewnorm.pdf handles a, loc, scale
        return stats.skewnorm.logpdf(x, a, loc=loc, scale=scale)

    P_grids = [
        np.maximum(skew_pdf(p[0],p[1],p[2], x_grid),eps_density) for p in component_param_sets
    ] # reference distributions for each skew normal component

    # --- parameter vectorization for optimizer ---
    # vector: [skew1, mu1, log scale1, skew2, loc2, log scale2, ..., skewK, locK, log scaleK, logit_wP1, logit_wP2,..., logit_wP_K-1, logit_wB1, logit_wB2,...,logit_wB_K-1]
    def packVector(component_param_sets):
        log_scales = np.log([p[2] for p in component_param_sets])
        pack = []
        for compNum in range(len(component_param_sets)):
            pack.extend([component_param_sets[compNum][0],
                         component_param_sets[compNum][1],
                         log_scales[compNum]])
        return np.array(pack, dtype=float)

    def unpack_vector(vec):
        # [
        #  skew1, loc1, log scale1,
        #  skew2, loc2, log scale2,
        #  ...,
        #  skewK, locK, log scaleK
        # ]
        # 
        # |n| = 3 * K + 2 * (K-1)
        # |n| = 5 * K - 2
        # K = (|n| - 2) / 5
        n_components = len(vec) // 3
        component_param_sets = []
        for compNum in range(n_components):
            start_idx = 3 * compNum
            skew, loc, log_scale = vec[start_idx : 3 + start_idx]
            component_param_sets.append([skew, loc, np.exp(log_scale)])
        return component_param_sets


    x_grid_local = x_grid  # closure
    # --- objective: KL(P || Q) over x_grid (trapezoidal integration) ---
    def objective(vec):
        component_param_sets = unpack_vector(vec)
        Q_grids = [np.maximum(skew_pdf(p[0], p[1], p[2], x_grid_local),eps_density) \
                   for p in component_param_sets]

        # KL(P||Q) approx = integral P * log(P/Q)
        integrands = [P_grid * np.log(P_grid / Q_grid) \
                      for P_grid, Q_grid in zip(P_grids, Q_grids)]
        integrand = np.stack(integrands).sum(0)
        return np.trapezoid(integrand, x_grid_local)

    # --- monotonicity constraints as vector valued inequality ---
    # positive likelihood ratio should be monotonically decreasing

    def lrPlus_diff_constraints(vec):
        component_param_sets = unpack_vector(vec)
        log_density_ratios = [
            skew_log_pdf(pi[0], pi[1], pi[2], x_grid_local) - \
            skew_log_pdf(pj[0], pj[1], pj[2], x_grid_local)
            for pi,pj in zip(component_param_sets[:-1],
                              component_param_sets[1:])]
        diffs = np.concatenate([lrPlus[:-1] - lrPlus[1:] for lrPlus in log_density_ratios])
        return diffs

    # --- bounds & initial guess ---
    # initial vector from current params
    x0 = packVector(component_param_sets)

    # Bounds:
    # - skews: unbounded (but we can put wide bounds)
    # - mus: unbounded (put wide bounds)
    # - logs: bound scales to avoid extremely small or huge values
    # - logits: unbounded (but clamp to wide bounds)
    big = 100
    min_log_scale = np.log(1e-6)
    max_log_scale = np.log(1e3)
    n_components = len(component_param_sets)
    bnds = [skew_bounds,
            (-big, big),
            (min_log_scale, max_log_scale)] * n_components

    # Nonlinear inequality constraint: ratio_diff_constraints(vec) >= 0
    cons = ({'type': 'ineq', 'fun': lrPlus_diff_constraints},)

    options = {'maxiter': maxiter, 'ftol': 1e-9}

    res = optimize.minimize(objective, x0, method='SLSQP',
                            bounds=bnds, constraints=cons,
                            options=options)

    if not res.success:
        # If failed, return original params
        return component_param_sets, {'success':False, 'message':res.message}

    # return unpacked optimized vector
    return unpack_vector(res.x), {'success': True, 'message': res.message, 'fun_val': res.fun}

In [5]:
cur = [(-1.9902822273568392, -2.9914679027930475, 2.0048547595071127),
       (0.13876378297632538, -0.11873237569251935, 1.013600859117432),
       (2.050437314179738, 2.968422139946502, 1.0192270181740002)]
xmin, xmax = -11.0, 8.0
proj_params, info = project_to_monotone_component_density_ratios(cur, xmin, xmax)
print(info)
print(proj_params)

{'success': True, 'message': 'Optimization terminated successfully', 'fun_val': np.float64(-5.944564695948051e-07)}
[[np.float64(-1.983825879714495), np.float64(-2.9937788487345425), np.float64(2.0021040577937987)], [np.float64(0.13876383095201852), np.float64(-0.11873245967622098), np.float64(1.013600957931795)], [np.float64(2.050298056222733), np.float64(2.9684384635580368), np.float64(1.019191814449914)]]


In [10]:
proj_params

((np.float64(-1.9900109499332066),
  np.float64(-2.9928246082834407),
  np.float64(2.003009253817416)),
 (np.float64(0.13863454287155721),
  np.float64(-0.11887256909949559),
  np.float64(1.013813189630775)),
 (np.float64(2.050437047484353),
  np.float64(2.9684626914641825),
  np.float64(1.019196599939703)),
 array([0.29991589, 0.50008312, 0.20000098]))

In [None]:
import numpy as np
from scipy import optimize, stats

def project_to_monotone_lrPlus(params, xmin, xmax,
                                           grid_n=120, eps_density=1e-12,
                                           maxiter=200,
                                           skew_bounds=(-5,5),):
    """
    Find the parameter set Q that minimizes KL(P || Q) where P is the current
    skew-normal mixture defined by `params`, subject to the constraint that
    the positive likelihood ratio log fp(x) - log fb(x) *decreasing* on [xmin,xmax].

    Inputs
    ------
    params : tuple or list
        ((skew1, loc1, scale1),
         (skew2, loc2, scale2),
         ...,
         (skewK, locK, scaleK),
         ((wp0, wp1, ..., wpk),(wb0, wb1, ...,wbk)))
    xmin, xmax : floats
        domain over which monotonicity must hold and (also) used for numeric KL integral.
    grid_n : int
        number of grid points to discretize [xmin, xmax] for constraints & integration.
    eps_density : float
        small floor to densities to avoid log(0).
    maxiter : int
        maximum iterations for optimizer.

    Returns
    -------
    result_params, info
        result_params: ((skew1, loc1, scale1), (skew2, loc2, scale2), (skew3, loc3, scale3), w_adj)
        info: dict containing optimizer success flag and message
    """
    # --- unpack input params ---
    component_param_sets = params[:-1]
    wP = np.array(params[-1][0],dtype=float)
    wB = np.array(params[-1][1],dtype=float)
    # Grid for constraints and integration
    x_grid = np.linspace(xmin, xmax, grid_n)

    # helper: build skew-normal pdf (scipy uses 'a' as skew)
    def skew_pdf(a, loc, scale, x):
        # scipy.stats.skewnorm.pdf handles a, loc, scale
        return stats.skewnorm.pdf(x, a, loc=loc, scale=scale)
    
    def mixture_pdf(param_sets, w, x):
        return np.stack([w[i] * skew_pdf(p[0], p[1], p[2], x) for i,p in enumerate(param_sets)]).sum(0)

    P_grids = [
        np.maximum(skew_pdf(p[0],p[1],p[2], x_grid),eps_density) for p in component_param_sets
    ] # reference distributions for each skew normal component

    # --- parameter vectorization for optimizer ---
    # vector: [skew1, mu1, log scale1, skew2, loc2, log scale2, ..., skewK, locK, log scaleK, logit_wP1, logit_wP2,..., logit_wP_K-1, logit_wB1, logit_wB2,...,logit_wB_K-1]
    def packVector(component_param_sets, wPB):
        log_scales = np.log([p[2] for p in component_param_sets])
        # convert w (length 3) to two logits
        # use inverse-softmax: for w=[w1,w2,w3], logits = log(w[:2]/w3)
        wP = wPB[0]
        wB = wPB[1]
        wP = np.maximum(wP, 1e-12)
        wB = np.maximum(wB, 1e-12)
        K = len(component_param_sets)
        lP = [np.log(wP[i]) - np.log(wP[-1]) for i in range(K - 1)]
        lB = [np.log(wB[i]) - np.log(wB[-1]) for i in range(K-1)]
        pack = []
        for compNum in range(len(component_param_sets)):
            pack.extend([component_param_sets[compNum][0],
                         component_param_sets[compNum][1],
                         log_scales[compNum]])
        pack.extend([*lP,*lB])
        return np.array(pack, dtype=float)

    def unpack_vector(vec):
        # [
        #  skew1, loc1, log scale1,
        #  skew2, loc2, log scale2,
        #  ...,
        #  skewK, locK, log scaleK,
        #  lP1, lP2,...,lP_K-1,
        #  lB1, lB2, ...,lB_K-1
        # ]
        # 
        # |n| = 3 * K + 2 * (K-1)
        # |n| = 5 * K - 2
        # K = (|n| - 2) / 5
        n_components = int((len(vec) - 2) / 5)
        component_param_sets = []
        for compNum in range(n_components):
            start_idx = 3 * compNum
            skew, loc, log_scale = vec[start_idx : 3 + start_idx]
            component_param_sets.append([skew, loc, np.exp(log_scale)])
        lB = vec[-n_components:]
        lP = vec[-2 * n_components: -n_components]

        # softmax-like conversion to weights (via logits relative to component 3)
        logsP = np.array([*lP, 0.0])
        exP = np.exp(logsP - np.max(logsP))
        wP = exP / np.sum(exP)
        logsB = np.array([*lB, 0.0])
        exB = np.exp(logsB - np.max(logsB))
        wB = exB / np.sum(exB)
        wPB = np.stack([wP,wB])
        return [*component_param_sets, wPB]


    x_grid_local = x_grid  # closure
    # --- objective: KL(P || Q) over x_grid (trapezoidal integration) ---
    def objective(vec):
        component_param_sets = unpack_vector(vec)[:-1]
        Q_grids = [np.maximum(skew_pdf(p[0], p[1], p[2], x_grid_local),eps_density) \
                   for p in component_param_sets]

        # KL(P||Q) approx = integral P * log(P/Q)
        integrands = [P_grid * np.log(P_grid / Q_grid) \
                      for P_grid, Q_grid in zip(P_grids, Q_grids)]
        integrand = np.stack(integrands).sum(0)
        return np.trapezoid(integrand, x_grid_local)

    # --- monotonicity constraints as vector valued inequality ---
    # positive likelihood ratio should be monotonically decreasing

    def lrPlus_diff_constraints(vec):
        unpacked = unpack_vector(vec)
        component_param_sets = unpacked[:-1]
        wPB = unpacked[-1]
        wP = wPB[0]
        wB = wPB[1]
        log_fPath = np.log(mixture_pdf(component_param_sets, wP, x_grid_local))
        log_fBenign = np.log(mixture_pdf(component_param_sets, wB, x_grid_local))
        lrPlus = log_fPath - log_fBenign
        diffs = lrPlus[:-1] - lrPlus[1:]
        return diffs

    # --- bounds & initial guess ---
    # initial vector from current params
    x0 = packVector(component_param_sets,np.stack([wP,wB]))

    # Bounds:
    # - skews: unbounded (but we can put wide bounds)
    # - mus: unbounded (put wide bounds)
    # - logs: bound scales to avoid extremely small or huge values
    # - logits: unbounded (but clamp to wide bounds)
    big = 100
    min_log_scale = np.log(1e-6)
    max_log_scale = np.log(1e3)
    n_components = len(component_param_sets)
    bnds = [skew_bounds,
            (-big, big),
            (min_log_scale, max_log_scale)] * n_components
    bnds += [*(2 * \
               [(-big,big),] * (n_components - 1))
            ]

    # Nonlinear inequality constraint: ratio_diff_constraints(vec) >= 0
    cons = ({'type': 'ineq', 'fun': lrPlus_diff_constraints},)

    options = {'maxiter': maxiter, 'ftol': 1e-9}

    res = optimize.minimize(objective, x0, method='SLSQP',
                            bounds=bnds, constraints=cons,
                            options=options)

    if not res.success:
        # If failed, return original params
        return params, {'success':False, 'message':res.message}

    # return unpacked optimized vector
    return unpack_vector(res.x), {'success': True, 'message': res.message, 'fun_val': res.fun}

In [19]:
# cur = ((2.0, 0.0, 1.0), (0.5, 1.0, 1.2), (-1.0, 2.0, 0.8), [0.3, 0.5, 0.2])
cur = [(-1.9902822273568392, -2.9914679027930475, 2.0048547595071127),
       (0.13876378297632538, -0.11873237569251935, 1.013600859117432),
       (2.050437314179738, 2.968422139946502, 1.0192270181740002),
       [[0.8, 0.1, 0.1],
        [0.01, 0.01, 0.98]]]
xmin, xmax = -11.0, 8.0
proj_params, info = project_to_monotone_lrPlus(cur, xmin, xmax)
print(info)
print(proj_params)

{'success': True, 'message': 'Optimization terminated successfully', 'fun_val': np.float64(-8.316335187511058e-08)}
[[np.float64(-1.990254183216807), np.float64(-2.991473664344085), np.float64(2.0046860044942747)], [np.float64(0.13877150006750713), np.float64(-0.11869243376498217), np.float64(1.013593083004448)], array([[0.79999761, 0.10000245, 0.09999994],
       [0.00999828, 0.01000063, 0.98000109]])]
