In [1]:
import pyspark
from azure.storage.blob import BlobServiceClient
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, FloatType, DoubleType, IntegerType, LongType


STORAGEACCOUNTURL = "https://trainingbatchaccount.blob.core.windows.net"
STORAGEACCOUNTKEY = "2QPPHsAtQ8/fh33VE7wqg/ZaeJoxdq/pnevAEmCh0n32tC5eXa8dTEEwMHdD9Ff5k1/wVh97aubqgKzQSwOLnQ=="
CONTAINERNAME = "datasets"
# HOSPITALIZATION = "economy.csv"
# INDEX = "index.csv"


spark = SparkSession.builder.appName('azure').getOrCreate()
spark.conf.set(
        "fs.azure.account.key.trainingbatchaccount.blob.core.windows.net",
        STORAGEACCOUNTKEY
    
)

#-----------------------------Schema for hospitalizations dataset-------------------------------

hospitalizations_schema = StructType([StructField("date",StringType(), True),\
    StructField("location_key",StringType(), True),\
    StructField("new_hospitalized_patients", IntegerType(), True),\
    StructField("cumulative_hospitalized_patients", IntegerType(), True),\
    StructField("current_hospitalized_patients", IntegerType(), True),\
    StructField("new_intensive_care_patients", IntegerType(), True),\
    StructField("cumulative_intensive_care_patients", IntegerType(), True),\
    StructField("current_intensive_care_patients", IntegerType(), True),\
    StructField("new_ventilator_patients", StringType(), True),\
    StructField("cumulative_ventilator_patients", StringType(), True),\
    StructField("current_ventilator_patients", IntegerType(), True)])
#-----------------------------Schema for vaccination dataset-------------------------------   
vaccination_schema = StructType([StructField("date",StringType(), True),\
    StructField("location_key",StringType(), True),\
    StructField("new_persons_vaccinated", IntegerType(), True),\
    StructField("cumulative_persons_vaccinated", IntegerType(), True),\
    StructField("new_persons_fully_vaccinated", IntegerType(), True),\
    StructField("cumulative_persons_fully_vaccinated", IntegerType(), True),\
    StructField("new_vaccine_doses_administered", IntegerType(), True),\
    StructField("cumulative_vaccine_doses_administered", LongType(), True),\
    StructField("new_persons_vaccinated_pfizer", IntegerType(), True),\
    StructField("cumulative_persons_vaccinated_pfizer", IntegerType(), True),\
    StructField("new_persons_fully_vaccinated_pfizer", IntegerType(), True),\
    StructField("cumulative_persons_fully_vaccinated_pfizer", IntegerType(), True),\
    StructField("new_vaccine_doses_administered_pfizer", IntegerType(), True),\
    StructField("cumulative_vaccine_doses_administered_pfizer", IntegerType(), True),\
    StructField("new_persons_vaccinated_moderna", IntegerType(), True),\
    StructField("cumulative_persons_vaccinated_moderna", IntegerType(), True),\
    StructField("new_persons_fully_vaccinated_moderna", IntegerType(), True),\
    StructField("cumulative_persons_fully_vaccinated_moderna", IntegerType(), True),\
    StructField("new_vaccine_doses_administered_moderna", IntegerType(), True),\
    StructField("cumulative_vaccine_doses_administered_moderna", IntegerType(), True),\
    StructField("new_persons_vaccinated_janssen", IntegerType(), True),\
    StructField("cumulative_persons_vaccinated_janssen", IntegerType(), True),\
    StructField("new_persons_fully_vaccinated_janssen", IntegerType(), True),\
    StructField("cumulative_persons_fully_vaccinated_janssen", IntegerType(), True),\
    StructField("new_vaccine_doses_administered_janssen", IntegerType(), True),\
    StructField("cumulative_vaccine_doses_administered_janssen", IntegerType(), True),\
    StructField("new_persons_vaccinated_sinovac", IntegerType(), True),\
    StructField("total_persons_vaccinated_sinovac", IntegerType(), True),\
    StructField("new_persons_fully_vaccinated_sinovac", StringType(), True),\
    StructField("total_persons_fully_vaccinated_sinovac", StringType(), True),\
    StructField("new_vaccine_doses_administered_sinovac", StringType(), True),\
    StructField("total_vaccine_doses_administered_sinovac", StringType(), True)])
  #-----------------------------Schema for epidemiology dataset-------------------------------

epidemiology_schema = StructType([StructField("date",StringType(), True),\
    StructField("location_key",StringType(), True),\
    StructField("new_confirmed", IntegerType(), True),\
    StructField("new_deceased", IntegerType(), True),\
    StructField("new_recovered", IntegerType(), True),\
    StructField("new_tested", IntegerType(), True),\
    StructField("cumulative_confirmed", IntegerType(), True),\
    StructField("cumulative_deceased", IntegerType(), True),\
    StructField("cumulative_recovered", IntegerType(), True),\
    StructField("cumulative_tested", IntegerType(), True)])

  

hosp_df = spark.read.format('csv').option('header',True).schema(hospitalizations_schema).load("wasbs://datasets@trainingbatchaccount.blob.core.windows.net/hospitalizations.csv")
vacc_df = spark.read.format('csv').option('header',True).schema(vaccination_schema).load("wasbs://datasets@trainingbatchaccount.blob.core.windows.net/vaccinations.csv")
epi_df = spark.read.format('csv').option('header',True).schema(epidemiology_schema).load("wasbs://datasets@trainingbatchaccount.blob.core.windows.net/epidemiology.csv")



In [76]:
# to find shape of dataframe 
def sparkShape(dataFrame):
    return (dataFrame.count(), len(dataFrame.columns))
pyspark.sql.dataframe.DataFrame.shape = sparkShape

spark.conf.set('spark.sql.repl.eagerEval.enabled', True)

In [41]:
epi_df.shape()

(10909296, 10)

In [42]:
vacc_df.shape()

(1783850, 8)

In [40]:
hosp_df.shape()

(1048563, 9)

In [30]:
from pyspark.sql.functions import count, when,col,expr, udf, avg,to_date,broadcast,regexp_replace,last,lpad,concat_ws,date_format,year,month
from pyspark.sql import Window
import sys

In [20]:
#---------------------------------Opening hospitalization dataset and cleaning it----------------

hosp_df = hosp_df.withColumn('Date',to_date(hosp_df['date'],'dd-mm-yyyy'))
#fill empty values with zero
hosp_df = hosp_df.fillna(value = 0, subset = ['current_hospitalized_patients','current_intensive_care_patients','new_ventilator_patients','cumulative_ventilator_patients','current_ventilator_patients'])
#drop unwanted column
hosp_df = hosp_df.drop('cumulative_ventilator_patients','new_ventilator_patients')
  

In [25]:
 #---------------------------------Opening epidemology dataset and cleaning it----------------
    
epi_df = epi_df.withColumn('date',to_date(epi_df['date'],format='yyyy-mm-dd'))
epi_df = epi_df.na.fill(value=0)

In [32]:
 #---------------------------------Opening vaccination dataset and cleaning it----------------
    
#selecting needed columns only
vacc_df = vacc_df.select("date","location_key","new_persons_vaccinated","cumulative_persons_vaccinated","new_persons_fully_vaccinated","cumulative_persons_fully_vaccinated","new_vaccine_doses_administered","cumulative_vaccine_doses_administered")

        #Drop rows if all the values are null
vacc_df = vacc_df.na.drop(subset=["new_persons_vaccinated","cumulative_persons_vaccinated","new_persons_fully_vaccinated","cumulative_persons_fully_vaccinated","new_vaccine_doses_administered","cumulative_vaccine_doses_administered"] ,how="all")

        #fill cumulative value with previous field value (Forward Fill)
vacc_df = vacc_df.withColumn("cumulative_persons_vaccinated", last('cumulative_persons_vaccinated', True).over(Window.partitionBy('location_key').rowsBetween(-sys.maxsize, 0)))
vacc_df = vacc_df.withColumn("cumulative_persons_fully_vaccinated",last('cumulative_persons_fully_vaccinated', True).over(Window.partitionBy('location_key').rowsBetween(-sys.maxsize, 0)))
vacc_df = vacc_df.withColumn("cumulative_vaccine_doses_administered",last('cumulative_vaccine_doses_administered', True).over(Window.partitionBy('location_key').rowsBetween(-sys.maxsize, 0)))

        #replace null with zero
vacc_df = vacc_df.na.fill(value=0)

        #correct date format
vacc_df = vacc_df.withColumn('Date',to_date(vacc_df['Date'],format='yyyy-mm-dd'))
    

In [34]:
hosp_df.printSchema()
vacc_df.printSchema()

root
 |-- Date: date (nullable = true)
 |-- location_key: string (nullable = true)
 |-- new_hospitalized_patients: integer (nullable = true)
 |-- cumulative_hospitalized_patients: integer (nullable = true)
 |-- current_hospitalized_patients: integer (nullable = true)
 |-- new_intensive_care_patients: integer (nullable = true)
 |-- cumulative_intensive_care_patients: integer (nullable = true)
 |-- current_intensive_care_patients: integer (nullable = true)
 |-- current_ventilator_patients: integer (nullable = true)

root
 |-- Date: date (nullable = true)
 |-- location_key: string (nullable = true)
 |-- new_persons_vaccinated: integer (nullable = true)
 |-- cumulative_persons_vaccinated: integer (nullable = true)
 |-- new_persons_fully_vaccinated: integer (nullable = true)
 |-- cumulative_persons_fully_vaccinated: integer (nullable = true)
 |-- new_vaccine_doses_administered: integer (nullable = true)
 |-- cumulative_vaccine_doses_administered: long (nullable = true)



In [61]:
# adding a date string column to each dataframe 

hosp_df = hosp_df.withColumn("Date_string",hosp_df.Date.cast(StringType()))
hosp_df = hosp_df.where(hosp_df.location_key == 'AR')
vacc_df = vacc_df.withColumn("Date_string",vacc_df.Date.cast(StringType()))
vacc_df = vacc_df.where(vacc_df.location_key == 'AR')

In [81]:
# aggregating date field

hosp_df= hosp_df.groupby("Date").sum()
vacc_df= vacc_df.groupby("Date").sum()
hosp_df.shape()


(141, 8)

In [82]:
vacc_df.shape()

(65, 7)

In [78]:
hosp_df.toPandas().to_csv('hosp_df.csv')


In [66]:
vacc_df.shape()

(433, 9)

In [84]:
# joining 
# hosp_df
# vacc_df


cond = ((hosp_df['Date'] == vacc_df['Date']))
df_hosp_vacc = hosp_df.join(vacc_df, on = cond ,how = 'inner')


In [85]:
df_hosp_vacc.shape()

(65, 15)

In [87]:
df_hosp_vacc.toPandas().to_csv("df_hosp_vacc.csv")

In [75]:
df_hosp_vacc.show()

+----------+------------+-------------------------+--------------------------------+-----------------------------+---------------------------+----------------------------------+-------------------------------+---------------------------+-----------+----------+----------------------+-----------------------------+----------------------------+-----------------------------------+------------------------------+-------------------------------------+-----------+
|      Date|location_key|new_hospitalized_patients|cumulative_hospitalized_patients|current_hospitalized_patients|new_intensive_care_patients|cumulative_intensive_care_patients|current_intensive_care_patients|current_ventilator_patients|Date_string|      Date|new_persons_vaccinated|cumulative_persons_vaccinated|new_persons_fully_vaccinated|cumulative_persons_fully_vaccinated|new_vaccine_doses_administered|cumulative_vaccine_doses_administered|Date_string|
+----------+------------+-------------------------+-----------------------------

In [37]:
df_hosp_vacc.printSchema()



root
 |-- Date: date (nullable = true)
 |-- location_key: string (nullable = true)
 |-- new_hospitalized_patients: integer (nullable = true)
 |-- cumulative_hospitalized_patients: integer (nullable = true)
 |-- current_hospitalized_patients: integer (nullable = true)
 |-- new_intensive_care_patients: integer (nullable = true)
 |-- cumulative_intensive_care_patients: integer (nullable = true)
 |-- current_intensive_care_patients: integer (nullable = true)
 |-- current_ventilator_patients: integer (nullable = true)
 |-- new_persons_vaccinated: integer (nullable = true)
 |-- cumulative_persons_vaccinated: integer (nullable = true)
 |-- new_persons_fully_vaccinated: integer (nullable = true)
 |-- cumulative_persons_fully_vaccinated: integer (nullable = true)
 |-- new_vaccine_doses_administered: integer (nullable = true)
 |-- cumulative_vaccine_doses_administered: long (nullable = true)



In [None]:
# joining 
# df_hosp_vacc
# epi_df