**Installing the Spark Dependancies**

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget https://dlcdn.apache.org/spark/spark-3.3.0/spark-3.3.0-bin-hadoop3.tgz
!tar -xzf spark-3.3.0-bin-hadoop3.tgz
!pip install -q findspark
!pip install pyspark==3.0.3

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.0-bin-hadoop3"

Copy the Dataset into a local store.

Download from: https://raw.githubusercontent.com/nytimes/covid-19-data/master/us-counties.csv

(**In Colab:** the downloaded file is stored under "/content" folder)

Dataset description can be found here: https://github.com/nytimes/covid-19-data

In [None]:
! wget https://raw.githubusercontent.com/nytimes/covid-19-data/master/us-counties.csv

**Start your program**

In [None]:
import findspark
findspark.init()
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, SparkSession

Create a Spark Session

In [None]:
conf = SparkConf().set('spark.ui.port', '4050')
sc = SparkContext(conf=conf)
spark = SparkSession.builder.master('local[*]').getOrCreate()

Load dateset into a dataframe

In [None]:
# Load the dataset
# NOTE: Fix your dataset location in case you run locally on your machine
data = spark.read.load('/content/us-counties.csv', format='csv', inferSchema=True, header=True)

# Print schema
data.printSchema()

In [None]:
# The number of rows in the dataset
data.count()

In [None]:
# See first 10 rows of the dataset
data.show(10)

In [None]:
data.schema

**Task 0. Find the daily new cases across the entire US and plot**
(*you DO not need to do this, this code is given for your assistance*)

In [None]:
from pyspark.sql import functions as F

# Aggregate by day, sum the cases for all counties for each day
daily_cumulative = data.groupby('date').agg(F.sum('cases').alias('total_cases'))
daily_cumulative = daily_cumulative.sort('date')
daily_cumulative.show(10)

# Convert Spark dataframe to a Panda dataframe to plot
plot_data = daily_cumulative.toPandas()
dates = plot_data['date']
values = plot_data['total_cases']

# Find daily news cases from cumulative cases
daily_cases = [values[i+1] - values[i] for i in range(len(values)-1)]
ddates = [dates[i+1] for i in range(len(values)-1)]

In [None]:
# Plot daily cases against dates
import matplotlib.pyplot as plt
import numpy as np

plt.plot(ddates, daily_cases)
plt.xlabel('Date')
plt.ylabel('Daily new cases')
plt.xticks(rotation=90)
plt.show()

*Now, solve the following tasks.*

Feel free to add more code blocks within each task to spereate the code for better clarity and understanding.

**Task 1: Find the total number of new cases added in the entire US in the  month of March 2020.**

In [None]:
# Create temp table where in Pyspark we can use spark SQL
data.registerTempTable("counties") 
# Created CTE in which we will be returning date and sum of cases by grouping date column and hthen with condition having March 2020
data_march = spark.sql('''with CTE as (
  select date, sum(cases) as total from counties group by date having date like "2020-03%" order by date)
  select sum(total) from CTE''')
data_march.show()

**Task 2: Calculate the total new cases added in three consecutive months of June, July, and August of 2020 in Jackson county, Missouri (fips code 29095).**

Output will be like this:

June 2020 `cases`

July 2020 `cases`

August 2020 `cases`

In [None]:
# First CTE: extract3_months - where it returns records  with condition having three consecutive months of June, July, and August of 2020
# Second CTE: sum3_months - returns data with 3 months having total new cases added
# Finally by using case when statement, will be extracting month from the date and returning records.
data_Jackson = spark.sql('''with extract3_months (
  select date,fips,sum(cases) as total from counties group by fips,date having fips = 29095 AND (date like "2020-06%" OR date like "2020-07%" OR date like "2020-08%")),
  sum3_months (
  SELECT MONTH(date) AS Month, SUM(total) AS TotalSUM
  FROM extract3_months
  GROUP BY MONTH(date) order by Month)
  select CASE
    WHEN Month = 6 THEN "June 2020 cases"
    WHEN Month = 7 THEN "July 2020 cases"
    ELSE "August 2020 cases" 
  END AS MonthName,TotalSUM from sum3_months'''
   )
   
data_Jackson.show()

**Task 3: Find the daily new cases per month per 1000 population in Missouri state (MO) since the beginning of the pandemic (assume MO's population is 6,154,913). [Plot the data]**

In [None]:
# Rounding sum of new cases per month per 1000 population and by using where condition we will be filtering state = 'Missouri' and Year(date) >= 2020 grouping the year and date
data_Missouri = spark.sql(""" select round(sum(cases)*1000/6154913,2) as new_cases , Month(date) as Month, Year(date) 
as Year from counties where state = 'Missouri' and Year(date) >= 2020
group by Month(date),Year(date) order by Year(date),Month(date)""")
data_Missouri.show(30)

In [None]:
# Convert Spark dataframe to a Panda dataframe to plot
plot_data = data_Missouri.toPandas()
Month = plot_data['Month']
NewCases = plot_data['new_cases']

# Find news cases from cumulative cases
new_cases = [NewCases[i+1] - NewCases[i] for i in range(len(NewCases)-1)]
month = [Month[i+1] for i in range(len(NewCases)-1)]


In [None]:
# Plot new cases against month
import matplotlib.pyplot as plt
import numpy as np

plt.plot(month, new_cases)
plt.xlabel('Month')
plt.ylabel('New Cases')
plt.xticks(rotation=90)
plt.show()

**Task 4:  On which date all 50 US states have at least 100 cases? At least one death?**

In [None]:
# CTE- Firstly filtered records with cases>=100 AND deaths>=1and then by using CTE,filtering data by applying group by condition having all the states for particular date
data_atleast = spark.sql('''with CTE (
  SELECT * from counties where cases>=100 AND deaths>=1)
  select date,count(distinct state) from CTE group by date having count(distinct state)>=50 order by date''')
data_atleast.show()

**Task 5: Which single day in the year 2020 and 2021 had the largest number of deaths in the entire US (if there are multiple such dates, choose the earliest one)?**

In [None]:
# CTE - retuns data with all the dates having sum(deaths) and row_number with partition by YEAR(date)
# Finally filtering 3 records in each year with rownumber=1 
data_deaths = spark.sql('''with cte (select date,sum(deaths) as deaths, row_number() over(partition by YEAR(date) order by sum(deaths) desc) as rn from counties group by date)
select date,DAY(date),deaths from cte where rn =1''')
data_deaths.show()

Your programming assignment ends here.
Thank you.