In [None]:
from load_data import pull_from_postgres
from psycopg2.sql import SQL
import pandas as pd

def age(answer_id):
    return 85 - (answer_id - 807) * 5

# pull covid test results and test types from postgres
test_data = pull_from_postgres(SQL("""
    SELECT
        features.f10 test_result, features.f133 age, features.f127 sex, features.test_week_start, features.user_id, answers.element test_type
    FROM
        datenspende_derivatives.homogenized_features features, datenspende.answers answers
    WHERE
        features.test_week_start >= '2021-10-01' AND features.questionnaire_session = answers.questionnaire_session AND answers.question = 91
    """)).replace(
    {
        'test_type': {547: 'PCR', 548: 'Antigen', 549: 'Antibody', 550: 'Unknown', 1436: 'PCR'},
        'sex': {773: 'Female', 774: 'Male', 775: 'Divers', None: 'Unknown', float('nan'): 'Unknown'},
        'age': {float(answer_id): age(answer_id) for answer_id in list(range(807,822))}
    })

# convert date to pandas datetime
test_data['date'] = pd.to_datetime(test_data['test_week_start'])
test_data.drop(columns=['test_week_start'], inplace=True)
print(f"{len(test_data['user_id'].unique())} unique users")
print(f"{test_data['test_result'].sum()} positive test results")
# print number of negative test results
print(f"{test_data[test_data['test_result']==0]['test_result'].count() - test_data['test_result'].sum()} negative test results")
test_data

In [None]:
# shares of sexesb
sex = test_data.groupby('user_id').first().groupby('sex').count()[['test_result']].rename(columns={'test_result': 'count'})
sex['share'] = sex['count'] / sex['count'].sum()
sex['share']

In [None]:
# average age
test_data.groupby('user_id').first()['age'].mean()

In [None]:
# count first positive test in a row as infection
infections = test_data.sort_values(['user_id', 'date']).ffill()
infections['infection'] = infections.replace({True: 1, False: 0}).groupby(['user_id'])['test_result'].diff()
infections['infection'] = infections['infection'].fillna(0).replace({-1: 0})

In [None]:
# count reinfections (users with more than one sequence of positive tests)
df = infections.groupby(['user_id']).sum()['infection']
print(f'we have {df[df > 1].count()} reinfections in our data')

In [None]:
infections['infection'].sum()

In [None]:
# aggregate by user_id
df = infections.groupby(['test_type', 'date']).agg({'infection': ['sum', 'count']}).reset_index().sort_values(['date', 'test_type'])
df.columns = ['test_type', 'date', 'sum', 'count']

# count all reports (with or without test) for a given day
all_tests = df.groupby(['date']).agg({'sum': 'sum', 'count': 'sum'}).reset_index()
all_tests['test_type'] = 'all'
sum_of_daily_tests = pd.concat([df, all_tests]).reset_index(drop=True).sort_values(['date', 'test_type'])
sum_of_daily_tests

In [None]:
# generate dataframe with sum of positive tests per type per day
positive_tests = sum_of_daily_tests[['test_type', 'sum', 'date']].rename(columns={'sum': 'positive_tests'}).set_index(['date', 'test_type']).unstack('test_type')
positive_tests.columns = positive_tests.columns.droplevel(0)

# generate dataframe with sum of all reports per day
all_daily_reports = sum_of_daily_tests[['test_type', 'count', 'date']].rename(columns={'count': 'all_daily_reports'}).set_index(['date', 'test_type']).unstack('test_type')
all_daily_reports.columns = all_daily_reports.columns.droplevel(0)
all_daily_reports = all_daily_reports[['all']].rename(columns={'all': 'all_daily_reports'})

daily_aggregate_test_data = positive_tests.merge(all_daily_reports, on='date', how='outer')
daily_aggregate_test_data

In [None]:
import matplotlib.pyplot as plt

all_daily_reports = sum_of_daily_tests[['test_type', 'count', 'date']].rename(columns={'count': 'all_daily_reports'}).set_index(['date', 'test_type']).unstack('test_type')
all_daily_reports.columns = all_daily_reports.columns.droplevel(0)

# normalize by all daily reports
for column in all_daily_reports.columns:
    all_daily_reports[column] = all_daily_reports[column] / all_daily_reports['all']

fig, ax = plt.subplots(figsize=(10, 6))

# plot relative share of test types among all reported tests per day
all_daily_reports[['Antibody', 'Antigen', 'PCR', 'Unknown']].plot.area(ax=ax)
ax.set_ylabel('Relative share of test types')
ax.set_xlabel('Date')
ax.set_title('Relative share of test types among all reported tests per day')
ax.set_xlim(pd.to_datetime('2021-10-01'), pd.to_datetime('2022-07-2'))
ax.set_ylim(0, 1)

all_daily_reports.to_csv('all_daily_reports.csv')

In [None]:
import matplotlib.pyplot as plt

df = daily_aggregate_test_data.copy()

# divide by all
for column in df.columns:
    df[column] = df[column] / df['all']

df.dropna(subset=['all'], inplace=True)

# take 7 day rolling mean
df = df.rolling(7).mean()

fig, ax = plt.subplots(figsize=(10, 5))

# plot relative share of antibody, antigen, PCR, unknown tests
df[['PCR', 'Antigen']].plot.area(ax=ax)
ax.set_xlabel('Date')
ax.set_ylabel('Relative share of tests')
ax.legend(loc='lower left')
ax.set_title('Relative share of test types among positive tests per day')
ax.set_xlim(pd.to_datetime('2021-10-20'), pd.to_datetime('2022-07-2'))
ax.set_ylim(0, 1)
# ax.text(0.92, 0.95, 'A)', transform=ax.transAxes, fontsize=24, verticalalignment='top')
fig.savefig('relative_shares_of_test_type_among_positive_tests_per_day.png')

df.to_csv('relative_shares_of_test_type_among_positive_tests_per_day.csv')

In [None]:
# plot test positivity rate for each test type per day
sum_of_daily_tests['rate'] = sum_of_daily_tests['sum'] / sum_of_daily_tests['count']

df = sum_of_daily_tests[['rate', 'date', 'test_type']].set_index(['date', 'test_type']).unstack('test_type')
df.columns = df.columns.droplevel(0)
df = df.rolling(7).mean()

fig, ax = plt.subplots(figsize=(10, 8))
df[['PCR', 'Antigen', 'all']].plot(ax=ax)

ax.set_xlabel('Date')
ax.set_ylabel('Test positivity rate')
ax.legend(loc='upper left')
ax.set_title('Test positivity rate per day per test type')
plt.show()

In [None]:
daily_aggregate_test_data

In [None]:
# calcualte 7 day incidence per test type

df = daily_aggregate_test_data.copy()
incidence = pd.DataFrame()
for column in df.columns:
    print(column, type(column))
    incidence[column + ' incidence'] = df[column] / df['all_daily_reports'] * 100_000

incidence = incidence[['Antigen incidence', 'PCR incidence', 'all incidence']].dropna().rolling('7D').mean()

fig, ax = plt.subplots(figsize=(10, 8))
incidence.plot(ax=ax)

In [None]:
incidence_official = pull_from_postgres(SQL(
    """
    SELECT date_of_report date, incidence_7d_per_100k official_incidence FROM coronacases.german_counties_incidence WHERE location_level = 0;
    """))
incidence_official

In [None]:
import matplotlib.pyplot as plt
from matplotlib.lines import Line2D

columns = incidence.columns.drop('all incidence')

df = incidence_official.merge(incidence, on='date').sort_values(by="date").reset_index(drop=True).query('date>20220101').drop(columns=['all incidence'])

fig, ax = plt.subplots(figsize=(12, 6))
axb = ax.twinx()

ax1 = df.plot(x="date", y="official_incidence", ax=ax, label="incidence as officially reported", color='blue', legend=False)
ax1 = df.plot(x="date", y="official_incidence", ax=ax, label="incidence as officially reported", color='blue', legend=False)

colors = ['red', 'green', 'black']
incidence_lines = []
for column, color in zip(columns, colors):
    incidence_lines.append(Line2D([0], [0], color=color, label=column))
    ax2 = df.plot(x="date", y=column, ax=axb, label="incidence from surveys", color=color, legend=False)

# create legend manually
labels = ['incidence as officially reported', *incidence.columns.values]
blue_line = Line2D([0], [0], color='blue', label='official incidence')
red_line = Line2D([0], [0], color='red', label='self reported incidence')
ax.legend([blue_line, *incidence_lines], labels, loc='upper left')

# label axes
ax.set_xlabel('Date')
ax.set_ylabel('Incidence calculated from surveys as 7 day average per 100000')
axb.set_ylabel('Incidence as officially reported')

# label plot as B)
ax.text(0.92, 0.95, 'B)', transform=ax.transAxes, fontsize=24, verticalalignment='top')

fig.savefig('incidence_pcr_vs_antigen_vs_reported.png')

In [None]:
import matplotlib.pyplot as plt
from matplotlib.lines import Line2D

columns = incidence.columns

df = incidence_official.merge(incidence, on='date').sort_values(by="date").reset_index(drop=True).query('date>20220101')

fig, ax = plt.subplots(figsize=(12, 6))
axb = ax.twinx()

ax1 = df.plot(x="date", y="official_incidence", ax=ax, label="incidence as officially reported", color='blue', legend=False)
ax1 = df.plot(x="date", y="official_incidence", ax=ax, label="incidence as officially reported", color='blue', legend=False)

colors = ['red', 'green', 'black']
incidence_lines = []
for column, color in zip(columns, colors):
    incidence_lines.append(Line2D([0], [0], color=color, label=column))
    ax2 = df.plot(x="date", y=column, ax=axb, label="incidence from surveys", color=color, legend=False)

# create legend manually
labels = ['incidence as officially reported', *incidence.columns.values]
blue_line = Line2D([0], [0], color='blue', label='official incidence')
red_line = Line2D([0], [0], color='red', label='self reported incidence')
ax.legend([blue_line, *incidence_lines], labels, loc='upper left')

# label axes
ax.set_xlabel('Date')
ax.set_ylabel('Incidence calculated from surveys as 7 day average per 100000')
axb.set_ylabel('Incidence as officially reported')


ax.text(0.9, 0.95, 'C)', transform=ax.transAxes, fontsize=24, verticalalignment='top')

fig.savefig('incidence_all.png')

In [None]:
columns = incidence.columns.drop('all incidence')

df = incidence_official.merge(incidence, on='date').sort_values(by="date").reset_index(drop=True).query('date>20220101').drop(columns=['all incidence'])

fig, ax = plt.subplots(figsize=(12, 6))

ax1 = df.plot(x="date", y="official_incidence", ax=ax, label="incidence as officially reported", color='blue', legend=False)

colors = ['red', 'green', 'black']
incidence_lines = []
for column, color in zip(columns, colors):
    incidence_lines.append(Line2D([0], [0], color=color, label=column))
    ax2 = df.plot(x="date", y=column, ax=ax, label="incidence from surveys", color=color, legend=False)

# create legend manually
labels = ['incidence as officially reported', *incidence.columns.values]
blue_line = Line2D([0], [0], color='blue', label='official incidence')
red_line = Line2D([0], [0], color='red', label='self reported incidence')
ax.legend([blue_line, *incidence_lines], labels, loc='upper left')

# label axes
ax.set_xlabel('Date')
ax.set_ylabel('Incidence as 7 day average per 100000')
axb.set_ylabel('Incidence as officially reported')

ax.text(0.92, 0.95, 'D)', transform=ax.transAxes, fontsize=24, verticalalignment='top')

fig.savefig('incidence_pcr_vs_antigen_vs_reported_on_same_ax.png')

In [None]:
columns = incidence.columns.drop(['PCR incidence', 'Antigen incidence'])

df = incidence_official.merge(incidence, on='date').sort_values(by="date").reset_index(drop=True).query('date>20220101').drop(columns=['PCR incidence', 'Antigen incidence'])

fig, ax = plt.subplots(figsize=(7, 5))

axb = ax.twinx()

ax1 = df.plot(x="date", y="official_incidence", ax=ax, label="incidence as officially reported", color='blue', legend=False)

colors = ['red', 'green', 'black']
incidence_lines = []
for column, color in zip(columns, colors):
    print(column)
    incidence_lines.append(Line2D([0], [0], color=color, label=column))
    ax2 = df.plot(x="date", y=column, ax=axb, label="incidence from surveys", color=color, legend=False)

# create legend manually
labels = ['incidence as officially reported', 'incidence from self reported test results']
blue_line = Line2D([0], [0], color='blue', label='official incidence')
red_line = Line2D([0], [0], color='red', label='self reported incidence')
ax.legend([blue_line, *incidence_lines], labels, loc='upper left')

# label axes
ax.set_xlabel('Date')
axb.set_ylabel('Incidence as 7 day average per 100000')
ax.set_ylabel('Incidence as officially reported')
ax.set_ylim(0, ax.get_ylim()[1])
axb.set_ylim(0, axb.get_ylim()[1])

ax.text(0.92, 0.95, 'A)', transform=ax.transAxes, fontsize=24, verticalalignment='top')

fig.savefig('official_vs_reported_on_different_ax.png')

In [None]:
# divide official incidence by initial value
df = incidence_official.merge(incidence, on='date').sort_values(by="date").reset_index(drop=True).query('date>20211115').drop(columns=['PCR incidence', 'Antigen incidence'])
df['official_incidence'] = df['official_incidence'] / df['official_incidence'].iloc[0]
df['all incidence'] = df['all incidence'] / df['all incidence'].iloc[0]
df

In [None]:
fig, ax = plt.subplots(figsize=(12, 6))

ax1 = df.plot(x="date", y="official_incidence", ax=ax, label="incidence as officially reported", color='blue', legend=False)

colors = ['red', 'green', 'black']
incidence_lines = []
for column, color in zip(columns, colors):
    print(column)
    incidence_lines.append(Line2D([0], [0], color=color, label=column))
    ax2 = df.plot(x="date", y=column, ax=ax, label="incidence from surveys", color=color, legend=False)

# create legend manually
labels = ['incidence as officially reported', 'incidence from self reported test results']
blue_line = Line2D([0], [0], color='blue', label='official incidence')
red_line = Line2D([0], [0], color='red', label='self reported incidence')
ax.legend([blue_line, *incidence_lines], labels, loc='upper left')

# label axes
ax.set_xlabel('Date')
ax.set_ylabel('Incidence divided by initial value')
ax.set_ylim(0, ax.get_ylim()[1])
axb.set_ylim(0, axb.get_ylim()[1])

ax.text(0.92, 0.95, 'B)', transform=ax.transAxes, fontsize=24, verticalalignment='top')
fig.savefig('incidence_divided_by_initial_value.png')

In [None]:
# plot all incidence divided by official incidence
df['self_reported_incidence_ratio'] =df['all incidence'] / df['official_incidence']

fig, ax = plt.subplots(figsize=(12, 6))
df.plot(x="date", y="self_reported_incidence_ratio", ax=ax, label="incidence as officially reported divided by self reported incidence", color='blue', legend=False)

ax.set_xlabel('Date')
ax.set_ylabel('Self reported incidence divided by official incidence')
ax.text(0.92, 0.95, 'C)', transform=ax.transAxes, fontsize=24, verticalalignment='top')
fig.savefig('incidence_ratio.png')

In [None]:
df = incidence_official.merge(incidence, on='date').sort_values(by="date").reset_index(drop=True).query('date>20220101').drop(columns=['PCR incidence', 'Antigen incidence'])
df['official R_t'] = df['official_incidence'].shift(-4) / df['official_incidence']
df['self reported R_t'] = df['all incidence'].shift(-4) / df['all incidence']
# set type of date to pandas datetime
df['date'] = pd.to_datetime(df['date'])
df = df.rolling(7, on='date', win_type='triang', center=True).mean()
df

In [None]:
fig, ax = plt.subplots(figsize=(12, 6))

ax1 = df.plot(x="date", y="official R_t", ax=ax, label="R estimated from officially reported incidence", color='blue', legend=False)
ax2 = df.plot(x='date', y='self reported R_t', ax=ax, label="R estimated from self reported incidence", color='red', legend=False)

# create legend manually
labels = ['R estimated from officially reported incidence', 'R estimated from self reported test results']
blue_line = Line2D([0], [0], color='blue', label='official incidence')
red_line = Line2D([0], [0], color='red', label='self reported incidence')
ax.legend([blue_line, *incidence_lines], labels, loc='upper left')

# label axes
ax.set_xlabel('Date')
ax.set_ylabel('R value')

ax.text(0.92, 0.95, 'D)', transform=ax.transAxes, fontsize=24, verticalalignment='top')

fig.savefig('R_estimated_from_official_vs_reported.png')

In [None]:
df = incidence_official.merge(incidence, on='date').sort_values(by="date").reset_index(drop=True).query('date>20220101').drop(columns=['PCR incidence', 'Antigen incidence'])
df['split'] = 'before May 2022'
df['split'][df['date'] > pd.to_datetime('20220501')] = 'after May 2022'

import seaborn as sns
# scatterplot of official vs self reported incidence with split by date
fig, ax = plt.subplots(figsize=(4, 4))
sns.scatterplot(x='official_incidence', y='all incidence', hue='split', data=df, ax=ax, alpha=0.5)

# plot line from (0, 0) to (5000, 5000)
factor = 4
length = 2000
ax.plot([0, length], [0, length * factor], color='black', alpha=0.5)
ax.plot([0, length], [0, length], color='black', alpha=0.5)
ax.fill_between([0, length], [0, length * factor], [0, length], color='black', alpha=0.2)
ax.set_xlim(0, length)
ax.set_ylim(0, length * factor)

ax.set_ylabel('Self reported incidence')
ax.set_xlabel('Official incidence')

# annotate with E in upper left corner
ax.text(0.05, 0.95, 'E)', transform=ax.transAxes, fontsize=24, verticalalignment='top')
fig.savefig('official_vs_self_reported_incidence_with_split.png')

In [None]:
from typing import TypedDict
from datetime import timedelta

Date = TypedDict('Date', {'date': pd.Timestamp, 'name': str})

def plot_official_vs_self_reported_incidence(ax, legend_location, split_date: Date, df):
    axb = ax.twinx()

    df.plot(x="date", y="official_incidence", ax=ax, label="incidence as officially reported", color='blue', legend=False)

    colors = ['red']
    incidence_lines = []
    for column, color in zip(columns, colors):
        print(column)
        incidence_lines.append(Line2D([0], [0], color=color, label=column))
        df.plot(x="date", y=column, ax=axb, label="incidence from surveys", color=color, legend=False)

    # plot vertical line at split_date date and label with split date name
    ax.axvline(split_date['date'], color='black', alpha=0.5)
    ax.text(split_date['date'] + timedelta(days=5), ax.get_ylim()[1]*0.95 , split_date['name'], fontsize=12, verticalalignment='top', rotation=90)

    # create legend manually
    labels = ['incidence as officially \nreported', 'incidence from self \nreported test results']
    blue_line = Line2D([0], [0], color='blue', label='official incidence')
    ax.legend([blue_line, *incidence_lines], labels, bbox_to_anchor=legend_location)

    # label plot with A) in upper left corner
    axb.text(0.05, 0.95, 'A)', transform=ax.transAxes, fontsize=24, verticalalignment='top')

    # label axes
    ax.set_xlabel('Date')
    axb.set_ylabel('Self reported incidence')
    ax.set_ylabel('Official incidence')
    ax.set_ylim(0, ax.get_ylim()[1])
    axb.set_ylim(0, axb.get_ylim()[1])


def scatter_plot_with_split(ax, legend_location, split_date_1: Date, split_date_2: Date, df):

    df['split'] = f'before {split_date_1["name"]}'
    df['split'][df['date'] > split_date_1['date']] = f'after {split_date_1["name"]}'
    df['split'][df['date'] > split_date_2['date']] = f'after {split_date_2["name"]}'

    import seaborn as sns
    # scatterplot of official vs self reported incidence with split by date
    sns.scatterplot(x='official_incidence', y='all incidence', hue='split', data=df, ax=ax, alpha=0.5)

    # plot line from (0, 0) to (5000, 5000)
    factor = 4
    length = 2000

    ax.plot([0, length], [0, length * factor], label='x=5y', color='black', alpha=0.5)
    ax.plot([0, length], [0, length * 2], '-.', label='x=2y', color='black', alpha=0.5)
    ax.plot([0, length], [0, length], '--', label='x=y', color='black', alpha=0.5)

    ax.fill_between([0, length], [0, length * factor], [0, length], color='black', alpha=0.2)
    ax.set_xlim(0, length)
    ax.set_ylim(0, length * factor)

    # set legend position outside of plot on lower right
    ax.legend(loc='upper left', bbox_to_anchor=legend_location)

    # remove axis tics
    ax.tick_params(axis='both', which='both', length=0)
    # remove axis labels
    # ax.set_xticklabels([])
    # ax.set_yticklabels([])

    ax.set_ylabel('Self reported incidence')
    ax.set_xlabel('Official incidence')

    # label plot with B) in upper left corner
    ax.text(0.05, 0.95, 'B)', transform=ax.transAxes, fontsize=24, verticalalignment='top')

gridspecs = dict(width_ratios=[1, .9], height_ratios=[1, 1])
fig, axs = plt.subplot_mosaic([['upper left', 'upper right'],
                               ['lower', 'lower']], gridspec_kw=gridspecs, figsize=(7, 7))
# delete upper left subplot
axs['upper left'].remove()

df = incidence_official.merge(incidence, on='date').sort_values(by="date").reset_index(drop=True).query('date>20211115').drop(columns=['PCR incidence', 'Antigen incidence'])

scatter_plot_with_split(
    ax=axs['upper right'],
    legend_location=(-1.23, 0.93),
    split_date_1=dict(date=pd.to_datetime('20220101'), name='January 2022'),
    split_date_2=dict(date=pd.to_datetime('20220501'), name='May 2022'),
    df=df
)

plot_official_vs_self_reported_incidence(
    ax=axs['lower'],
    legend_location=(.4, 1.48),
    split_date=dict(date=pd.to_datetime('20220501'), name='May 2022'),
    df=df)
fig.savefig('official_vs_self_reported_incidence_with_scatter_subplot.png')