# Import data

## Local Files

List the datasets available on the Databricks filesystem.

In [0]:
dbutils.fs.ls('FileStore/tables')

Out[74]: [FileInfo(path='dbfs:/FileStore/tables/demographics.csv', name='demographics.csv', size=1544607, modificationTime=1700869951000),
 FileInfo(path='dbfs:/FileStore/tables/economy.csv', name='economy.csv', size=10228, modificationTime=1700868829000),
 FileInfo(path='dbfs:/FileStore/tables/epidemiology.csv', name='epidemiology.csv', size=520931512, modificationTime=1700870705000),
 FileInfo(path='dbfs:/FileStore/tables/geography.csv', name='geography.csv', size=1005065, modificationTime=1700869953000),
 FileInfo(path='dbfs:/FileStore/tables/government_response.csv', name='government_response.csv', size=17676542, modificationTime=1700869979000),
 FileInfo(path='dbfs:/FileStore/tables/health.csv', name='health.csv', size=122072, modificationTime=1700869980000),
 FileInfo(path='dbfs:/FileStore/tables/mobility.csv', name='mobility.csv', size=234579795, modificationTime=1700870325000),
 FileInfo(path='dbfs:/FileStore/tables/vaccination_access.csv', name='vaccination_access.csv', size=7


## Read Files

The datasets are read as CSV files, for which the first row of each provides column headers. The data types held by each column is inferred from the non-null values it contains.

In [0]:
demographics = spark.read.option('header', True).option('delimiter', ',').option('inferSchema', True).csv('/FileStore/tables/demographics.csv')
economy = spark.read.option('header', True).option('delimiter', ',').option('inferSchema', True).csv('/FileStore/tables/economy.csv')
epidemiology = spark.read.option('header', True).option('delimiter', ',').option('inferSchema', True).csv('/FileStore/tables/epidemiology.csv')
geography = spark.read.option('header', True).option('delimiter', ',').option('inferSchema', True).csv('/FileStore/tables/geography.csv')
government_response = spark.read.option('header', True).option('delimiter', ',').option('inferSchema', True).csv('/FileStore/tables/government_response.csv')
health = spark.read.option('header', True).option('delimiter', ',').option('inferSchema', True).csv('/FileStore/tables/health.csv')
mobility = spark.read.option('header', True).option('delimiter', ',').option('inferSchema', True).csv('/FileStore/tables/mobility.csv')
# TODO remove this.
vaccination_access = spark.read.option('header', True).option('delimiter', ',').option('inferSchema', True).csv('/FileStore/tables/vaccination_access.csv')
vaccinations = spark.read.option('header', True).option('delimiter', ',').option('inferSchema', True).csv('/FileStore/tables/vaccinations.csv')


# Cleaning and Transformation

## Demographics

For every dataset, we keep only rows that specify countries (not regions therein). We characterize a population as 'young', 'mid', or 'old' based on the number of people who belong to each of a set of age ranges.

In [0]:
from functools import reduce
from itertools import chain
from operator import add
from pyspark.ml.feature import Imputer
from pyspark.sql.functions import col, create_map, greatest, lit, max, posexplode

demographics = demographics.filter('location_key NOT LIKE "%\_%"')

demographics_imputer_columns = list(set(demographics.columns).difference(['location_key']))
demographics_imputer = Imputer(inputCols=demographics_imputer_columns, outputCols=demographics_imputer_columns)
demographics = demographics_imputer.fit(demographics).transform(demographics)

young_columns = ['population_age_00_09', 'population_age_10_19', 'population_age_20_29']
mid_columns = ['population_age_30_39', 'population_age_40_49', 'population_age_50_59']
old_columns = ['population_age_60_69', 'population_age_70_79', 'population_age_80_and_older']

demographics = demographics.withColumn('young', reduce(add, [col(x) for x in young_columns])) \
                           .withColumn('mid', reduce(add, [col(x) for x in mid_columns])) \
                           .withColumn('old', reduce(add, [col(x) for x in old_columns])) \
                           .drop(*(young_columns + mid_columns + old_columns))

demographics = demographics.withColumn('max_value', greatest('young', 'mid', 'old')) \
                           .select('*', posexplode(create_map(list(chain(*[(lit(c), col(c)) for c in demographics.columns]))))) \
                           .filter('max_value = value') \
                           .select(demographics.columns + [col('key').alias('population_age')]) \
                           .drop(*['young', 'mid', 'old'])


## Economy

Locations finer than country are once again filtered.

In [0]:
economy = economy.filter('location_key NOT LIKE "%\_%"')

economy_imputer_columns = list(set(economy.columns).difference(['location_key']))
economy_imputer = Imputer(inputCols=economy_imputer_columns, outputCols=economy_imputer_columns)
economy = economy_imputer.fit(economy).transform(economy)


## Epidemiology

We aggregate time series data by average or maximum according to whether the input data is a daily measure or a cumulative one respectively. From each aggregate, we determine the percentage of the associated population that it constitutes.

In [0]:
from pyspark.sql.functions import avg

epidemiology = epidemiology.filter('location_key NOT LIKE "%\_%"')

epidemiology = epidemiology.groupBy('location_key') \
                           .agg(avg('new_confirmed'), avg('new_deceased'), avg('new_recovered'), avg('new_tested'), \
                                max('cumulative_confirmed'), max('cumulative_deceased'), max('cumulative_recovered'), max('cumulative_tested'))

epidemiology = epidemiology.join(demographics.select('location_key', 'population'), on='location_key', how='fullouter') \
                             .withColumn('percentage_new_confirmed', col('avg(new_confirmed)') / col('population')) \
                             .withColumn('percentage_new_deceased', col('avg(new_deceased)') / col('population')) \
                             .withColumn('percentage_new_recovered', col('avg(new_recovered)') / col('population')) \
                             .withColumn('percentage_new_tested', col('avg(new_tested)') / col('population')) \
                             .withColumn('percentage_cumulative_confirmed', col('max(cumulative_confirmed)') / col('population')) \
                             .withColumn('percentage_cumulative_deceased', col('max(cumulative_deceased)') / col('population')) \
                             .withColumn('percentage_cumulative_recovered', col('max(cumulative_recovered)') / col('population')) \
                             .withColumn('percentage_cumulative_tested', col('max(cumulative_tested)') / col('population')) \
                             .drop('avg(new_confirmed)', \
                                   'avg(new_deceased)', \
                                   'avg(new_recovered)', \
                                   'avg(new_tested)', \
                                   'max(cumulative_confirmed)', \
                                   'max(cumulative_deceased)', \
                                   'max(cumulative_recovered)', \
                                   'max(cumulative_tested)', \
                                   'population')

epidemiology_imputer_columns = list(set(epidemiology.columns).difference(['location_key']))
epidemiology_imputer = Imputer(inputCols=epidemiology_imputer_columns, outputCols=epidemiology_imputer_columns)
epidemiology = epidemiology_imputer.fit(epidemiology).transform(epidemiology)


## Geography

We remove an unecessary column, and replace absolute measures of rural and urban area with percentages of total area.

In [0]:
geography = geography.filter('location_key NOT LIKE "%\_%"')

geography = geography.drop('openstreetmap_id')

# TODO remove these keys from other datasets.

geography = geography.filter('location_key NOT IN ("IO", "SJ", "MC", "VA")')

geography = geography.withColumn('percentage_area_rural_sq_km', col('area_rural_sq_km') / col('area_sq_km')) \
                     .withColumn('percentage_area_urban_sq_km', col('area_urban_sq_km') / col('area_sq_km')) \
                     .drop('area_rural_sq_km', 'area_urban_sq_km')

geography_imputer_columns = list(set(geography.columns).difference(['location_key']))
geography_imputer = Imputer(inputCols=geography_imputer_columns, outputCols=geography_imputer_columns)
geography = geography_imputer.fit(geography).transform(geography)


## Government Response

Time series data over ordinal scaled features are aggregated with mode.

In [0]:
from pyspark.sql.functions import mode, when

government_response = government_response.filter('location_key NOT LIKE "%\_%"')

government_response = government_response.groupBy('location_key') \
                                         .agg(mode('school_closing').alias('school_closing_severity'), \
                                              mode('workplace_closing').alias('workplace_closing_severity'), \
                                              mode('cancel_public_events').alias('cancel_public_events_severity'), \
                                              mode('restrictions_on_gatherings').alias('restrictions_on_gatherings_severity'), \
                                              mode('public_transport_closing').alias('public_transport_closing_severity'), \
                                              mode('stay_at_home_requirements').alias('stay_at_home_requirements_severity'), \
                                              mode('restrictions_on_internal_movement').alias('restrictions_on_internal_movement_severity'), \
                                              mode('international_travel_controls').alias('international_travel_controls_severity'), \
                                              mode('debt_relief').alias('debt_relief_extent'), \
                                              mode('public_information_campaigns').alias('public_information_campaigns_extent'), \
                                              mode('testing_policy').alias('testing_policy_severity'), \
                                              mode('contact_tracing').alias('contact_tracing_severity'), \
                                              mode('facial_coverings').alias('facial_coverings_severity'), \
                                              mode('vaccination_policy').alias('vaccination_policy_extent'))

government_response_imputer_columns = list(set(government_response.columns).difference(['location_key']))
government_response_imputer = Imputer(
     strategy='mode', inputCols=government_response_imputer_columns, outputCols=government_response_imputer_columns)
government_response = government_response_imputer.fit(government_response).transform(government_response)


## Health

The only transformation applied is to infer missing values.

In [0]:
health = health.filter('location_key NOT LIKE "%\_%"')

health_imputer_columns = list(set(health.columns).difference(['location_key']))
health_imputer = Imputer(inputCols=health_imputer_columns, outputCols=health_imputer_columns)
health = health_imputer.fit(health).transform(health)


## Mobility

We aggregate time series data with average values.

In [0]:
mobility = mobility.filter('location_key NOT LIKE "%\_%"')

mobility = mobility.groupBy('location_key') \
                   .agg(avg('mobility_retail_and_recreation').alias('retail_and_recreation'), \
                        avg('mobility_grocery_and_pharmacy').alias('grocery_and_pharmacy'), \
                        avg('mobility_parks').alias('parks'), \
                        avg('mobility_transit_stations').alias('transit_stations'), \
                        avg('mobility_workplaces').alias('workplaces'), \
                        avg('mobility_residential').alias('residential'))

mobility_imputer_columns = list(set(mobility.columns).difference(['location_key']))
mobility_imputer = Imputer(inputCols=mobility_imputer_columns, outputCols=mobility_imputer_columns)
mobility = mobility_imputer.fit(mobility).transform(mobility)


## Vaccinations

We select a small subset of the features from this dataset, and aggregate values by the same method described under epidemiology.

In [0]:
vaccinations = vaccinations.filter('location_key NOT LIKE "%\_%"')

vaccinations = vaccinations.groupBy('location_key') \
                           .agg(avg('new_persons_vaccinated'), \
                                avg('new_persons_fully_vaccinated'), \
                                max('cumulative_persons_vaccinated'), \
                                max('cumulative_persons_fully_vaccinated'))  

vaccinations = vaccinations.join(demographics.select(['location_key', 'population']), on='location_key', how='fullouter') \
                                             .withColumn('percentage_new_persons_vaccinated', col('avg(new_persons_vaccinated)') / col('population')) \
                                             .withColumn('percentage_new_persons_fully_vaccinated', col('avg(new_persons_fully_vaccinated)') / col('population')) \
                                             .withColumn('percentage_cumulative_persons_vaccinated', col('max(cumulative_persons_vaccinated)') / col('population')) \
                                             .withColumn('percentage_cumulative_persons_fully_vaccinated', col('max(cumulative_persons_fully_vaccinated)') / col('population')) \
                                             .drop('avg(new_persons_vaccinated)', \
                                                   'avg(new_persons_fully_vaccinated)', \
                                                   'max(cumulative_persons_vaccinated)', \
                                                   'max(cumulative_persons_fully_vaccinated)', \
                                                   'population')

vaccinations_imputer_columns = list(set(vaccinations.columns).difference(['location_key']))
vaccinations_imputer = Imputer(inputCols=vaccinations_imputer_columns, outputCols=vaccinations_imputer_columns)
vaccinations = vaccinations_imputer.fit(vaccinations).transform(vaccinations)