In [1]:
import datetime
import numpy as np
from scipy.stats import poisson


In [2]:
#testing
import random
from bokeh.plotting import figure
from bokeh.io import show
from bokeh.io import output_notebook


In [3]:
def get_poisson_for_day(date, cases, loc = 2):
    """
    Calculate when the infections occurred for a single day
    """
    final = []
    for i in range(14):
        p = poisson.pmf(i, 5, loc = loc)
        final.append((date, p * cases))
        date -= datetime.timedelta(days = 1)
    return final

def test_get_poisson_for_day():
    date = datetime.datetime(2020,9,1)
    num_infected = 100
    data = get_poisson_for_day(date, num_infected)
    assert len(data) == 14
    assert abs(sum([x[1] for x in data]) - num_infected) < 1
test_get_poisson_for_day()

In [4]:
def get_poisson_for_all_days(dates, cases):
    temp_dict = {}
    for i in range(len(cases)):
        p_data =  get_poisson_for_day(dates[i], cases[i])
        data = {}
        for i in p_data:
            data[i[0]] = i[1]
        for key in data.keys():
            if not temp_dict.get(key):
                temp_dict[key] = data[key]
            else:
                temp_dict[key] += data[key]
    final = []
    for key in sorted(temp_dict.keys()):
        final.append((key, temp_dict[key]))
    return final

def test_get_poisson_for_all_days():
    dates = [datetime.datetime(2020,9,1), datetime.datetime(2020,9,2)]
    cases = [100, 200]
    data = get_poisson_for_all_days(dates, cases)
    assert data[0][0] == datetime.datetime(2020, 8, 19)
    assert data[-1][0] == datetime.datetime(2020, 9, 2)
    assert abs(sum([x[1] for x in data]) - sum(cases)) < 2
test_get_poisson_for_all_days()    

In [13]:
output_notebook()

In [5]:
# not used
def get_new_dates(last_date):
    raise NotImplementedError('')

    final = []
    for i in range(1,14):
        final.append(last_date + datetime.timedelta(days = i))
    return final

In [6]:
#not used
def get_pred_slope(x, conf_i):
        """
        calculate slope and intercepts for confidence intervals
        """
        raise NotImplementedError('')

        sl_upper = (conf_i[-1][1]  - conf_i[0][0])/(x[-1] - x[0])
        sl_lower = (conf_i[-1][0] - conf_i[0][1])/(x[-1] -x[0])
        return sl_lower, sl_upper

#not used
def get_projected_cases(dates, cases):
    raise NotImplementedError('')
    assert len(dates) == len(cases)
    x = range(len(dates[-14:]))
    X = list(zip(*[x]))
    xm = sm.add_constant(X)
    model = sm.OLS(cases[-14:], xm) 
    result = model.fit()
    predictions = result.get_prediction(xm)
    print('p value is {p}'.format(p = result.pvalues[1]))
    frame = predictions.summary_frame(alpha=0.05)
    new_dates =  get_new_dates(last_date = dates[-1])
    slope_low, slope_high = get_pred_slope(x, predictions.conf_int())
    proj_upper = [frame.mean_ci_upper.tolist()[-1] + slope_high * x for x in range(len(new_dates))]
    proj_lower = [frame.mean_ci_lower.tolist()[-1] + slope_low * x for x in range(len(new_dates))]
    y_hat = [x * result.params[1] + cases[-1] for x in range(len(new_dates))]
    return new_dates, y_hat, proj_lower, proj_upper

#not used
def test_get_projected_cases():
    return
    dates = [datetime.datetime(2020, 8, 1) + datetime.timedelta(days = x) for x in range(14)]
    cases = [1.3 * x * 10 + random.gauss(40, 30) for x in range(14)]
    n_dates, y_hat, lower, upper = get_projected_cases(dates, cases)
    assert len(lower) == 13
    assert len(lower) == len(upper)
    assert len(lower) == len(n_dates)
    assert len(y_hat) == len(lower)
    p = figure(x_axis_type = 'datetime')
    p.line(x = dates, y = cases)
    p.line(x = n_dates, y = y_hat)
    p.line(x = n_dates, y =  upper, color = 'red')
    p.line(x =  n_dates, y = lower, color = 'green')


    show(p)

test_get_projected_cases()

In [7]:
def make_poisson_data(dates, cases):
    raise NotImplementedError('')

    assert len(dates) == len(cases)
    #n_dates, y_hat, lower, upper = get_projected_cases(dates, cases)
    med = get_poisson_for_all_days(
        dates, cases)
    return med
    

In [8]:
def get_test_dates():
    return [   datetime.datetime(2020, 7, 30, 0, 0),
    datetime.datetime(2020, 7, 31, 0, 0),
    datetime.datetime(2020, 8, 1, 0, 0),
    datetime.datetime(2020, 8, 2, 0, 0),
    datetime.datetime(2020, 8, 3, 0, 0),
    datetime.datetime(2020, 8, 4, 0, 0),
    datetime.datetime(2020, 8, 5, 0, 0),
    datetime.datetime(2020, 8, 6, 0, 0),
    datetime.datetime(2020, 8, 7, 0, 0),
    datetime.datetime(2020, 8, 8, 0, 0),
    datetime.datetime(2020, 8, 9, 0, 0),
    datetime.datetime(2020, 8, 10, 0, 0),
    datetime.datetime(2020, 8, 11, 0, 0),
    datetime.datetime(2020, 8, 12, 0, 0),
    datetime.datetime(2020, 8, 13, 0, 0),
    datetime.datetime(2020, 8, 14, 0, 0),
    datetime.datetime(2020, 8, 15, 0, 0),
    datetime.datetime(2020, 8, 16, 0, 0),
    datetime.datetime(2020, 8, 17, 0, 0),
    datetime.datetime(2020, 8, 18, 0, 0),
    datetime.datetime(2020, 8, 19, 0, 0),
    datetime.datetime(2020, 8, 20, 0, 0),
    datetime.datetime(2020, 8, 21, 0, 0),
    datetime.datetime(2020, 8, 22, 0, 0),
    datetime.datetime(2020, 8, 23, 0, 0),
    datetime.datetime(2020, 8, 24, 0, 0),
    datetime.datetime(2020, 8, 25, 0, 0),
    datetime.datetime(2020, 8, 26, 0, 0),
    datetime.datetime(2020, 8, 27, 0, 0),
    datetime.datetime(2020, 8, 28, 0, 0),
    datetime.datetime(2020, 8, 29, 0, 0),
    datetime.datetime(2020, 8, 30, 0, 0),
    datetime.datetime(2020, 8, 31, 0, 0),
    datetime.datetime(2020, 9, 1, 0, 0),
    datetime.datetime(2020, 9, 2, 0, 0),
    datetime.datetime(2020, 9, 3, 0, 0),
    datetime.datetime(2020, 9, 4, 0, 0),
    datetime.datetime(2020, 9, 5, 0, 0),
    datetime.datetime(2020, 9, 6, 0, 0),
    datetime.datetime(2020, 9, 7, 0, 0)]

In [9]:
def get_test_cases():
    return [160, 158, 202, 111, 124, 145, 167, 192, 134, 200, 129, 
            148, 81, 149, 225, 136, 174, 129, 181, 165, 141, 183, 105, 152, 108, 
            146, 53, 172, 128, 184, 119, 74, 111, 46, 108, 96, 158, 92, 99, 56]


In [10]:
def get_lin_vals(cases):
    x = range(len(cases))
    X = list(zip(*[x]))
    xm = sm.add_constant(X)
    model = sm.OLS(cases, xm) 
    result = model.fit()
    predictions = result.get_prediction(xm)
    print(result.pvalues[1])

    pass

In [12]:
output_notebook()
def test_all():
    cases = get_test_cases()
    dates = get_test_dates()
    main = get_poisson_for_all_days(
        dates, cases)[0:-14]
    #get_lin_vals([x[1] for x in main][-7:])
    
    p = figure(x_axis_type = 'datetime')
    p.line(x = dates, y = cases )
    p.line(x = [x[0] for x in main], y = [x[1] for x in main], color ='red')
    show(p)
   
    
test_all()