# Pooled testing
This is an application of pooled testing to COVID-19. For further details refer to my blog
## Sources
- BBC podcast https://www.bbc.co.uk/sounds/play/w3cszh0k
- The paper https://arxiv.org/pdf/2004.14934.pdf

In [1]:
import numpy as np
from tqdm import tqdm

- Set n, number of tests,
- p, estimated prevelance of virus in the population
- m number infected in sample of n, expected value n*p

In [2]:
n = int(1e6)
p = 1e-4
m = int(n*p)

Randomly set the status m out of n samples to be 1, i.e. infected

## Prepare run - create a random sample and set up parameters
- Generate a random sample
- Optimal group size, Nopt, is 0.35/p
- expected number of tests T per person ≈ ep ln(0.734/p)
- Using L=3 as and approximation of e for the optimal number of lattice points, L.
- Choose N to be closest power of 3 to optimal
- Pad the samples to the next multiple of N and reshape to groups x N

In [3]:
def randomSample(n,p,m):
    # Random sample
    positives = np.random.choice(range(n),m)
    status = np.zeros(n)
    status[positives] = 1
    return status


def prepareRun(status,n,p):
    """Set up parameters """
    # Set up parameters
    T = np.exp(1)*p*np.log(0.734/p)
    Nopt = round(0.35/p,0)
    print(f'n = {n:,d}. Incidence = {p*100:.4f}%. Optimal group size if {Nopt}. Expected tests per person {T*100:.2f}%')
    L = 3
    D = int(round(np.log(Nopt)/np.log(L)))
    N = int(L**D)
    groups = int(np.ceil(n/N))
    print(f'OK we are going to use a {groups:,d} groups of size {N} to process the {n:,d} samples')
    status = np.append(status,np.zeros(int(groups*N-len(status)))).reshape(groups,N)
    return status, D, L, N, groups


## Stage 1 Test groups
In each group, pool all samples and a perform a test. If this is negative, all members of that group are negative. This selects the groups were futher tests are required to identify individuals

In [4]:
def initialScan(status,n,p):
    positivesPool = status.sum(axis=1)>0
    positiveGroups = np.arange(groups)[positivesPool]
    totalTests = groups
    print(f'In initial scan, {totalTests:,d} tests performed. Positives were found in {len(positiveGroups):,d} groups')
    return status,D,L,N,positiveGroups,totalTests

## Stage 2 Now perform hypercube slicing algorithm on each group containing one or more positive
In this stage we think of the samples in each group as lying on a D-dimentional hypercube lattice with L points in each direction. The coordinates of the sample provide a base L, i.e. tertiary number, identifying the sample ID. 
- Slices are obtained by sequentially rolling the axes of the hypercube and taking the sum of all the elements of the slices in down axis=1. 
- There are LxD slices, resulting in LxD tests being performed
- Slices are themselves hypercubes of dimension D-1
- Every point on the lattice is contained in exactly D slices. So a single positive will make tests on D slices positive. These D slices uniquely identify the ID of the positive. 
    - For example for D=3 and L=3, we have a 3x3x3 lattice
    - Slices from left to right match the tertiary unit column
    - Slices from back to front match the teritary 3s column
    - And slices from top to bottom match the tertiary 9s column
    - Positives in slices 2,2,2 would result from sample 26 being positive

The results are stored in a the *test* array of shape (D,L). We can think of the rows of *test* representing the results of pooled samples from slices in each dimension. The results are arranged to match the positions of the slices through the hypercube. The first row of *test* represents the units, the next row the 3s, the next row 9s etc. 
- For example a 3x3x3x3 cube, *test* will have 4 rows and 3 columns. 
- The number 59 in tertiary is 2102
- The position of the 1 in each row ot *test* represents a tertiary digit, where the tertiary number is read upwards from the bottom
- [[0 0 1] (position 2)
-  [1 0 0] (position 0)
-  [0 1 0] (position 1)
-  [0 0 1]] (position 2)

Create a couple of helper functions 
- convert pooled test result to an ID, this will be a uniquie ID if there are exactly D positive test results
- convert a unique ID into a test result
- and thirdly, create an important function to test all the slices in a hypercube lattice of samples

In [5]:
def test2ID(test,L=3):
    """Function that converts a set of test results into an id
    If there are exactly D positives, this will be the unique id of the infected sample"""
    r,c = test.shape
    # locator converts positive test results into a location by multiplying by powers of 3
    locator = np.arange(L).reshape(1,L) * L**np.arange(r).reshape(r,1)
    return int((test*locator).sum())

def ID2test(ID,D,L=3):
    """Function that converts the id of a positive sample into the pooled test result,
    as if it were the only positive in the group"""
    
    assert ID < L**D
    testID = np.zeros((D,L))
    # intitialise as the ID for zero
    testID[:,0] = 1
    # Generate teritary number and set that position to 1 in test
    row = 0
    while ID>0:
        r = ID%L           # remainder on division by L
        testID[row,0] = 0
        testID[row,r] = 1
        ID = ID//L
        row += 1
    return testID

def testGroup(hypercube,L=3):
    """Take a D-dimentionsal hypercube lattice of samples and obtain result of testing each slice
    In real life, this array of test results would be produced by the PCR testing machine"""
    Dcube = hypercube.ndim
    test = np.empty((Dcube,L))
    for d in range(Dcube):
        if Dcube>1:
            hypercube = np.rollaxis(hypercube,-1)
            test[d] = hypercube.sum(axis=1).reshape(3,-1).sum(axis=1)
        else:
            test[d]=hypercube
    # Test can only be postive or negative even if more than one positive in a slice 
    test[test>1]=1
    return test


## Hypercube algorithm
- This is the main algorithm. The next cell creates a random set of test results, size n, with indidence p.
- The next cell implements the algoritm. Unfortunately the code gets a bit messy dealing with awkward, low-probability edge cases, where multiple positives happen to fall into one slice.

In [6]:
n = 1e6
p = 1e-2
m = n*p
status=randomSample(int(n),p,int(m))

In [7]:
n=int(n)
status=status.ravel()
status, D, L, N, groups = prepareRun(status,n,p)


status,D,L,N,positiveGroups,totalTests = initialScan(status,n,p)

positivesFound = []

for g in positiveGroups:
    #print(f'Performing {int(N/L)} tests on group {g}')
    # newSlice keeps track of the positions of the samples in the hypercube lattice
    # initially newSlice tracks all the samples in status[g]
    pool = status[g]
    newSlice = np.arange(len(pool)).reshape([3]*D)
    
    hypercube = pool[newSlice]
    test = testGroup(hypercube)
    totalTests += L**(hypercube.ndim-1)
    
    # Check for single positive test
    if test.sum() == hypercube.ndim:
        groupPositive = newSlice.ravel()[test2ID(test)]
        positivesFound += [groupPositive + g*N]
        #print(f'Positive case identified for individual number {int(samplePositive)} found in group {g} ')
    else:
        #      More than one positive in group g
        # Find principle axis with max number of postive slices, princDir
        positiveSlices=(test>0).sum(axis=1)
        # Choose a direction with the highest number of positive slices
        princDir = np.argmax(positiveSlices)
        # Make a list of axes locating positive slices
        princAxes = np.where(test[princDir]==1)[0]

        # Now try to find unique positives in each slice
        # Iterate through poistive slices in the next dimension down
        for princAxis in princAxes:
            # Track samples included in this slice
            tracker = np.arange(len(pool)).reshape([3]*hypercube.ndim)
            tracker = np.rollaxis(tracker,-1)
            for i in range(princDir):
                tracker = np.rollaxis(tracker,-1)
            newSlice = tracker[princAxis]

            # Apply hypercube again to newSlice of this sample
            hypercube2 = pool[newSlice]
            test2 = testGroup(hypercube2)
            totalTests += L**(hypercube2.ndim-1)
            
            # Check for single positive test
            if test2.sum()==hypercube2.ndim:
                # found a positive 
                newPositive = newSlice.ravel()[test2ID(test2)]
                if newPositive + g*N not in positivesFound:
                    positivesFound += [newPositive + g*N]
            else:
                # More than 2 positives in group {g} so now recursively search sub-slices
                # It would be a lot neater to do this as a recursive call
                positiveSlices2=(test2>0).sum(axis=1)
                princDir2 = np.argmax(positiveSlices2)
                princAxes2 = np.where(test2[princDir2]==1)[0]

                # Iterate though positives slices in next dimension down
                for princAxis2 in princAxes2:
                    # Track samples included in this slice
                    tracker = np.arange(newSlice.size).reshape([3]*hypercube2.ndim)
                    tracker = np.rollaxis(tracker,-1)
                    for i in range(princDir2):
                        tracker = np.rollaxis(tracker,-1)
                    newSlice2 = tracker[princAxis2]
                    if D>2:
                        # Apply hypercube again to newSlice of this sample, only for groups where D>2
                        test3 = testGroup(pool[newSlice.ravel()[newSlice2]])
                        totalTests += L**(D-3)
                        if test3.sum()==(D-2):
                            # found a positive 
                            newPositive = newSlice.ravel()[newSlice2.ravel()[test2ID(test3)]]
                            if newPositive + g*N not in positivesFound:
                                positivesFound += [newPositive + g*N]
                        elif test3.sum()>D-2:
                            # Check every sample in this slice
                            for i in newSlice.ravel()[newSlice2]:
                                totalTests += 1
                                if status[g][i]:
                                    newPositive = i
                                    if newPositive + g*N not in positivesFound:
                                        positivesFound += [newPositive + g*N]
                                                        
                    else:
                        # Multiple positives in this low diminsional slice, so go through one by one
                        # Because you can't slice any further
                        totalTests += newSlice.size
                        for i in newSlice.ravel():
                            if status[g][i]:
                                newPositive = i
                                if newPositive + g*N not in positivesFound:
                                    positivesFound += [newPositive + g*N]

            
#print(f'{status.sum()} True positives {np.where(status.ravel()==1)}')
print(f'{len(positivesFound):,d} out of {int(status.sum()):,d} found from a total of {int(totalTests):,d} tests performed for {n:,d} individuals or {totalTests/n*100:.2f}%. \nSaving @ £30 per test  = £{int(n-totalTests)*30:,d}')
FP = set(positivesFound)-set(np.where(status.ravel()==1)[0])
FN= set(np.where(status.ravel()==1)[0])-set(positivesFound)
print(f'{len(FP)/n*100:.4f}% False Positives {sorted(list(FP))} \n{len(FN)/n*100:.4f}% False Negatives {sorted(list(FN))}')



n = 1,000,000. Incidence = 1.0000%. Optimal group size if 35.0. Expected tests per person 11.68%
OK we are going to use a 37,038 groups of size 27 to process the 1,000,000 samples
In initial scan, 37,038 tests performed. Positives were found in 8,695 groups
9,940 out of 9,940 found from a total of 122,423 tests performed for 1,000,000 individuals or 12.24%. 
Saving @ £30 per test  = £26,327,310
0.0000% False Positives [] 
0.0000% False Negatives []
