# Init SparkContext

In [1]:
import os
from datetime import datetime
from pyspark import SparkContext, HiveContext
from pyspark.sql import SparkSession, SQLContext
import pyspark.sql.functions as sf

In [4]:
spark = (
    SparkSession.builder.appName("covid19-benchmark-{}".format(datetime.today()))
                .master("spark://spark-master:7077")
                .getOrCreate()
)
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
spark.conf.set("spark.sql.execution.arrow.pyspark.fallback.enabled","true")

In [4]:
sqlContext = SQLContext(spark)



In [7]:
spark.sql("SHOW databases").show()

+---------+
|namespace|
+---------+
|  default|
+---------+



In [8]:
spark.sql("SHOW tables").show()

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
+---------+---------+-----------+



# Load and create Catalog table

In [9]:
spark_covid19_cases_position = spark.read.parquet("s3a://warehouse/bronze/medical/covid19_cases_position.pq")
spark_covid19_cases_position.createOrReplaceTempView("covid19_cases_position")

In [10]:
spark_covid19_country_wise = spark.read.parquet("s3a://warehouse/bronze/medical/covid19_country_wise.pq")
spark_covid19_country_wise.createOrReplaceTempView("covid19_country_wise")

In [11]:
spark_covid19_time_series = spark.read.parquet("s3a://warehouse/bronze/medical/covid19_time_series.pq")
spark_covid19_time_series.createOrReplaceTempView("covid19_time_series")

In [12]:
spark_covid19_worldometer = spark.read.parquet("s3a://warehouse/bronze/medical/covid19_worldometer.pq")
spark_covid19_worldometer.createOrReplaceTempView("covid19_worldometer")

In [13]:
spark.sql("SHOW TABLES").show()

+---------+--------------------+-----------+
|namespace|           tableName|isTemporary|
+---------+--------------------+-----------+
|         |covid19_cases_pos...|      false|
|         |covid19_country_wise|      false|
|         | covid19_time_series|      false|
|         | covid19_worldometer|      false|
+---------+--------------------+-----------+



# Benchmark

## 1. covid19_daily_stats

In [12]:
sql_stm1 = """
-- 1
SELECT 
    t.`date`, 
    t.country_region, 
    t.confirmed, 
    t.deaths, 
    t.recovered
FROM 
    covid19_time_series AS t
JOIN 
    covid19_cases_position AS c 
ON 
    t.country_region = c.country_region
JOIN 
    covid19_country_wise AS w 
ON 
    t.country_region = w.country_region
WHERE 
    t.confirmed > 0 OR 
    t.deaths > 0 OR 
    t.recovered > 0;
"""

In [7]:
%%timeit -r 4
spark.sql(sql_stm1).toPandas()

1.07 s ± 252 ms per loop (mean ± std. dev. of 4 runs, 1 loop each)


In [13]:
covid19_daily_stats = spark.sql(sql_stm1)
covid19_daily_stats.show()

+----------+--------------+---------+------+---------+
|      date|country_region|confirmed|deaths|recovered|
+----------+--------------+---------+------+---------+
|2020-01-22|         China|      548|    17|       28|
|2020-01-22|         China|      548|    17|       28|
|2020-01-22|         China|      548|    17|       28|
|2020-01-22|         China|      548|    17|       28|
|2020-01-22|         China|      548|    17|       28|
|2020-01-22|         China|      548|    17|       28|
|2020-01-22|         China|      548|    17|       28|
|2020-01-22|         China|      548|    17|       28|
|2020-01-22|         China|      548|    17|       28|
|2020-01-22|         China|      548|    17|       28|
|2020-01-22|         China|      548|    17|       28|
|2020-01-22|         China|      548|    17|       28|
|2020-01-22|         China|      548|    17|       28|
|2020-01-22|         China|      548|    17|       28|
|2020-01-22|         China|      548|    17|       28|
|2020-01-2

In [20]:
covid19_daily_stats.repartition(1).write.mode("overwrite").parquet("s3a://warehouse/silver/ecom/covid19_daily_stats.pq")

## 2. covid19_continent_stats

In [6]:
sql_stm2 = """
-- 1
SELECT 
    w.continent AS Continent, 
    SUM(cw.confirmed) AS TotalCases, 
    SUM(cw.deaths) AS TotalDeaths, 
    SUM(cw.recovered) AS TotalRecovered 
FROM 
    covid19_worldometer AS w 
JOIN 
    covid19_cases_position AS c 
ON 
    w.country_region = c.country_region 
JOIN 
    covid19_country_wise AS cw 
ON 
    w.country_region = cw.country_region 
GROUP BY 
    w.continent;
"""

In [12]:
%%timeit -r 4
spark.sql(sql_stm2).toPandas()

735 ms ± 240 ms per loop (mean ± std. dev. of 4 runs, 1 loop each)


In [7]:
covid19_continent_stats = spark.sql(sql_stm2)
covid19_continent_stats.show()

+-----------------+----------+-----------+--------------+
|        Continent|TotalCases|TotalDeaths|TotalRecovered|
+-----------------+----------+-----------+--------------+
|           Europe|   4860471|     476988|       2430932|
|           Africa|    828239|      17759|        487793|
|Australia/Oceania|    124070|       1358|         76031|
|    North America|   2049543|     157599|        428613|
|    South America|   3780484|     135506|       2714173|
|             Asia|   3799878|      85740|       2722251|
+-----------------+----------+-----------+--------------+



In [21]:
covid19_continent_stats.repartition(1).write.mode("overwrite").parquet("s3a://warehouse/silver/ecom/covid19_continent_stats.pq")

In [8]:
type(covid19_continent_stats)

pyspark.sql.dataframe.DataFrame

## TEST

In [15]:
sql_stm = """
    SELECT 
        cw.country_region,
        w.continent,
        w.population,
        cw.confirmed,
        cw.deaths,
        cw.recovered,
        cw.who_region
    FROM 
        covid19_country_wise AS cw 
    JOIN covid19_worldometer AS w
    ON w.country_region = cw.country_region
    WHERE 
        cw.confirmed > 0 OR
        cw.deaths > 0 OR
        cw.recovered > 0;
"""
sparkDF = spark.sql(sql_stm)
sparkDF.show()

+-------------------+-----------------+----------+---------+------+---------+--------------------+
|     country_region|        continent|population|confirmed|deaths|recovered|          who_region|
+-------------------+-----------------+----------+---------+------+---------+--------------------+
|        Afghanistan|             Asia| 3.90094E7|    36263|  1269|    25198|Eastern Mediterra...|
|            Albania|           Europe| 2877470.0|     4880|   144|     2745|              Europe|
|            Algeria|           Africa| 4.39261E7|    27973|  1163|    18837|              Africa|
|            Andorra|           Europe|   77278.0|      907|    52|      803|              Europe|
|             Angola|           Africa| 3.29563E7|      950|    41|      242|              Africa|
|Antigua and Barbuda|    North America|   98010.0|       86|     3|       65|            Americas|
|          Argentina|    South America| 4.52369E7|   167416|  3059|    72575|            Americas|
|         