In [95]:
import os

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [96]:
spark = (
    SparkSession
    .builder
    .enableHiveSupport()
    .config('spark.sql.repl.eagerEval.enabled', 'true')
    .getOrCreate()
)

In [180]:
!wget -cNr -P ~/data/ https://github.com/CSSEGISandData/COVID-19/archive/master.zip && find ~/data -name "*.zip" -exec unzip -au {} -d ~/data \;

--2020-03-15 22:13:10--  https://github.com/CSSEGISandData/COVID-19/archive/master.zip
Resolving github.com (github.com)... 140.82.113.4
Connecting to github.com (github.com)|140.82.113.4|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://codeload.github.com/CSSEGISandData/COVID-19/zip/master [following]
--2020-03-15 22:13:10--  https://codeload.github.com/CSSEGISandData/COVID-19/zip/master
Resolving codeload.github.com (codeload.github.com)... 140.82.114.9
Connecting to codeload.github.com (codeload.github.com)|140.82.114.9|:443... connected.
HTTP request sent, awaiting response... 416 Requested Range Not Satisfiable

    The file is already fully retrieved; nothing to do.

Archive:  /home/jovyan/data/github.com/CSSEGISandData/COVID-19/archive/master.zip
d5eab1b3657e70f55b9eb9eb7e475475dac445fb


In [98]:
!wc -l ~/data/COVID-19-master/csse_covid_19_data/csse_covid_19_daily_reports/*.csv

    39 /home/jovyan/data/COVID-19-master/csse_covid_19_data/csse_covid_19_daily_reports/01-22-2020.csv
    46 /home/jovyan/data/COVID-19-master/csse_covid_19_data/csse_covid_19_daily_reports/01-23-2020.csv
    41 /home/jovyan/data/COVID-19-master/csse_covid_19_data/csse_covid_19_daily_reports/01-24-2020.csv
    44 /home/jovyan/data/COVID-19-master/csse_covid_19_data/csse_covid_19_daily_reports/01-25-2020.csv
    47 /home/jovyan/data/COVID-19-master/csse_covid_19_data/csse_covid_19_daily_reports/01-26-2020.csv
    51 /home/jovyan/data/COVID-19-master/csse_covid_19_data/csse_covid_19_daily_reports/01-27-2020.csv
    52 /home/jovyan/data/COVID-19-master/csse_covid_19_data/csse_covid_19_daily_reports/01-28-2020.csv
    54 /home/jovyan/data/COVID-19-master/csse_covid_19_data/csse_covid_19_daily_reports/01-29-2020.csv
    58 /home/jovyan/data/COVID-19-master/csse_covid_19_data/csse_covid_19_daily_reports/01-30-2020.csv
    63 /home/jovyan/data/COVID-19-master/csse_covid_19_data/csse_covid_19

In [100]:
with open("./sql/csse_covid_19_daily_reports/create.sql") as f_in:
    text = f_in.read()
    print(text)
    spark.sql("DROP TABLE IF EXISTS csse_covid_19_daily_reports")
    spark.sql(text)

CREATE TABLE IF NOT EXISTS csse_covid_19_daily_reports(
    `Last Update` STRING,
    `Country/Region` STRING,
    `Province/State` STRING,
    Confirmed STRING,
    Recovered STRING,
    Deaths STRING,
    Latitude STRING,
    Longitude STRING
)


In [101]:
spark.catalog.listTables()

[Table(name='csse_covid_19_daily_reports', database='default', description=None, tableType='MANAGED', isTemporary=False)]

In [102]:
input_files = os.path.expanduser('~/data/COVID-19-master/csse_covid_19_data/csse_covid_19_daily_reports/*.csv')

In [103]:
df = spark.read.csv(
    input_files,
    header=True,
)

In [154]:
df = df.withColumn('input_file_name', input_file_name())

In [155]:
print(',\n'.join(f'{f} {k.upper()}' for (f, k) in df.dtypes))

Province/State STRING,
Country/Region STRING,
Last Update STRING,
Confirmed STRING,
Deaths STRING,
Recovered STRING,
Latitude STRING,
Longitude STRING,
input_file_name STRING


In [157]:
spark.sql("""
DROP TABLE IF EXISTS csse_covid_19_daily_reports_enriched
""")
spark.sql("""
CREATE TABLE IF NOT EXISTS csse_covid_19_daily_reports_enriched(
    `Last Update` TIMESTAMP,
    `Province/State` STRING,
    `Country/Region` STRING,
    Confirmed INT,
    Deaths INT,
    Recovered INT,
    Active INT,
    Latitude STRING,
    Longitude STRING,
    input_file_name STRING
)
""")

In [158]:
spark.catalog.listTables()

[Table(name='csse_covid_19_daily_reports', database='default', description=None, tableType='MANAGED', isTemporary=False),
 Table(name='csse_covid_19_daily_reports_enriched', database='default', description=None, tableType='MANAGED', isTemporary=False)]

In [160]:
a = df.alias('a')

In [161]:
b = a.select(
    to_timestamp(trim(a['Last Update'])),
    trim(a['Province/State']),
    trim(a['Country/Region']),
    trim(a.Confirmed).cast('INT'),
    trim(a.Deaths).cast('INT'),
    trim(a.Recovered).cast('INT'),
    (a.Confirmed - (a.Deaths + a.Recovered)).cast('INT'),
    trim(a.Latitude),
    trim(a.Longitude),
    a.input_file_name
)

In [162]:
b.write.insertInto('csse_covid_19_daily_reports_enriched', overwrite=True)

In [163]:
csse_covid_19_daily_reports_enriched = spark.table('csse_covid_19_daily_reports_enriched')

In [178]:
csse_covid_19_daily_reports_enriched.orderBy(
    asc_nulls_last('Last Update')
).drop_duplicates(
    subset=csse_covid_19_daily_reports_enriched.columns[:-1]
).write.mode(
    'overwrite'
).csv(
    './samples/output/csse_covid_19_daily_reports_enriched.csv', 
    header=True
)

In [176]:
csse_covid_19_daily_reports_enriched.dropDuplicates(subset=['Last Update','Province/State','Country/Region','Confirmed','Deaths','Recovered','Active','Latitude','Longitude']).count()

3520