### PART 1 - Data pipeline with only Spark's 1-Master and 1-Worker node. main to start ETL

In [2]:
from pyspark.sql import DataFrame
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType
from datetime import datetime, timedelta
import numpy as np
import pandas as pd


In [3]:
## ETL - get Covid date from date range 
def end_date():
  cdate = spark.sql(""" select current_date() as cdate """).collect()[0]['cdate'].strftime("%m-%d-%Y") 
  end_date = (datetime.strptime(cdate, '%m-%d-%Y') - timedelta(days=1)).strftime('%m-%d-%Y')
  return end_date

def start_date():
  """
  retreive the max date from the log file. It's a transactional file that keeps infrmation of the dates for which data has been successfully   download
  """ 
  try:
      dbutils.fs.ls("dbfs:/Covid_datasets/log")
      max_date = spark.sql("SELECT max(*) FROM csv.`dbfs:/Covid_datasets/log`").collect()
      start_date = (datetime.strptime(max_date[0][0], '%m-%d-%Y') + timedelta(days=1)).strftime('%m-%d-%Y')
  except:
      start_date = "01-22-2020"
  return start_date

In [4]:
# date range for which the data willbe pulled from github respository
def dateRange():
  return(pd.date_range(start = start_date(), end = end_date()))

In [5]:
# Transform data, create a aggregated sum of cases found grouped Country wise
def transform():
    global df_conf
    global df_death
    global df_recover
    
    ## Load data into dataframes 
    df_conf = spark.createDataFrame([], schema)
    df_death = spark.createDataFrame([], schema)
    df_recover = spark.createDataFrame([], schema)
    
    print("IN transform")

    for d in date_range:
        date = d.strftime("%m-%d-%Y")
        file = "file:/tmp/Covid_datasets/"+str(date)+".csv"
        print('Transform file: ',file)
        df_data = spark.read.format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat') \
                 .option('header','true')\
                 .option('inferSchema','true')\
                 .load(file)
    
        # files dated older than 03-23 has different column name.
        if 'Country/Region' in df_data.columns:
            df_data = df_data.withColumnRenamed('Country/Region','Country_Region')
            
        try:   
            df_clean = df_data['Country_Region','Confirmed','Deaths','Recovered']
        except Exception as e:
            print(e)
        
        
        # group by country column and sum the cases for columns to give daily cumulative records
        df_table = df_clean.groupBy('Country_Region').sum()
        
        # join all the files/tables that were batch extracted on current run
        conf_table = df_table['Country_Region','sum(Confirmed)'].withColumnRenamed('sum(Confirmed)',date)
        df_conf = df_conf.join(conf_table, on="Country_Region", how='full')

        death_table = df_table['Country_Region','sum(Deaths)'].withColumnRenamed('sum(Deaths)',date)
        df_death = df_death.join(death_table, on="Country_Region", how='full')

        recover_table = df_table['Country_Region','sum(Recovered)'].withColumnRenamed('sum(Recovered)',date)
        df_recover = df_recover.join(recover_table, on="Country_Region", how='full')

In [6]:
# renaming columns in the dataframe
def rename_columns(df):
    old_cols = df.columns
    new_cols = [f.strip('sum(').strip(')') for f in df.columns]
    rename = list(zip(old_cols,new_cols))
    for old,new in rename:
          df = df.withColumnRenamed(old,new)
    return df

In [7]:
# store data in some dbfs and add to it the daily updates
def load():
    print("IN LOAD")    

    # The schema/structure of the data is to have a time-series data with dates on columns and each row is a country
    
    # command with absolute path
    df_conf.write.option("mergeSchema","true").format("delta").mode("append").save("dbfs:/Covid_datasets/Covid_Confirmed")
    df_death.write.option("mergeSchema","true").format("delta").mode("append").save("dbfs:/Covid_datasets/Covid_Deaths")
    df_recover.write.option("mergeSchema","true").format("delta").mode("append").save("dbfs:/Covid_datasets/Covid_Recovered")


    # The above command appends to the table and creates a new row for exisiting Country. 
    # Groupby Countrys and sum the rows and write back to dbfs
    delta_files = ["dbfs:/Covid_datasets/Covid_Confirmed","dbfs:/Covid_datasets/Covid_Deaths","dbfs:/Covid_datasets/Covid_Recovered"]
    for file in delta_files:
        # groupby and sum new appended row of exisiting country_region
        df = spark.read.format("delta").load(file).groupby('Country_Region').sum()
        # suming the rows changes the column names to sum('')
        df = rename_columns(df)
        df.write.format("delta").option("overwriteSchema","true").mode("overwrite").save(file)
  
    
    print('Covid_Confirmed, Covid_Deaths and Covid_Recovered tables created')


``` We are extracting data from https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/csse_covid_19_data/csse_covid_19_daily_reports/
instead of this link https://github.com/CSSEGISandData/COVID-19/tree/master/csse_covid_19_data/csse_covid_19_time_series
    .. because it has cumulative numbers rather than just day by day counts.```

In [9]:
def extract():
    # mine/get all .csv files in date range from github daily reports url
    print('Fetching data to local cluster........')
    for d in date_range:
        dated = d.strftime("%m-%d-%Y")
        file_dated = str(dated) +'.csv'
        url = "https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/csse_covid_19_data/csse_covid_19_daily_reports/" + file_dated
        try:
          # extract all the files to local disk
          !wget -P /tmp/Covid_datasets "$url"
          localpath = "file:/tmp/Covid_datasets/" + file_dated
          print('Written file to databricks local filesystem: ', file_dated)          
        except Exception as e:
          print(e)       

In [10]:
## Initial ETL pipeline 
def ETL_Pipeline():
  
    global date_range
    date_range = dateRange()

    # mine/get all .csv files in date range from github daily reports url
    extract()

    # transform/process data from each file
    transform()

    # load date in specified dataframes
    load()
 
    # write to log all the dates whose data is downloaded
    mylist = list(date_range.map(lambda x: x.strftime("%m-%d-%Y")))
    spark.createDataFrame(mylist, StringType()).write.format("csv").mode("append").save("dbfs:/Covid_datasets/log")

    # recursively delete all files and folders in the directory releasing memory once copied to dbgs #freeUpFile(file)
    print("Finally, freeing the space by deleting all the files download from git")
    dbutils.fs.rm("file:/tmp/Covid_datasets",True) 

In [11]:
if __name__ == "__main__":
    """starts the pipeline, extracts data, transforms it and loads it into a dbfs client"""
    # global empty dataframes
    global df_conf
    global df_death
    global df_recover
    # schema for the global dataframe
    schema = StructType([StructField("Country_Region", StringType(), True)])

    ## create a separate folder in dbfs filesystem for Covid19 dataset 
    try:
      dbutils.fs.ls("dbfs:/Covid_datasets")
    except:
      dbutils.fs.mkdirs("dbfs:/Covid_datatsets")
    #create a new directory in local filesystem as staging area
    try:
      dbutils.fs.rm("file:/tmp/Covid_datasets",True)
    except:
      pass
    
    dbutils.fs.mkdirs("file:/tmp/Covid_datasets/")

    # date range for which the data willbe pulled from github respository
    global date_range

    # Extract, trasnform and load data in DBFS from github's covid updated data master branch
    ETL_Pipeline()
    
    dbutils.fs.rm("file:/tmp/Covid_datasets",True)
    
    

### Clean space
###set spark setting to spark.databricks.delta.retentionDurationCheck.enabled = False to enable VACUUM-ing with retaining data < 100hours

In [13]:

%sql
%md #VACUUM 'dbfs:/Covid_datasets/Covid_Confirmed' DRY RUN

VACUUM delta.'dbfs:/Covid_datasets/Covid_Confirmed' RETAIN 48 HOURS
VACUUM delta.'dbfs:/Covid_datasets/Covid_Deaths' RETAIN 48 HOURS
VACUUM delta.'dbfs:/Covid_datasets/Covid_Recovered' RETAIN 48 HOURS
VACUUM delta.'dbfs:/Covid_datasets/log' RETAIN 48 HOURS

#### display the memory usage by files in databricks

In [15]:
%sh
exec <&- 2> /dev/null
echo "=Look for big files:"
du --human-readable --max-depth=2 --apparent-size --exclude='/dbfs/mnt' \
    --exclude='/dbfs/databricks-*' /dbfs
echo
echo "=Look for big local files:"
du --human-readable --max-depth=1 --exclude='/dbfs' /

## PART 2 - Basic Analysis

In [17]:
# load data into dataframes
df_confirm = spark.read.format("delta").load("dbfs:/Covid_datasets/Covid_Confirmed").fillna(0)
df_death = spark.read.format("delta").load("dbfs:/Covid_datasets/Covid_Deaths").fillna(0)
df_recover = spark.read.format("delta").load("dbfs:/Covid_datasets/Covid_Recovered").fillna(0)

In [18]:
##### import libraires
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from pyspark.sql.types import *
from pandas import Series

In [19]:
# get all the countries in COuntry_Region column in a list
def getCountryList(df):
  country = df.select('Country_Region').collect()
  country_list = [country[i][0] for i in range(len(country))]
  return country_list

In [20]:
# get dates - all the date columns we have in dataframe
#dates_list = spark.sql("select * from df_confirm").drop('Country_Region').columns
dates_list = df_confirm.drop('Country_Region').columns

# get countries list
countries_list = getCountryList(df_confirm)

In [21]:
# create a differenced series with custom interval
def difference(dataset):
    d = dataset.drop('Country_Region').collect()[0][:]
    return d - Series(d).shift()

In [22]:
def plotCases(df_c, df_d):
    """
    function plots daily new cases for few listed countries
    """
    x = dates_list
    countries_list = df_c.select('Country_Region').collect()
    legends = []
    
    fig, (ax1, ax2) = plt.subplots(1, 2, sharex=True, figsize=(12,6))

    for i in countries_list: 
        country = i[0][:]
        legends.append(country)
        # collect() changes dataframe to row type list, to get list values from row type we convert df to list and then extract just values
        # collect -> row type list, [0] -> get 0th index row, [:] get all values of row
        y_c = difference(df_c[df_c['Country_Region']== country])
        y_d = difference(df_d[df_d['Country_Region']== country])
        # plot 
        ax1.plot(x,y_c)
        ax2.plot(x,y_d)
    ax1.set_title("Confirmed Cases")
    ax2.set_title("Deaths Cases")
    xticks = np.arange(0, len(x), step=7)
    ax1.set_xticks(xticks)
    ax2.set_xticks(xticks)
    ax1.set_xticklabels(x[::7], rotation=90)
    ax2.set_xticklabels(x[::7], rotation=90)
    ax1.set_xlabel('Weekly dates')
    ax2.set_xlabel('Weekly dates')
    ax1.set_ylabel('Population affected')
    ax2.set_ylabel('Population affected')
    fig.legend(legends, loc= 'lower left' ,fontsize=16, title = "Countries" )
    fig.tight_layout()
    fig.suptitle('Daily Covid19 NEW Cases')
    display(fig)
    

In [23]:
# display confirmed cases for below few countries
disp_country = ['US','India','Spain','Italy','United Kingdom']
df_c = df_confirm.filter(df_confirm.Country_Region.isin(disp_country))
df_d = df_death.filter(df_death.Country_Region.isin(disp_country))
plotCases(df_c,df_d)

In [24]:
# pie chart cumulative total

# select for most recent date as column, filter countries with zero cases and sort cases 
recent_col = df_confirm[-1]
df_sorted = df_confirm.select('Country_Region',recent_col).sort(recent_col,ascending=False)

# cumulative sum of confirmed cases
cum_confCase = df_sorted.select(recent_col).groupby().sum().collect()[0][0]

# collect list of 10 countries with most confirmed cases and calculate %age of total cases prevailing in those top countries
most_caseConf = df_sorted.collect()[:10]
per_caseConf = {c[0]:round(c[1]*100/cum_confCase,2) for c in most_caseConf}
country_label = list(per_caseConf.keys())


# get death cases for the countries with most confirmed case on recent date
cum_deathCase = df_death.select(df_death[-1]).groupby().sum().collect()[0][0]
df_caseDeath = df_death.select('Country_Region',df_death[-1]) \
                .filter(df_death.Country_Region.isin(country_label)).collect()
per_caseDeath = {c[0]:round(c[1]*100/cum_deathCase,2) for c in df_caseDeath}


#plot pie
fig, (ax1, ax2) = plt.subplots(1, 2, sharex=True, figsize=(12,6))
ax1.pie([per_caseConf[i] for i in country_label], radius =1.5, autopct='%1.1f%%', shadow=True, startangle=90)
ax2.pie([per_caseDeath[i] for i in country_label], radius = 1.0, autopct='%1.1f%%', shadow=True, startangle=90)
ax1.set_title("Highest percent of confirmed cases")
ax2.set_title("death cases")
fig.legend(country_label, loc = 'center')
fig.suptitle("Countries with most percentage of cases on %s"%recent_col._jc.toString(), fontweight='bold')
display(fig)