<h1>Ontario COVID-19 Hospitalization and ICU Rates and Chart - Data Pipeline</h1>

<h3>Pipeline outline</h3>
<div class="alert alert-block alert-info" style="margin-top: 20px">
    <ol>
        <li><a href="#parse-ontario-health-website">Parse Public Health Ontario web-page to retrieve URLs to daily epidemilogic summaries pdf-reports</a></li>
        <li><a href="#dowload-and-parse-reports">Download the most recent three pdf-reports and extract Ever Hospitalized, Ever in ICU, Total Deaths metrics</a></li>
        <li><a href="#dowload-historical-data">Download accumulated historical data and manual override data</a></li>
        <li><a href="#combine-data">Combine newly extracted data with historical data and manual override data into a single historical dataset. Upload the dataset to S3</a></li>
        <li><a href="#find-missing-dates">Identify missing dates if any. Upload the list of missing dates to S3</a></li>
        <li><a href="#build-graphs">Build graphs for day-to-day totals and deltas. Upload the graphs to S3</a></li>
        <li><a href="#refresh-cd">Refreshing CloudFront cache</a></li>
    </ol>
</div>

<a id="parse-ontario-health-website"></a>
<h4>Parse Public Health Ontario web-page to retrieve URLs to daily epidemilogic summaries pdf-reports</h4>

In [None]:
DEBUG = False

In [None]:
import re
import requests

response = requests.get('https://covid-19.ontario.ca/covid-19-epidemiologic-summaries-public-health-ontario')
response.raise_for_status()
urls = re.findall(r'https://files\.ontario\.ca/moh-covid-19-report-en-202.+\.pdf', response.text)
print(f'Discovered {len(urls)} urls')
urls[:5]

<a id="dowload-and-parse-reports"></a>
<h4>Download the most recent three pdf-reports and extract Ever Hospitalized, Ever in ICU, Total Deaths metrics</h4>

In [None]:
from pdfminer.high_level import extract_text
from datetime import datetime, date
from io import BytesIO

def get_pdf_bytes_io_from_url(url):
    response = requests.get(url)
    response.raise_for_status()
    return BytesIO(response.content)

def parse_report_fields(text):
    template = (
        'Ever\s+in\s+ICU\s+Ever\s+hospitalized\s+(?:Data\s+Source:\s+(?:iPHIS|CCM)\s+plus\s+)?(?:Cumulative\s+case\s+count\s+as\s+of\s+[A-z]+\s+\d+\s+Percentage\s+of\s+all\s+cases\s+)?([\d,]+)\s+[\d,]+\s+[\d,]+\s+[\d,]+\s+[\d,]+\s+[\d,]+\s+([\d,]+)\s+([\d,]+)\s+[\d\.]+\s*%?\s+<?[\d\.]+\s*%?\s+<?[\d\.]+\s*%?\s+<?[\d\.]+\s*%?\s+[\d\.]+\s*%?\s+[\d\.]+\s*%?\s+[\d\.]+\s*%?\s+[\d\.]+\s*%?\s+(?:Note: Not all cases have an age reported. Data corrections or updates.*?)?COVID-19\s+in\s+Ontario:\s+[A-z]+\s+\d+,\s+202\d\s+to\s+([A-z]+)\s+(\d+),\s+(202\d)'
        #Ever   in   ICU   Ever   hospitalized                                                                                                                                            2,840      1        11       122      765     1,941      1,083    5,108        5.6%           <0.1%           0.1%              0.8%           9.1%          30.9%           2.1%          10.1%              Note: Not all cases have an age reported. Data corrections or updates ... COVID-19   in   Ontario:  January    15,   2020    to   September   27,      2020
        #Ever   in   ICU   Ever   hospitalized                                                                                                                                            2,815      1        11       121      755     1,926      1,053    4,955        6.3%            0.0%           0.1%              0.9%           9.7%          31.4%           2.4%          11.1%              Note: Not all cases have an age reported. Data corrections or updates ... COVID-19   in   Ontario:  January    15,   2020    to   September   12,      2020
        #Ever   in   ICU   Ever   hospitalized      Data   Source:      iPHIS        plus      Cumulative   case   count   as   of    August   1    Percentage   of   all   cases         2,778      1        11       117      745     1,904      1,004    4,675        7.0%            0.0%           0.1%              1.0%          10.4%          31.5%           2.5%          11.9%                                                                                        COVID-19   in   Ontario:  January    15,   2020    to   August       1,      2020        
        #Ever   in   ICU   Ever   hospitalized      Data   Source:      iPHIS        plus                                                                                                 2,665      1        11       104      710     1,839      936      4,342        7.6%            0.1%           0.1 %               1%          10.7 %         31.4%           2.7%          12.4%                                                                                        COVID-19   in   Ontario:  January    15,   2020    to   June        28,      2020
        #Ever   in   ICU   Ever   hospitalized      Data   Source:      iPHIS        plus                                                                                                 2,764      1        11       116      740     1,896      994      4,650        7.1             0.0            0.1               1.0           10.4           31.5            2.6           12.0                                                                                         COVID-19   in   Ontario:  January    15,   2020    to   July        26,      2020
    )
    deaths, icu, hospitalized, month, day, year = [None] * 6
    match = re.search(template, text)
    if match and len(match.groups()) == 6:
        deaths, icu, hospitalized = map(lambda s: int(s.replace(',','')), match.groups()[0:3])
        month, day, year = match.groups()[3:6]
        report_date = datetime.strptime(f'{month} {day}, {year}', '%B %d, %Y').date()
    return report_date, deaths, icu, hospitalized

def get_values(urls, num_back):
    for url in reversed(urls[:num_back]):
        try:
            bytes_io = get_pdf_bytes_io_from_url(url)
            text = extract_text(bytes_io).replace('\n',' ')
            report_date, deaths, icu, hospitalized = parse_report_fields(text)
        except: 
            print('Processing failed:', url)
            yield None
        else:
            yield {'date': report_date, 'hospitalized': hospitalized, 'icu': icu, 'deaths': deaths}

print('Parsing last pdf documents:')
new_data = []
for v in get_values(urls, 3):
    if v: new_data.append(v)
    print(v)

In [None]:
# debug: get extracted pdf text
# print(extract_text(get_pdf_bytes_io_from_url('https://files.ontario.ca/moh-covid-19-report-en-2020-09-28.pdf')).replace('\n',' '))

<a id="dowload-historical-data"></a>
<h4>Download accumulated historical data and manual override data</h4>

In [None]:
import pandas as pd
import numpy as np

In [None]:
print('Reading historical data')
prev_df = pd.read_csv(
    'https://2020-ontario-covid19-severe.s3.amazonaws.com/ontario-covid19-severe.csv',
    parse_dates=['date'],
    usecols=['date', 'hospitalized', 'icu', 'deaths'],
    index_col='date',
    dtype={'hospitalized': 'Int32', 'icu': 'Int32', 'deaths': 'Int32'}
)
# print(prev_df.dtypes)
prev_df.tail() #noprod

In [None]:
import numpy as np

new_df = (
    pd.DataFrame(new_data).
    astype({'date': np.datetime64,'hospitalized': 'Int32', 'icu': 'Int32', 'deaths': 'Int32'}).
    set_index('date')
)
# print(new_df.dtypes)
new_df.head() #noprod

In [None]:
print('Reading manual override data')
override_df = pd.read_csv(
    'https://2020-ontario-covid19-severe.s3.amazonaws.com/ontario-covid19-severe-override.csv', parse_dates=["date"],
    usecols=["date", "hospitalized", "icu", "deaths"],
    index_col='date', dtype={'hospitalized': 'Int32', 'icu': 'Int32', 'deaths': 'Int32'}
)
# print(override_df.dtypes)
override_df.tail() #noprod

<a id="combine-data"></a>
<h4>Combine newly extracted data with historical data and manual override data into a single historical dataset. Upload the dataset to S3</h4>

In [None]:
from itertools import product

columns = ('hospitalized', 'icu', 'deaths')
dfs = (override_df, new_df, prev_df)
dfs_prefixes = {num: f'df{num}_' for num, df in enumerate((override_df, new_df, prev_df))}
dfs_prefixed = [df.add_prefix(dfs_prefixes[num]) for num, df in enumerate((override_df, new_df, prev_df))]

joined_df = (
    override_df.add_prefix('override_').
    join(new_df.add_prefix('new_'), how='outer').
    join(prev_df.add_prefix('prev_'), how='outer')
)
joined_df #noprod

final_df = pd.DataFrame()
for col in columns:
    final_df[col] = (
        joined_df[f'override_{col}'].
        combine_first(joined_df[f'new_{col}']).
        combine_first(joined_df[f'prev_{col}'])
    )
final_df.drop_duplicates(keep='first', inplace=True)
final_df #noprod

<a id="find-missing-dates"></a>
<h4>Identify missing dates if any. Upload the list of missing dates to S3</h4>

In [None]:
print('Identifying missing dates')
from datetime import date, timedelta
def find_missing_dates(df):
    min_rpt_date = df.index.min().date()
    # current date is excluded
    num_rpt_days = (date.today() - timedelta(days=1) - min_rpt_date).days
    all_rpt_dates = {min_rpt_date + timedelta(days=x) for x in range(num_rpt_days)}
    fact_rpt_dates = {ts.date() for ts in df.index}
    missing_dates = sorted(all_rpt_dates - fact_rpt_dates)
    return missing_dates
missing_dates = find_missing_dates(final_df)
missing_dates #noprod

In [None]:
if not DEBUG:
    print('Uploading missing-dates.csv to S3')
    pd.DataFrame(dict(dates=[str(d) for d in missing_dates])).to_csv('s3://2020-ontario-covid19-severe/missing-dates.csv', index=False)

In [None]:
delta10_df = final_df.diff(10) / 10
delta10_df.dropna(inplace=True)

delta1_df = final_df.diff()
delta1_df.dropna(inplace=True)

In [None]:
if not DEBUG:
    print('Uploading ontario-covid19-severe.csv to S3')
    (final_df.
       join(delta1_df.rename(columns={'hospitalized':'hospitalized_delta','icu':'icu_delta','deaths':'deaths_delta'})).
       join(delta10_df.rename(columns={'hospitalized':'hospitalized_mva10','icu':'icu_mva10','deaths':'deaths_mva10'}))
       )[['hospitalized','hospitalized_delta','hospitalized_mva10',
          'icu','icu_delta','icu_mva10',
          'deaths','deaths_delta','deaths_mva10']].to_csv('s3://2020-ontario-covid19-severe/ontario-covid19-severe.csv')

<h4>Download total cases numbers</h4>

In [None]:
import pandas as pd

covid_status_df = pd.read_csv(
    'https://data.ontario.ca/dataset/f4f86e54-872d-43f8-8a86-3892fd3cb5e6/resource/ed270bb8-340b-41f9-a7c6-e8ef587e6d11/download/covidtesting.csv',
    parse_dates=['Reported Date'],
    usecols=['Reported Date', 'Total Cases'],
    index_col='Reported Date',
    dtype={'Total Cases': 'Int32'}
    ).dropna()
covid_status_df.index.name = 'date'
covid_status_df.columns = ['total_cases']

In [None]:
covid_status_df['total_cases_mva10'] = covid_status_df['total_cases'].diff(10)/10
covid_status_df['total_cases_mva10'].plot.line() #noprod

In [None]:
offset_corr_df = pd.DataFrame()
for offset in range(0,45):
    covid_status_offset_df = covid_status_df.copy()
    covid_status_offset_df.index = covid_status_df.index + pd.DateOffset(days=offset)
    joined_df = delta10_df.join(covid_status_offset_df['total_cases_mva10'], how='inner')
    offset_df = joined_df.corr()[['hospitalized','icu','deaths']].loc[['total_cases_mva10']]
    offset_df.index = [offset]
    offset_df.index.name = 'offset'
    offset_corr_df = offset_corr_df.append(offset_df)
offset_corr_df

<a id="build-graphs"></a>
<h4>Build graphs for day-to-day totals and deltas. Upload the graphs to S3</h4>

In [None]:
def fig2s3(fig, name):
    import s3fs
    from io import StringIO
    html_buffer = StringIO()
    fig.write_html(html_buffer, include_plotlyjs=False, full_html=False)
    s3 = s3fs.S3FileSystem(anon=False)  # uses default credentials
    with s3.open(f's3://2020-ontario-covid19-severe/{name}.html', 'wb') as f:
        f.write(html_buffer.getvalue().encode('UTF-8'))

In [None]:
def calc_max_yrange_abs(final_df, covid_status_df):
    max_from_severe = final_df[['hospitalized', 'icu', 'deaths']].max().max()
    max_from_status = covid_status_df[['total_cases']].max().max()
    severe_yrange = (max_from_severe if max_from_severe * 10 > max_from_status else max_from_status // 10) * 1.05
    return severe_yrange

def calc_max_yrange_delta10(delta10_df, covid_status_df):
    max_from_severe = delta10_df[['hospitalized', 'icu', 'deaths']].max().max()
    max_from_status = covid_status_df[['total_cases_mva10']].max().max()
    severe_yrange = (max_from_severe if max_from_severe * 10 > max_from_status else max_from_status // 10) * 1.05
    return severe_yrange

In [None]:
import plotly.graph_objects as go
from plotly.subplots import make_subplots

fig = make_subplots(specs=[[{"secondary_y": True}]])

colors = {'Hospitalized': '#636efa', 'ICU': 'orange', 'Deaths': '#EF553B'}
names_in_df = {'Hospitalized': 'hospitalized', 'ICU': 'icu', 'Deaths': 'deaths'}
for col in ['Hospitalized', 'ICU', 'Deaths']:
    fig.add_trace(
        go.Scatter(
            x=final_df.index, y=final_df[names_in_df[col]], name=col, line=dict(color=colors[col])
        ),
        secondary_y=False
    )

# Set x-axis title
fig.update_xaxes(title_text="Date", range=[delta10_df.index.min(),delta10_df.index.max()])


fig.update_yaxes(title_text="Number of people", secondary_y=False, range=[0, calc_max_yrange_abs(final_df, covid_status_df)])
fig.update_yaxes(title_text="Total cases", secondary_y=True, range=[0, 10 * calc_max_yrange_abs(final_df, covid_status_df)])

fig.add_trace(
    go.Scatter(
        x=covid_status_df.index,
        y=covid_status_df['total_cases'],
        name='Total Cases',
        line=dict(color='#AAAAAA', dash='dot'),
    ),
    secondary_y=True
)

fig.update_layout(legend=dict(
    orientation='h',
    yanchor='bottom',
    y=1.02,
    xanchor='right',
    x=0.9
))

fig.update_layout(hovermode='x')
fig.show() #noprod

In [None]:
if not DEBUG:
    print('Uploading graph-totals to S3')
    fig2s3(fig, 'graph-totals')

In [None]:
import plotly.graph_objects as go
from plotly.subplots import make_subplots

fig = make_subplots(specs=[[{"secondary_y": True}]])

colors = {'Hospitalized Δ': '#636efa', 'ICU Δ': 'orange', 'Deaths Δ': '#EF553B'}
names_in_df = {'Hospitalized Δ': 'hospitalized', 'ICU Δ': 'icu', 'Deaths Δ': 'deaths'}
for col in ['Hospitalized Δ', 'ICU Δ', 'Deaths Δ']:
    fig.add_trace(
        go.Scatter(
            x=delta10_df.index, y=delta10_df[names_in_df[col]], name=col, line=dict(color=colors[col])
        ),
        secondary_y=False
    )

# Set x-axis title
fig.update_xaxes(title_text="Date", range=[delta10_df.index.min(),delta10_df.index.max()])
fig.update_yaxes(
    title_text="Number of people",
    secondary_y=False,
    range=[0, calc_max_yrange_delta10(delta10_df, covid_status_df)],
)
fig.update_yaxes(
    title_text="Total cases Δ",
    secondary_y=True,
    range=[0, 10*calc_max_yrange_delta10(delta10_df, covid_status_df)],
)

fig.add_trace(
    go.Scatter(
        x=covid_status_df.index,
        y=covid_status_df['total_cases_mva10'],
        name='Total Cases Δ',
        line=dict(color='#AAAAAA', dash='dot'),
    ),
    secondary_y=True
)

fig.update_layout(legend=dict(
    orientation='h',
    yanchor='bottom',
    y=1.02,
    xanchor='right',
    x=0.9
))

fig.update_layout(hovermode='x')
fig.show() #noprod

In [None]:
if not DEBUG:
    print('Uploading graph-deltas to S3')
    fig2s3(fig, 'graph-deltas')

<a id="refresh-cf"></a>
<h4>Refreshing CloudFront cache</h4>

In [None]:
print('Refreshing CloudFront cache')

In [None]:
def refresh_cf_cache():
    import boto3
    import datetime

    client = boto3.client('cloudfront')

    response = client.create_invalidation(
        DistributionId='ECSSCPAYIJPXV',
        InvalidationBatch={
            'Paths': {
                'Quantity': 1,
                'Items': [
                    '/*',
                ]
            },
            'CallerReference': datetime.datetime.now().isoformat()
        }
    )

    print(
        "The invalidation request response HTTPStatusCode={}, Invalidation.Status={}"
        .format(response['ResponseMetadata']['HTTPStatusCode'], response['Invalidation']['Status'])
    )
    
if not DEBUG:
    refresh_cf_cache()

In [None]:
print('Done!')