In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("covid_spark").config("spark.sql.repl.eagerEval.enabled", True).getOrCreate()


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/22 13:19:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
spark

In [3]:
df = spark.read.csv("covid_data.csv", header=True, inferSchema=True)

In [4]:
df

Date,Name of State / UT,Latitude,Longitude,Total Confirmed cases,Death,Cured/Discharged/Migrated,New cases,New deaths,New recovered
1/30/2020,Kerala,10.8505,76.2711,1,0,0,0,0,0
1/31/2020,Kerala,10.8505,76.2711,1,0,0,0,0,0
2/1/2020,Kerala,10.8505,76.2711,2,0,0,1,0,0
2/2/2020,Kerala,10.8505,76.2711,3,0,0,1,0,0
2/3/2020,Kerala,10.8505,76.2711,3,0,0,0,0,0
2/4/2020,Kerala,10.8505,76.2711,3,0,0,0,0,0
2/5/2020,Kerala,10.8505,76.2711,3,0,0,0,0,0
2/6/2020,Kerala,10.8505,76.2711,3,0,0,0,0,0
2/7/2020,Kerala,10.8505,76.2711,3,0,0,0,0,0
2/8/2020,Kerala,10.8505,76.2711,3,0,0,0,0,0


# Convert all State names to lowercase

In [5]:
from pyspark.sql.functions import lower
df = df.withColumn("Name of State / UT", lower(df["Name of State / UT"]))

# Day with highest number of covid cases

In [6]:
day_with_highest_cases = df.orderBy(df["Total Confirmed cases"].desc()).first()
day_with_highest_cases['Date']

'8/6/2020'

# Second Largest covid Cases

In [7]:
from pyspark.sql.functions import sum as spark_sum
import plotly.express as px
state_cases = df.groupBy("Name of State / UT").agg(spark_sum("Total Confirmed cases").alias("Total Cases"))
second_largest_state = state_cases.orderBy(state_cases["Total Cases"].desc()).take(2)[1]
second_largest_state['Name of State / UT']

'tamil nadu'

In [8]:
import plotly.io as pio
pio.renderers.default = 'iframe'
state_cases_pd = state_cases.orderBy(state_cases["Total Cases"].desc()).toPandas()
fig = px.bar(state_cases_pd, x='Name of State / UT', y='Total Cases', title='COVID-19 Cases by State',
             labels={'Name of State / UT': 'State', 'Total Cases': 'Total COVID-19 Cases'}, 
             text='Total Cases')
fig.update_traces(texttemplate='%{text:.2s}', textposition='outside')
fig.update_layout(uniformtext_minsize=8, uniformtext_mode='hide', xaxis={'categoryorder':'total descending'})
fig.show()

# Union Territory with least number of deaths

In [9]:
ut_df = df.filter(df["Name of State / UT"].startswith("union territory of"))

In [10]:
ut_df

Date,Name of State / UT,Latitude,Longitude,Total Confirmed cases,Death,Cured/Discharged/Migrated,New cases,New deaths,New recovered
3/7/2020,union territory o...,34.2996,78.2932,2,0,0,0,0,0
3/8/2020,union territory o...,34.2996,78.2932,2,0,0,0,0,0
3/9/2020,union territory o...,33.7782,76.5762,1,0,0,0,0,0
3/9/2020,union territory o...,34.2996,78.2932,2,0,0,0,0,0
3/10/2020,union territory o...,33.7782,76.5762,1,0,0,0,0,0
3/10/2020,union territory o...,34.2996,78.2932,2,0,0,0,0,0
3/11/2020,union territory o...,33.7782,76.5762,1,0,0,0,0,0
3/11/2020,union territory o...,34.2996,78.2932,2,0,0,0,0,0
3/12/2020,union territory o...,33.7782,76.5762,1,0,0,0,0,0
3/12/2020,union territory o...,34.2996,78.2932,3,0,0,1,0,0


In [11]:
ut_deaths = ut_df.groupBy("Name of State / UT").agg(spark_sum("Death").alias("Total Deaths"))
least_deaths_ut = ut_deaths.orderBy(ut_deaths["Total Deaths"].asc()).first()
least_deaths_ut['Name of State / UT']

'union territory of ladakh'

In [12]:
# spark.stop()