In [1]:
# Imports
import numpy as np
import pandas as pd

In [2]:
# Read in usafacts data
case_df = pd.read_csv(
    "https://usafactsstatic.blob.core.windows.net/"
    "public/data/covid-19/covid_confirmed_usafacts.csv",
).set_index("countyFIPS").drop(
    # Remove unallocated cases (FIPS 0 or 1)
    range(2)
).reset_index().melt(
    # Melt dataframe (wide to long format)
    id_vars=["countyFIPS", "County Name", "State", "stateFIPS"],
    value_name="confirmed",
    var_name="date",
).astype({"date": "datetime64"})

In [3]:
# Create a day variable from the date variable
case_df = case_df.assign(
    days=(case_df["date"] - case_df["date"].min()).dt.days
)

# Trim up to but not including day 30
case_df = case_df[case_df["days"].ge(30)]

# Reverse order (highest to lowest day)
case_df = case_df.sort_values(["countyFIPS", "days"], ascending=False)

# Replace incorrect values with missing values
while case_df.groupby("countyFIPS")["confirmed"].pct_change().gt(0).any():
    case_df.loc[
        case_df.groupby("countyFIPS")["confirmed"].pct_change().gt(0),
        "confirmed",
    ] = np.nan

# Replace missing values with previous values
case_df.assign(
    confirmed=case_df["confirmed"].ffill()
)

# Restore the original order (lowest to highest day)
case_df = case_df.sort_values(["countyFIPS", "days"])

case_df = case_df.assign(
    # Calculate new cases from confirmed cases
    new_cases=case_df.groupby("countyFIPS")["confirmed"]
    .diff()
    .fillna(0)
)

In [4]:
# Read in population data
cens_df = pd.read_csv(
    "https://www2.census.gov/programs-surveys/popest/datasets/"
    "2010-2019/counties/totals/co-est2019-alldata.csv",
    usecols=[
        "STATE",
        "COUNTY",
        "STNAME",
        "CTYNAME",
        "POPESTIMATE2019"
    ],
    encoding="latin-1"
)

# Combine state and county fips
cens_df = cens_df.assign(
    county_fips=(
        cens_df["STATE"].astype(str)
        + cens_df["COUNTY"].astype(str).str.zfill(3)
    ).astype(int)
)

In [5]:
# Merge data files
df = case_df.merge(
    cens_df,
    left_on="countyFIPS",
    right_on="county_fips",
    how="left"
)

In [6]:
# Calculate model inputs
GAMMA = 1/7
# Growth rate
df = df.assign(
    gr=df.groupby("countyFIPS")["confirmed"].pct_change()
       / df.groupby("countyFIPS")["days"].diff()
)
# Calculate 7-day moving average of growth rate
df = df.assign(
    smooth_gr=df.groupby("countyFIPS")
    .rolling(window=7, min_periods=1)["gr"]
    .mean()
    .reset_index(level=0, drop=True)
)

# Doubling time and beta
df = df.assign(
    dt=np.log(2) / np.log(df["gr"] + 1),
    smooth_dt=np.log(2) / np.log(df["smooth_gr"] + 1),
    beta=(df["gr"] + GAMMA) / df["POPESTIMATE2019"],
    smooth_beta=(df["smooth_gr"] + GAMMA) / df["POPESTIMATE2019"],
)

# Rt
df = df.assign(
    rt=df["beta"] / GAMMA * df["POPESTIMATE2019"],
    smooth_rt=df["smooth_beta"] / GAMMA * df["POPESTIMATE2019"],
)

In [7]:
# Define sir function
from typing import Iterator, Tuple, Dict

import numpy as np

def sir(
        susceptible: int,
        total_infected: int,
        total_recovered: int,
        new_infected: int,
        new_recovered: int,
        beta: float,
        doubling_time: float,
        growth_rate: float,
        r_naught: float,
        gamma: float = 1 / 7,
        case_adjustment_factor: float = 10,
        start: int = 0,
        stop: int = 90
) -> Iterator[Tuple[
    int,
    float,
    float,
    float,
    float,
    float,
    float,
    float,
    float,
    float
]]:
    """
    Calculate the number of susceptible people and incidence and prevalence
    of infection and recovery in a population based on initial values.

    Core SIR model function that forecasts new
    - susceptible (S),
    - infected (I), and
    - recovered (R) values

    Arguments:
        susceptible: Initial number of susceptible people.
        total_infected: Initial number of total confirmed infections.
        total_recovered: Initial number of people who have recovered.
        new_infected: Initial number of new infections.
        new_recovered: Initial number of newly recovered individuals.
        beta: The effective contact rate during social distancing.
            BETA = (GROWTH_RATE + GAMMA) / SUSCEPTIBLE
        growth_rate: The rate of growth of total_cases.
            GROWTH_RATE = BETA * SUSCEPTIBLE - GAMMA
            GROWTH_RATE = 2 ** (1 / DOUBLING_TIME) - 1
        doubling_time: The time required for total_cases to double.
             DOUBLING_TIME = ln(2)/ln(GROWTH_RATE + 1)
        r_naught: The number of people a newly infected person will infect.
            R_NAUGHT = (GROWTH_RATE + GAMMA) / GAMMA
            R_NAUGHT = BETA / GAMMA * SUSCEPTIBLE
        gamma: The inverse of the recovery length, e.g.
            RECOVERY_TIME = 7
            GAMMA = 1 / RECOVERY_TIME
        case_adjustment_factor: A multiplier to estimate the true case count.
            Under-reporting of cases is common because many are asymptomatic.
            The case adjustment factor compensates for under-reporting.
        start: The day for which initial values are provided, e.g. day 0.
        stop: The total number of days to forecast (daily projections).

    Yields:
        Collected values across each day from start to end.
        1. day
        2. susceptible
        3. total infected (infected prevalence)
        4. total recovered (recovered prevalence)
        5. new cases (infected incidence)
        6. newly recovered individuals (recovered incidence)
        7. beta (recalculated)
        8. doubling time (recalculated)
        9. growth rate (recalculated)
        10. rt (recalculated)
    """
    # Yield initial values
    yield (
        start,
        susceptible,
        total_infected,
        total_recovered,
        new_infected,
        new_recovered,
        beta,
        doubling_time,
        growth_rate,
        r_naught
    )
    # Iterate over days and yield model outputs
    for day in range(start + 1, stop + 1):
        # New confirmed infections (new positive tests)
        infected_incidence = beta * total_infected * susceptible
        # Number of infected who have just recovered
        recovered_incidence = gamma * total_infected
        # Number of people who are susceptible as of this time point
        susceptible_new = (
                susceptible
                - case_adjustment_factor
                * infected_incidence
        )
        # Total number of people who are infected as of this time point
        infected_prevalence = (
                infected_incidence
                + total_infected
                - recovered_incidence
        )
        # Total number of people who have recovered as of this time point
        recovered_prevalence = (
                recovered_incidence
                + total_recovered
                + infected_incidence * max(1., case_adjustment_factor - 1)
        )
        try:
            doubling_time_new = (
                    np.log(2) / np.log(infected_prevalence / total_infected)
            )
        except ZeroDivisionError:
            doubling_time_new = np.nan
        growth_rate_new = 2 ** (1 / doubling_time_new) - 1
        beta_new = (growth_rate_new + gamma) / susceptible_new
        rt = beta_new / gamma * susceptible_new
        yield (
            day,
            susceptible_new,
            infected_prevalence,
            recovered_prevalence,
            infected_incidence,
            recovered_incidence,
            beta_new,
            doubling_time_new,
            growth_rate_new,
            rt
        )
        susceptible, total_infected, total_recovered = (
            susceptible_new, infected_prevalence, recovered_prevalence
        )

In [8]:
# Pick county and start date
fips = df["countyFIPS"] == 36061
date = df["date"] == pd.to_datetime("2020-04-11")

# Get model inputs
subset = df[fips & date]
s = subset["POPESTIMATE2019"].values[0]
i = subset["confirmed"].values[0]
b = subset["smooth_beta"].values[0]
dt = subset["smooth_dt"].values[0]
gr = subset["smooth_gr"].values[0]
rt = subset["smooth_rt"].values[0]

In [9]:
# Create column names for the output dataframe
output_columns = (
        "days",
        "susceptible",
        "total_infected",
        "total_recovered",
        "new_infected",
        "new_recovered",
        "beta",
        "doubling_time",
        "growth_rate",
        "r_naught"
)

# Pass inputs to sir function
sir_df = pd.DataFrame(
        sir(
            susceptible=s,
            total_infected=i,
            total_recovered=0,
            new_infected=b * i * s,
            new_recovered=0,
            beta=b,
            doubling_time=dt,
            growth_rate=gr,
            r_naught=rt,
            gamma=GAMMA,
            start=0,
            stop=100
        ), columns=output_columns
    )

# Save df as excel file
sir_df.to_excel("sir.xlsx")