# SA coding assessment: Data Engineering, COVID
## Version 2022.09

What you'll do:
* We provide the dataset. You will load it into dataframes, and perform some data cleansing, transformation, and analytics tasks.
* You will answer a series of questions to show insights from the data.
* There are also some written-answer questions.

*We care about the process, not the result.*  I.e., we're looking for proper use of data engineering techniques and understanding of the code you've written.  

This Data Engineering section is scored out of 65 points.

**Important:** Do not use pandas or single node APIs. Please use PySpark or Spark SQL to answer this question set.

In [0]:
# This folder is for you to write any data as needed. Write access is restricted elsewhere. You can always read from dbfs.
aws_role_id = "AROAUQVMTFU2DCVUR57M2"
user = dbutils.notebook.entry_point.getDbutils().notebook().getContext().tags().apply('user')
userhome = f"s3a://e2-interview-user-data/home/{aws_role_id}:{user}"
print(userhome)

s3a://e2-interview-user-data/home/AROAUQVMTFU2DCVUR57M2:rashiranjith@gmail.com


## JHU Covid Data Set
The following questions use the covid data set from https://github.com/CSSEGISandData/COVID-19/. Specifically, the data in https://github.com/CSSEGISandData/COVID-19/tree/master/csse_covid_19_data/csse_covid_19_daily_reports

For your convenience, we've preloaded the data set to `dbfs:/interview-datasets/sa/covid-19-daily-cleaned/`. Please do not download the latest version of the dataset, as there are schema irregularities.

For all questions relying on timestamps, use the `Last_Update` column as the source of truth.

In [0]:
%scala

import java.net.URL
import java.io.File
import org.apache.commons.io.FileUtils

val tmpFile = new File("/tmp/rows.csv")
FileUtils.copyURLToFile(new URL("https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/csse_covid_19_data/csse_covid_19_daily_reports/01-01-2022.csv?accessType=DOWNLOAD"), tmpFile)

In [0]:
# https://docs.python.org/3/library/hashlib.html#blake2
from hashlib import blake2b

user = dbutils.notebook.entry_point.getDbutils().notebook().getContext().tags().apply('user')
h = blake2b(digest_size=4)
h.update(user.encode("utf-8"))
display_name = "user_" + h.hexdigest()
print("Display Name: " + display_name)

# dbutils.fs.cp('file:/tmp/rows.json', userhome + '/rows.json')
# dbutils.fs.cp(userhome + '/rows.json' ,f"dbfs:/tmp/{display_name}/rows.json")
dbutils.fs.cp('file:/tmp/rows.csv',f"dbfs:/tmp/{display_name}/rows.csv")
covid_data_path = f"dbfs:/tmp/{display_name}/rows.csv"

print("Covid Data Path: " + covid_data_path)
dbutils.fs.head(covid_data_path)

Display Name: user_bbfa5ce0
Covid Data Path: dbfs:/tmp/user_bbfa5ce0/rows.csv
[Truncated to first 65536 bytes]


'FIPS,Admin2,Province_State,Country_Region,Last_Update,Lat,Long_,Confirmed,Deaths,Recovered,Active,Combined_Key,Incident_Rate,Case_Fatality_Ratio\n,,,Afghanistan,2022-01-02 04:20:52,33.93911,67.709953,158107,7356,,,Afghanistan,406.1488261213084,4.652545428096163\n,,,Albania,2022-01-02 04:20:52,41.1533,20.1683,210224,3217,,,Albania,7305.024671624156,1.5302724712687419\n,,,Algeria,2022-01-02 04:20:52,28.0339,1.6596,218818,6284,,,Algeria,499.00295416006406,2.871792996919815\n,,,Andorra,2022-01-02 04:20:52,42.5063,1.5218,23740,140,,,Andorra,30725.425483724845,0.5897219882055602\n,,,Angola,2022-01-02 04:20:52,-11.2027,17.8739,82398,1772,,,Angola,250.70689498424343,2.150537634408602\n,,,Antigua and Barbuda,2022-01-02 04:20:52,17.0608,-61.7964,4283,119,,,Antigua and Barbuda,4373.621436157177,2.7784263366798974\n,,,Argentina,2022-01-02 04:20:52,-38.4161,-63.6167,5674428,117181,,,Argentina,12555.217271737578,2.0650715807831204\n,,,Armenia,2022-01-02 04:20:52,40.0691,45.0382,344980,7975,,,Armeni

In [0]:
dbutils.fs.ls(covid_data_path)

[FileInfo(path='dbfs:/tmp/user_bbfa5ce0/rows.csv', name='rows.csv', size=559258, modificationTime=1706259528000)]

###JHU Question 1 [10 Points]
Write code that uses the DataFrame API to read in the JHU Covid Data set with clearly named columns, and appropriate data types. For example, `Last_Update` should be a timestamp. Ensure you ignore files that are not `.csv`. Do not drop any null values from the data set at this point.

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
from pyspark.sql.functions import col

covid_data_path = "dbfs:/tmp/user_bbfa5ce0/*.csv" 

schema = StructType([
    StructField("FIPS", IntegerType()),
    StructField("Admin2", StringType()),
    StructField("Province_State", StringType()),
    StructField("Country_Region", StringType()),
    StructField("Last_Update", TimestampType()),
    StructField("Lat", StringType()),
    StructField("Long_", StringType()),
    StructField("Confirmed", IntegerType()),
    StructField("Deaths", IntegerType()),
    StructField("Recovered", IntegerType()),
    StructField("Active", IntegerType()),
    StructField("Combined_Key", StringType()),
    StructField("Incident_Rate", StringType()),
    StructField("Case_Fatality_Ratio", StringType())
])


q1_df = spark.read.schema(schema)\
    .format("csv")\
    .option("header", "true")\
    .option("ignoreLeadingWhiteSpace", "true")\
    .option("ignoreTrailingWhiteSpace", "true")\
    .load(covid_data_path)

In [0]:
# display(df_covid_dataset)
q1_df.count()
# q1_df.printSchema()

4016

### JHU Question 2 [5 Points]
Using the `Last_Update` column and without dropping any rows or nulls, how many distinct month values are possible in the dataset? Create a dataframe with column `month` that contains the unique values and show using `display(...)`

In [0]:
from pyspark.sql import functions as F

df_month = q1_df.withColumn("month", F.month("Last_Update"))

# Find distinct month values
q2_df = df_month.select("month").distinct()

# Show the DataFrame
display(q2_df)

month
12
1
4
8
7
10
11


### JHU Question 3 [10 Points]
Which are the top 3 countries with the most confirmed COVID cases?

In [0]:
from pyspark.sql.functions import col,sum

q3_df = q1_df.groupBy("Country_Region") \
                                            .agg(sum("Confirmed").alias("TotalConfirmedCases")) \
                                                .orderBy(col('TotalConfirmedCases').desc())
                                         

display(q3_df)                     

Country_Region,TotalConfirmedCases
US,55099969
India,34889132
Brazil,22295621
United Kingdom,13174530
Russia,10340011
France,10296909
Turkey,9519281
Germany,7115992
Spain,6294745
Italy,6266939


### JHU Question 4 [15 Points]
You've been asked to create a summary table for all countries that shows the average change in the ratio of new daily recoveries compared to the change in new daily cases. Create a new column in this new dataframe called `Convalescence_Rate` that contains the grouped averages to provide a per country breakdown of new recoveries compared to number of new cases per country.

To accomplish this, 
1. Create a window spec that partitions by country and orders by `Last_Update`
2. use the window spec to calculate `Previous_Cases` and `Previous_Recovered`, the confirmed cases and recoveries for the previous row. Hint: you will need to use an additional Spark SQL function to calculate this.
3. Then, calculate `New_Cases` and `New_Recovered`, the differences between the current row's cases and recoveries compared to the previous row. 
4. Finally, create a summary table grouped by `Country_Region` with column `Convalescence_Rate` that shows the average ratio of `New_Recovered` to `New_Cases`.

What are the top 3 countries with the highest `Convalescence_Rate` values? Use pyspark functions to sort the displayed table in descending order.

In [0]:
from pyspark.sql.functions import col,lag,when,avg
from pyspark.sql.window import Window

window_spec = Window.partitionBy("Country_Region").orderBy("Last_Update")

windowed_df = (q1_df
       .select("Last_Update", "Country_Region", "Confirmed", "Recovered")
       .dropna() # without this you'll get incorrect answers
       .withColumn("Previous_Cases", lag("Confirmed").over(window_spec))
       .withColumn("Previous_Recovered", lag("Recovered").over(window_spec))
       .withColumn("New_Cases", col("Confirmed") - col("Previous_Cases"))
       .withColumn("New_Recovered", col("Recovered") - col("Previous_Recovered"))
)

q4_df = (windowed_df
        .withColumn("Recovery_Ratio", when(col("New_Cases") != 0, col("New_Recovered") / col("New_Cases")).otherwise(0))
        .groupBy("Country_Region")
        .agg(avg("Recovery_Ratio").alias("Convalescence_Rate"))
        .select("Country_Region", "Convalescence_Rate")
        .orderBy(col("Convalescence_Rate").desc())
        # .limit(3)
       # sort me and give just 3 answers
)

display(q4_df)

Country_Region,Convalescence_Rate
Antarctica,0.0
"Korea, North",0.0
Nauru,0.0
New Zealand,0.0
Tuvalu,0.0
Ukraine,0.0
United Kingdom,0.0
Winter Olympics 2022,0.0


### JHU Question 5 [15 Points]

Using a sliding 3 month window, what was the average recovery rate (`Recovered` divided by `Confirmed`) for each country?

To calculate:
1. Use pypsark.sql.functions.window to create the 3 month (91 day) window
1. Create a summary dataframe grouped by `country_region` with column, `Recovery_Rate`, that contains the average recovery rate. 
1. Use pyspark functions to sort the displayed table in descending order.

In [0]:
from pyspark.sql.functions import col,avg,window

window_spec = window("Last_Update", "91 days")

q5_df = (q1_df
       .select("Last_Update", "Country_Region", "Confirmed", "Recovered")
       .withColumn("Recovery_Rate", col("Recovered") / col("Confirmed"))
       .groupBy("Country_Region", window_spec)
       .agg(
         avg("Recovery_Rate").alias("Recovery_Rate")
        )
       .select("Country_Region","Recovery_Rate")
       .orderBy(col("Recovery_Rate").desc())
       # sort me...
)

display(q5_df)

Country_Region,Recovery_Rate
United Kingdom,0.0
Antarctica,0.0
Kyrgyzstan,
Burundi,
Cyprus,
Dominica,
Panama,
Burkina Faso,
Equatorial Guinea,
Saudi Arabia,


### JHU Question 6 [10 Points]

Save the table from Question 4 into a folder in your user_home with the expectation that your table will be used for analytics. Choose a file format that makes sense for an analytics use case.

Why did you choose the file format you did?

In [0]:
# Assuming q4_df is the DataFrame from Question 4
q6_df = q4_df

# Write the DataFrame to a Parquet file
q6_df.write.mode("overwrite").parquet("dbfs:/user/home/root/q4_output") 

# Display the DataFrame to confirm its content
display(q6_df)


Country_Region,Convalescence_Rate
Antarctica,0.0
"Korea, North",0.0
Nauru,0.0
New Zealand,0.0
Tuvalu,0.0
Ukraine,0.0
United Kingdom,0.0
Winter Olympics 2022,0.0


In [0]:
import os
username = os.environ['USER']
print(username)

root
