In [1]:
import collections
import os
import glob
from typing import List
from pyspark.sql import SQLContext, SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder\
                    .master("local") \
                    .appName("covid19-dashboard-etl") \
                    .enableHiveSupport() \
                    .getOrCreate()
display(spark)

22/08/30 00:20:51 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [2]:
class ParquetEnumerator:
    def __init__(self, payload_root:str, shards:List[str]):
        self._payload_root = payload_root
        self._shards = shards
    
    def list_all(self, path_validation:bool=True) -> List[str]:
        ParquetPath = collections.namedtuple("ParquetPath", self._shards + ["path"])
        
        fmt = self._payload_root
        for shard in self._shards:
            fmt = os.path.join(fmt, "{}=*".format(shard))
        fmt = os.path.join(fmt, "*.parquet")
        
        sols = []
        for path_ in glob.glob(fmt):
            if path_validation:
                for shard, subpath in zip(self._shards, path_.replace(self.payload_root, "").split(os.sep)[1:-1]):
                    tokens = subpath.split("=")
                    assert tokens[0] == shard
            sols.append(path_)
        return sols
        
    @property
    def payload_root(self) -> str:
        return self._payload_root

In [3]:
path_parquets = ParquetEnumerator("./source", []).list_all()

#target_date = "2022-08-25"
target_date = None

# Load to Daily-Bases Data: `daily`
----
* Source: original data (`bing_covid-19_data.parquet`) 
* Target: `daily`
    * daily-bases data,
    * without **"Worldwide"**

In [4]:
ds = spark.read.parquet(*path_parquets)
if target_date is not None:
    ds = ds.filter(F.col("updated") == target_date)
ds = ds.filter(F.col("country_region") != "Worldwide") \
       .filter((F.col("confirmed_change") >= 0) & (F.col("deaths_change") >= 0)) \
       .fillna(0)

ds.sort(F.col("updated"), F.col("country_region"), F.col("admin_region_1"), F.col("admin_region_2")) \
  .write \
  .option("maxRecordsPerFile", 262144) \
  .option("header", True) \
  .partitionBy("updated") \
  .mode("append") \
  .parquet("daily")

                                                                                

In [5]:
display(ds.limit(100).toPandas())
ds.printSchema()

Unnamed: 0,id,updated,confirmed,confirmed_change,deaths,deaths_change,recovered,recovered_change,latitude,longitude,iso2,iso3,country_region,admin_region_1,iso_subdivision,admin_region_2,load_time
0,340556,2020-02-25,1,0,0,0,0,0,33.83114,66.02471,AF,AFG,Afghanistan,,,,2022-08-28 08:04:17.466
1,340557,2020-02-26,1,0,0,0,0,0,33.83114,66.02471,AF,AFG,Afghanistan,,,,2022-08-28 08:04:17.466
2,340558,2020-02-27,1,0,0,0,0,0,33.83114,66.02471,AF,AFG,Afghanistan,,,,2022-08-28 08:04:17.466
3,340559,2020-02-28,1,0,0,0,0,0,33.83114,66.02471,AF,AFG,Afghanistan,,,,2022-08-28 08:04:17.466
4,340560,2020-02-29,1,0,0,0,0,0,33.83114,66.02471,AF,AFG,Afghanistan,,,,2022-08-28 08:04:17.466
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
95,24830580,2020-05-30,15750,545,257,8,1303,44,33.83114,66.02471,AF,AFG,Afghanistan,,,,2022-08-28 08:04:17.466
96,24830581,2020-05-31,16509,759,265,8,1328,25,33.83114,66.02471,AF,AFG,Afghanistan,,,,2022-08-28 08:04:17.466
97,24830582,2020-06-01,17267,758,270,5,1428,100,33.83114,66.02471,AF,AFG,Afghanistan,,,,2022-08-28 08:04:17.466
98,26297178,2020-06-02,18054,787,294,24,1450,22,33.83114,66.02471,AF,AFG,Afghanistan,,,,2022-08-28 08:04:17.466


root
 |-- id: integer (nullable = true)
 |-- updated: date (nullable = true)
 |-- confirmed: integer (nullable = true)
 |-- confirmed_change: integer (nullable = true)
 |-- deaths: integer (nullable = true)
 |-- deaths_change: short (nullable = true)
 |-- recovered: integer (nullable = true)
 |-- recovered_change: integer (nullable = true)
 |-- latitude: double (nullable = false)
 |-- longitude: double (nullable = false)
 |-- iso2: string (nullable = true)
 |-- iso3: string (nullable = true)
 |-- country_region: string (nullable = true)
 |-- admin_region_1: string (nullable = true)
 |-- iso_subdivision: string (nullable = true)
 |-- admin_region_2: string (nullable = true)
 |-- load_time: timestamp (nullable = true)



# Load to Regional-bases Data: `regional_based`
----
* Source: `daily`
* Target: `regional_based`

In [6]:
parquet_enumerator = ParquetEnumerator("./daily", ["updated"])
path_parquets = parquet_enumerator.list_all()

ds = spark.read.option("basePath", parquet_enumerator.payload_root).parquet(*path_parquets)

                                                                                

In [7]:
ds_ = ds 
if target_date is not None:
    ds_ = ds_.filter(F.col("updated") == target_date)

ds_ = ds_.groupBy(F.col("updated"),
                  F.col("country_region"),
                  F.col("admin_region_1"),
                  F.col("admin_region_2"),
                  F.col("latitude"),
                  F.col("longitude")) \
         .agg(F.sum(F.col("deaths_change")).alias("death"),
              F.sum(F.col("confirmed_change")).alias("confirmed")) \
         .withColumn("region", F.when((F.col("admin_region_1").isNotNull() & F.col("admin_region_2").isNotNull()),  F.concat(F.col("country_region"), F.lit(", "), F.col("admin_region_1"), F.lit(", "), F.col("admin_region_2")))
                                .when((F.col("admin_region_1").isNotNull()),  F.concat(F.col("country_region"), F.lit(", "), F.col("admin_region_1")))
                                .when((F.col("admin_region_2").isNotNull()),  F.concat(F.col("country_region"), F.lit(", "), F.col("admin_region_2")))
                                .otherwise(F.col("country_region"))) \
         .select(F.col("updated"), F.col("region"), F.col("latitude"), F.col("longitude"), F.col("death"), F.col("confirmed")) \
         .sort("updated")

ds_.write \
   .format("jdbc") \
   .mode("append") \
   .option("driver", "org.mariadb.jdbc.Driver") \
   .option("url", "jdbc:mariadb://127.0.0.1:3306/airflow_demo_covid19?user=airflow&password=airflow") \
   .option("truncate", "true") \
   .option("dbtable", "regional_based") \
   .save()


                                                                                

In [8]:
display(ds_.limit(100).toPandas())
ds_.printSchema()

                                                                                

Unnamed: 0,updated,region,latitude,longitude,death,confirmed
0,2020-01-22,"United States, Washington",47.41130,-120.55630,0,0
1,2020-01-22,South Korea,36.45116,127.86110,0,0
2,2020-01-22,Taiwan,23.74899,120.97130,0,0
3,2020-01-22,Thailand,15.13067,101.01780,0,2
4,2020-01-22,China (mainland),36.56311,103.73580,0,131
...,...,...,...,...,...,...
95,2020-01-30,Singapore,1.29018,103.85200,0,3
96,2020-01-30,"United States, Arizona",34.29308,-111.66470,0,0
97,2020-01-30,Thailand,15.13067,101.01780,0,0
98,2020-01-30,China (mainland),36.56311,103.73580,0,1981


root
 |-- updated: date (nullable = true)
 |-- region: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- death: long (nullable = true)
 |-- confirmed: long (nullable = true)



# Load to Country-bases Data: `country_based`
----
* Source: `daily`
* Target: `country_based`

In [9]:
ds_ = ds 
if target_date is not None:
    ds_ = ds_.filter(F.col("updated") == target_date)

ds_ = ds_.groupBy(F.col("updated"),
                 F.col("country_region")) \
        .agg(F.sum(F.col("deaths_change")).alias("death"),
             F.sum(F.col("confirmed_change")).alias("confirmed")) \
        .withColumnRenamed("country_region", "country") \
        .select(F.col("updated"), F.col("country"), F.col("death"), F.col("confirmed")) \
        .sort("updated")

ds_.write \
   .format("jdbc") \
   .mode("append") \
   .option("driver", "org.mariadb.jdbc.Driver") \
   .option("url", "jdbc:mariadb://127.0.0.1:3306/airflow_demo_covid19?user=airflow&password=airflow") \
   .option("truncate", "true") \
   .option("dbtable", "country_based") \
   .save()

                                                                                

In [10]:
display(ds_.filter(F.col("country") == "Taiwan").toPandas())
ds_.printSchema()

                                                                                

Unnamed: 0,updated,country,death,confirmed
0,2020-01-22,Taiwan,0,0
1,2020-01-25,Taiwan,0,0
2,2020-01-26,Taiwan,0,1
3,2020-01-27,Taiwan,0,1
4,2020-01-28,Taiwan,0,3
...,...,...,...,...
935,2022-08-22,Taiwan,23,26343
936,2022-08-23,Taiwan,16,28531
937,2022-08-24,Taiwan,18,27227
938,2022-08-25,Taiwan,37,26718


root
 |-- updated: date (nullable = true)
 |-- country: string (nullable = true)
 |-- death: long (nullable = true)
 |-- confirmed: long (nullable = true)

