<a href="https://colab.research.google.com/github/pde/private-contact-tracing/blob/master/v2_Copy_of_Predicted_effectiveness_of_privacy_friendly_mobile_contact_tracing_for_COVID_19.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

This notebook attempts to predict the effectiveness of different types of apps for epidemiological contact tracing. Initial (and still very much work-in-progress) version by Peter Eckersley peter.eckersley@gmail.com, Lewis Mitchell lewis.mitchell@adelaide.edu.au, and James McCaw jamesm@unimelb.edu.au. This version currently models the United States, as a large and high-stakes setting, but only a few variables (especially N, I0/E0, testmap) need to be changed to model other countries. The model is also likely too pessimistic because it assumes willpower for lockdowns just runs out at some point (rather than being incrementally and carefully phased out).

Please feel free to reuse and repurpose this notebook, but please cite this URL and indicate that (as of this version) results are not peer reviewed, and do not imply endorsement of any conclusion or policy!


In [0]:
import numpy as np
import numpy.random as npr
import bokeh.plotting as bp
import bokeh.io as bi
import bokeh.util.hex as bh
import bokeh.transform as bt
import bokeh.models as bm
import bokeh.colors as bc
import bokeh.layouts as bl
import matplotlib.pyplot as plt
import scipy.stats as ss
from tqdm.notebook import tnrange
from tqdm.notebook import tqdm

import os
import requests
npa = np.array

**Links to key results below**:
* [Comparison of bluetooth and GPS/wifi location apps](#scrollTo=sgCgKqKxkgus&uniqifier=1) based on the presmise that bluetooth requires both parties to have the app installed, while sometimes mobile location apps can be obtained retrospectively at install time.
* [Impact of apps on lockdowns and mortality as a function of adoption](#scrollTo=OjMKiefMkgvd&uniqifier=1)


**Things that are done**:

*   Monte Carlo simulation of app adoption & effectiveness for two types of app:
  * A "bluetooth" model that is more accurate, but requires both infected and exposed individuals to have the app installed in order to anonymously measure exposure using a [protocol like this one](https://docs.google.com/document/d/1f65V3PI214-uYfZLUZtm55kdVwoazIMqGJrxcYNI4eg/edit#heading=h.6q40wl39kcs8).
  * A less accurate retrospective mobile location (GPS+Wifi) model that assumes that when patients are diagnosed, they can be sent an onboarding link which uses their Google Maps Timeline or iOS on-device location records to identify (using [private set intersection](https://en.wikipedia.org/wiki/Private_set_intersection) or similar methods) and send them notifications. For the model, the important point is that patients do not need to have had the app installed at the time of contact.
*   Differential equation model of pandemic for each simulation, including:
  * A lockdown policy that kicks in during periods of high infection
  * App launching at some date (Monte Carlo distributed) that diverts some exposed individuals into a quarantine state that's separate from the usual exposed -> infected -> recovered path and does not cause further transmission
*   Analysis of impact of the app on fatalities and lockdown duration, as a function of adoption
* Test SQUEE step size + accuracy
* Estimate lives saved & days of lockdown averted for the retrospective GPS model
* Ground policy intervention effect sizes better
   * Social distancing from observational data?
   * And/or from [other modelling approaches](https://www.sciencedirect.com/science/article/pii/S1473309920301444) 
* Discount the effect of contact tracing for the proportion of traces that arrive too late
* Graph against every MC variable, and compute partially ranked correlation coefficients
* Set a "model done" deadline, and pivot to paper at that point
    * Thursday morning!
* SEEIIR



**Things to consider doing**:
* Model eradication policies?
* Consider modelling false positive alerts explicitly
* Model household transmission, which continues after some forms of isolation into state Q
* Adjust the lockdown model to be non-binary. Presently the model turns lockdowns on and off more frequently than is realistic, since these policies are inherently slow to stop and start (though it may be reasonable to view the total number of days of binary lockdown as an approximation to the effect and burden of a more nuanced and slow-changing set of policies)

In [0]:
def franken_dist(mean, high95, low95, size=1):
    "Create an asymmetrically stretched normal distribution"
    dist = npr.normal(size=size)
    dist = np.where(dist < 0, dist * (mean - low95) / 2, dist * (high95 - mean) / 2)
    dist += mean
    return dist

def franken_dist_fs(mean, high, low, size=1):
    "Create a custom distribution with finite support"

def log_uniform(min, max, size=1):
    # The reciprocal aka log-uniform
    bounds = np.log([min, max])
    return np.e ** npr.uniform(*bounds, size=size)

In [0]:
# we use a Monte Carlo approach to handle uncertainty in parameters; we call down to an SEIRQL 
# differential equation pandemic model to estimate # of lives saved
samples = 250000
npr.seed(0)

scenario = "US"  # "US"

# model the United States
#N = 327 * 10**6
if scenario == "AU":
    N = 25 * 1e6
elif scenario == "US":
    N = 327 * 1e6
else:
    raise ValueError("Unknown scenario")
t = np.linspace(0, 364, 365)
# start date is beginning of February 2020

# Variables for our model

data_raw = dict(
  # These are for the app
  pop_adoption = npr.uniform(0, 1.0, size=samples),  # independent variable, explore all adoption levels
  #pop = 1 / npr.pareto(1e6, size=samples),  # scale of population, roughly cities to countries
  tester_adoption = npr.uniform(0.8, 1.0, size=samples) if scenario == "AU"
               else npr.uniform(0.5, 1.0),  # when a test is positive, does that wind up in the app?
  #testing_rate = np.clip(npr.pareto(1, size=samples), 0, 1),  # XXX BUGGY Fraction of infections that get tested. Typical range from 0.001 (US or Indonesia) to above 0.5 (Singapore)
  #testing_rate = franken_dist(0.12, 0.15, 0.022, samples),  # approximate https://cmmid.github.io/topics/covid19/severity/global_cfr_estimates.html#current-estimates US numbers
  catch_rate_bt = npr.uniform(0.4, 0.8, size=samples),  # how often does the software detect the contact between two of its users XXX needs more modelling
  catch_rate_gps = npr.uniform(0.3, 0.7, size=samples),  # assume GPS is less precise and might fail worse, though best case is as good because of intertemporal fomite risk detection
  app_launch_date = npr.randint(84, 120, size=samples),
  onboarding_loss = npr.uniform(0, 0.3, size=samples),  # patients who refuse or fail to install the app at diagnosis time
  test_growth = 0.5 + npr.lognormal(size=samples),  # multiplier on linear extrapolation from recent test levels
  #test_supply = 10 ** npr.uniform(4, 4.5, size=samples)  # alternative model of test supply
  test_supply = npr.uniform(8000, 20000, size=samples) if scenario == "AU"
           else log_uniform(1e5, 1e6, size=samples),  # This lower bound is around US actual tests in May
  stranger_contact_rate = npr.uniform(1/3, 2/3, size=samples),  # what fraction of close contacts are not nameable by index patients?
)

data_raw["onboarding_loss"] = np.minimum(data_raw["onboarding_loss"], 1 - data_raw["pop_adoption"])  # but if 100% of users have the app, diagnosed patients do too

pandemic_params = dict(
  # these are for the pandemic
  r0_raw = (1.6 + npr.beta(2, 5, size=samples) * 3),  # TODO: ground better in https://github.com/midas-network/COVID-19/tree/master/parameter_estimates/2019_novel_coronavirus#basic-reproduction-number
  #infection_fatality_rate = npr.uniform(0.11, 4.3, size=samples)/100,  # follow https://www.medrxiv.org/content/medrxiv/early/2020/03/09/2020.03.05.20031773.full.pdf
                                                                        # but err a little higher due to subsequent Diamond Princess deaths & ICU overload risks
  infection_fatality_rate = npr.beta(2, 5, size=samples)*3./100,
  # could use npr.lognormal(-0.1, 0.9, size=samples)
  #lockdown_threshold = 1 / npr.pareto(100, size=samples),  # 99% CI~ 20-10,000 new cases per day for lockdown
  lockdown_threshold = 1 / npr.pareto(50, size=samples) if scenario == "AU" else 1 / npr.pareto(100, size=samples),
  #lockdown_effect = npr.uniform(0.2, 0.9, size=samples),
  lockdown_r0 = npr.uniform(0.4, 0.7, size=samples) if scenario == "AU" # roughly https://www.doherty.edu.au/uploads/content_doc/Estimating_changes_in_the_transmission_of_COVID-19_April14-public-release.pdf
          else npr.uniform(0.4, 1.3, size=samples),  # same lower bound as Australia, but a lot less clear that the US lock down has been quite as effective or has as much community support (https://rt.live/)

  #lockdown_limit = npr.randint(20, 200, size=samples)  # hard limit before politics or economics makes lockdown impossible
  #lockdown_limit = npr.lognormal(4, size=samples),
  lockdown_limit = npr.uniform(800, 1000, size=samples) if scenario == "AU"  # effectively infinity but setting everything to a single value breaks some analysis code
              else log_uniform(30, 365, size=samples),  # political willingness for lockdowns in the US unclear
  lockdown_length = npr.uniform(7, 30, size=samples),
  tests_positive = franken_dist(0.018, 0.077, 0.0074, size=samples).clip(0.0074, None),  # What fraction of tests are positive? Mean Australia, HK - Germany
                                                       # (Conservatively assume it's as hard to find a next case as it is in those
                                                       # countries)
)




data_raw.update(pandemic_params)
data_raw["tester_adoption"] = np.maximum(data_raw["tester_adoption"], data_raw["pop_adoption"])  # assume that if you have very high population adoption, testers probably use this too
#data_raw["time_test_to_diagnosis_max"] = data_raw["time_test_to_diagnosis_min"] + npr.uniform(0, 3, size=samples), # 0 to 3 days more

In [0]:
# trace race parameters
# for these we run a separate set of MC simulations, and then for each scenario in the main MC model, draw from this distribution
tr_samples = 2000
tr_dist = dict(
  generation_time_wshape = franken_dist(2.826, 4.7, 1.75, size=tr_samples),  # https://science.sciencemag.org/content/early/2020/04/09/science.abb6936/tab-figures-data
  generation_time_wscale = franken_dist(5.665, 6.9, 4.7, size=tr_samples),
  # time_symptoms_to_doctor_min = np.zeros(samples), 0
  time_symptoms_to_doctor_max = npr.uniform(2, 10, size=tr_samples),
  time_test_to_diagnosis_min = npr.uniform(0, 3, size=tr_samples), # instant to three days
  incubation_time_lognormal_lmean = franken_dist(1.644, 1.798, 1.495, size=tr_samples),
  incubation_time_lognormal_lsd = franken_dist(0.363, 0.521, 0.201, size=tr_samples),
  human_trace_time = log_uniform(2/24., 3, size=tr_samples),  # human contact tracing takes 2 hours ... 3 days
                                                              # (quick interview up to weekend + overnight lab->tracer delay + people who don't answer phones)
)
tr_dist["time_test_to_diagnosis_max"] = npr.uniform(0, 3, size=tr_samples) + tr_dist["time_test_to_diagnosis_min"]

In [0]:
# simulate the success of the tracing race in a specific tracing scenario
def tracing_race_win_rate(tparams, ssize=1000):
    # now how many of those catchable cases did we get to in time to prevent future transmission
    global tr_samples
    s = (ssize, tr_samples)   # (samples per scenario world, scenario worlds)
    time_symptoms_to_doctor = npr.uniform(tparams["time_symptoms_to_doctor_max"], size=s)
    time_test_to_diagnosis = npr.uniform(tparams["time_test_to_diagnosis_min"], tparams["time_test_to_diagnosis_max"], size=s)
    #time_diagnosis_to_trace = npr.uniform(0, 0.01, size=ssize)
    incubation_time = npr.lognormal(tparams["incubation_time_lognormal_lmean"], tparams["incubation_time_lognormal_lsd"], size=s)
    time_from_primary_exposure_to_primary_diagnosis = incubation_time + time_symptoms_to_doctor + time_test_to_diagnosis
    #time_from_primary_exposure_to_primary_diagnosis = 1/infect_rate + time_symptoms_to_doctor + time_test_to_diagnosis
    w_shape = tparams["generation_time_wshape"]
    w_scale = tparams["generation_time_wscale"]
    time_from_primary_exposure_to_secondary_exposure  = w_scale * npr.weibull(w_shape, size=s) # = 1/infect_rate # generation time
    time_from_exposure_to_tertiary_exposure = time_from_primary_exposure_to_secondary_exposure + w_scale * npr.weibull(w_shape, size=s)
    time_primary_exposure_to_trace = time_from_primary_exposure_to_primary_diagnosis # + time_diagnosis_to_trace

    won = np.count_nonzero(time_primary_exposure_to_trace < time_from_exposure_to_tertiary_exposure, axis=0)
    won_race_proportion = won / ssize

    human_won = np.count_nonzero(time_primary_exposure_to_trace + tparams["human_trace_time"]< time_from_exposure_to_tertiary_exposure, axis=0)
    human_race_proportion = human_won / ssize
    ret = {"app_race_win_rate": won_race_proportion,
           "human_trace_race_win_rate": human_race_proportion}

    return ret

ret = tracing_race_win_rate(tr_dist)
tr_dist.update(ret)

In [0]:
for i in range(5):
    print([(k, v[i]) for k,v in tr_dist.items()])

# stretch our smaller trace win distribution out to the full monte carlo size
resample = npr.choice(np.arange(tr_samples), replace=True, size=samples)
print(tr_dist["app_race_win_rate"][resample])

tr_dist_resampled = {name: vals[resample] for name, vals in tr_dist.items()}
data_raw.update(tr_dist_resampled)

[('generation_time_wshape', 3.6128988866250795), ('generation_time_wscale', 5.615010537793989), ('time_symptoms_to_doctor_max', 8.068625845827174), ('time_test_to_diagnosis_min', 1.7161494990236132), ('incubation_time_lognormal_lmean', 1.7038490061111202), ('incubation_time_lognormal_lsd', 0.375458055927706), ('human_trace_time', 1.1749181181424235), ('time_test_to_diagnosis_max', 1.792107765976867), ('app_race_win_rate', 0.301), ('human_trace_race_win_rate', 0.204)]
[('generation_time_wshape', 2.816488412188316), ('generation_time_wscale', 6.161597871917034), ('time_symptoms_to_doctor_max', 9.14481796557903), ('time_test_to_diagnosis_min', 1.845351987243564), ('incubation_time_lognormal_lmean', 1.629898218827544), ('incubation_time_lognormal_lsd', 0.3055110724219292), ('human_trace_time', 2.0155243397762552), ('time_test_to_diagnosis_max', 2.3046134429810454), ('app_race_win_rate', 0.372), ('human_trace_race_win_rate', 0.215)]
[('generation_time_wshape', 2.8063435167454753), ('generat

In [0]:
# This craziness is somehow necessary to export Bokeh figures as SVG for a paper (!)
export_figures = False
to_drive = False
if export_figures:
    !npm install phantomjs-prebuilt
    !pip3 install selenium
    os.environ["BOKEH_PHANTOMJS_PATH"] = "/content/node_modules/phantomjs-prebuilt/lib/phantom/bin/phantomjs"
    if to_drive:
        from google.colab import drive
        drive.mount("/content/drive")
        figpath=("/content/drive/My Drive/figures")
        if not os.path.isdir(figpath):
            os.mkdir(figpath)
    else:
        figpath ="/tmp"

In [0]:

def set_svg(figure_or_layout):
    if isinstance(figure_or_layout, bp.Figure):
        print("Switching", figure_or_layout.output_backend, "to svg")
        figure_or_layout.output_backend = "svg"
        fig = figure_or_layout
        title = fig.title.text if hasattr(fig, "title") else fig.yaxis.axis_label.title()
        title += ".svg"
        return title
    else:
        return [set_svg(c) for c in figure_or_layout.children][0]

def display(plot):
    bi.curdoc().add_root(plot)
    bp.output_notebook()
    bi.show(plot)
    if export_figures:
        filename = set_svg(plot)
        if not to_drive:
            filename = "/tmp/fig.svg"
        paths = bi.export_svgs(plot, filename=os.path.join(figpath, filename))
        print("Plot saved to", paths)

In [0]:
use_US_test_data = False

if use_US_test_data:
    import datetime as dt
    import requests
    import json

    j = requests.get("https://covidtracking.com/api/us/daily").json()
    #print(json.dumps(j[0], indent=4))
    #j_is = requests.get("https://covidtracking.com/api/is/daily").json()
    N_is = 3.4 * 1e5

In [0]:
if use_US_test_data:
    def get_testmap(json_tests):
        testmap = np.zeros(366)
        last = 0
        assert not np.any(np.isnan(testmap))

        for entry in json_tests:
            date = dt.datetime.strptime(str(entry["date"]), "%Y%m%d")
            start = dt.datetime(2020, 2, 1)
            day = (date - start).days
            if day > last: last = day
            if day >= 0:
                val = entry.get("totalTestResultsIncrease", 0)
                try:
                   val = int(val)
                   testmap[day] = val
                except:
                   pass
                   #print("Couldn't coerce %r to int" % val)

        assert not np.any(np.isnan(testmap))
        return testmap, last

    testmap, last = get_testmap(j)
    #testmap_is = get_testmap(j_is)
    plot = bp.figure(plot_width=600, plot_height=400, y_axis_type="linear", y_range=[0, 1.1 * 1e6],
                    x_axis_label="Day", y_axis_label="Tests in the US")

    plot.scatter(t[:last], testmap[:last], legend_label="actual US tests")
    #plot.scatter(t[:last], testmap_is[:last] * N/N_is, label="iceland equivalent")
    fitstart = last - 30
    nppp = np.polynomial.polynomial  # really!
    for g in data_raw["test_growth"][:50]:
        extrapolation = nppp.polyfit(t[fitstart:last], testmap[fitstart:last], 1)
        p = np.polynomial.Polynomial(extrapolation)
        
        plot.line(t[fitstart:fitstart+100], g * p(t[fitstart:fitstart+100]), legend_label="extrapolation", line_color="red", line_alpha=0.3)
        #p(np.arange(fitstart,365)))
    testmap[last:365] = p(t[last:365])
    display(plot)


In [0]:
globals().update(data_raw)  # refactorme out
scattersize = 3000
print([k for k, v in data_raw.items() if type(v) == float])
data = bm.ColumnDataSource({k: v[:scattersize] for k, v in data_raw.items()})
for k, v in data_raw.items():
  desc = ss.describe(v)
  print("%25s" % k, desc)

[]
             pop_adoption DescribeResult(nobs=250000, minmax=(3.3105544573475143e-06, 0.9999990120313899), mean=0.5004072618970965, variance=0.08344647118764248, skewness=-0.0036138267076261505, kurtosis=-1.2022056474149432)
          tester_adoption DescribeResult(nobs=250000, minmax=(0.8000005900832556, 0.9999990120313899), mean=0.9064909506178667, variance=0.003289526746022724, skewness=-0.1362741046201672, kurtosis=-1.1681456461820756)
            catch_rate_bt DescribeResult(nobs=250000, minmax=(0.4000002828481269, 0.799997000217794), mean=0.6004855431009937, variance=0.013330054234985959, skewness=-0.0036428231002065253, kurtosis=-1.1996123967375827)
           catch_rate_gps DescribeResult(nobs=250000, minmax=(0.3000015060683224, 0.6999998883062533), mean=0.5002839311612699, variance=0.013333654723764706, skewness=-0.001948235929288432, kurtosis=-1.2023086886782552)
          app_launch_date DescribeResult(nobs=250000, minmax=(84, 119), mean=101.494852, variance=107.919893177

In [0]:
# summary of each simulation for the tooltip
tooltips_bt = [
    ("societal adoption", "@pop_adoption"),
    ("tester adoption", "@tester_adoption"),
    ("testing rate", "@testing_rate"),
    ("app contact detection rate", "@catch_rate_bt"),  # needs to change
    ("trace success rate", "$y")
]

tooltips_gps = tooltips_bt[:]
tooltips_gps[-2] = ("app contact detection rate", "@catch_rate_gps")

In [0]:
def coverage_bt(adoption, tester_adoption, catch_rate):
  """
  Tramsmission event coverage for an app that needs to be on both users' phones, at the time of
  exposure, eg by bluetooth matching. *NOTE* this assumes iOS and Android can see each other.
  If not, coverage is roughly halved :(
  """
  intersection = adoption * adoption
  return intersection * tester_adoption * catch_rate

cov_bt = coverage_bt(pop_adoption, tester_adoption, catch_rate_bt)
data.add(cov_bt[:scattersize], "cov_bt")
data_raw["cov_bt"] = cov_bt

# 0.2 
# 0.8 -> second bite
#
# * (1-friendly_stranger_rate) * human_tracing_success_rate    [applied at +]


def trace_rate(t, incidence, bt_gps, r0, tests_conducted, params):

    launched = np.heaviside(t - params["app_launch_date"], 1)
    if bt_gps is not None:
        appw = params["app_race_win_rate"] * params["cov_" + bt_gps]
    else:
        appw = 0
    humw = (1 - params["stranger_contact_rate"]) * params["human_trace_race_win_rate"]

    # when the app is active, we get two bites at the cherry: one with the app, and a second
    # slower attempt with human contact tracers, when the app fails
    # XXX note this incorrectly assumes indepdence of the two tracing races
    win_rate = np.where(launched,  appw + (1 - appw) * humw, humw)
    tests_positive = params["tests_positive"]
    # what fraction of new cases in a day can we potentially catch with a pool of tests
    max_detection = tests_conducted * tests_positive # account for negative tests
    max_detection *= r0 * r0  # we assume that finding an index case requires a test, but that
                              # tracing prevents transmission from this number of secondary to tertiary cases
    assert not np.any(np.isnan(max_detection)), "MD is nan {0} {1}".format(tests_conducted, tests_positive)
    catchable_incidence = incidence.clip(None, max_detection)
    catchable = catchable_incidence * win_rate  # TODO adjust for r0 as above
    test_shortfall = (incidence - catchable_incidence) / tests_positive
    rate = np.where(incidence > 0, catchable / incidence, 0)
    assert np.all(rate <= 1.0), "Trace rate shouldn't be above 1!"

    return rate, test_shortfall

In [0]:
step = 0.03
incs = np.arange(0, 1, step)

def bin_stats(variable, statistic, group_by=pop_adoption):
  bins = [[] for n in incs]

  for adoption, v in zip(group_by, variable):
    i = int(adoption // step)
    bins[i].append(v)

  binned = np.array([statistic(b) for b in bins])
  return binned

bin_averages = lambda data: bin_stats(data, np.average)
bin_25 = lambda data: bin_stats(data, lambda x: np.quantile(x, 0.25))
bin_75 = lambda data: bin_stats(data, lambda x: np.quantile(x, 0.75))

avgs = bin_averages(cov_bt)

Now compare to an app where the assumption is that diagnosed patients contribute to an anonymous redzone map based on retrospective location history

In [0]:

def coverage_gps(adoption, tester_adoption, catch_rate2):
  return adoption * tester_adoption * catch_rate2 * (1 - onboarding_loss)

cov_gps = coverage_gps(pop_adoption, tester_adoption, catch_rate_gps)
data_raw["cov_gps"] = cov_gps
data.add(cov_gps[:scattersize], "cov_gps")
avgs2 = bin_averages(cov_gps)

In [0]:
bin_05 = lambda data: bin_stats(data, lambda x: np.quantile(x, 0.05))
bin_95 = lambda data: bin_stats(data, lambda x: np.quantile(x, 0.95))
bin_50 = lambda data: bin_stats(data, lambda x: np.quantile(x, 0.50))
bin_q = lambda q, data: bin_stats(data, lambda x: np.quantile(x, q))

def graph_distribution(plot, variable, name, colors, xvar=incs, alpha=0.5, fill=True, twosigma=True):
    cols = bm.LinearColorMapper(palette=colors).palette
    pot = bm.ColumnDataSource({
        "xvar": xvar, 
        "b05": bin_05(variable),
        "b25": bin_25(variable),
        "b75": bin_75(variable),
        "b95": bin_95(variable)
    })
    plot.line(xvar, bin_50(variable), line_color=cols[0], legend_label="{0} (median)".format(name), line_width=2.0, line_dash="dotted")

    if fill:
        band = bm.Band(base="xvar", lower="b05", upper="b95",  line_width=1.6, source=pot, fill_color=cols[2], fill_alpha=alpha)
        plot.add_layout(band)
        band = bm.Band(base="xvar", lower="b25", upper="b75",  line_width=1.6, source=pot, fill_color=cols[1], fill_alpha=alpha)
        plot.add_layout(band)
    plot.line(xvar, bin_averages(variable), line_color=cols[0], legend_label="{0} (mean)".format(name), line_width=2.0)

    if twosigma:
        plot.line(xvar, bin_05(variable), line_color=cols[2], legend_label="{0} 5/95% quantiles".format(name), line_width=1.2)
        plot.line(xvar, bin_95(variable), line_color=cols[2], line_width=1.2)
    plot.line(xvar, bin_25(variable), line_color=cols[1], legend_label="{0} 25/75% quantiles".format(name), line_width=1.6)
    plot.line(xvar, bin_75(variable), line_color=cols[1], line_width=1.6)
    plot.line(xvar, bin_averages(variable), line_color=cols[0], legend_label="{0} (mean)".format(name), line_width=2.0)
    plot.line(xvar, bin_50(variable), line_color=cols[0], legend_label="{0} (median)".format(name), line_width=2.0, line_dash="dotted")

In [0]:
if samples > 2000:
  plots = []
  for i, scale in enumerate(("linear", "log")):
    plot = bp.figure(x_range=[0,1], y_range=[0.0001, 1], plot_width=600, plot_height=500,
                     x_axis_label="Proportion of population using app",
                     y_axis_label="Proportion of infections traced", y_axis_type=scale,
                     title="Effectiveness of different app types",
                     output_backend="svg")
    
    plot.scatter(pop_adoption[:scattersize], cov_bt[:scattersize], radius=0.005, fill_alpha=0.20, line_color=None, 
                 legend_label="bluetooth simulation")
    plot.scatter(pop_adoption[:scattersize], cov_gps[:scattersize], radius=0.005, fill_alpha=0.15, line_color=None,
                 fill_color="#30ff00", legend_label="retrospective gps simulation")
    graph_distribution(plot, cov_gps, "retospective GPS matching", "Greens4", fill=False, twosigma=False)
    graph_distribution(plot, cov_bt, "prospective bluetooth matching", "Blues4", fill=False, twosigma=False)
    plot.legend.location = "top_left" if i==0 else "bottom_right"

    plots.append(plot)

  display(bl.row(plots))
  print("Monte Carlo simulation of "
        "effectiveness as a function of adoption for bluetooth matching apps (blues) and apps that\n"
        "use retrospective location records such as Google Maps Timeline or iOS's on-device encrypted\n"
        "location records (greens). This chart assumes sufficient test coverage.")

Monte Carlo simulation of effectiveness as a function of adoption for bluetooth matching apps (blues) and apps that
use retrospective location records such as Google Maps Timeline or iOS's on-device encrypted
location records (greens). This chart assumes sufficient test coverage.


In [0]:
dir(plot)
p = bp.figure

plot.yaxis.axis_label.title()

'Proportion Of Infections Traced'

In [0]:
def human_format(num):
    num = float('{:.3g}'.format(num))
    magnitude = 0
    while abs(num) >= 1000:
        magnitude += 1
        num /= 1000.0
    return '{}{}'.format('{:f}'.format(num).rstrip('0').rstrip('.'), ['', 'K', 'M', 'B', 'T'][magnitude])

In [0]:
# adapted from https://scipython.com/book/chapter-8-scipy/additional-examples/the-sir-epidemic-model/
t = np.linspace(0, 364, 365)

# observations are measurements of the simulation that are not its ODE state variables
obssize = 4
conditions = 3
iR0r, iTr, iIncidence, iTestshort = range(obssize)
iBaseline, iBt, iGps = range(conditions)
observations = - np.ones((samples, conditions, obssize, len(t)+1), dtype=np.float32)  # lower precision is fine for simulation recordkeeping

In [0]:
# adapted from https://scipython.com/book/chapter-8-scipy/additional-examples/the-sir-epidemic-model/
t = np.linspace(0, 364, 365)
isnumpy = lambda x: isinstance(x, (np.ndarray, np.generic))
# work in progress on running many simulations at once
def deriv_vectorised(y, t, N, infect_rate, recovery_rate, bt_gps, params, obs_records):
    "Compute derivatives for variables in the ODE pandemic at time t"
    S, E1, E2, I1, I2, R, Q, L = y

    I =  I1 + I2
    incidence = 2*infect_rate * E2
    
    it = t.astype(int) if isnumpy(t) else int(t)
    dLdt = np.zeros(L.shape)
    sh1, sh2 = L.shape, params["lockdown_limit"].shape
    assert sh1 == sh2, "shapes do not match {0} {1}".format(sh1, sh2)
    r0 = np.copy(params["r0_raw"])

    # pseudocode (superceded by numpy vectorised version )
    # if incidence > params["lockdown_threshold"] and L <= params["lockdown_limit"]:
    #     r0 = params["lockdown_r0"]
    #     dLdt = 1.
    #     if t > 0:
    #         length = int(np.clip(np.clip(7, None, params["lockdown_limit"] - L), 0, None))
    #         if length > 0:
    #             lockdown_map[it:it + length] = np.ones(length)
    #         lockdown_map[it:it + length] = np.ones(length)
    # elif lockdown_map[it]:   #
    #     r0 = params["lockdown_r0"]
    #     dLdt = 1.
    # else:
    #     r0 = params["r0_raw"]
    #     dLdt = 0.

    lockdown_starting = np.argwhere(
        np.logical_and(
            incidence > params["lockdown_threshold"],
            L <= params["lockdown_limit"]))
    r0[lockdown_starting] = params["lockdown_r0"][lockdown_starting]
    dLdt[lockdown_starting] = 1
    # lockdowns always continue for lockdown_length days past any time when their conditions
    # are met, unless they accumulate to the limit
    llength = np.clip(np.clip(params["lockdown_length"], None, params["lockdown_limit"] - L), 0, None)
    llength = llength.astype(int)
    where_growing = np.intersect1d(np.argwhere(llength > 0), lockdown_starting, assume_unique=True)

    for i in where_growing:
        # walk through each simulation to update lockdown map     
        sim_map = deriv_vectorised.lockdown_map[:, i]
        sim_map[int(it):int(it + llength[i])] = np.ones(int(llength[i]))

    locked = np.argwhere(deriv_vectorised.lockdown_map[it])
    r0[locked] = params["lockdown_r0"][locked]
    dLdt[locked] = 1.
    if use_US_test_data:
        tr, test_shortfall = trace_rate(t, incidence, bt_gps, r0, params["test_growth"] * testmap[it], params)
    else:
        tr, test_shortfall = trace_rate(t, incidence, bt_gps, r0, params["test_supply"], params)

    if t == np.round(t):
        #print(obs_records.shape)
        obs_records[:, iR0r, it] = r0
        obs_records[:, iTr, it] = tr
        obs_records[:, iTestshort, it] = test_shortfall
        assert np.max(obs_records[:, iTr, it]) < 1.0, "unpopular"
        obs_records[:, iIncidence, it] = incidence

    contact_tracing_rate = tr #* contact_tracing_rate
    #contact_tracing_rate = contact_tracing_rate * np.heaviside(t - params["app_launch_date"], 1)
    contact_rate = recovery_rate * r0
    
    dSdt = -contact_rate * S * I / N
    dE1dt = (1 - contact_tracing_rate) * contact_rate * S * I / N - 2 * infect_rate * E1
    dE2dt = 2 * infect_rate * E1 - incidence
    dI1dt = incidence - 2*recovery_rate*I1
    dI2dt = 2 * recovery_rate * (I1 - I2)
    dRdt = 2 * recovery_rate * I2
    # Q represents people who are quarantined *due to the app intervention* ; other
    # types of qurantine should be reflected in the value of R0
    dQdt = contact_tracing_rate*contact_rate * S * I / N
    ret = np.array([dSdt, dE1dt, dE2dt, dI1dt, dI2dt, dRdt, dQdt, dLdt])
    
    return ret

if False:
    # vectorise this...
    params = {name:val[0:2] for name, val in data_raw.items()}
    print(np.ones((8,2)).shape)
    x = npa([1/2, 1/7])
    deriv_vectorised(np.ones((8,2)), 17, npa([N, N]), x, x, x, params, observations[:2, iBaseline])

In [0]:

def simulate_quarantined_epidemic_rk4_vectorised(bt_gps, params, obs_records):
  "Run a simple SEIQR model of an epidemic using a friggin simple Euler method, except it's actually a RK4 scheme!"
  # solved for infection_rate using https://science.sciencemag.org/content/early/2020/03/24/science.abb3221.full
  # recovery rate from https://www.doherty.edu.au/uploads/content_doc/McVernon_Modelling_COVID-19_07Apr1_with_appendix.pdf
  infect_rate, recovery_rate = 1./3, 1./5
  batch_size = len(params['app_launch_date'])
  #print("Batch size", batch_size)
  # Total population, N.
  #N = 25*10**6
  # Initial number of infected and recovered individuals, I0 and R0.
  E10, E20, I10, I20, R_init, Q0 = 20, 0, 0, 0, 0, 0
  L0 = 0.  # days of lockdown
  # Everyone else, S0, is susceptible to infection initially.
  S0 = N - I10 - I20 - R_init - E10 - E20 - Q0
  # Contact rate, beta, and mean recovery rate, gamma, (in 1/days).
  # A grid of time points (in days)
  deriv_vectorised.lockdown_map = np.zeros((365 + 30, batch_size))

  # Initial conditions vector
  y0 = npa((S0, E10, E20, I10, I20, R_init, Q0, L0))
  nparams = len(y0)
  #print("y0", y0.shape,  y0)

  y0 = np.tile(y0, (batch_size, 1))
  #print("y0", y0.shape,  y0)

  # Integrate the SIR equations over the time grid, t.
  # ret = odeint(deriv, y0, t, args=(N, r0, infect_rate, recovery_rate, contact_tracing_rate,
  #                                  lockdown_threshold, lockdown_r0))#, rtol=1e-12, atol=1e-12)
  ret = np.zeros((len(t), nparams, batch_size))
  ret[0, ...] = y0.T
  n_inner = 1 # number of steps in inner euler loop (reducing timestep by 1/n_inner)
  for i, ti in enumerate(tqdm(t[1:], desc="Running {0} epidemic simulations".format(batch_size))):
      dt = ti - t[i]
      ## Modern and totally sweet RK4 scheme!
      k1 = deriv_vectorised(ret[i], ti, N, infect_rate, recovery_rate, bt_gps, params, obs_records)
      k2 = deriv_vectorised(ret[i] + dt*k1/2., ti + dt/2., N, infect_rate, recovery_rate, bt_gps, params, obs_records)
      k3 = deriv_vectorised(ret[i] + dt*k2/2., ti + dt/2., N, infect_rate, recovery_rate, bt_gps, params, obs_records)
      k4 = deriv_vectorised(ret[i] + dt*k3, ti + dt, N, infect_rate, recovery_rate, bt_gps, params, obs_records)
      ret[i+1,:] = ret[i,:] + 1./6*dt*(k1 + 2*k2 + 2*k3 + k4)

  # collapse SEEIIRQL -> SEIRQL
  ret = np.array([ret[:,0],ret[:,1] + ret[:,2],ret[:,3] + ret[:,4],ret[:,5],ret[:,6],ret[:,7]]).T
  
  # S, E, I, R, Q, L = ret.T
  return ret.T

squerkv = simulate_quarantined_epidemic_rk4_vectorised


In [0]:
# Explore how the app launch day affects dynamics

pobj = {name:np.copy(val[:1]) for name, val in data_raw.items()}

pobj["r0_raw"][0] = 2.4
#pobj["lockdown_threshold"][0] = 1000000

test_solvers = True
test_slow_solvers = False
if test_solvers and test_slow_solvers:
    test_r0 = 2.4

    color_mapper = bm.LinearColorMapper(palette="Viridis256", low=20, high=150)
    try:
      if use_US_test_data:
          tm = np.copy(testmap)
          testmap *= 50
      for llimit, ld in ((0, "no lockdowns"), (100, "lockdowns")):
      #testmap.clip(None, 1.0)
          plot = bp.figure(plot_width=768, plot_height=400, y_axis_type="linear", #y_range=[0, 0.1],
                      x_axis_label="Day", y_axis_label="Infected",
                      title="Illustrative solver dynamics ({0}, varied app launch date, extensive testing)".format(ld))
          plot2 = bp.figure(plot_width=768, plot_height=400, y_axis_type="linear",
                      x_axis_label="Day", y_axis_label="Quarantined via app")
          for launch_date in range(20, 150, 3):
            #(S, E, I, R, Q, L), _r0r = simulate_quarantined_epidemic2(test_r0, 0.2, launch_date, 0.001, 0.8, 10)
            pobj["app_launch_date"][0] = launch_date
            pobj["lockdown_limit"][0] = llimit
            #ret = squerkv([0.2], pobj, observations[:1, iBt])[..., 0]
            ret = squee("bt", pobj, observations[0, iBt])
            (S1, E1, I1, R1, Q1, L1) = ret 
            kwargs = {"legend_label" : "days of lockdown"} if launch_date == 20 else {}
            #plot.line(t, 0.05 + I/N, line_color=color_mapper.palette[int((launch_date-20)*255/130.)], alpha=0.6)
            plot.line(t, I1/N, line_color=color_mapper.palette[int((launch_date-20)*255/130.)], alpha=0.6)
            scalef = max(I1/N) / max(L1/365)
            plot.line(t, scalef * L1/365., line_color="red", alpha=0.2, **kwargs)
            plot2.line(t, Q1/N, line_color=color_mapper.palette[int((launch_date-20)*255/130.)], alpha=0.6)

          color_bar = bm.ColorBar(color_mapper=color_mapper, label_standoff=12, border_line_color=None, location=(0,0))
          plot.add_layout(color_bar, 'right')
          plot2.add_layout(color_bar, 'right')

          display(bl.row(plot, plot2))
    finally:
      if use_US_test_data:
          testmap = tm

In [0]:
if test_solvers:
    color_mapper = bm.LinearColorMapper(palette="Viridis256", low=20, high=150)
    try:
      if use_US_test_data:
          tm = np.copy(testmap)
          testmap *= 50
      
      for llimit, ld in ((0, "no lockdowns"), (100, "lockdowns")):
      #testmap.clip(None, 1.0)
          plot = bp.figure(plot_width=768, plot_height=400, y_axis_type="linear", #y_range=[0, 0.1],
                      x_axis_label="Day", y_axis_label="Infected",
                      title="Illustrative vectorised solver dynamics ({0}, varied app launch date)".format(ld))
          plot2 = bp.figure(plot_width=768, plot_height=400, y_axis_type="linear",
                      x_axis_label="Day", y_axis_label="Quarantined via app")
          
          ldates = np.arange(20, 150, 3)
          runs = len(ldates)
          pobjs = {name: np.repeat(pobj[name], runs) for name in data_raw}
          pobjs["app_launch_date"] = ldates
          pobjs["lockdown_limit"] = np.repeat(llimit, runs)
          pobjs["cov_bt"] = np.repeat(0.2, runs)
          ret = squerkv("bt", pobjs, observations[:runs, iBt])

          for i, launch_date in enumerate(ldates):
              (S1, E1, I1, R1, Q1, L1) = ret[..., i]
              kwargs = {"legend_label" : "days of lockdown"} #if launch_date == 20 else {}
              plot.line(t, I1/N, line_color=color_mapper.palette[int((launch_date-20)*255/130.)], alpha=0.6)
              scalef = max(I1/N) / max(L1/365)
              plot.line(t, scalef * L1/365., line_color="red", alpha=0.2, **kwargs)
              plot2.line(t, Q1/N, line_color=color_mapper.palette[int((launch_date-20)*255/130.)], alpha=0.6)

          color_bar = bm.ColorBar(color_mapper=color_mapper, label_standoff=12, border_line_color=None, location=(0,0))
          plot.add_layout(color_bar, 'right')
          plot2.add_layout(color_bar, 'right')

          display(bl.row(plot, plot2))
    finally:
        if use_US_test_data:
            testmap = tm

HBox(children=(FloatProgress(value=0.0, description='Running 44 epidemic simulations', max=364.0, style=Progre…






HBox(children=(FloatProgress(value=0.0, description='Running 44 epidemic simulations', max=364.0, style=Progre…




In [0]:
statesize = 6
iS, iE, iI, iR, iQ, iL = range(statesize)

def run_simulations(obs_records, params=data_raw):
    details = - np.ones((conditions, statesize, len(t), samples), dtype=np.float32)
    details[iBaseline, ...] = squerkv(None, params, obs_records[:, iBaseline])
    details[iBt, ...] = squerkv("bt", params, obs_records[:, iBt])
    details[iGps, ...] = squerkv("gps", params, obs_records[:, iBt])
    details = np.moveaxis(details, -1, 0)  # samples, conditions, statesize, time
    R = details[:,:,iR,:]
    Q = details[:,:,iQ,:]
    ifr_stack = np.tile(infection_fatality_rate, (conditions, 1)).T
    assert infection_fatality_rate[10] == ifr_stack[10][iGps]
    fatalities = np.round(ifr_stack * (R[...,-1] + Q[...,-1]))
    ldays = details[..., iL, -1]  # lockdown days
    lresults = npa([ldays[:,iBt], ldays[:,iBaseline] - ldays[:,iBt], 
                    ldays[:,iGps], ldays[:,iBaseline] - ldays[:,iGps]])
    results = npa([fatalities[:,iBt], fatalities[:,iBaseline] - fatalities[:,iBt], 
                   fatalities[:,iGps], fatalities[:,iBaseline] - fatalities[:,iGps]])
    return results, lresults, details


In [0]:
try:
    del results, lockdown_results, details  # allow garbage collector to free up memory
except NameError: 
    pass
results, lockdown_results, details = run_simulations(observations)

HBox(children=(FloatProgress(value=0.0, description='Running 250000 epidemic simulations', max=364.0, style=Pr…






HBox(children=(FloatProgress(value=0.0, description='Running 250000 epidemic simulations', max=364.0, style=Pr…




HBox(children=(FloatProgress(value=0.0, description='Running 250000 epidemic simulations', max=364.0, style=Pr…




In [0]:
print(details.shape)

(250000, 3, 6, 365)


In [0]:
# plot state variables for some example runs from the monte carlo distribution
number = 5

def plot_variables(varspec, plot_range=range(number), width=1024, title=None, include_r0s=False, data=details):
    """
    Plot a set of simulation state variables (in side-by side charts to avoid clutter)

    varspec: [(varcolumn, "varname")]
    plot_range: which runs to plot
    """
    number = len(plot_range)
    if title is None:
        title = "{0} epidemics from the monte carlo distribution".format(number)
    plots = []
    for var, name in varspec:
        top = np.clip(np.max(data[plot_range, iBt, var]/N), 0, 1)
        kwargs = {"plot_width": width, "plot_height": int(width * 3./4), "y_axis_type": "log", # if var == iR else "log",
                  "y_range": [1/N, top * 1.01], # log compatible
                  "x_axis_label": "Day", "y_axis_label": "{0} proportion (lines) vs after app launch (dashed)".format(name),
                  "title": title}
        
        plot = bp.figure(**kwargs)
        if include_r0s:
            plot.extra_y_ranges = {"r0": bm.Range1d(start=0.4, end=4.5)}
            plot.add_layout(bm.LinearAxis(y_range_name="r0", axis_label="r0 (stripes)"), "right")
        color_mapper = bm.LinearColorMapper(palette="Turbo256", low=0, high=30)
        for i, n in enumerate(plot_range):
            ldate = app_launch_date[n]
            plot.scatter(ldate, data[n, iBaseline, var, ldate] / N, line_color="black", marker="x", legend_label="app launches")
            sub_n = i % number
            col = color_mapper.palette[sub_n * (255 // (1 + number))]
            w = 1.0 if i % 2 == 0 else 2.0
            plot.line(t, np.clip(data[n, iBaseline, var] / N, -0.01, 1.0), color=col, alpha=0.8, legend_label="run {0}".format(n), line_width=w)
            plot.line(t[ldate:], np.clip(data[n, iBt, var, ldate:] / N, -0.01, 1.0), color=col, alpha=0.8, line_dash="dashed", line_width=w)
            if include_r0s:
                r0rs = observations[n][iBt][iR0r][:-1]
                plot.scatter(t, r0rs, color=col, alpha=0.2, y_range_name="r0")

        plot.legend.background_fill_alpha = 0.85
        plots.append(plot)
    return plots

In [0]:
vspec = [(iE, "exposed"), (iI, "infected"), (iQ, "quarantined positive patients"), (iR, "recovered"), (iL, "cumulative days of lockdown")]
plots1 = bl.column(plot_variables(vspec))
plots2 = bl.column(plot_variables(vspec, plot_range=range(number, 2*number)))
display(bl.row(plots1, plots2))

In [0]:
vspec = [(iR0r, 'R0 (after lockdown, before tracing)'), (iIncidence, 'Incidence'), (iTestshort, '"shortfall" from optimal testing')]

plots1 = bl.column(plot_variables(vspec, data=observations[..., :-1]))
plots2 = bl.column(plot_variables(vspec, data=observations[..., :-1], plot_range=range(number, 2*number)))
display(bl.row(plots1, plots2))

In [0]:
p1 = plot_variables([(iI, "infected")], plot_range=[4, 9])[0]
p2 = plot_variables([(iI, "infected")], plot_range=[8, 19])[0]
print(p1, p2)
r =bl.row([p1, p2])
print(r)
set_svg(r)
display(r)
#print("Left: two scenarios where the app was very helpful"

Figure(id='112936', ...) Figure(id='113045', ...)
Row(id='113154', ...)
Switching canvas to svg
Switching canvas to svg


In [0]:
plot = bp.figure(plot_width=768, plot_height=768, x_axis_label="Day", y_axis_label="Proportion of infections traced", x_range=[2, 365])
plot2 = bp.figure(plot_width=768, plot_height=768, x_axis_label="Day", y_axis_label="Proportion of infections traced")
color_mapper = bm.LinearColorMapper(palette="Turbo256", low=0, high=30)

for i in range(number):
    p = plot if i < (number // 2) else plot2
    trs = observations[i][iBt][iTr]

    sub_n = i % (number // 2)  # restart colours for the second graph
    col = color_mapper.palette[sub_n * (255 // (1 + number // 2))]
    w = 2.0 if i % 2 == 0 else 1.0
    p.line(t[1:], trs[1:-1] , color=col, alpha=0.8, legend_label=str(i), line_width=w)

display(bl.row(plot, plot2))   

In [0]:
# which of our ODE simulators is numerically stable?
# Look for the most numerically pathological run in the batch

#x = np.argmin(details[:, 1, iR, -1])
test_solvers = False
if test_solvers:
    x = 41
    print(x, details[x, 1, iR, -1])
    I, R = details[x, 1, iI:iR+1]
    plot = bp.figure(plot_width=768, plot_height=400, y_axis_type="linear",
                    x_axis_label="Day", y_axis_label="Infected")

    #print((r0_raw[x], cov_bt[x], app_launch_date[x], lockdown_threshold[x], lockdown_r0[x]))
    t, I1, R1, L = simulate_quarantined_epidemic(r0_raw[x], cov_bt[x], app_launch_date[x], lockdown_threshold[x], lockdown_r0[x])
    (S, E, I, R, Q, L), _r0r = simulate_quarantined_epidemic2(r0_raw[x], cov_bt[x], app_launch_date[x], lockdown_threshold[x], lockdown_r0[x], lockdown_limit[x])
    (S2, E2, I2, R2, Q2, L), _r0r = simulate_quarantined_epidemic_euler(r0_raw[x], cov_bt[x], app_launch_date[x], lockdown_threshold[x], lockdown_r0[x], lockdown_limit[x])
    plot.line(t, I/N, legend_label="I (sqe2)", line_color="navy")
    plot.line(t, R/N, legend_label="R (sqe2)", line_color="red")
    #plot.line(t, I1/N, legend_label="I (sqe)", line_color="green")
    #plot.line(t, R1/N, legend_label="R (sqe)", line_color="blue")
    plot.line(t, I2/N, legend_label="I (squee)", line_color="purple")
    plot.line(t, R2/N, legend_label="R (squee)", line_color="orange")
    display(plot)
    print('Probe for numerical instability (the "squee" Euler simulator is looking okay)')

In [0]:
plot = bp.figure(x_range=[0,1], plot_width=600, plot_height=600,
                 x_axis_label="Proportion of population using app",
                 y_axis_label="Mortality", y_axis_type="log")
intervention, diff, intervention_gps, gps_diff = results

graph_distribution(plot, intervention, "deaths", "OrRd4", alpha=0.2)

#plot.scatter(pop_adoption, gps_diff, radius=0.005, fill_alpha=0.2, fill_color="#500050", line_color=None,
#             legend_label="gps lives saved (1 simulation pair)")

plot2 = bp.figure(x_range=[0,1], plot_width=600, plot_height=600,
                 x_axis_label="Proportion of population using app",
                 y_axis_label="Mortality reduction", y_axis_type="linear",
                 y_range=[np.quantile(diff, 0.01), np.quantile(diff, 0.99)])
graph_distribution(plot2, diff, "bluetooth lives saved", "Blues4", alpha=0.4)

plot2.scatter(pop_adoption[:scattersize], diff[:scattersize], radius=0.005, fill_alpha=0.2, fill_color="#50b050", line_color=None,
             legend_label="bt lives saved (1 simulation pair)")


#plot.line(incs, avgs, line_color="#a000f0", legend_label="prospective bluetooth matching",
#          line_width=2.0)  
plot.legend.background_fill_alpha = 0.75
display(bl.row(plot, plot2))

In [0]:
plot1 = bp.figure(x_range=[0,1], plot_width=768, plot_height=768,
                 x_axis_label="Proportion of population using app",
                 y_axis_label="Full lockdown equivalent days in first year")
plot2 = bp.figure(x_range=[0,1], plot_width=768, plot_height=768,
                 x_axis_label="Proportion of population using app",
                 y_axis_label="Averted full lockdown equivalent days in first year")
lintervention, ldiff, lintervention_gps, ldiff_gps = lockdown_results

graph_distribution(plot1, lintervention, "days of lockdown", "OrRd4")
graph_distribution(plot2, ldiff, "lockdown days averted by bluetooth app", "Blues4")

plot1.scatter(pop_adoption[:scattersize], lintervention[:scattersize], radius=0.005, fill_alpha=0.2, fill_color="#a0a0a0", line_color=None,
             legend_label="(1 simulation w/ bluetooth)")
plot2.scatter(pop_adoption[:scattersize], ldiff[:scattersize], radius=0.005, fill_alpha=0.2, fill_color="#50b050", line_color=None,
             legend_label="(1 counterfactual simulation pair)")
plot1.legend.background_fill_alpha = 0.95

#plot.line(incs, avgs, line_color="#a000f0", legend_label="prospective bluetooth matching",
#          line_width=2.0)
plot1.output_backend = plot2.output_backend = "svg"
display(bl.row(plot1, plot2))

In [0]:
def display_tile(plots, width):
    display(bl.layout([plots[n:n+width] for n in list(range(0, len(plots), width))]))


In [0]:

def qbin_stats(variable, group_by):
    step = 1 / len(incs)
    quantiles = np.arange(step, 1, step)

    qstarts = np.quantile(group_by, quantiles)
    xyview = np.stack([group_by, variable], axis=-1)
    bin_indexes = np.searchsorted(qstarts, group_by)
   
    bins = [[] for i in incs]
    for i, val in enumerate(variable):
        bins[bin_indexes[i]].append(val)
    
    qstarts, bins = npa([(q, b) for q, b in zip(qstarts, bins) if len(b) != 0]).T

    #qs, bs = [], []
    #for i, b in enumerate(bins):
    #    if len(b) > 0:
    #        qs.append(qstarts[i])
    #        bs.append(b)
    #qstarts, bins = qs, bs
    return qstarts, bins

def var_breakdown(score, label):
    irrelevant = ("testing_rate", "catch_rate_gps")
    plots = []
    for name, dist in data_raw.items():
        if name in irrelevant: continue
        qstarts, bins = qbin_stats(score, dist)
        plot = bp.figure(plot_width=600, plot_height=400,
                    x_axis_label=name,
                    y_axis_label="Mean {0} (bluetooth)".format(label))
        plot.output_backend = "svg"
        #print(qstarts, list(map(np.average, bins)))
        
        #bins = bins[:-1
        #if name == "app_launch_date":
        #    #print([len(b) for b in bins])
        #    qstarts, bins = npa([(q,b) for q,b in zip(qstarts,bins) if len(b) != 0]).T
        plot.line(qstarts, list(map(np.average, bins)), line_width=2., line_color="#000080", legend_label="mean " + label)
        plot.line(qstarts, [np.quantile(d, 0.05) for d in bins], line_color="lightblue", line_width=1.3, legend_label="05 / 95%",line_alpha=0.8)
        plot.line(qstarts, [np.quantile(d, 0.95) for d in bins], line_color="lightblue", line_width=1.3, line_alpha=0.8)
        plot.line(qstarts, [np.quantile(d, 0.25) for d in bins], line_color="#4040d0",   line_width=1.7, legend_label="25 / 75%", line_alpha=0.8)
        plot.line(qstarts, [np.quantile(d, 0.75) for d in bins], line_color="#4040d0",    line_width=1.7, line_alpha=0.8)
        plot.line(qstarts, list(map(np.median, bins)), line_width=1., line_color="black", line_dash="dotted", legend_label="median")
        #pot = bm.ColumnDataSource(dict(qs=qstarts, b95=[np.quantile(d, 0.95) for d in bins], b05=[np.quantile(d, 0.05) for d in bins]))
        #band = bm.Band(base="qa", lower="b05", upper="b95",  line_width=1.6, source=pot, fill_color="lightblue", fill_alpha=0.5)
        #plot.add_layout(band)
        
        plot.legend.background_fill_alpha = 0.75

        if name != "r0_raw": plot.legend.location = "top_left"
        #ydist = npa(list(map(np.average, bins)))[:-1]
        #graph_distribution(plot, ydist, name, "Blues4", xvar=qstarts)
        plots.append(plot)
    display_tile(plots, 3)

In [0]:
var_breakdown(ldiff, "days of lockdown averted")

In [0]:
print("")




In [0]:
var_breakdown(diff, "mortality averted")

In [0]:
# Partial ranked correlation coefficient 

In [0]:
p = bp.figure(tools="", match_aspect=True, background_fill_color='black', y_range=[-1,1])
p.grid.visible = False
hexes=bh.hexbin(pop_adoption, diff/N, 0.005, aspect_scale=1/12.)
print(np.min(pop_adoption), np.max(pop_adoption))
p.hex_tile(q="q", r="r", size=0.1, line_color=None, source=hexes,
           fill_color=bt.linear_cmap('counts', 'Viridis256', 0, max(hexes.counts)))
#bi.show(p)

3.3105544573475143e-06 0.9999990120313899


In [0]:
# try density maps. FIXME: these need to be normalised to not imply that most of the points lie on the
# left...
p = bp.figure(tools="", match_aspect=True, background_fill_color='black')
p.grid.visible = False

hexes=bh.hexbin(pop_adoption, cov_bt, 0.01)
p.hex_tile(q="q", r="r", size=0.1, line_color=None, source=hexes,
           fill_color=bt.linear_cmap('counts', 'Viridis256', 0, max(hexes.counts)/10))
bi.show(p)
p = bp.figure(tools="", match_aspect=True, background_fill_color='#440154')
p.grid.visible = False

hexes=bh.hexbin(pop_adoption, cov_gps, 0.01)
p.hex_tile(q="q", r="r", size=0.1, line_color=None, source=hexes,
           fill_color=bt.linear_cmap('counts', 'Viridis256', 0, max(hexes.counts)/10))
bi.show(p)

In [0]:
# Neither beta nor lognormal distributions fit Kucharski's
plot = bp.figure(y_axis_label="PDF", y_range=[0,1])
alt_ifr = npr.lognormal(-0.1, 0.9, size=samples).clip(0,8) / 100.

r0_dist = (1.5 + npr.beta(2, 5, size=samples) * 3)/100.

print ("mean", np.mean(r0_dist))


for name, dist, col in [("lognormal IFR", alt_ifr, "red"), ("beta IFR", infection_fatality_rate, "navy"),
                        ("r0", r0_raw/100, "green")]:
    hist, edges = np.histogram(100 * dist, density=True, bins=100)
    plot.quad(top=hist, bottom=0, left=edges[:-1], right=edges[1:],
              fill_color=col, line_color="white", alpha=0.5, legend_label=name)
    
display(plot)

mean 0.02356490195523718


In [0]:
bcols = ["red", "green", "blue"]
plots = []
color_mapper = bm.LinearColorMapper(palette="Plasma256", low=0, high=len(data_raw))

for i, (var, dist) in enumerate(data_raw.items()):
    plot = bp.figure(y_axis_label="PDF", width=400, height=300)
    hist, edges = np.histogram(dist, density=True, bins=100)
    
    plot.quad(top=hist, bottom=0, left=edges[:-1], right=edges[1:],
              fill_color=color_mapper.palette[i* (256 // len(data_raw))], line_color="white", legend_label=var)
    plots.append(plot)

layout = bl.layout([plots[n:n+4] for n in list(range(0, len(plots), 4))])
display(layout)
    

In [0]:
print(layout)
dir(layout.children)
print([p for p in layout.children])

Column(id='173003', ...)
[Row(id='172996', ...), Row(id='172997', ...), Row(id='172998', ...), Row(id='172999', ...), Row(id='173000', ...), Row(id='173001', ...), Row(id='173002', ...)]
