# Coronavirus Daily Report


Running this notebook every day updates the cases and deaths report by country/region and reports the Pearson correlation. This notebook utilizes the Spark SQL and Spark MLlib frameworks.

### Import Packages




In [1]:
import sys.process._
import spark.{sparkContext => sc}
import org.apache.spark.sql.types._

## Dataset Formation

### Downloading the Kaggle Datasets using Kaggle API


Use bash to access the Kaggle API and download the datasets. To use the API I had to download my account's kaggle.json file and move it into ~/.kaggle. Below we use bash commands to perform the necessary operations.




In [3]:
"kaggle datasets files sudalairajkumar/novel-corona-virus-2019-dataset" !

name                                     size  creationDate         
-------------------------------------  ------  -------------------  
COVID19_open_line_list.csv                3MB  2020-04-30 05:26:54  
covid_19_data.csv                         1MB  2020-04-30 05:26:54  
time_series_covid_19_confirmed_US.csv  1013KB  2020-04-30 05:26:54  
time_series_covid_19_deaths.csv          65KB  2020-04-30 05:26:54  
time_series_covid_19_deaths_US.csv      974KB  2020-04-30 05:26:54  
COVID19_line_list_data.csv              359KB  2020-04-30 05:26:54  
time_series_covid_19_recovered.csv       70KB  2020-04-30 05:26:54  
time_series_covid_19_confirmed.csv       84KB  2020-04-30 05:26:54  


0

For this report, we need to download two of the CSV files and later join them using SQL.




In [5]:
"kaggle datasets download sudalairajkumar/novel-corona-virus-2019-dataset -f time_series_covid_19_confirmed.csv -p /home/waldenr1_gmail_com/polynote/notebooks/finalProject/data" !

time_series_covid_19_confirmed.csv: Skipping, found more recently modified local copy (use --force to force download)


0

In [6]:
"kaggle datasets download sudalairajkumar/novel-corona-virus-2019-dataset -f time_series_covid_19_deaths.csv -p /home/waldenr1_gmail_com/polynote/notebooks/finalProject/data" !

time_series_covid_19_deaths.csv: Skipping, found more recently modified local copy (use --force to force download)


0

### Loading the Dataset


Here I read the .csv files into separate Spark DataFrames. Since these files are only updated daily they can be treated as static so I allow for the schema to first be inferred. After initial inference, I cast the columns values to SQL DataTypes by programmatically enforcing a custom schema.




In [8]:
val path = "/home/waldenr1_gmail_com/polynote/notebooks/finalProject/data/"

val test_df = spark
                .read
                .format("csv")
                .option("header", "true")
                .csv(path + "time_series_covid_19_deaths.csv")

val schemaUpdate = StructType(
                test_df.columns.slice(0,2).map(StructField(_ , StringType, true)) ++ 
                test_df.columns.slice(2,4).map(StructField(_ , DoubleType, true)) ++ 
                test_df.columns.drop(4).map(StructField(_ , IntegerType, true))
                )

val confirmed_df = spark
                    .read
                    .format("csv")
                    .option("header", "true")
                    .schema(schemaUpdate)
                    .csv(path + "time_series_covid_19_confirmed.csv")

val deaths_df = spark
                    .read
                    .format("csv")
                    .option("header", "true")
                    .schema(schemaUpdate)
                    .csv(path + "time_series_covid_19_deaths.csv")

### Preparing Dataset for SQL Interpretation

The .csv files I downloaded are not SQL friendly due to columns containing the "/" character. By replacing all of the "/" characters with "_" characters, the DataFrames can be interpreted by SQL. Lastly, I use scala to grab the last column name which holds the newest data (this operation cannot be performed in SQL).



In [10]:
val newCols = deaths_df.columns.map(_.replace("/", "_"))
val deaths_df_newCols = deaths_df.toDF(newCols:_*)
val confirmed_df_newCols = confirmed_df.toDF(newCols:_*)
val newest = deaths_df_newCols.columns.last

### Make Data Visible to SQL 

The following cell simply exposes the DataFrames to the SQL interpreter as "deaths" and "confirmed" respectively.



In [12]:
deaths_df_newCols.createGlobalTempView("deaths")
confirmed_df_newCols.createGlobalTempView("confirmed")

## Generating the Daily Report (Spark SQL)


Here we form the Daily Report


### Join and Query

This SQL query sums the deaths and cases for each country/region in the dataset for the most recent day.




In [14]:
val agg_df = spark.sql( " SELECT deaths.Country_Region AS Country_Region," + 
                        " SUM(confirmed." + newest + ") AS Cases," +
                        " SUM(deaths." + newest + ") AS Deaths" +
                        " FROM global_temp.deaths deaths" +
                        " JOIN global_temp.confirmed confirmed" +
                        " ON deaths.Country_Region = confirmed.Country_Region" +
                        " GROUP BY deaths.Country_Region" +
                        " ORDER BY Cases DESC"
                        )

### Summarizing the Results


Here we summarize the results from our SQL query output.




In [16]:
val summary_df = agg_df.describe("Cases", "Deaths")

### Calculate Deaths Pearson Correlation (Spark MLlib) 

Here we use the VectorAssembler function to create a feature vector column from a multicolumn DataFrame, then calculate the Pearson correlation between the Cases and Deaths.



In [18]:
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.stat.Correlation

val assembler = new VectorAssembler()
                    .setInputCols(Array("Cases", "Deaths"))
                    .setOutputCol("features")

val features = assembler.transform(agg_df)

val pearson_corr_mat = Correlation.corr(features, "features").head