Adapted from Karpov Courses

In [1]:
from typing import Tuple, List
import numpy as np
import multiprocessing as mp
import os
from tqdm import tqdm_notebook

In [2]:
N_bootstraps: int = 10000 # num boostrap samples (of size N each)
mp.set_start_method('spawn', force=True) # spawn -> fork

In [3]:
def poisson_bootstrap_tp_fp_fn_tn(
    bundle: Tuple[float, List[Tuple[float, float, float, int]]],
                                 ) ->List[np.ndarray]:
    threshold, data = bundle
    
    # We take multiple bootstrap resamples from our data and want to
    # figure out, how many TP, FP, FN and TN will be in each of them.
    # This helps us to simulate the presense of multiple
    # datasets that we use to evaluate our model.
    TP = np.zeros((N_bootstraps))
    FP = np.zeros((N_bootstraps))
    FN = np.zeros((N_bootstraps))
    TN = np.zeros((N_bootstraps))
    
    # Iterating over each individual datapoint (prediction)
    for current_label, current_predict, weight, index in data:
        np.random.seed(index) # for reproducibility
        
        current_predict += np.random.normal(0,0.0125,1) # add noise for testing robustness
        current_predict = int(np.clip(current_predict,0,1) >= threshold) # make sure predictions are within [0,1] range
        
        # With regular bootstrap each resample contains N records.
        # For IID data, we can actually describe any resample by the number of occurrences of each observation.
        # Thus, we need to either sample from multinomial(N, (1/N, 1/N ...)) or
        # sample each record independently from binomial(N, 1/N).
        # However, as N is large, 1/N is small -> we can use Poisson(N * (1/N) = 1) to approximate Binomial(N, 1/N)
        # and use it to calculate number of occurences of record #index in each of the resamples:
        p_sample = np.random.poisson(1, N_bootstraps) * weight
        
        # If current label was classified as positive, and it is correct,
        # then in each of the bootstrapped resamples this records will
        # contribute to TP count by its number of occurences in the respective 
        # bootstrap resample:
        if current_label == 1 and current_predict == 1: 
            TP += p_sample
            
            
        elif current_label == 1 and current_predict == 0:
            FN += p_sample
        elif current_label == 0 and current_predict == 1:
            FP += p_sample
        elif current_label == 0 and current_predict == 0:
            TN += p_sample
            
    return [TP, FP, FN, TN]         

In [4]:
N = 10**7
labels = np.random.randint(0,2,N)
predicts = np.clip(np.random.normal(0.5,1,N),0,1)
weights = np.array([1 for _ in range(N)])

print(labels[:10])
print(predicts[:10])
print(weights[:10])

[0 0 1 0 1 0 1 1 1 0]
[0.46677325 0.47942523 0.76572266 1.         0.         0.35443751
 0.         1.         1.         1.        ]
[1 1 1 1 1 1 1 1 1 1]


In [5]:
chunk_size = 1000
threshold = 0.81
generator = (
    (
        threshold,
        [
            (labels[x + y],
             predicts[x + y],
             weights[x + y],
             x + y,
            )
    
        for x in range(chunk_size)
        if x+y < N
        ],
        
    )
        for y in range(0,N,chunk_size)

)

In [6]:
cpu_to_use = np.max([os.cpu_count() - 3,1])
print(cpu_to_use)

with mp.Pool(processes=cpu_to_use) as pool:
    stat_list = list(tqdm_notebook(pool.imap(poisson_bootstrap_tp_fp_fn_tn,generator),
                    total = N//chunk_size))
    
TP, FP, FN, TN = np.sum(stat_list)
print(TP,FP,FN,TN)