### Unit Test Data Creation

This notebook outlines the code used to create the unit test data. Some of this is mock data, some real data that has been scrambled, and some is just cuts of real data as time constraints became a concern. The majority of target tables are generated by the functions themselves for brevity, so ensure the target dataframes are generated (if needed) prior to any code changes.

In [None]:
%load_ext autoreload
%autoreload 2

import os
import sys
import math
import itertools
import random

import pandas as pd
import pandas_gbq
import numpy as np

from google.cloud import bigquery

# Import from local data files
current_path = os.path.abspath('.')
sys.path.append(os.path.dirname(current_path))

from data_access.data_factory import DataFactory as factory
from data_access import prep_pipeline as pp
from utils import data as dt
from utils import config as cf

In [None]:
# location in GCP to save tables in
# this must end in a full stop 
table_loc = 'review_ons.'

# project ID within GCP to save tables in
project_id = 'ons-hotspot-prod'

# parameter to overwrite tables in GCP
# 'replace' will replace, 'fail' will raise error if the table already exists
if_exists = 'replace'

## Two-way fixed effects model

The model code is broadly split into two sets of functions - those used in the two-way fixed effect model, and those used in the time tranche model. 

<b>NB:</b> While some of the data is mocked manually in this notebook, a lot of the dataframes use real data, either sampled, or scrambled. 

### Static Data

Here we create a mock up of the individual tables that are fed into the read_data function when the 'static' table_type parameter is called. 

In [None]:
# read in real data to scramble

static_vars = factory.get('static_vars').create_dataframe()

static_vars_fake = pd.DataFrame()

region_cols = ['LSOA11CD', 'LSOA11NM', 'MSOA11CD', 'MSOA11NM', 'LTLA20CD', 'LTLA20NM',
       'UTLA20CD', 'UTLA20NM', 'RGN19CD', 'RGN19NM']

static_vars_fake[region_cols] = static_vars[region_cols].sample(n=1000, random_state=42).reset_index(drop=True)

static_vars_filt = static_vars[static_vars['LSOA11CD'].str.startswith('E')]

# sample data at random so it is not associated with its actual location
for col in [col for col in static_vars_filt.columns if col not in region_cols]:
    static_vars_fake[col] = static_vars_filt[col].sample(n=1000, random_state=42).reset_index(drop=True)
    
static_vars_fake.to_gbq(table_loc + 'unit_test_static_vars', project_id = project_id, if_exists = if_exists)

In [None]:
# this dataset is publically available
mid_year_lsoa = factory.get('mid_year_lsoa').create_dataframe()

lsoa_list = static_vars_fake['LSOA11CD'].unique()
mid_year_lsoa = mid_year_lsoa[mid_year_lsoa['LSOA11CD'].isin(lsoa_list)]

mid_year_lsoa.to_gbq(table_loc + 'unit_test_mid_year_lsoa', project_id = project_id, if_exists = if_exists)

In [None]:
mobility_clusters_processed = factory.get('mobility_clusters_processed').create_dataframe()

mob = mobility_clusters_processed.sample(n=1000, random_state=42).reset_index(drop=True)

# replace real LSOAs with 1000 LSOAs we are using in static_vars_fake
# as a proxy for shuffling the data
mob['LSOA11CD'] = lsoa_list

mob.to_gbq(table_loc + 'unit_test_mobility_clusters', project_id = project_id, if_exists = if_exists)

In [None]:
flow_to_work = factory.get('flow_to_work').create_dataframe()

flow = flow_to_work.sample(n=1000, random_state=42)

# replace real LSOAs with chosen LSOA list as before
flow['LSOA11CD'] = lsoa_list

flow.to_gbq(table_loc + 'unit_test_flow_to_work', project_id = project_id, if_exists = if_exists)

In [None]:
# this is also publically available
lsoa_2011 = factory.get('LSOA_2011').create_dataframe() 

lsoa_2011 = lsoa_2011[lsoa_2011['LSOA11CD'].isin(lsoa_list)]

lsoa_2011['geometry'] = lsoa_2011['geometry'].astype(str)

lsoa_2011.to_gbq(table_loc + 'unit_test_lsoa_2011', project_id = project_id, if_exists = if_exists)

In [None]:
# manually join the tables as would happen with the read_data function

table_list = [mid_year_lsoa,
        mob,
        flow,
        lsoa_2011]

df_final = static_vars_fake.copy()

for table in table_list:
    df_final = df_final.merge(table.copy(), on='LSOA11CD', how='outer', suffixes=['', '_drop'])
    
drop_cols = [col for col in df_final.columns if col.endswith('_drop')]
df_final.drop(columns=drop_cols, inplace=True)

df_final = df_final[df_final['LSOA11CD'].str.startswith('E')]

df_final.to_gbq(table_loc + 'unit_test_static', project_id = project_id, if_exists = if_exists)

### Dynamic Data

Repeating the above steps, but for the dynamic datsets, in this case just cases and mobility.

In [None]:
# cases data
cases_df = factory.get('aggregated_tests_lsoa').create_dataframe()

lsoas = ['E01001994', 'E01014214', 'E01013400', 'E01002435', 'E01019632',
       'E01015272', 'E01030378', 'E01007603', 'E01022044', 'E01007712']

dates = pd.date_range(start='2021-01-10', periods=10, freq='w')

filt = (cases_df['Date'].isin(dates)) & (cases_df['LSOA11CD'].isin(lsoas))

cases_df_sub = cases_df[filt]

cases_df_sub.to_gbq(table_loc + 'unit_test_cases', project_id = project_id, if_exists = if_exists)

In [None]:
# mobility data
deimos_footfall_df = factory.get('lsoa_daily_footfall').create_dataframe()

# add extra dates to account for future functions
# these operate on the assumption that there will be more mobility data than case data
date_list = dates.astype(str).to_list() + ['2021-03-21', '2021-03-28']

# adding in a Welsh LSOA to test that this is dealt with appropriately
lsoa_list = lsoas + ['W01001957']

filt = deimos_footfall_df['Date'].isin(date_list)
filt &= deimos_footfall_df['LSOA11CD'].isin(lsoa_list)

deimos_mock = deimos_footfall_df[filt]

deimos_mock.to_gbq(table_loc + 'unit_test_deimos', project_id = project_id, if_exists = if_exists)


In [None]:
# creating target dataframe for dynamic test of read_data

dynamic_mock = cases_df_sub.merge(deimos_mock, on=['LSOA11CD', 'Date'], how='outer', suffixes=['', '_drop'])
dynamic_mock.to_gbq(table_loc + 'unit_test_dynamic', project_id = project_id, if_exists = if_exists)


### Geo merge function

This function simply merges precalculated area data onto LSOA11CD, converting to the required units.

In [None]:
# read in LSOAs
geom_df = factory.get('LSOA_2011').create_dataframe()
geom_df = geom_df[['LSOA11CD']]

geo_df = pp.geo_merge_precalc(geom_df.copy())

geo_df.to_gbq(table_loc + 'unit_test_geo_precalc', project_id = project_id, if_exists = if_exists)


### Normalise function

Create a dataset which tests the normalise function. This has functionality to normalise data with the sum of each row, or by a separate column.

In [None]:
normalise_input = pd.DataFrame({
    'col1': range(10),
    'col2': range(10, 20),
    'col3': range(0,20,2),
    'col4': range(0,30,3),
    'std': 50
})

normalise_input.to_gbq(table_loc + 'unit_test_normalise', project_id = project_id, if_exists = if_exists)

# the instructions for this normalisation are found in the normalise_dic dictionary
# within the config_unit_testing.py file 

# normalise col1 and col2 by their row-wise sum
col_sum = normalise_input['col1'] + normalise_input['col2']
normalise_input['col1'] = normalise_input['col1'] / col_sum
normalise_input['col2'] = normalise_input['col2'] / col_sum

# normalise col3 and col4 by the std column, creating new columns 
normalise_input['col3_test'] = normalise_input['col3'] / normalise_input['std']
normalise_input['col4_test'] = normalise_input['col4'] / normalise_input['std']

normalise_input.to_gbq(table_loc + 'unit_test_normalise_result', project_id = project_id, if_exists = if_exists)

### Forward fill function
This accounts for data that was cumulative but with incomplete dates - after joining to other datasets, there may be gaps.

In [None]:
ffill_input = pd.DataFrame({
    'col1':[np.nan,  1.,  1.,  2., np.nan,  3.,  4.,  1.,  2.,  3.,  4.,  5.,  6.,
        7.,  1., np.nan, np.nan, np.nan,  2.,  4.,  8.,  0.,  0., np.nan, np.nan,  1.,
        1., np.nan],
    'col2':[0.,  2.,  3., np.nan,  4.,  7.,  7., np.nan,  1.,  1.,  4.,  4., np.nan,
       np.nan, np.nan, np.nan,  1.,  1.,  1.,  1.,  1., np.nan, np.nan, np.nan, np.nan, np.nan,
       np.nan, np.nan],
    'col3':['a']*7 + ['b']*7 + ['c']*7 + ['d']*7,
    'date': pd.date_range(start='2020-01-01', periods=7).to_list() * 4
})

ffill_input.date = ffill_input.date.dt.tz_localize(None)

# first output frame where only one column is forward filled
ffill_output1 = ffill_input.copy()
ffill_output1['col1'] = ffill_output1.groupby('col3')['col1'].fillna(method='ffill')
ffill_output1['col1'].fillna(0, inplace=True)

# second output frame where two columns are forward filled
ffill_output2 = ffill_input.copy()
ffill_output2[['col1', 'col2']] = ffill_output2.groupby('col3')['col1', 'col2'].fillna(method='ffill')
ffill_output2.fillna(0, inplace=True)

ffill_input.to_gbq(table_loc + 'unit_test_ffill_df', project_id = project_id, if_exists = if_exists)
ffill_output1.to_gbq(table_loc + 'unit_test_ffill_1', project_id = project_id, if_exists = if_exists)
ffill_output2.to_gbq(table_loc + 'unit_test_ffill_2', project_id = project_id, if_exists = if_exists)

### Sum features

This function sums the columns provided and creates a new column with this sum, dropping the original columns.

In [None]:
sum_feat = pd.DataFrame({
    'col1': range(5),
    'col2': range(5,10),
    'col3': range(0,10,2),
    'col4': range(18, 3, -3),
    'col5': range(5,14, 2)})

sum_result = pd.DataFrame()

# these parameters are contained in the config unit test file, config_unit_testing.py
sum_result['newcol'] = sum_feat[['col1', 'col2']].sum(axis=1)
sum_result['newcol2'] = sum_feat[['col3', 'col4', 'col5']].sum(axis=1)

sum_feat.to_gbq(table_loc + 'unit_test_sum_features', project_id = project_id, if_exists = if_exists)
sum_result.to_gbq(table_loc + 'unit_test_sum_features_result', project_id = project_id, if_exists = if_exists)

### Timelag function

In [None]:
dynamic_df = factory.get('lsoa_dynamic').create_dataframe()
dynamic_df_norm = factory.get('dynamic_raw_norm_chosen_geo').create_dataframe()

# generate fake data for the input table
# take the range of each column and sample randomly from a uniform distribution

dyn_cols = ['cases_cumsum', 'full_vacc_cumsum',
       'msoa_people',
       'worker_footfall_sqkm', 'visitor_footfall_sqkm',
       'resident_footfall_sqkm', 'total_footfall_sqkm',
       'worker_visitor_footfall_sqkm', 'ALL_PEOPLE', 'Area', #'travel_cluster',
       #'RGN19NM', 'UTLA20NM', 'MSOA11NM', 'Country',
       'total_vaccinated_first_dose', 'total_vaccinated_second_dose',
       'pct_of_people_full_vaccinated', 'cases_per_person',
       'pct_infected_all_time', 'COVID_Cases', 'cumsum_divided_area']

dynamic_df_mock = pd.DataFrame()

for col in dyn_cols:
    dyn_min = dynamic_df[col].min()
    dyn_max = dynamic_df[col].max()
    values = []
    for i in range(2000):
        values.append(random.uniform(dyn_min, dyn_max))
    
    dynamic_df_mock[col] = values
    
dates = pd.date_range(start='2020-01-01', periods = 20)

dates_df = pd.DataFrame({'cj':1, 'Date':dates})

subset = dynamic_df.sample(n=100, random_state=42)

lsoa_dyn = subset['LSOA11CD'].unique()

# create a cross-join dataframe to get all combinations of the chosen dates and LSOAs
df_cj = pd.DataFrame({'LSOA11CD':lsoa_dyn, 
                      'travel_cluster': subset['travel_cluster'],
                     'cj':1})

df_cj = df_cj.merge(dates_df, on='cj')

df_cj.drop(columns='cj', inplace=True)

dynamic_df_mock[['LSOA11CD', 'travel_cluster', 'Date']] = df_cj

In [None]:
# perform the same operation for the normalised dynamic dataset
dyn_cols = ['COVID_Cases', 'cases_cumsum',
       'total_vaccinated_first_dose', 'total_vaccinated_second_dose',
       'full_vacc_cumsum',
       'msoa_people', 'worker_footfall_sqkm', 'visitor_footfall_sqkm',
       'resident_footfall_sqkm', 'total_footfall_sqkm',
       'worker_visitor_footfall_sqkm', 'ALL_PEOPLE', 'Area',
       #'RGN19NM', 'UTLA20NM', 'MSOA11NM', 'Country', 
        'Area_chosen_geo',
       'Population_chosen_geo', 'total_vaccinated_first_dose_norm_lag_pop',
       'total_vaccinated_second_dose_norm_lag_pop',
       'full_vacc_cumsum_norm_lag_pop', 'COVID_Cases_norm_lag_pop',
       'cases_cumsum_norm_lag_pop', 'COVID_Cases_norm_lag_area',
       'cases_cumsum_norm_lag_area', 'worker_footfall_sqkm_norm_lag_area',
       'visitor_footfall_sqkm_norm_lag_area',
       'resident_footfall_sqkm_norm_lag_area',
       'total_footfall_sqkm_norm_lag_area',
       'worker_visitor_footfall_sqkm_norm_lag_area',
       'commute_inflow_sqkm_norm_lag_area', 'other_inflow_sqkm_norm_lag_area']

dynamic_df_norm_mock = pd.DataFrame()

for col in dyn_cols:
    dyn_min = dynamic_df_norm[col].min()
    dyn_max = dynamic_df_norm[col].max()
    values = []
    for i in range(2000):
        values.append(random.uniform(dyn_min, dyn_max))
    
    dynamic_df_norm_mock[col] = values
    
dynamic_df_norm_mock[['LSOA11CD', 'travel_cluster', 'Date']] = df_cj

In [None]:
# apply the timelag function to get the target dataframe

timelag_df = pp.apply_timelag(dynamic_df_mock.copy(), dynamic_df_norm_mock.copy(), save_results=False)

In [None]:
dynamic_df_mock.to_gbq(table_loc + 'unit_test_timelag_dynamic', project_id = project_id, if_exists = if_exists)
dynamic_df_norm_mock.to_gbq(table_loc + 'unit_test_timelag_dynamic_norm', project_id = project_id, if_exists = if_exists)
timelag_df.to_gbq(table_loc + 'unit_test_timelag_result', project_id = project_id, if_exists = if_exists)

## Time Tranche functions
### Static and case data subsets

In [None]:
static_df = factory.get('static_features').create_dataframe()

# select same set of LSOAs that are in the unit_test_cases table
static_df_sub = static_df[static_df['LSOA11CD'].isin(lsoas)]

# add Area column - this was removed from the source dataframe as is no longer required in the script
# so needs to be added back here manually
# this also needs to be done for the LSOA population column, ALL_PEOPLE
area_list = [x/10 for x in range(1, 200, 5)]

# set seed for reproducibility
random.seed(42)
static_df_sub['Area'] = random.sample(area_list, 10)
static_df_sub['ALL_PEOPLE'] = random.sample(range(1400,2200), 10)       

static_df_sub.to_gbq(table_loc + 'unit_test_static_for_cases', project_id = project_id, if_exists = if_exists)

cases_static = pp.join_cases_to_static_data(static_df_sub, 'unit_test_cases')

cases_static.to_gbq(table_loc + 'unit_test_cases_static',project_id = project_id, if_exists = if_exists)

cases_static_week = pp.derive_week_number(cases_static)
cases_static_week.to_gbq(table_loc + 'unit_test_cases_static_week', project_id = project_id, if_exists = if_exists)

### Vaccination data

In [None]:
vax_df = factory.get('lsoa_vaccinations').create_dataframe()

cases_static_week['Date'] = pd.to_datetime(cases_static_week['Date']).dt.date.astype(str)

filt = vax_df['LSOA11CD'].isin(cases_static_week['LSOA11CD'].unique())
filt &= vax_df['Date'].isin(list(cases_static_week['Date'].unique()) + ['2021-03-21', '2021-03-28'])

vax_df_sub = vax_df[filt]    

vax_df_sub.sort_values(by=['LSOA11CD', 'Date'], inplace=True)
vax_df_sub.reset_index(drop=True, inplace=True)

# delete first row so fillna in join_vax_data function will have something to work on
vax_df_sub = vax_df_sub.iloc[1:,:]
vax_df_sub.reset_index(drop=True, inplace=True)

cases_static_week['Date'] = pd.to_datetime(cases_static_week['Date']).dt.date.astype(str)

vax_processed_df, cases_all_weeks_df = pp.join_vax_data(cases_static_week, vax_df_sub)

vax_df_sub.to_gbq(table_loc + 'unit_test_vaccinations', project_id = project_id, if_exists = if_exists)
vax_processed_df.to_gbq(table_loc + 'unit_test_vaccinations_processed', project_id = project_id, if_exists = if_exists)
cases_all_weeks_df.to_gbq(table_loc + 'unit_test_cases_static_week_vax', project_id = project_id, if_exists = if_exists)

In [None]:
# ensure Date columns can be merged
deimos_mock['Date'] = pd.to_datetime(deimos_mock['Date']).dt.date.astype(str)

joined = pp.join_tranches_mobility_data(cases_all_weeks_df.copy(), deimos_mock.copy())
joined.to_gbq(table_loc + 'unit_test_deimos_cases_vax', project_id = project_id, if_exists = if_exists)

In [None]:
# first target for the convert_unit function, changing the original column
# NB this function will also change the original dataframe, so be sure to copy this if you wish to retain the original
convert_units_df = pp.convert_units(joined.copy(), 
                                    'meat_and_fish_processing',
                                    0.1)
convert_units_df = pp.sort_cols(convert_units_df, ['LSOA11CD', 'Date'])

convert_units_df.to_gbq(table_loc + 'unit_test_convert_unit', project_id = project_id, if_exists = if_exists)

# second target, creating a new column
convert_units_df_alt = pp.convert_units(joined.copy(), 
                                    'meat_and_fish_processing',
                                    0.1,
                                    new_colname='meat_and_fish_processing_alt')

convert_units_df_alt = pp.sort_cols(convert_units_df_alt, ['LSOA11CD', 'Date'])
convert_units_df_alt.to_gbq(table_loc + 'unit_test_convert_unit_alt', project_id = project_id, if_exists = if_exists)

In [None]:
test_data = pp.create_test_data(convert_units_df, static_df_sub, deimos_mock, vax_processed_df)

test_data.to_gbq(table_loc + 'unit_test_tranche_test', project_id = project_id, if_exists = if_exists)

In [None]:
time_tranche_df = pp.create_time_tranches(convert_units_df,
                                         tranche_dates=cf.tranche_dates,
                                         tranche_description = cf.tranche_description)

time_tranche_df.to_gbq(table_loc + 'unit_test_time_tranche', project_id = project_id, if_exists = if_exists)

tranche_order = pp.derive_tranche_order(time_tranche_df, 
                                        tranche_description=cf.tranche_description)

tranche_order.to_gbq(table_loc + 'unit_test_tranche_order', project_id = project_id, if_exists = if_exists)