# Pyspark task

In [7]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql import types 
from pyspark.sql.window import Window

In [29]:
spark = SparkSession.builder.appName("covid").getOrCreate()
filepath = "complete.csv"

df_csv = spark.read.format("csv") \
            .option("header", True) \
            .option("multiLine", True) \
            .option("ignoreLeadingWhiteSpace",True) \
            .option("ignoreTrailingWhiteSpace",True) \
            .option("escape", "\\") \
            .option("quote", "\"") \
            .load(filepath)

# Print the first few rows of the DataFrame
df_csv.show(truncate=False)


+----------+------------------+--------+---------+---------------------+-----+-------------------------+---------+----------+-------------+
|Date      |Name of State / UT|Latitude|Longitude|Total Confirmed cases|Death|Cured/Discharged/Migrated|New cases|New deaths|New recovered|
+----------+------------------+--------+---------+---------------------+-----+-------------------------+---------+----------+-------------+
|2020-01-30|Kerala            |10.8505 |76.2711  |1.0                  |0    |0.0                      |0        |0         |0            |
|2020-01-31|Kerala            |10.8505 |76.2711  |1.0                  |0    |0.0                      |0        |0         |0            |
|2020-02-01|Kerala            |10.8505 |76.2711  |2.0                  |0    |0.0                      |1        |0         |0            |
|2020-02-02|Kerala            |10.8505 |76.2711  |3.0                  |0    |0.0                      |1        |0         |0            |
|2020-02-03|Kerala  

In [20]:
# Convert necessary columns to appropriate types
df_csv = df_csv.withColumn("total_case", df_csv["Total Confirmed cases"].cast("long"))
df_csv = df_csv.withColumn("total_newly_recovered", df_csv["New recovered"].cast("long"))
df_csv = df_csv.withColumn("state", F.lower(df_csv["Name of State / UT"]))
df_csv = df_csv.withColumn("death_Case", df_csv["Death"].cast("long"))
df_csv = df_csv.withColumn("date", F.to_date(df_csv["Date"], "yyyy-MM-dd"))

# Convert month number to name
month_names = {
    1: "January", 2: "February", 3: "March", 4: "April", 5: "May", 6: "June",
    7: "July", 8: "August", 9: "September", 10: "October", 11: "November", 12: "December"
}

df_csv.printSchema()

root
 |-- date: date (nullable = true)
 |-- Name of State / UT: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Total Confirmed cases: string (nullable = true)
 |-- Death: string (nullable = true)
 |-- Cured/Discharged/Migrated: string (nullable = true)
 |-- New cases: string (nullable = true)
 |-- New deaths: string (nullable = true)
 |-- New recovered: string (nullable = true)
 |-- total_case: long (nullable = true)
 |-- total_newly_recovered: long (nullable = true)
 |-- state: string (nullable = true)
 |-- death_Case: long (nullable = true)



# Task1: Convert state names to lowercase

In [30]:
df_csv = df_csv.withColumn("state", F.lower(df_csv["Name of State / UT"]))
df_csv.select("state").distinct().show(truncate=False)

+----------------------------------------+
|state                                   |
+----------------------------------------+
|delhi                                   |
|maharashtra                             |
|meghalaya                               |
|odisha                                  |
|haryana                                 |
|west bengal                             |
|goa                                     |
|punjab                                  |
|jammu and kashmir                       |
|dadra and nagar haveli and daman and diu|
|karnataka                               |
|andhra pradesh                          |
|telangana                               |
|nagaland                                |
|bihar                                   |
|madhya pradesh                          |
|jharkhand                               |
|assam                                   |
|kerala                                  |
|tamil nadu                              |
+----------

# Task 2: Find the day with the greatest number of COVID cases

In [25]:
daily_cases = df_csv.groupBy("date").agg(F.sum("total_case").alias("total_cases"))
day_max_cases = daily_cases.orderBy(F.desc("total_cases")).first()
day_max_cases_date = day_max_cases["date"]
day_max_cases_total = day_max_cases["total_cases"]
print(f"Day with the greatest number of COVID cases: {day_max_cases_date} with {day_max_cases_total} cases")

Day with the greatest number of COVID cases: 2020-08-06 with 1964536 cases


# Task 3: Find the state with the second-largest number of COVID cases

In [26]:
state_cases = df_csv.groupBy("state").agg(F.sum("total_case").alias("total_cases"))
second_largest_state = state_cases.orderBy(F.desc("total_cases")).collect()[1]
second_largest_state_name = second_largest_state["state"]
second_largest_state_total = second_largest_state["total_cases"]
print(f"State with the second-largest number of COVID cases: {second_largest_state_name} with {second_largest_state_total} cases")

State with the second-largest number of COVID cases: tamil nadu with 7847083 cases


# Task 4: Find the Union Territory with the least number of deaths

In [28]:
# Assuming a list of Union Territories
union_territories = ["delhi", "puducherry", "chandigarh", "andaman and nicobar islands"]
ut_deaths = df_csv.filter(F.col("state").isin(union_territories)) \
                   .groupBy("state") \
                   .agg(F.sum("death_Case").alias("total_deaths"))
min_deaths_ut = ut_deaths.orderBy(F.asc("total_deaths")).first()
min_deaths_ut_name = min_deaths_ut["state"]
min_deaths_ut_total = min_deaths_ut["total_deaths"]
print(f"Union Territory with the least number of deaths: {min_deaths_ut_name} with {min_deaths_ut_total} deaths")

Union Territory with the least number of deaths: andaman and nicobar islands with 64 deaths


# Task 5: Find the state with the lowest Death to Total Confirmed cases ratio

In [14]:
state_ratios = df_csv.groupBy("state") \
                     .agg((F.sum("death_Case") / F.sum("total_case")).alias("death_ratio"))
lowest_ratio_state = state_ratios.orderBy(F.asc("death_ratio")).first()
lowest_ratio_state_name = lowest_ratio_state["state"]
lowest_ratio_state_ratio = lowest_ratio_state["death_ratio"]
print(f"State with the lowest Death to Total Confirmed cases ratio: {lowest_ratio_state_name} with a ratio of {lowest_ratio_state_ratio}")

State with the lowest Death to Total Confirmed cases ratio: union territory of jammu and kashmir with a ratio of 0.0


# Task 6: Find the month with the most New Recovered cases

In [27]:
df_csv = df_csv.withColumn("month", F.month(df_csv["date"]))
monthly_recovered = df_csv.groupBy("month").agg(F.sum("total_newly_recovered").alias("total_new_recovered"))
max_recovered_month = monthly_recovered.orderBy(F.desc("total_new_recovered")).first()
max_recovered_month_number = max_recovered_month["month"]
max_recovered_month_total = max_recovered_month["total_new_recovered"]
max_recovered_month_name = month_names[max_recovered_month_number]
print(f"Month with the most new recovered cases: {max_recovered_month_name} with {max_recovered_month_total} cases")

Month with the most new recovered cases: July with 722983 cases


In [27]:
df_csv = df_csv.withColumn("month", F.month(df_csv["date"]))
monthly_recovered = df_csv.groupBy("month").agg(F.sum("total_newly_recovered").alias("total_new_recovered"))
max_recovered_month = monthly_recovered.orderBy(F.desc("total_new_recovered")).first()
max_recovered_month_number = max_recovered_month["month"]
max_recovered_month_total = max_recovered_month["total_new_recovered"]
max_recovered_month_name = month_names[max_recovered_month_number]
print(f"Month with the most new recovered cases: {max_recovered_month_name} with {max_recovered_month_total} cases")

Month with the most new recovered cases: July with 722983 cases
