In [2]:
import sys

!'{sys.executable}' -m pip install --upgrade pip
!'{sys.executable}' -m pip install gurobipy
!'{sys.executable}' -m pip install cvxpy
!'{sys.executable}' -m pip install numpy
!'{sys.executable}' -m pip install jupyter-matlab-proxy
!'{sys.executable}' -m pip install matplotlib
!'{sys.executable}' -m pip install multiprocess
# !'{sys.executable}' -m pip install matlabengine

Collecting pip
  Using cached pip-23.1.2-py3-none-any.whl (2.1 MB)
Installing collected packages: pip
  Attempting uninstall: pip
    Found existing installation: pip 22.3.1
    Uninstalling pip-22.3.1:
      Successfully uninstalled pip-22.3.1
Successfully installed pip-23.1.2
Collecting gurobipy
  Using cached gurobipy-10.0.2-cp39-cp39-macosx_10_9_universal2.whl (10.3 MB)
Installing collected packages: gurobipy
Successfully installed gurobipy-10.0.2
Collecting cvxpy
  Using cached cvxpy-1.3.2-cp39-cp39-macosx_10_9_universal2.whl (1.2 MB)
Collecting osqp>=0.4.1 (from cvxpy)
  Using cached osqp-0.6.3-cp39-cp39-macosx_10_9_universal2.whl
Collecting ecos>=2 (from cvxpy)
  Using cached ecos-2.0.12-cp39-cp39-macosx_10_9_universal2.whl
Collecting scs>=1.1.6 (from cvxpy)
  Using cached scs-3.2.3-cp39-cp39-macosx_13_0_universal2.whl
Collecting numpy>=1.15 (from cvxpy)
  Using cached numpy-1.25.0-cp39-cp39-macosx_11_0_arm64.whl (14.0 MB)
Collecting scipy>=1.1.0 (from cvxpy)
  Using cached scip

Generic probability distribution histogram code + some helper functions here

In [3]:
import math
import numpy as np
#import multiprocessing as mp
import multiprocess as mp
import itertools
from collections import defaultdict

def push_multinomial(n, k, occ=1):
    return math.lgamma(n + k * occ + 1) - math.lgamma(n + 1) - occ * math.lgamma(k + 1)

class DistHistogram:
    def __init__(self, hist = defaultdict(int)):
        self.hist = hist

    # If the histogram actually represents a prob distribution
    def check_integrity(self):
        if not math.isclose(1.0, sum(self.hist[p] * p for p in self.hist)):
            raise Exception("Bad histogram: {}".format(str(self.hist)))

    # Generate a fingerprint
    def generate_fingerprint(self, k):
        self.check_integrity()
        n = sum(self.hist[p] for p in self.hist)
        dist = np.concatenate([[p] * self.hist[p] for p in self.hist])
        hist = np.random.multinomial(k, dist, size=1)
        unique, counts = np.unique(hist, return_counts=True)
        fingerprint = dict(zip(unique, counts))
        if (zero := n - sum(fingerprint[f] for f in fingerprint)) != 0:
            fingerprint[0] = zero
        return fingerprint

# Get true probability of obtaining a fingerprint 
# Speedup calculation with vectorization
def get_probabilities(dists, fingerprint):
    # Needs to have all distributions of the same ??-point structure
    pts = {len(dist.hist) for dist in dists}
    if len(pts) > 1:
        raise Exception("All distributions need to have the same ??-point structure")
    k, pts = sum(fingerprint[f] * f for f in fingerprint), pts.pop()
    log_zero_arr = lambda : np.full((1, m), -np.inf, dtype=float)

    m = len(dists)
    lps, ys = np.log(np.transpose([list(dist.hist.keys()) for dist in dists])), np.array([list(dist.hist.values()) for dist in dists])
    coefficients = np.full((1, m), math.lgamma(k + 1))
    for f in fingerprint:
        coefficients -= fingerprint[f] * math.lgamma(f + 1)
    for y in np.transpose(ys):
        coefficients += np.vectorize(math.lgamma)(y + 1)
    dp = defaultdict(log_zero_arr, {np.zeros(pts).tobytes(): coefficients})

    # Partition lazy generator
    def partition(size, array, value, occ):
        ind = array.size
        if ind == pts - 1:
            i = size
            new_value = value - math.lgamma(i + 1) + occ * i * lps[ind]
            new_array = np.concatenate([array, [i]])
            yield (new_array, new_value)
        else:
            for i in range(size + 1):
                new_value = value - math.lgamma(i + 1) + occ * i * lps[ind]
                new_array = np.concatenate([array, [i]])
                yield from partition(size - i, new_array, new_value, occ)

    for occ, frq in fingerprint.items():
        new_dp = defaultdict(log_zero_arr)
        for arr, val in partition(frq, np.array([]), np.full((1, m), 0.0, dtype=float), occ):
            for prev_arr_byte, prev_val in dp.items():
                prev_arr = np.frombuffer(prev_arr_byte)
                idx = (arr + prev_arr).tobytes()
                new_dp[idx] = np.logaddexp(new_dp[idx], val + prev_val)
        dp = new_dp
    ans = np.exp([dp[np.array(ys[i], dtype=float).tobytes()][0, i] for i in range(m)])
    return ans

def get_probabilities_no_multithread(dists, fingerprints):
    return np.transpose([get_probabilities(dists, f) for f in fingerprints])

def get_probabilities_multithread(dists, fingerprints):
    pool = mp.Pool(processes=16)
    return np.transpose(pool.starmap(get_probabilities, zip(itertools.repeat(dists), fingerprints)))

def get_probabilities_nonuniform(dists, fingerprints):
    f1s=[f[0] for f in fingerprints]
    f2s=[f[1] for f in fingerprints]

    d1s = [d[0] for d in dists]
    d2s = [d[1] for d in dists]

    p1s = get_probabilities_multithread(d1s, f1s)
    p2s = get_probabilities_multithread(d2s, f2s)

    def jprob(k,j):
        return np.exp(math.lgamma(k + 1) - math.lgamma(k - j + 1) - math.lgamma(j + 1) - k * math.log(2))

    ans = []
    for i in range(len(fingerprints)):
        (f1, f2) = fingerprints[i]
        j=sum(f1[f] * f for f in f1)
        k=j + sum(f2[f] * f for f in f2)

        ans.append([jprob*a*b for a,b in zip(p1s[i],p2s[i])])

Generic tester + semilinear tester

In [4]:
class SemilinearTester: #this might not be entirely clean if the worst case distribution changes due to probabilism
    def __init__(self, c):
        self.c = c

    def get_coefficient(self, f):
        return sum([cnt * self.c[i] for i, cnt in f.items()])

    def optimize(self, lo, hi, fs, prob_hypo, prob_alts):
        def binary_search(lo, hi, pred, eps=1e-9):
            # assuming that pred(lo) is true and pred(hi) is false
            while lo + eps < hi:
                mi = (lo + hi) / 2
                if pred(mi):
                    lo = mi
                else:
                    hi = mi
            return lo, hi

        def predicate(t):
            verdict = np.array([self.get_coefficient(f) >= t for f in fs], dtype=float)
            t1 = (prob_hypo @ verdict)[0]
            t2 = np.max(prob_alts @ (1 - verdict))
            return t1 > t2
    
        self.lo, self.hi = binary_search(lo, hi, predicate)
        verdict_lo = np.array([self.get_coefficient(f) >= self.lo for f in fs], dtype=float)
        t1_lo = (prob_hypo @ verdict_lo)[0]
        t2_lo = np.max(prob_alts @ (1 - verdict_lo))
        verdict_hi = np.array([self.get_coefficient(f) >= self.hi for f in fs], dtype=float)
        t1_hi = (prob_hypo @ verdict_hi)[0]
        t2_hi = np.max(prob_alts @ (1 - verdict_hi))
        # solve t1_lo + (t1_hi - t1_lo) * self.tb = t2_lo + (t2_hi - t2_lo) * self.tb
        self.tb = (t1_lo - t2_lo) / ((t1_lo - t1_hi) - (t2_lo - t2_hi))
        return np.log(t1_lo + (t1_hi - t1_lo) * self.tb)

    def single_verdict(self, f):
        if (coef := self.get_coefficient(f)) < self.lo:
            return 0
        elif coef > self.hi:
            return 1
        else:
            return self.tb # this can be changed to a bernoulli sample with probability self.tb

    def multiple_verdicts(self, fs):
        return np.array([self.single_verdict(f) for f in fs], dtype=float)


Generating all alternative two-point distributions, sampling fingerprints, and generating probabilities

In [7]:
import copy

n, k = 10, 10
eps = 0.9
fg_size = 20000

hypo = DistHistogram({1/n: n})
alts = []
for q in range(1, n):
    # x1 * q + x2 * (n - q) = 1
    # (1 / n - x1) * q + (x2 - 1 / n) * (n - q) = eps
    x1, x2 = (2 * q - n * eps) / (2 * n * q), (2 * n + n * eps - 2 * q) / (2 * n * (n - q))
    if 0 < x1 <= 1 and 0 < x2 <= 1:
        alts.append(DistHistogram({x1: q, x2: n - q}))

def generate_fingerprint(n, k, mi, dct):
    if n == 1:
        dct[k] += 1
        yield dict(dct)
    else:
        if n * (mi + 1) <= k:
            yield from generate_fingerprint(n, k, mi + 1, copy.deepcopy(dct))
        dct[mi] += 1
        yield from generate_fingerprint(n - 1, k - mi, mi, dct)

fingerprints = list(generate_fingerprint(n, k, 0, defaultdict(int)))

def generate_fingerprint_nonuniform(n,k):
    ans=[]
    for j in range(0,k+1):
        ans += [(f1,f2) for f1 in generate_fingerprint(2, j, 0, defaultdict(int)) for f2 in generate_fingerprint(2,n-j,0,defaultdict(int))]

    return ans

fingerprints_nonuniform = generate_fingerprint_nonuniform(n,k)

# seen = set()
# fingerprints = []
# while len(fingerprints) < fg_size:
#     f = hypo.generate_fingerprint(k)
#     t = tuple(f.items())
#     if t not in seen:
#         seen.add(t)
#         fingerprints.append(f)

# fingerprints = [hypo.generate_fingerprint(k) for _ in range(fg_size)]
print(len(fingerprints))
prob_hypo = get_probabilities_multithread([hypo], fingerprints)
prob_hypo = prob_hypo / prob_hypo.sum(axis=1)[:, np.newaxis] # normalization step
# prob_alts = np.array([[alt.get_probability(fingerprint) for fingerprint in fingerprints] for alt in alts])
prob_alts = get_probabilities_multithread(alts, fingerprints)
prob_alts = prob_alts / prob_alts.sum(axis=1)[:, np.newaxis] # normalization step

[{1: 10}, {0: 1, 1: 8, 2: 1}, {0: 2, 1: 6, 2: 2}, {0: 2, 1: 7, 3: 1}, {0: 3, 1: 4, 2: 3}, {0: 3, 1: 5, 2: 1, 3: 1}, {0: 3, 1: 6, 4: 1}, {0: 4, 1: 2, 2: 4}, {0: 4, 1: 3, 2: 2, 3: 1}, {0: 4, 1: 4, 3: 2}, {0: 4, 1: 4, 2: 1, 4: 1}, {0: 4, 1: 5, 5: 1}, {0: 5, 2: 5}, {0: 5, 1: 1, 2: 3, 3: 1}, {0: 5, 1: 2, 2: 1, 3: 2}, {0: 5, 1: 2, 2: 2, 4: 1}, {0: 5, 1: 3, 3: 1, 4: 1}, {0: 5, 1: 3, 2: 1, 5: 1}, {0: 5, 1: 4, 6: 1}, {0: 6, 2: 2, 3: 2}, {0: 6, 2: 3, 4: 1}, {0: 6, 1: 1, 3: 3}, {0: 6, 1: 1, 2: 1, 3: 1, 4: 1}, {0: 6, 1: 1, 2: 2, 5: 1}, {0: 6, 1: 2, 4: 2}, {0: 6, 1: 2, 3: 1, 5: 1}, {0: 6, 1: 2, 2: 1, 6: 1}, {0: 6, 1: 3, 7: 1}, {0: 7, 3: 2, 4: 1}, {0: 7, 2: 1, 4: 2}, {0: 7, 2: 1, 3: 1, 5: 1}, {0: 7, 2: 2, 6: 1}, {0: 7, 1: 1, 4: 1, 5: 1}, {0: 7, 1: 1, 3: 1, 6: 1}, {0: 7, 1: 1, 2: 1, 7: 1}, {0: 7, 1: 2, 8: 1}, {0: 8, 5: 2}, {0: 8, 4: 1, 6: 1}, {0: 8, 3: 1, 7: 1}, {0: 8, 2: 1, 8: 1}, {0: 8, 1: 1, 9: 1}, {0: 9, 10: 1}]
[({0: 2}, {5: 2}), ({0: 2}, {4: 1, 6: 1}), ({0: 2}, {3: 1, 7: 1}), ({0: 2}, {2: 1, 8:

Solving linear program

In [5]:
import cvxpy as cp

x = cp.Variable(len(fingerprints))
t = cp.Variable()
objective = cp.Minimize(t)
constraints = [
    x >= 0,
    x <= 1,
    prob_hypo @ x <= t,
    prob_alts @ (1 - x) <= t
]
prob = cp.Problem(objective, constraints)

#result = prob.solve(solver=cp.GUROBI)
result = prob.solve()

print(np.log(t.value))

-2.4148196359833625


In [6]:
import matlab.engine
import io

# --2.783246248005155
# -2.5430245416741255 deterministic tester

lo, hi = -1e6, 1e6

eng = matlab.engine.start_matlab()
c_matlab = np.array(eng.get_coefs(n * 1.0, k * 1.0, eps * 1.0, stdout=io.StringIO())).flatten()
if len(c_matlab) < k + 1:
    c_matlab = np.pad(c_matlab, (0, k + 1 - len(c_matlab)), 'constant')
matlab_tester = SemilinearTester(c_matlab)
print(matlab_tester.optimize(lo, hi, fingerprints, prob_hypo, prob_alts))
print(matlab_tester.lo, matlab_tester.hi, matlab_tester.tb)

c_chi = np.array([(i - k / n) ** 2 for i in range(k + 1)])
chi_squared_tester = SemilinearTester(c_chi)
print(chi_squared_tester.optimize(lo, hi, fingerprints, prob_hypo, prob_alts))
print(chi_squared_tester.lo, chi_squared_tester.hi, chi_squared_tester.tb)

c_tv = np.array([abs(i - k / n) for i in range(k + 1)])
tv_tester = SemilinearTester(c_tv)
print(tv_tester.optimize(lo, hi, fingerprints, prob_hypo, prob_alts))
print(tv_tester.lo, tv_tester.hi, tv_tester.tb)

c_singleton = -np.array([i == 1 for i in range(k + 1)], dtype=float)
singleton_tester = SemilinearTester(c_singleton)
print(singleton_tester.optimize(lo, hi, fingerprints, prob_hypo, prob_alts))
print(singleton_tester.lo, singleton_tester.hi, singleton_tester.tb)

ModuleNotFoundError: No module named 'matlab'

In [None]:
print(*list(zip(x.value, fingerprints)), sep='\n')