# COVID-19 Data EDA

*This document performs EDA on a dataset containing data from COVID-19 event.*

- **Author:** Sakthi Santhosh
- **Created on:** 23/08/2024

In [None]:
print("Attack Helicopter")

## Creating a Spark Session

In [None]:
from calendar import month_name

from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, regexp_replace, date_format,
    to_date, when, lower, month
)
from pyspark.sql.functions import sum as pyspark_sum
from pyspark.sql.types import (
    StructType, StructField, FloatType,
    IntegerType, StringType
)

spark = SparkSession.builder \
    .appName("covid-data-eda") \
    .getOrCreate()

In [None]:
schema = StructType(fields=[
    StructField(name="date", dataType=StringType()),
    StructField(name="name_state_ut", dataType=StringType()),
    StructField(name="latitude", dataType=FloatType()),
    StructField(name="longitude", dataType=FloatType()),
    StructField(name="total_confirmed_cases", dataType=IntegerType()),
    StructField(name="death", dataType=IntegerType()),
    StructField(name="cured_discharged_migrated", dataType=IntegerType()),
    StructField(name="new_cases", dataType=IntegerType()),
    StructField(name="new_deaths", dataType=IntegerType()),
    StructField(name="new_recovered", dataType=IntegerType())
])

df = spark.read.csv("./data.csv", header=True, schema=schema)

df.show()

## Data Cleaning

In this section, let us clean the data and ensure all columns are in the format we want for processing.

### Date Field Cleaning

Some date fields are in the format `M/dd/yyyy` and some are in the format `MM-dd-yyyy`. Let's first convert them all to `MM-dd-yyyy` format and convert the field to a date-type field.

In [None]:
df = df.withColumn(
    colName="date",
    col=regexp_replace(col("date"), pattern='/', replacement='-')
)
df = df.withColumn(
    colName="date",
    col=when(
        col("date").rlike(r"^\d{1}-\d{2}-\d{4}$"),
        date_format(to_date(col("date"), "M-dd-yyyy"), "MM-dd-yyyy")
    ).otherwise(
        col("date")
    )
)

df = df.withColumn(
    colName="date",
    col=to_date(col("date"), format="MM-dd-yyyy")
)

df.show()

## Tasks


### Task-1: Convert All State Names to Lowercase

To do this, let us transform the `names_state_ut` field's value to lowercase. Note that we're replacing the column.

In [None]:
task1_df = df.withColumn(colName="name_state_ut", col=lower(col("name_state_ut")))

task1_df.show()

### Task-2: Union Teritory with Least Number of Deaths

Let us find the Union Territory with least number of deaths with the following steps:

1. **Filter:** Let's first filter the data frame by name of the state by choosing only the state names that stats with `Union`.
2. **Sum:** Let us next group the result by state name and sum the number of deaths in each grouped states.
3. **Sorting and Limiting:** Finally, let's sort the result by `total_deaths` column and limit the result to one which gives the union territory with least death cases.

In [None]:
task2_df = df.filter(col("name_state_ut").like("%Union%")) \
    .groupBy("name_state_ut") \
    .sum("death") \
    .withColumnRenamed("sum(death)", "total_deaths") \
    .orderBy("sum(death)") \
    .limit(1)

task2_df.show(truncate=False)

### Task-3: State with Lowest Death to Total Confirmed Cases Ratio

In order to find the state with the lowest death to confirmed cases ratio, let us perform the following steps:

1. **Group Data:** Groups the data by state or union territory names.
2. **Aggregations:** Calculates the total number of deaths and the total number of confirmed cases for each state or union territory.
3. **Calculate Ratio:** Computes the ratio of deaths to confirmed cases for each state or union territory.
4. **Sorting and Limiting:** Sorts the data based on this ratio to find the state or union territory with the lowest ratio.

In [None]:
task3_df = df.groupBy("name_state_ut") \
    .agg(
        pyspark_sum("death").alias("total_deaths"),
        pyspark_sum("total_confirmed_cases").alias("total_total_confirmed_cases")
    ).withColumn(
        colName="death_confirmed_cases_ratio",
        col=when(
            col("total_total_confirmed_cases") != 0,
            col("total_deaths") / col("total_total_confirmed_cases")
        )
    ).orderBy("death_confirmed_cases_ratio").first()

print(task3_df)

### Task-4: Find the Month with Most Newly Recorded Cases

To determine which month experienced the highest number of recoveries, we must go through a few important steps:

1. First, we extract the month from each date in the data. This will help us organize the data by month, making it easier to compare the number of recoveries across different time periods.
2. Next, we sum-up the number of recoveries for each month. By adding up the daily recovery numbers within each month, we were able to see the total number of recoveries that occurred during that month.
3. Finally, we compare the monthly totals to identify which month had the highest number of recoveries. This allows us to pinpoint the specific time period when the most people recovered, providing valuable insights into the recovery trends over time.

In [None]:
task4_df = df.withColumn("month", month(col("date"))) \
    .groupBy("month") \
    .agg(pyspark_sum("new_recovered").alias("total_recovered")) \
    .orderBy("total_recovered", ascending=False) \
    .first()

print(month_name[task4_df["month"]], task4_df, sep=", ")

### Task-5: The Day with the Most Number of Cases

In [None]:
task5_df = df.groupBy("date") \
    .agg(pyspark_sum("total_confirmed_cases").alias("total_total_confirmed_cases")) \
    .orderBy("total_total_confirmed_cases", ascending=False) \
    .first()

print(task5_df["date"])

# End Notes

The `death` column contains only zero, why?