In [1]:
# In this Spark ETL process we are loading the Covid19 data set(downloaded from - https://github.com/owid/covid-19-data/blob/master/public/data/README.md).

In [27]:
from pyspark.sql import SparkSession
from datetime import datetime, timedelta
from pyspark.sql.functions import *
import os

In [28]:
cwd = os.getcwd()
spark = SparkSession.builder.appName('Covid19Data').getOrCreate()

In [29]:
# Step 1 - Load the Covid19 data set

# Covid19 
file = f'{cwd}/data/raw_data/covid19/owid-covid-data.csv'
cdf = spark.read.csv(path=file, header=True)
cdf.createOrReplaceTempView('CDF')
# cdf.printSchema()

# owid-covid-data.csv (downloaded from - https://github.com/owid/covid-19-data/blob/master/public/data/README.md).

covid19 = spark.sql("""

select  iso_code                 as country_iso_3_code,
        date                     as report_date,
        year(date)               as report_year,        
        coalesce(total_cases, 0) as total_cases,
        population               as population,
        coalesce(cast(total_cases as double)/cast(population as double)*100,0) as cases_population_ratio
from    CDF
where   1=1
        and weekofyear(date)=14
       
""")
covid19.createOrReplaceTempView('covid19')

In [20]:
 spark.sql("""

 select  year(report_date), count(*)
 from    covid19
 group by 1
 """).show(10,False)

# +-------------------------------+--------+
# |year(CAST(report_date AS DATE))|count(1)|
# +-------------------------------+--------+
# |2020                           |1322    |
# |2021                           |1545    |
# +-------------------------------+--------+

In [21]:
# Step 2 - Check Null percentage for each attribute

# spark.sql("""

# select  count(*) as row_count,
#         cast(sum( case when  country_iso_3_code     is not null then 1 else 0 end) as double)/count(*) *100 as NN_iso_code,
#         cast(sum( case when  report_date            is not null then 1 else 0 end) as double)/count(*) *100 as NN_date,
#         cast(sum( case when  total_cases            is not null then 1 else 0 end) as double)/count(*) *100 as NN_total_cases,
#         cast(sum( case when  population             is not null then 1 else 0 end) as double)/count(*) *100 as NN_population,
#         cast(sum( case when  cases_population_ratio is not null then 1 else 0 end) as double)/count(*) *100 as NN_cases_population_ratio
# from    covid19

# """).show(10,False)


# +---------+-----------+-------+--------------+----------------+-------------------------+
# |row_count|NN_iso_code|NN_date|NN_total_cases|NN_population   |NN_cases_population_ratio|
# +---------+-----------+-------+--------------+----------------+-------------------------+
# |2867     |100.0      |100.0  |100.0         |99.2675270317405|100.0                    |
# +---------+-----------+-------+--------------+----------------+-------------------------+

# As we can see the population attribute has some null values (lets remove all rows withou population per country)

In [22]:
# Step 3 - Load Countries data set
countries_file = f'{cwd}/data/dims/dim_countries.csv'
countries_df = spark.read.csv(path=countries_file, header=True)
countries_df.createOrReplaceTempView('Countries')
# countries_df.printSchema()

# countries.csv (downloaded from - https://github.com/google/dspl/blob/master/samples/google/canonical/countries.csv).
# Schema:
# root
#  |-- country_id: string (nullable = true)
#  |-- continent_name: string (nullable = true)
#  |-- country_iso_3_code: string (nullable = true)
#  |-- country_name: string (nullable = true)
#  |-- country_lat: string (nullable = true)
#  |-- country_lng: string (nullable = true)

spark.conf.set("spark.sql.crossJoin.enabled", "true")

countries_df = spark.sql("""

with D
as (    SELECT 2019 as report_year
        union all
        SELECT 2020
        union all
        SELECT 2021),
C as
    (select  country_id                  as country_iso_2_code,
             country_iso_3_code          as country_iso_3_code,
             country_name                as country_name,
             continent_name              as continent_name,        
             country_lat                 as country_lat,
             country_lng                 as country_lng,
             2018                          as join_year
    from Countries)

select  C.country_iso_2_code          as country_iso_2_code,
        C.country_iso_3_code          as country_iso_3_code,
        C.country_name                as country_name,
        C.continent_name              as continent_name,        
        C.country_lat                 as country_lat,
        C.country_lng                 as country_lng,
        cast(D.report_year as string) as report_year
from C join D 
     ON C.join_year <= D.report_year


""")
countries_df.createOrReplaceTempView('Countries')

# countries_df.printSchema()

In [23]:
# spark.sql("""

# select  count(*) as row_count,
#         count(distinct country_iso_2_code) as cd,
#         cast(sum( case when  country_iso_2_code     is not null then 1 else 0 end) as double)/count(*) *100 as NN_country_iso_2_code,
#         cast(sum( case when  country_iso_3_code     is not null then 1 else 0 end) as double)/count(*) *100 as NN_country_iso_3_code,
#         cast(sum( case when  country_name            is not null then 1 else 0 end) as double)/count(*) *100 as country_name,
#         cast(sum( case when  continent_name             is not null then 1 else 0 end) as double)/count(*) *100 as continent_name
# from    Countries

# """).show(10,False)

# +---------+---+---------------------+---------------------+------------+-----------------+
# |row_count|cd |NN_country_iso_2_code|NN_country_iso_3_code|country_name|continent_name   |
# +---------+---+---------------------+---------------------+------------+-----------------+
# |245      |245|100.0                |98.77551020408163    |100.0       |99.18367346938776|
# +---------+---+---------------------+---------------------+------------+-----------------+


# spark.sql("""

# select  country_iso_2_code,
#         count(*) as row_count
# from    Countries
# group by 1
# having count(*)>1
# """).show(10,False)

In [24]:
# TESTS - check that countries are synced in both data freams
# -------------------------------------------------------------
# spark.sql("""

# select  count(distinct country_iso_3_code) as cd_country_iso_3_code,
#         'covid19' as source_
# from    covid19

# union all

# select  count(distinct country_iso_3_code) as cd_country_iso_3_code,
#         'dimCountries' as source_
# from    countries

# """).show(100,False)

# +---------------------+------------+
# |cd_country_iso_3_code|source_     |
# +---------------------+------------+
# |222                  |covid19     |
# |242                  |dimCountries|
# +---------------------+------------+

#  As we can see we have more countries in dim countries, lets check which countries are missing in each data set


# spark.sql("""
# select  c19.iso_3,
#         con.iso_3
# from    (select  distinct lower(country_iso_3_code) as iso_3
#          from    covid19) as c19
#         full join
#         (select  distinct lower(country_iso_3_code) as iso_3
#         from    countries) as con
#         ON c19.iso_3 = con.iso_3
# where   c19.iso_3 is null or con.iso_3 is null
#         and length(c19.iso_3)=3 --Remove invalid iso_3 codes
# """).show(100,False)

#  As we can see we have invalid iso_3 codes lets remove them
#  We still have countries missing in both data sets, for now I will create a fully joined data frame

In [25]:
# Step 4 - Join both data sets and create dim_country enriched with covid 19 data

countries_and_covid_19_week_14_data = spark.sql("""

select  C.country_iso_2_code          as country_iso_2_code,
        C.report_year                 as report_year,
        C19.report_date               as covid19_report_date,
        C19.total_cases               as total_cases,
        C19.population                as population,
        C19.cases_population_ratio    as cases_population_ratio,
        C.country_iso_3_code          as country_iso_3_code,
        C.country_name                as country_name,
        C.continent_name              as continent_name,        
        C.country_lat                 as country_lat,
        C.country_lng                 as country_lng      
from Countries C left join covid19 C19
    ON lower(C19.country_iso_3_code) = lower(C.country_iso_3_code)
       and C.report_year = C19.report_year
  
  
""")
countries_and_covid_19_week_14_data.createOrReplaceTempView('countries_and_covid_19_week_14_data')


In [26]:
# spark.sql("""
       
# select  report_year,
#         count(*) as row_count,
#         count(distinct country_iso_2_code) as country_id,
#         count(distinct country_iso_3_code) as country_iso_3_code,
#         count(distinct country_name) as country_name                      
# from    countries_and_covid_19_week_14_data
# group by 1

# """).show(10,False)


In [11]:
# Step 4 - Join both data sets and create dim_country enriched with covid 19 data

countries_covid_19_week_14_agg = spark.sql("""

select  C.country_iso_2_code          as country_iso_2_code,
        C.report_year                 as report_year,
        C.country_iso_3_code          as country_iso_3_code,
        C.country_name                as country_name,
        C.continent_name              as continent_name,        
        C.country_lat                 as country_lat,
        C.country_lng                 as country_lng,
        max(C19.total_cases)          as total_cases,
        max(C19.population)           as population,
        max(C19.cases_population_ratio) as cases_population_ratio,
        max(C19.cases_population_ratio) as covid19_percentage,
        cast(cast(max(C19.cases_population_ratio) as decimal(3, 2))as string) || "%" as covid19_percentage_label
from Countries C left join covid19 C19
    ON lower(C19.country_iso_3_code) = lower(C.country_iso_3_code)
       and C.report_year = C19.report_year
group by 1,2,3,4,5,6,7

""")
countries_covid_19_week_14_agg.createOrReplaceTempView('countries_covid_19_week_14_agg')


In [12]:
# Test agg data

# spark.sql("""
       
# select  report_year,
#         count(*) as row_count,
#         count(distinct country_iso_2_code) as country_id,
#         count(distinct country_iso_3_code) as country_iso_3_code,
#         count(distinct country_name) as country_name                      
# from    countries_covid_19_week_14_agg
# group by 1

# """).show(10,False)



In [13]:
# Step 5 - Check Null percentage for each attribute

# spark.sql("""

# select  count(*) as row_count,
#         cast(sum( case when  country_iso_2_code     is not null then 1 else 0 end) as double)/count(*) *100 as NN_country_iso_2_code,
#         cast(sum( case when  covid19_report_date    is not null then 1 else 0 end) as double)/count(*) *100 as NN_covid19_report_date,
#         cast(sum( case when  total_cases            is not null then 1 else 0 end) as double)/count(*) *100 as NN_total_cases,
#         cast(sum( case when  population             is not null then 1 else 0 end) as double)/count(*) *100 as NN_population,
#         cast(sum( case when  cases_population_ratio is not null then 1 else 0 end) as double)/count(*) *100 as NN_cases_population_ratio,
#         cast(sum( case when  country_iso_3_code     is not null then 1 else 0 end) as double)/count(*) *100 as NN_country_iso_3_code,
#         cast(sum( case when  country_name           is not null then 1 else 0 end) as double)/count(*) *100 as NN_country_name,
#         cast(sum( case when  country_lat            is not null then 1 else 0 end) as double)/count(*) *100 as NN_country_lat,
#         cast(sum( case when  country_lng            is not null then 1 else 0 end) as double)/count(*) *100 as NN_country_lng
# from    countries_and_covid_19_week_14_data

# """).show(10,False)

# No Null values

In [14]:
# # Step 5 - Check Null percentage for each attribute

# spark.sql("""

# select  count(*) as row_count,
#         cast(sum( case when  country_iso_2_code     is not null then 1 else 0 end) as double)/count(*) *100 as NN_country_iso_2_code,
#         cast(sum( case when  total_cases            is not null then 1 else 0 end) as double)/count(*) *100 as NN_total_cases,
#         cast(sum( case when  population             is not null then 1 else 0 end) as double)/count(*) *100 as NN_population,
#         cast(sum( case when  cases_population_ratio is not null then 1 else 0 end) as double)/count(*) *100 as NN_cases_population_ratio,
#         cast(sum( case when  country_iso_3_code     is not null then 1 else 0 end) as double)/count(*) *100 as NN_country_iso_3_code,
#         cast(sum( case when  country_name           is not null then 1 else 0 end) as double)/count(*) *100 as NN_country_name,
#         cast(sum( case when  country_lat            is not null then 1 else 0 end) as double)/count(*) *100 as NN_country_lat,
#         cast(sum( case when  country_lng            is not null then 1 else 0 end) as double)/count(*) *100 as NN_country_lng
# from    countries_covid_19_week_14_agg

# """).show(10,False)

# No Null values

In [15]:
countries_and_covid_19_week_14_data.repartition(1).write \
.mode('overwrite') \
.format("com.databricks.spark.csv") \
.option("header", "true") \
.option("quoteAll", "true") \
.save("countries_covid19")

!mkdir -p ./data/stg/covid19/
# Copy new file into proper data folder location
!cp ./countries_covid19/*.csv ./data/stg/covid19/countries_and_covid_19_week_14_data.csv
# Delete Spark output folder
!rm -rf ./countries_covid19
# Check that the folder deleted
!ls ./countries_covid19/*.csv

zsh:1: no matches found: ./countries_covid19/*.csv


In [16]:
countries_covid_19_week_14_agg.repartition(1).write \
.mode('overwrite') \
.format("com.databricks.spark.csv") \
.option("header", "true") \
.option("quoteAll", "true") \
.save("./data/stg/countries_covid_19_week_14_agg")

# Copy new file into proper data folder location
!cp ./data/stg/countries_covid_19_week_14_agg/*.csv ./data/stg/covid19/countries_covid_19_week_14_agg.csv
# Delete Spark output folder
!rm -rf ./data/stg/countries_covid_19_week_14_agg
# Check that the folder deleted
!ls ./data/stg/countries_covid_19_week_14_agg/*.csv

zsh:1: no matches found: ./data/stg/countries_covid_19_week_14_agg/*.csv
