In [6]:
import pyspark

In [None]:
from pyspark.sql import SparkSession

if 'spark' in locals():
    spark.stop()

spark = SparkSession.builder.appName("Covid Data Analysis").getOrCreate()

In [None]:
df = spark.read.csv("covid_data.csv", header=True, inferSchema=True)

In [None]:
df = df.withColumn("Name of State / UT", lower(col("Name of State / UT")))
df.show()

In [None]:
max_cases_day = df.groupBy("Date").sum("New cases").orderBy(col("sum(New cases)").desc()).first()
print(f"Day with the most cases: {max_cases_day['Date']} with {max_cases_day['sum(New cases)']} cases")

In [None]:
second_largest_state = df.groupBy("Name of State / UT").sum("Total Confirmed cases").orderBy(col("sum(Total Confirmed cases)").desc()).collect()[1]
print(f"State with the second-largest number of cases: {second_largest_state['Name of State / UT']} with {second_largest_state['sum(Total Confirmed cases)']} cases")

In [None]:
min_death_ut = df.groupBy("Name of State / UT").sum("Death").orderBy(col("sum(Death)").asc()).first()
print(f"Union Territory with the least deaths: {min_death_ut['Name of State / UT']} with {min_death_ut['sum(Death)']} deaths")

In [None]:
df = df.withColumn("death_ratio", col("Death") / col("Total Confirmed cases"))

lowest_ratio_state = df.groupBy("Name of State / UT").min("death_ratio").orderBy(col("min(death_ratio)").asc()).first()
print(f"State with the lowest death to confirmed cases ratio: {lowest_ratio_state['Name of State / UT']} with a ratio of {lowest_ratio_state['min(death_ratio)']}")


In [None]:
df = df.withColumn("month", date_format(col("Date"), "MM"))

max_recovered_month = df.groupBy("month").sum("New recovered").orderBy(col("sum(New recovered)").desc()).first()

from calendar import month_name
month_name_str = month_name[int(max_recovered_month['month'])]
print(f"Month with the most newer recovered cases: {month_name_str} with {max_recovered_month['sum(New recovered)']} recovered cases")


In [None]:
spark.stop()