In [1]:
#COVID 19 Data Transformation/JDBC Conncetivity to Azure SQL Database/Widget creation
#Author: Pranav Thaenraj
#Date: 08/10/20

#Data from covidtracking.org
# Loding Covid data
covid_Path = "dbfs:/FileStore/tables/covid19.csv"
covid_df = spark.read.format('csv').option('inferschema', 'true').option('header', 'true').load(covid_Path)
#display(covid_df)

#convert the date column to a TimeStamp type
from pyspark.sql import functions as F 
from pyspark.sql.functions import count
covid_df = covid_df.withColumn("dateCast",covid_df['date'].cast("String"))
covid_df = covid_df.withColumn( 'Date', F.unix_timestamp(covid_df['dateCast'], 'yyyyMMdd').cast('timestamp'))

#Getting the Month only from the Time stamp
import datetime
from pyspark.sql.functions import year, month, dayofmonth,count
covid_df = covid_df.withColumn("Month", month(covid_df["Date"]))

#Getting the increase in COVID cases per month for each state
State_Positive_df = covid_df.groupBy("Month","State").sum("positiveIncrease").orderBy("Month","State").toDF("Month","State","positiveIncrease")

#Number of tests per state 
tests_df = covid_df.groupBy("State").sum("totalTestsViral").orderBy("State")
#Cleansing null data
tests_df = tests_df.na.drop().toDF("State","sum(totalTestsViral)")
#display(tests_df)

#number of cases per state that need ventilators
ventilator_df = covid_df.groupBy("State").sum("onVentilatorCurrently").orderBy("State")
covid_df.show(25)


In [2]:
%scala 
Class.forName("com.microsoft.sqlserver.jdbc.SQLServerDriver")

In [3]:
#jdbc:sqlserver://azuresqlservermy.database.windows.net:1433;database=azuresqlmy;user=<username>@azuresqlservermy;password=<password>;encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30"

#Connecting to Azure SQL Server, Username and Password credentials changed for privacy
azuresqljdbcDF = ( 
         spark.read.format("jdbc")
        .option("url","jdbc:sqlserver://azuresqlservermy.database.windows.net:1433;database=azuresqlmy")
        .option("driver","com.microsoft.sqlserver.jdbc.SQLServerDriver")
        .option("dbtable","test")
        .option("user", "ADMIN")
        .option("password","PASSWORD")
        .load()
       )
display (azuresqljdbcDF)

In [4]:
# write the dataframe's data to Azure SQLs database , tablename as CovidData# query 
(df.write.format("jdbc")
        .option("url","jdbc:sqlserver://azuresqlservermy.database.windows.net:1433;database=azuresqlmy")
        .option("driver","com.microsoft.sqlserver.jdbc.SQLServerDriver")
        .option("dbtable","covidData")
        .option("user", "ADMIN")
        .option("password","PASSWORD")
        .save()
)

In [5]:
#Query the persisted Azure Sql's  CovidData table data with filter 
azuresqljdbcDF = ( 
         spark.read.format("jdbc")
        .option("url","jdbc:sqlserver://azuresqlservermy.database.windows.net:1433;database=azuresqlmy")
        .option("driver","com.microsoft.sqlserver.jdbc.SQLServerDriver")
        .option("query","select state, positive, negative, hospitalizedCurrently from covidData where (hospitalizedCurrently > 100) ")
        .option("user", "ADMIN")
        .option("password","PASSWORD")
        .load()
       )
azuresqljdbcDF.show(5)

In [6]:
%fs ls dbfs:/FileStore/tables

path,name,size
dbfs:/FileStore/tables/2010_summary.csv,2010_summary.csv,7121
dbfs:/FileStore/tables/covid19.csv,covid19.csv,2178036
dbfs:/FileStore/tables/covid_aggregate-1.csv,covid_aggregate-1.csv,10404
dbfs:/FileStore/tables/covid_aggregate-2.csv,covid_aggregate-2.csv,18688
dbfs:/FileStore/tables/covid_aggregate-3.csv,covid_aggregate-3.csv,18688
dbfs:/FileStore/tables/covid_aggregate.csv,covid_aggregate.csv,10431
dbfs:/FileStore/tables/covid_aggregate_v1.csv,covid_aggregate_v1.csv,34169
dbfs:/FileStore/tables/data-1.xml,data-1.xml,22464
dbfs:/FileStore/tables/data.csv,data.csv,386
dbfs:/FileStore/tables/data.parquet/,data.parquet/,0


In [7]:
#load the file using dataframereader functions  
covid_proportiondata_df = (spark.read
                           .format ("csv")
                           .option("header", "true")
                           .option ("infer_schema" , "true")
                           .load("dbfs:/FileStore/tables/covid_aggregate_v1.csv")
                          ) 
# create a temp view 
covid_proportiondata_df.createOrReplaceTempView ("covid_aggregate_data")
#call the spark.sql API
spark.sql ( '''  select distinct state   from covid_aggregate_data  ''')

# show the positive, recovered , hospitalized   
spark.sql ('''select covidmonth,state,total_positive,total_death,total_hospitalized,total_recovered,statewide_positive_percentage, statewide_recovered_percentage,usa_total_recovered ,usa_total_positive,statewide_death_percentage from covid_aggregate_data order by statewide_death_percentage desc,State desc  ''' )

covid_proportiondata_df.show(25)


In [8]:
#Creating widgets for covid_proportiondata_df

#Widget for the States Affected by COVID
states = covid_proportiondata_df.select("state").distinct().orderBy("State").collect()
dbutils.widgets.dropdown("Widget_CovidStates"
        ,"AK"
        ,[str(c.state) for c in states])
#Widget for the Month
covidmonths  = covid_proportiondata_df.select('covidmonth').distinct().collect()
dbutils.widgets.dropdown("Widget_Covidmonths"
        ,"August"
        ,[str(c.covidmonth) for c in covidmonths  ])

#remove any null values and use the widgets as part of the filter clause 
df2 = covid_proportiondata_df.select('total_positive','total_recovered','covidmonth','state').na.drop()
display(df2
   .filter( (df2.covidmonth == getArgument('Widget_Covidmonths')))
   .filter( df2.state == getArgument('Widget_CovidStates')  )
   #.groupBy('state' , 'covidmonth')
   #.avg())
       )


total_positive,total_recovered,covidmonth,state
2627,0,March,CO


In [9]:
#Creating Widgets for covid_df
covid_Path = "dbfs:/FileStore/tables/covid19.csv"
covid_df = spark.read.format('csv').option('inferschema', 'true').option('header', 'true').load(covid_Path)

#convert the date column to a TimeStamp type
from pyspark.sql import functions as F 
from pyspark.sql.functions import count
covid_df = covid_df.withColumn("dateCast",covid_df['date'].cast("String"))
covid_df = covid_df.withColumn( 'Date', F.unix_timestamp(covid_df['dateCast'], 'yyyyMMdd').cast('timestamp'))

#Getting the Month only from the Time stamp
import datetime
from pyspark.sql.functions import year, month, dayofmonth,count
covid_df = covid_df.withColumn("Month", month(covid_df["Date"]))
display(covid_df)

#Widget for the States Affected by COVID
states = covid_df.select("state").distinct().orderBy("State").collect()
dbutils.widgets.dropdown("States"
        ,"AK"
        ,[str(c.state) for c in states])
#Widget for the Month
covidmonths  = covid_df.select('Month').distinct().orderBy("Month").collect()
dbutils.widgets.dropdown("Month"
        ,"1"
        ,[str(c.Month) for c in covidmonths  ])

df2 = covid_df.select('positiveIncrease','recovered','Month','state').na.drop()
display(df2
   .filter( (df2.Month == getArgument('Month')))
   .filter( df2.state == getArgument('States')  )
       )


positiveIncrease,recovered,Month,state
