# Sequential probability ratio test, applied to PRNGs

Sequential testing procedure from Weiss (1962)

The method tests the null hypothesis that a multinomial random variable has equal category probabilities $1/k$ against the alternative that the most common $s$ categories have a probability of occuring more often than $s/k$.

The function `sequential_multinomial_test` tests the null with type 1 error at most $\alpha$ and power at least $1 - \beta$.

In this particular use case, we want to test whether samples of size $k$ drawn from a population of size $n$ using a particular PRNG and sampling algorithm actually occur with equal frequency.  They should be distributed as multinomial with probability ${n \choose k}^{-1}$.

In [1]:
%matplotlib inline
import matplotlib.pyplot as plt
import math
import numpy as np
import sys
sys.path.append('../modules')
from sample import PIKK, sample_by_index
from sha256prng import SHA256
from prng import lcgRandom
from scipy.misc import comb

In [2]:
def sequential_multinomial_test(sampling_function, num_categories, alpha, beta, multiplier, \
                                s = None, maxsteps=10**5):
    '''
    Conduct Wald's SPRT for multinomial distribution with num_categories categories
    Let p = sum_{s most frequent categories} p_category
    H_0: selection probabilities are all 1/num_categories so p=s/num_categories
    H_1: p = p1 = multiplier * s/num_categories
    
    sampling_function: a function which generates a random number or random sample.
    num_categories: number of categories
    alpha: desired type 1 error rate
    beta: desired power
    multiplier: value larger than 1. Determines alternative: p1 = multiplier * s/num_categories
    s: tuning parameter, integer between 1 and k. Default is 1% of num_categories.
    '''

    assert multiplier > 1
    assert maxsteps > 0
    
    if s is None:
        s = [math.ceil(0.01*num_categories)]
    if isinstance(s, int):
        s = [s]

    k = num_categories # Rename for ease of use!
    lessthan_multiplier = 2 - multiplier
    
    # Set parameters
    lower = beta/(1-alpha)
    upper = (1-beta)/alpha

    # Initialize counter
    sampleCounts = dict()
    while len(sampleCounts.keys()) < max(s):
        Xn = str(sorted(sampling_function()))
        if Xn not in sampleCounts.keys():
            sampleCounts[Xn] = 0
    steps = 0
    LR_upper = {ss: [1] for ss in s}
    decision_upper = {ss: "None" for ss in s}
    num_steps_upper = {ss: maxsteps for ss in s}
    LR_lower = {ss: [1] for ss in s}
    decision_lower = {ss: "None" for ss in s}
    num_steps_lower = {ss: maxsteps for ss in s}    
    tests_running = len(s)*2
    
    # Draw samples
    while tests_running and steps < maxsteps:
        Xn = str(sorted(sampling_function()))
        top_categories = sorted(sampleCounts, key = sampleCounts.get, reverse = True)
        
        steps += 1
        for ss in s:
            # Run test for greater than alternative
            # Event occurs if Xn is among the s most frequent values of X1,...,X_n-1
            if Xn in top_categories[:ss]:
                LR_upper[ss].append(LR_upper[ss][-1] * multiplier) # p1/p0 = multiplier
            else:
                LR_upper[ss].append(LR_upper[ss][-1] * (1 - multiplier*ss/k)/(1-ss/k)) # (1-p1)/(1-p0)

            # Run test at step n
            if LR_upper[ss][-1] <= lower:
                # accept the null and stop
                decision_upper[ss] = 0
                num_steps_upper[ss] = steps
                tests_running -= 1
                if decision_lower[ss] != "None":
                    s.remove(ss)
                
            if LR_upper[ss][-1] >= upper:
                # reject the null and stop
                decision_upper[ss] = 1
                num_steps_upper[ss] = steps
                tests_running -= 1
                if decision_lower[ss] != "None":
                    s.remove(ss)
            
            # Run test for less than alternative
            # Event occurs if Xn is among the s least frequent values of X1,...,X_n-1
            if Xn in top_categories[-ss:]:
                LR_lower[ss].append(LR_lower[ss][-1] * lessthan_multiplier) # p1/p0 = lessthan_multiplier
            else:
                LR_lower[ss].append(LR_lower[ss][-1] * (1 - lessthan_multiplier*ss/k)/(1-ss/k)) # (1-p1)/(1-p0)

            # Run test at step n
            if LR_lower[ss][-1] <= lower:
                # accept the null and stop
                decision_lower[ss] = 0
                num_steps_lower[ss] = steps
                tests_running -= 1
                if decision_upper[ss] != "None":
                    s.remove(ss)
                
            if LR_lower[ss][-1] >= upper:
                # reject the null and stop
                decision_lower[ss] = 1
                num_steps_lower[ss] = steps
                tests_running -= 1
                if decision_upper[ss] != "None":
                    s.remove(ss)

        # add Xn to sampleCounts and repeat
        if Xn in sampleCounts.keys():
            sampleCounts[Xn] += 1
        else:
            sampleCounts[Xn] = 1
    return {'decision_upper' : decision_upper,
            'decision_lower' : decision_lower,
            'lower_threshold' : lower,
            'LR_upper' : LR_upper,
            'LR_lower' : LR_lower,
            'upper_threshold' : upper,
            'steps_upper' : num_steps_upper,
            'steps_lower' : num_steps_lower
            }

In [3]:
# Choose pop and sample size, multiplier > 1, alpha, beta
n = 13
k = 4
alpha = 0.05
beta = 0.05
multiplier = 1.2
s = 10

# RANDU

In [4]:
prng = lcgRandom(100) # from random.org Timestamp: 2017-01-14 22:56:40 UTC
sampling_func = lambda: PIKK(n, k, prng)
sampling_func()

res = sequential_multinomial_test(sampling_func, num_categories=comb(n, k), 
                                  alpha=alpha, beta=beta, multiplier=multiplier, s=[5, 10])
res['decision_upper'], res['steps_upper'], res['decision_lower'], res['steps_lower'] 

({5: 'None', 10: 'None'},
 {5: 100000, 10: 100000},
 {5: 'None', 10: 0},
 {5: 100000, 10: 1672})

In [5]:
prng = lcgRandom(100) # from random.org Timestamp: 2017-01-14 22:56:40 UTC
sampling_func = lambda: sample_by_index(n, k, prng)
sampling_func()

res = sequential_multinomial_test(sampling_func, num_categories=comb(n, k), 
                                  alpha=alpha, beta=beta, multiplier=multiplier, s=s)
res['decision_upper'], res['steps_upper'], res['decision_lower'], res['steps_lower'] 

({10: 'None'}, {10: 100000}, {10: 0}, {10: 3329})

# Super-Duper LCG

In [6]:
# Parameters for the Super Duper LCG
A_SD = 0
B_SD = 69069
M_SD = 2**32
sdlcg = lcgRandom(seed=547691802, A=A_SD, B=B_SD, M=M_SD) 
sampling_func = lambda: PIKK(n, k, sdlcg)
sampling_func()

res = sequential_multinomial_test(sampling_func, num_categories=comb(n, k), 
                                  alpha=alpha, beta=beta, multiplier=multiplier, s=[5, 10, 20])
res['decision_upper'], res['steps_upper'], res['decision_lower'], res['steps_lower'] 

({5: 'None', 10: 'None', 20: 0},
 {5: 100000, 10: 100000, 20: 2178},
 {5: 'None', 10: 'None', 20: 'None'},
 {5: 100000, 10: 100000, 20: 100000})

In [7]:
# Parameters for the Super Duper LCG
A_SD = 0
B_SD = 69069
M_SD = 2**32
sdlcg = lcgRandom(seed=547691802, A=A_SD, B=B_SD, M=M_SD) 
sampling_func = lambda: sample_by_index(n, k, sdlcg)
sampling_func()

res = sequential_multinomial_test(sampling_func, num_categories=comb(n, k), 
                                  alpha=alpha, beta=beta, multiplier=multiplier, s=s)
res['decision_upper'], res['steps_upper'], res['decision_lower'], res['steps_lower'] 

({10: 'None'}, {10: 100000}, {10: 0}, {10: 11446})

# Mersenne Twister

In [8]:
prng = np.random
prng.seed(547691802)
sampling_func = lambda: PIKK(n, k, prng)
sampling_func()

res = sequential_multinomial_test(sampling_func, num_categories=comb(n, k), 
                                  alpha=alpha, beta=beta, multiplier=multiplier, s=s)
res['decision_upper'], res['steps_upper'], res['decision_lower'], res['steps_lower'] 

({10: 'None'}, {10: 100000}, {10: 0}, {10: 6835})

In [9]:
prng = np.random
prng.seed(547691802)
sampling_func = lambda: sample_by_index(n, k, prng)
sampling_func()

res = sequential_multinomial_test(sampling_func, num_categories=comb(n, k), 
                                  alpha=alpha, beta=beta, multiplier=multiplier, s=s)
res['decision_upper'], res['steps_upper'], res['decision_lower'], res['steps_lower'] 

({10: 0}, {10: 13813}, {10: 'None'}, {10: 100000})

# SHA256

In [10]:
prng = SHA256(547691802)
sampling_func = lambda: PIKK(n, k, prng)
sampling_func()

res = sequential_multinomial_test(sampling_func, num_categories=comb(n, k), 
                                  alpha=alpha, beta=beta, multiplier=multiplier, s=s)
res['decision_upper'], res['steps_upper'], res['decision_lower'], res['steps_lower'] 

({10: 'None'}, {10: 100000}, {10: 0}, {10: 15470})

In [11]:
prng = SHA256(547691802)
sampling_func = lambda: sample_by_index(n, k, prng)
sampling_func()

res = sequential_multinomial_test(sampling_func, num_categories=comb(n, k), 
                                  alpha=alpha, beta=beta, multiplier=multiplier, s=s)
res['decision_upper'], res['steps_upper'], res['decision_lower'], res['steps_lower'] 

({10: 'None'}, {10: 100000}, {10: 0}, {10: 1888})