In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql import types 

# Initialize Spark session
spark = SparkSession.builder.appName("covid").getOrCreate()

# Filepath
filepath = "complete.csv"

# Load CSV file
df_csv = spark.read.format("csv") \
            .option("header", True) \
            .option("multiLine", True) \
            .option("ignoreLeadingWhiteSpace", True) \
            .option("ignoreTrailingWhiteSpace", True) \
            .option("escape", "\\") \
            .option("quote", "\"") \
            .load(filepath)

# Cast columns to appropriate data types
df_csv = df_csv.withColumn("total_case", df_csv["Total Confirmed cases"].cast(types.LongType()))
df_csv = df_csv.withColumn("total_newly_recovered", df_csv["New recovered"].cast(types.LongType()))
df_csv = df_csv.withColumn("state", df_csv["Name of State / UT"].cast(types.StringType()))
df_csv = df_csv.withColumn("death_Case", df_csv["Death"].cast(types.LongType()))

# 1. Day with the most COVID cases
output_df_1 = df_csv.groupBy("Date").agg(F.sum("total_case").alias("sum_total_case"))
output_df_1 = output_df_1.orderBy(F.col("sum_total_case").desc()).limit(1)
output_df_1.show()

# 2. State with the second-largest number of COVID cases
output_df_2 = df_csv.groupBy("state").agg(F.sum("total_case").alias("sum_total_case"))
output_df_2 = output_df_2.orderBy(F.col("sum_total_case").desc()).limit(2)
output_df_2 = output_df_2.orderBy(F.col("sum_total_case").asc()).limit(1)
output_df_2.show()

# 3. Union Territory with the least number of deaths
output_df_3 = df_csv.filter(F.col('state').like("Union Territory%"))
output_df_3 = output_df_3.groupBy("state").agg(F.sum("death_Case").alias("sum_total_death"))
output_df_3 = output_df_3.orderBy(F.col("sum_total_death").asc()).limit(1)
output_df_3.show(truncate=False)

# 4. State with the lowest Death-to-Confirmed-Cases ratio
output_df_4 = df_csv.withColumn("ratio", F.col("death_Case") / F.col("total_case"))
output_df_4 = output_df_4.groupBy("state").agg(F.avg("ratio").alias("avg_ratio"))
output_df_4 = output_df_4.orderBy(F.col("avg_ratio").asc()).limit(1)
output_df_4.show(truncate=False)

# 5. Convert state names to lowercase
output_df_5 = df_csv.select(F.lower(F.col("state")).alias('state_lower')).distinct()
output_df_5.show()

# 6. Find the month with the most newly recovered cases
output_df_6 = df_csv.withColumn("month", F.month(F.col("Date")).cast(types.IntegerType()))
output_df_6 = output_df_6.groupBy("month").agg(F.sum("total_newly_recovered").alias("sum_newly_recovered"))
output_df_6 = output_df_6.orderBy(F.col("sum_newly_recovered").desc()).limit(1)
output_df_6 = output_df_6.withColumn("month_name", F.date_format(F.concat(F.lit("2020-"), F.col("month"), F.lit("-01")), "MMMM"))
output_df_6.select("month_name", "sum_newly_recovered").show()
