In [None]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd 
import pymc3 as pm
import arviz as az

sns.set()

In [None]:
### set 3:d param True if you want P(infected | positive test) ###
### set 3:d param False if you want p(healthy | negative test) ###

def bayes_rule(base_rate,sensitivity,specificity,p_inf_given_pos = True):
    true_pos = base_rate * sensitivity
    false_neg = base_rate * (1 - sensitivity)
    true_neg = (1 - base_rate) * specificity
    false_pos =  (1 - base_rate) * (1 - specificity)
    
    if p_inf_given_pos:
        return true_pos / (true_pos + false_pos)
    else:
        return true_neg / (true_neg + false_neg)

In [None]:
incidence = 0.001

sensitivity = 0.95
specificity = 0.99


In [None]:
# first test positive, post becomes new incidence #
post = bayes_rule(incidence,sensitivity,specificity) # post becomes new incidence #
print (post)

# second test negative,post becomes p(healthy) #
post = bayes_rule(post,sensitivity,specificity,p_inf_given_pos=False) # post becomes inverse of incidence #
print (1-post)

# third test negative,post becomes p(healthy) # 
post = bayes_rule(1-post,sensitivity,specificity,p_inf_given_pos=False) # must invert post #
print (1-post)

# fourth test negative, post becomes p(healthy) #
post = bayes_rule(1-post,sensitivity,specificity,p_inf_given_pos=False) # must invert post #
print (1-post)

In [None]:

def test_sequence(test_outcome,incidence,sensitivity,specificity):

    prob_infected_after_test = np.zeros(len(test_outcome))

    post = incidence

    for i,t in enumerate(test_outcome):

        post = bayes_rule(post,sensitivity,specificity,p_inf_given_pos=test_outcome[i])

        if test_outcome[i] == False: # post now represents p(inverse of infected) i.e. p(healthy)
            post = 1 - post # take the complement as the updated incidence

        prob_infected_after_test[i] = post

    prob_infected_after_test

    outcome_map = {True : 'Positive',False : 'Negative'}

    test_df = pd.DataFrame({'test_outcome' : [outcome_map[test_outcome[i]] for i in range(len(test_outcome))],
                           'p_infected_given_test_sequence' : prob_infected_after_test})

    test_df.index = range(1,len(test_df) + 1)
    test_df.index.name='test nr'
    
    return test_df


In [None]:
test_outcome = [True,False] * 10

foo = test_sequence(test_outcome,incidence,sensitivity,specificity)

foo

In [None]:
ax = foo.plot(y='p_infected_given_test_sequence',style='o--',figsize=(18,12))

status_2_color = {'Positive' : 'red','Negative' : 'green'}

xyz = zip(foo.index,foo['p_infected_given_test_sequence'],foo['test_outcome'])
for i,(x,y,status) in enumerate (xyz):
    ax.annotate(xy=(x,y),text=status[:3], color=status_2_color[status])

In [None]:
base_rates = np.linspace(0,0.1,100)
p_inf_g_pos = bayes_rule(base_rates,sensitivity,specificity)

plt.figure(figsize=(18,12))
plt.plot(base_rates,p_inf_g_pos)
plt.ylabel('P(infected | positive test)')
plt.xlabel('incidence')
plt.title('P(infected | positive test) given different incidence rates. Sensitivity : {:.3f} Specificity : {:.3f}'.format(
sensitivity,specificity))

plt.savefig('bayesian_update_p_infected_given_incidence.jpg',format='jpg')


In [None]:
fig,axes = plt.subplots(2,figsize=(18,12))

test_outcome = [True,False,False,False]

test_df = test_sequence(test_outcome,incidence,sensitivity,specificity)
test_df['p_healthy_given_test_sequence'] = 1 - test_df['p_infected_given_test_sequence']
test_df.plot(ax=axes[0],x='test_outcome',y='p_infected_given_test_sequence',style='o--')
print (test_df.head())

test_outcome = [True,True,True,True]

test_df = test_sequence(test_outcome,incidence,sensitivity,specificity)
test_df['p_healthy_given_test_sequence'] = 1 - test_df['p_infected_given_test_sequence']
test_df.plot(ax=axes[1],x='test_outcome',y='p_infected_given_test_sequence',style='o--')
print (test_df.head())

fig.suptitle('Bayesian Updating : Probability of being infected after n test outcomes\n' +\
'incidence : {:.3f} sensitivity : {:.3f} specificity : {:.2f}'.format(incidence,sensitivity,specificity))


In [None]:
title = 'Bayesian Updating : Probability of being infected after n test outcomes\n' +\
'incidence : {:.3f} sensitivity : {:.3f} specificity : {:.2f}'.format(incidence,sensitivity,specificity)

test_df.plot(x='test_outcome',y='p_infected_given_test_sequence',style='o--',figsize=(18,12),title=title)
plt.ylabel('Probability being infected after sequence of tests')

In [None]:
plt.hist(np.random.beta(100,3,1000))

In [None]:
# https://towardsdatascience.com/towards-better-estimates-of-recovered-covid-19-cases-d6d1e35b8bda #
population = 100000
k = 1094 # tested positive : total positive

claimed_se = 0.95
claimed_sp = 0.99

with pm.Model() as model:
    se = pm.Beta('se',100,5)
    sp = pm.Beta('sp',100,5)
    inci = pm.Uniform('inci',0,0.3)
    
    # bayes represents proportion positive of population
    bayes = pm.Deterministic('bayes',se * inci / (se * inci + (1-inci) * (1-sp)))
    obs = pm.Binomial('obs',n=population,p=bayes,observed=k)
    
    trace = pm.sample()

In [None]:
with model:
    az.plot_trace(trace)
    print (az.summary(trace))

In [None]:
with model:
    az.plot_posterior(trace,hdi_prob=0.89,var_names=['se'])
    az.plot_posterior(trace,hdi_prob=0.89,var_names=['sp'])
    az.plot_posterior(trace,hdi_prob=0.89,var_names=['inci'])
    az.plot_posterior(trace,hdi_prob=0.89,var_names=['bayes'])

In [None]:
with pm.Model() as model2:
    bernoulli_trials = [0,0] * 2 + [1,1] * 2
    
    p = pm.Uniform('p',0,1)
    obs = pm.Bernoulli('obs',p,observed=bernoulli_trials)
    
    trace2 = pm.sample()

In [None]:
with model2:
    az.plot_trace(trace2)
    print (az.summary(trace))

In [None]:
with model2:
    az.plot_posterior(trace2,hdi_prob=0.89)

In [None]:
population = 100000

def bayes_by_arithmetic(population, incidence,sensitivity,specificity):
    infected = incidence * population
    print ('infected ' ,infected)
    healthy = population - infected 
    print ('healthy ',healthy)

    true_positive = infected * sensitivity
    print ('true positive ' , true_positive)
    false_negative = infected - true_positive
    print ('false negative ', false_negative)

    true_negative = healthy * specificity
    print ('true negative ', true_negative)
    false_positive = healthy - true_negative
    print ('false positive ', false_positive)

    total_positive = true_positive + false_positive
    print ('total_positive ',total_positive)

    total_negative = true_negative + false_negative
    print ('total negative ', total_negative)

    p_infected_given_positive = true_positive / total_positive
    print ('p_infected | positive ',p_infected_given_positive)
    print ('p_healthy | positive ', 1 - p_infected_given_positive)

    p_healthy_given_negative = true_negative / total_negative
    print ('p_healthy | negative ', p_healthy_given_negative)
    print ('p_infected | negative ', 1 - p_healthy_given_negative)
    
    return (total_positive,p_infected_given_positive,total_negative,p_healthy_given_negative)

In [None]:
tests = ['Positive','Negative','Positive','Negative','Positive','Negative','Positive']

test_param_list = []

test_params = bayes_by_arithmetic(population,0.001,sensitivity,specificity)
test_param_list.append(test_params)
print ('test params ' ,test_params)
print ()

### say your first test is positive. That means your new cohort is now test_params[0] i.e. total positive, ###
### and the new incidence is test params[1]. So, we pass these numbers to test #2 ###

test_params = bayes_by_arithmetic(test_params[0],test_params[1],sensitivity,specificity)
test_param_list.append(test_params)
print ('test params ',test_params)
print ()

### say the second test is negative. That means your new cohort is now test_params[2], i.e. total negative, ###
### and the new incidence is 1 - test_params[3]

test_params = bayes_by_arithmetic(test_params[2],1-test_params[3],sensitivity,specificity)
test_param_list.append(test_params)
print ('test params ',test_params)
print ()

### positive
test_params = bayes_by_arithmetic(test_params[0],test_params[1],sensitivity,specificity)
test_param_list.append(test_params)
print ('test params ',test_params)
print ()

### negative
test_params = bayes_by_arithmetic(test_params[2],1-test_params[3],sensitivity,specificity)
test_param_list.append(test_params)
print ('test params ',test_params)
print ()

# positive
test_params = bayes_by_arithmetic(test_params[0],test_params[1],sensitivity,specificity)
test_param_list.append(test_params)
print ('test params ',test_params)
print ()

# negative
test_params = bayes_by_arithmetic(test_params[2],1-test_params[3],sensitivity,specificity)
test_param_list.append(test_params)
print ('test params ',test_params)
print ()


arithmetic_df = pd.DataFrame(test_param_list)
arithmetic_df.columns = ['total positive','p_inf_given_positive','total negative','p_healthy_given_negative']
arithmetic_df['p_healthy_given_positive'] = 1 - arithmetic_df['p_inf_given_positive']
arithmetic_df['p_inf_given_negative'] = 1 - arithmetic_df['p_healthy_given_negative']
arithmetic_df['test_outcome'] = tests
arithmetic_df

In [None]:
''' So above, with the given test outcomes, our P of being infected is: 
    [0.086837,0.004780,0.313311,0.022525,0.686437,0.099556,0.913070]
    that is, same as above using test_sequence() '''

In [None]:
arithmetic_outcomes = [0.086837,0.004780,0.313311,0.022525,0.686437,0.099556,0.913070]
plt.plot(arithmetic_outcomes,'o--')
plt.plot(foo['p_infected_given_test_sequence'].values)

In [None]:
### note that this actually overcounts the covid impact, since all cause mortality is based on ###
### yearly deaths, while covid deaths are counted from beginning of pandemic to YTD ###

covid_prop_impacted = pd.read_pickle('covid_prop_impacted.pkl')
covid_prop_impacted.columns = ['prop covid cases','prop covid ICU','prop covid dead',
                               'bin_mid','CFR','all_cause_baseline','covid_base_ratio']

covid_prop_impacted

In [None]:
covid_prop_impacted.plot(y=['prop covid dead','all_cause_baseline'],style='o--')

In [None]:
### ASSUMING WORST CASE, I.E. COVID DEATHS ARE IN ADDITION TO NORMAL DEATHS ###
rel_change = (covid_prop_impacted['prop covid dead'] + covid_prop_impacted['all_cause_baseline']) / covid_prop_impacted['all_cause_baseline']
rel_change

In [None]:
abs_change = covid_prop_impacted['prop covid dead']
abs_change

In [None]:
odds_without_covid = 1 / covid_prop_impacted['all_cause_baseline']

In [None]:
odds_with_covid = 1 / (covid_prop_impacted['all_cause_baseline'] + covid_prop_impacted['prop covid dead'])

In [None]:
odds = pd.concat([odds_without_covid,odds_with_covid],axis=1)
odds.columns = ['odds_dying_before_pandemic','odds_dying_during_pandemic']
odds['diff'] = odds['odds_dying_before_pandemic'] - odds['odds_dying_during_pandemic']
odds['rel_diff'] = odds['odds_dying_before_pandemic'] / odds['odds_dying_during_pandemic']
#odds = odds.apply(lambda x : np.round(x,0).astype(int))
odds 

In [None]:
1 / 3587

In [None]:
odds.plot(y=['odds_dying_before_pandemic','odds_dying_during_pandemic'],kind='bar',
          logy=True,figsize=(18,12))

In [None]:
1 if True else 0

In [None]:


### simulate infected and true and the error rate of testing ###

incidence = 0.001
sensitivity = 0.95
specificity = 0.99

pop_size = 100000

pop = pd.DataFrame({'infected' : np.random.choice([0,1],p=[1-incidence,incidence],size=pop_size)})

pop['positive'] = pop.apply(lambda row : np.random.choice(
    [0,1],p=[1-sensitivity,sensitivity]) if row.infected == 1 else np.random.choice(
    [0,1],p=[specificity,1-specificity]),axis=1)
    
pop['false positive'] = (pop['infected'] == 0) & (pop['positive'] == 1)
pop['false negative'] = (pop['infected'] == 1) & (pop['positive'] == 0)
pop['true positive'] = (pop['infected'] == 1) & (pop['positive'] == 1)
pop['true negative'] = (pop['infected'] == 0) & (pop['positive'] == 0)

p_inf_g_pos = pop.sum()['true positive'] / (pop.sum()['true positive'] + pop.sum()['false positive'])
p_ok_g_neg = pop.sum()['true negative'] / (pop.sum()['true negative'] + pop.sum()['false negative'])

print ('P(inf | positive) ', p_inf_g_pos)
print ('P(ok | negative) ', p_ok_g_neg)

print (pop.sum())

pop

In [None]:
positives = pop.loc[pop['positive'] == 1]
print (positives.sum())
positives

In [None]:
with pm.Model() as model3:
    
    p_inf_g_pos = pm.Uniform('p_inf_g_pos',0,1)
    obs = pm.Bernoulli('obs',p=p_inf_g_pos,observed=positives['true positive'].values)
    
    trace3 = pm.sample()

In [None]:
with model3:
    az.plot_trace(trace3)
    print (az.summary(trace3,hdi_prob=0.89))

In [None]:
with model3:
    az.plot_posterior(trace3,hdi_prob=0.89)