# Group Testing Puzzle
## Problem Statement
Given a population of $N$ people who may or may not have COVID, what is the expected minimum number of COVID tests you'd need to administer to determine who does and doesn't have COVID? What would your testing algorithm be?
### Assumptions
- A COVID test is 100% accurate (i.e. no false-positives or false-negatives).
- You can apply a single test to multiple people at a time, it will show positive if at least 1 person partaking in that test is positive.
- The chance of a person having COVID is a constant $C$

### Extensions
- What if the tests are not 100% accurate?
- What if the probability of having COVID isn't uniform over the population?

In [None]:
import numpy as np
import itertools

In [None]:
N = 2
C = 0.1

In [None]:
P = lambda state: C**sum(state)*(1-C)**(N-sum(state))

In [None]:
H = sum([-P(state)*np.log2(P(state)) for state in itertools.product([0,1], repeat=N)])

In [None]:
print(H)

In [None]:
import heapq

queue = [(P(state),state) for state in itertools.product([0,1], repeat=N)]
heapq.heapify(queue)

while len(queue) > 1:
    freq1, tree1 = heapq.heappop(queue)
    freq2, tree2 = heapq.heappop(queue)
    new_node = (freq1+freq2, (tree1,tree2))
    heapq.heappush(queue, new_node)
print(queue[0][1])

#### State Space
$$ X \in \mathbb{Z}_{2}^{N} $$

In [None]:
state_space = list(itertools.product([0,1], repeat=N))

#### Probability Space
$$ P(X=x) = C^{|x|} (1-C)^{N-|x|} $$

In [None]:
P = {state:P(state) for state in state_space}

In [None]:
for state,p in P.items():
    print(state,p)

#### Subset Test Probability
$$ P(X \in T) = \sum_{t \in T} P(X=t) $$

In [None]:
P_subset = {
    tested: sum([
        p 
        for state,p in P.items() 
        if any(np.array(state)*np.array(tested))
    ]) 
    for tested in state_space
}

In [None]:
for tested,p in P_subset.items():
    print(tested,p)

#### Conditional Entropy
$$ H(X|Y) = - \sum_{x,y} P(X=x,Y=y) \log \frac{P(X=x,Y=y)}{P(Y=y)}$$

In [None]:
def subset_conditional_H(P, subset):
    p_subset = sum([p for state,p in P.items() if any(np.array(state)*np.array(subset))])
    subset = np.array(subset)
    H = 0.0
    for subset_result in [0,1]:
        for state,p in P.items():
            state = np.array(state)
            viable = subset_result*any(subset*state) + (1-subset_result)*(not any(subset*state))
            if viable:
                #print(subset_result, state)
                H -= p*np.log2(p/(subset_result*p_subset + (1-subset_result)*(1-p_subset)))
    return H    

In [None]:
min_H = None
for test_set in state_space:
    H = subset_conditional_H(P, test_set)
    if not min_H or H < min_H:
        min_H = H
        min_test_set = test_set
print(min_test_set)
print(min_H)

In [None]:
subset_result = 1

In [None]:
# clean probability space
state_space = [state for state in state_space if subset_result*any(np.array(min_test_set)*np.array(state)) + (1-subset_result)*(not any(np.array(min_test_set)*np.array(state)))]
P_sum = sum([P[state] for state in state_space])
P = {state:P[state]/P_sum for state in state_space}

In [None]:
for state,p in P.items():
    print(state,p)

In [None]:
test_sets_arr = np.array([[0,1]])
results_arr = np.array([1])
state_space_arr = np.array(state_space)
#print(state_space_arr[:,:,np.newaxis].shape, test_sets.T[np.newaxis,:,:].shape)
#print(state_space_arr[:,:,np.newaxis] * test_sets.T[np.newaxis,:,:])

#print(state_space_arr)

#np.any(np.array(state_space)*mask, axis=1)
#~np.any(np.array(state_space)*mask, axis=1)

test_results = np.any(state_space_arr[:,:,np.newaxis] * test_sets_arr.T[np.newaxis,:,:], axis=1)
consistent_state = np.all(test_results == results_arr, axis=1)
reduced_state_space = state_space_arr[consistent_state]
#true_states = np.all(, axis=1)
#print(results_arr)
#print(test_results)
print(consistent_state)

reduced_state_space = state_space_arr[consistent_state]
known_values = np.logical_or(reduced_state_space.all(axis=0), np.logical_not(reduced_state_space).all(axis=0))
#np.any(np.array([1,1,0,0])*known_values)
test_set_arr = np.array([1,0,0,1])
covered_tests = np.logical_not(np.any(test_sets_arr * np.logical_not(test_set_arr), axis=1))
covered_tests = test_sets_arr[covered_tests]
covering = np.any(covered_tests, axis=0)
covering

In [None]:
state_space_arr = np.array(state_space)

import functools
@functools.lru_cache(maxsize=None)
def dynamic_solution(test_sets, results):
    # array-ify
    if not isinstance(results,tuple):
        results = [results]
    if len(test_sets) > 0 and not isinstance(test_sets[0],tuple):
        test_sets = [test_sets]
    test_sets_arr = np.array(test_sets)
    test_sets_arr = test_sets_arr.reshape((test_sets_arr.size//N,N))
    results_arr = np.array(results)
    results_arr = results_arr.reshape((results_arr.size))
    # compute the valid states
    test_results = np.any(state_space_arr[:,:,np.newaxis] * test_sets_arr.T[np.newaxis,:,:], axis=1)
    consistent_state = np.all(test_results == results_arr, axis=1)
    reduced_state_space = state_space_arr[consistent_state]
    if reduced_state_space.shape[0] == 1:
        return 0, tuple()
    if reduced_state_space.shape[0] == 0:
        return -1, tuple()
    # iterate over tests
    known_values = np.logical_or(reduced_state_space.all(axis=0), np.logical_not(reduced_state_space).all(axis=0))
    test_sets_set = set(test_sets)
    min_expected_depth = np.inf
    optimal_test_set = tuple([0])*N
    for test_set in state_space:
        test_set_arr = np.array(test_set)
        # check if not already tested, or is looking at known values
        if test_set not in test_sets_set and not np.any(test_set_arr * known_values):
            # check if already covered by previous tests
            covered_tests = np.logical_not(np.any(test_sets_arr * np.logical_not(test_set_arr), axis=1))
            covered_tests = test_sets_arr[covered_tests]
            covering = np.any(covered_tests, axis=0)
            if np.any(covering != test_set_arr):
                expected_depth = 0
                # Compute probability of positive result
                P_1 = 0.5
                # Loop over possible results
                print(reduced_state_space * np.array(test_set))
                for result in [0,1]:
                    # Create the sorted tuples for hashing
                    new_test_sets = list(test_sets) + [test_set]
                    new_results = list(results) + [result]
                    new_sorted = sorted(zip(new_test_sets,new_results))
                    new_test_sets = tuple([test[0] for test in new_sorted])
                    new_results = tuple([test[1] for test in new_sorted])
                    print(new_test_sets)
                    print(new_results)
                    # Perform the loop
                    temp_expected_depth, _ = dynamic_solution(new_test_sets, new_results)
                    expected_depth += (1+temp_expected_depth) * (P_1*result + (1-P_1)*(1-result))
                # Update if this test set is more optimal
                if expected_depth < min_expected_depth:
                    min_expected_depth = expected_depth
                    optimal_test_set = test_set
    return min_expected_depth, optimal_test_set

In [None]:
dynamic_solution(((0,1)), (0))

In [None]:
dynamic_solution((), ())

## References
- https://en.wikipedia.org/wiki/Decision_theory
- https://numpy.org/doc/stable/reference/generated/numpy.logical_not.html
- https://en.wikipedia.org/wiki/Hamming_space
- https://en.wikipedia.org/wiki/Bernoulli_trial
- https://en.wikipedia.org/wiki/Huffman_coding
- https://math.stackexchange.com/questions/2299145/why-does-the-average-number-of-questions-bits-needed-for-storage-in-shannons-en
- https://en.wikipedia.org/wiki/Kullback%E2%80%93Leibler_divergence
- https://en.wikipedia.org/wiki/Mutual_information
- https://en.wikipedia.org/wiki/Conditional_probability
- https://en.wikipedia.org/wiki/Shannon%27s_source_coding_theorem