In [2]:
import numpy as np
import pandas as pd
import warnings
from scipy.stats import nbinom, gamma
from scipy.special import digamma
from scipy.optimize import minimize

In [None]:
#conversion from R function to python

def phv_ebgm_qn(theta_hat, N, E):
    r1, b1, r2, b2, p = theta_hat
    prob_f1 = b1 / (b1 + E)
    prob_f2 = b2 / (b2 + E)
    f1_nb = nbinom.pmf(N, n=r1, p=prob_f1)
    f2_nb = nbinom.pmf(N, n=r2, p=prob_f2)
    num = p * f1_nb
    den = num + (1 - p) * f2_nb
    return num / den

def phv_ebgm_score(theta_hat, N, E, qn):
    r1, b1, r2, b2, _ = theta_hat
    e1 = digamma(r1 + N) - np.log(b1 + E)
    e2 = digamma(r2 + N) - np.log(b2 + E)
    exp_log = qn * e1 + (1 - qn) * e2
    eb_log2 = exp_log / np.log(2)
    return 2 ** eb_log2

def phv_ebgm_quant_bisect(cut_point, theta_hat, N, E, qn,
                          digits=2, limits=(-1e5, 1e5), max_iter=2000):
    r1, b1, r2, b2, _ = theta_hat
    lower, upper = limits
    tol = 0.5 * 10 ** (-digits)

    def post_cdf_minus_cp(x):
        c1 = gamma.cdf(x, a=r1 + N, scale=1/(b1 + E))
        c2 = gamma.cdf(x, a=r2 + N, scale=1/(b2 + E))
        return qn * c1 + (1 - qn) * c2 - cut_point

    N = np.asarray(N)
    E = np.asarray(E)
    qn = np.asarray(qn)

    guess = np.ones_like(N, dtype=float)
    err0 = post_cdf_minus_cp(guess)
    is_pos = err0 > 0
    left = np.where(is_pos, lower, guess)
    right = np.where(is_pos, guess, upper)

    for _ in range(max_iter):
        mid = (left + right) / 2
        err_mid = post_cdf_minus_cp(mid)
        if np.max((right - left) / 2) < tol:
            q = np.round(mid, digits)
            if np.any(q == upper):
                raise ValueError("increase maximum for 'limits'")
            return q
        err_left = post_cdf_minus_cp(left)
        same = np.sign(err_left) == np.sign(err_mid)
        left = np.where(same, mid, left)
        right = np.where(same, right, mid)

    raise RuntimeError("failed to converge -- try adjusting 'limits' or 'max_iter'")

def dbinbinom(x, size1, prob1, size2, prob2, w):
    return w * nbinom.pmf(x, n=size1, p=prob1) + (1 - w) * nbinom.pmf(x, n=size2, p=prob2)

def phvid_objective(theta, N, E):
    r1, b1, r2, b2, w = theta
    prob1 = b1 / (b1 + E)
    prob2 = b2 / (b2 + E)
    pmf = dbinbinom(N, r1, prob1, r2, prob2, w)
    return np.sum(-np.log(pmf + 1e-16))  # add tiny epsilon to avoid log(0)

def phv_ebgm(a, b, c, d, alpha=0.05, theta_init=None, squashing=True):
    """
    a, b, c, d: counts (scalars or array‐like)
    alpha: two‐sided significance level
    theta_init: array‐like [r1, b1, r2, b2, p] or pandas.DataFrame of guesses
    squashing: (not implemented – placeholder)
    """
    # turn inputs into arrays
    a, b, c, d = np.broadcast_arrays(a, b, c, d)
    if np.any(a < 0) or np.any(b < 0) or np.any(c < 0) or np.any(d < 0):
        raise ValueError("a, b, c, d must be non‐negative")
    # total counts
    tot = a + b + c + d
    N = a.astype(float)
    E = (a + b) / tot * (a + c)

    # squashing placeholder
    if squashing and np.any(a == 0):
        warnings.warn("squashing=True but no squash implemented; continuing with raw counts")

    # prepare initial guess
    if theta_init is None:
        x0 = np.array([0.2, 0.1, 2.0, 4.0, 1/3])
    else:
        if isinstance(theta_init, pd.DataFrame):
            x0 = theta_init.iloc[0].values
        else:
            x0 = np.asarray(theta_init, dtype=float)
        if x0.shape[0] != 5:
            raise ValueError("theta_init must have length 5")

    # bounds: r1,b1,r2,b2 > 0; p in (0,1)
    bounds = [(1e-6, None)]*4 + [(1e-6, 1-1e-6)]
    res = minimize(phvid_objective, x0, args=(N, E),
                   bounds=bounds, method='L-BFGS-B')
    if not res.success:
        raise RuntimeError("hyperparameter estimation failed: " + res.message)
    theta_hat = res.x

    # compute posterior weights, EBGM and CIs
    qn = phv_ebgm_qn(theta_hat, N, E)
    ebgm = phv_ebgm_score(theta_hat, N, E, qn)
    half = alpha / 2
    ci_low  = phv_ebgm_quant_bisect(half,      theta_hat, N, E, qn)
    ci_high = phv_ebgm_quant_bisect(1 - half,  theta_hat, N, E, qn)

    return pd.DataFrame({
        'ebgm':   ebgm,
        'ci_low': ci_low,
        'ci_high':ci_high
    })


In [12]:
#read in processed data, this is for aspirin specific. 

# In a 2×2 pharmacovigilance contingency table for each adverse event (PT):
# a = number of reports with the drug of interest AND the event
# b = number of reports with the drug of interest AND WITHOUT the event
# c = number of reports WITHOUT the drug of interest AND WITH the event
# d = number of reports WITHOUT the drug of interest AND WITHOUT the event

counts = pd.read_csv("phv_2x2_counts.csv")
result = phv_ebgm(
    a=counts['a'],
    b=counts['b'],
    c=counts['c'],
    d=counts['d'],
    alpha=0.05
)



In [None]:
#df of every ebgm score with high low confidence intervals, for each event vs. drug of choice (aspirin)
counts.merge(result, left_index=True, right_index=True)

Unnamed: 0,pt,a,c,b,d,ebgm,ci_low,ci_high
0,11-beta-hydroxylase deficiency,0,1,310,461312,0.222695,0.0,11.54
1,5-hydroxyindolacetic acid in urine increased,0,3,310,461310,0.221096,0.0,10.94
2,5-hydroxyindolacetic acid increased,0,1,310,461312,0.222695,0.0,11.54
3,ABO incompatibility,0,1,310,461312,0.222695,0.0,11.54
4,ADAMTS13 activity abnormal,0,3,310,461310,0.221096,0.0,10.94
...,...,...,...,...,...,...,...,...
12234,Zoonotic bacterial infection,0,1,310,461312,0.222695,0.0,11.54
12235,pH body fluid abnormal,0,4,310,461309,0.220330,0.0,10.67
12236,pH body fluid increased,0,2,310,461311,0.221884,0.0,11.24
12237,pH urine decreased,0,3,310,461310,0.221096,0.0,10.94
