# PySpark Load COVID Data

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

In [2]:
# Initialize a Spark session
spark = SparkSession.builder \
    .appName("PySpark Demo") \
    .getOrCreate()


In [20]:
file_path = "/home/jovyan/data/complete.csv"  
df = spark.read.option("header", "true").csv(file_path)

In [21]:
df.printSchema()

root
 |-- clDate: string (nullable = true)
 |-- Name of State / UT: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Total Confirmed cases: string (nullable = true)
 |-- Death: string (nullable = true)
 |-- Cured/Discharged/Migrated: string (nullable = true)
 |-- New cases: string (nullable = true)
 |-- New deaths: string (nullable = true)
 |-- New recovered: string (nullable = true)



In [22]:
df.show()

+----------+------------------+--------+---------+---------------------+-----+-------------------------+---------+----------+-------------+
|    clDate|Name of State / UT|Latitude|Longitude|Total Confirmed cases|Death|Cured/Discharged/Migrated|New cases|New deaths|New recovered|
+----------+------------------+--------+---------+---------------------+-----+-------------------------+---------+----------+-------------+
|2020-01-30|            Kerala| 10.8505|  76.2711|                  1.0|    0|                      0.0|        0|         0|            0|
|2020-01-31|            Kerala| 10.8505|  76.2711|                  1.0|    0|                      0.0|        0|         0|            0|
|2020-02-01|            Kerala| 10.8505|  76.2711|                  2.0|    0|                      0.0|        1|         0|            0|
|2020-02-02|            Kerala| 10.8505|  76.2711|                  3.0|    0|                      0.0|        1|         0|            0|
|2020-02-03|        

## Convert all state names to lowercase.

In [24]:
from pyspark.sql.functions import lower, col

df = df.withColumn('Name of State / UT', lower(col('Name of State / UT')))
df.select('Name of State / UT').show()

+------------------+
|Name of State / UT|
+------------------+
|            kerala|
|            kerala|
|            kerala|
|            kerala|
|            kerala|
|            kerala|
|            kerala|
|            kerala|
|            kerala|
|            kerala|
|            kerala|
|            kerala|
|            kerala|
|            kerala|
|            kerala|
|            kerala|
|            kerala|
|            kerala|
|            kerala|
|            kerala|
+------------------+
only showing top 20 rows



##  The day had a greater number of covid casesruary.

In [26]:
from pyspark.sql.functions import col, sum as spark_sum

daily_cases = df.groupBy('clDate').agg(spark_sum('Total Confirmed cases').alias('Total_Cases'))

day_with_max_cases = daily_cases.orderBy(col('Total_Cases').desc()).first()
print(f"Day with the greatest number of COVID cases: {day_with_max_cases['clDate']}")


Day with the greatest number of COVID cases: 2020-08-06


##  The day had a greater number of covid cases.ruary.

In [27]:
state_cases = df.groupBy('Name of State / UT').agg(spark_sum('Total Confirmed cases').alias('Total_Cases'))

second_largest_state = state_cases.orderBy(col('Total_Cases').desc()).collect()[1]
print(f"State with the second-largest number of COVID cases: {second_largest_state['Name of State / UT']}")


State with the second-largest number of COVID cases: tamil nadu


## The state has the second-largest number of covid cases.

In [29]:
second_largest_state = state_cases.orderBy(col('Total_Cases').desc()).collect()[2]
print(f"State with the second-largest number of COVID cases: {second_largest_state['Name of State / UT']}")


State with the second-largest number of COVID cases: delhi


##  Which Union Territory has the least number of death.

In [59]:
from pyspark.sql.functions import col, sum as spark_sum

# Filter for Union Territories
ut_df = df.filter(col('Name of State / UT').contains('union territory'))

# Aggregate total deaths by UT
ut_deaths = ut_df.groupBy('Name of State / UT').agg(spark_sum('Death').alias('Total_Deaths'))

# Find the UT with the least number of deaths
ut_least_deaths = ut_deaths.orderBy(col('Total_Deaths').asc()).first()

# Print the result
if ut_least_deaths:
    print(f"Union Territory with the least number of deaths: {ut_least_deaths['Name of State / UT']}")
else:
    print("No Union Territories found in the data.")


Union Territory with the least number of deaths: union territory of ladakh


## The state has the lowest Death to Total Confirmed cases ratio

In [47]:
from pyspark.sql.functions import expr

state_ratio = df.groupBy('Name of State / UT').agg(
    spark_sum('clDate').alias('Total_Deaths'),
    spark_sum('Total Confirmed cases').alias('Total_Confirmed')
).withColumn(
    'Death_to_Confirmed_Ratio', 
    expr('Total_Deaths / Total_Confirmed')
)

state_lowest_ratio = state_ratio.orderBy(col('Death_to_Confirmed_Ratio').asc()).first()
print(f"State with the lowest Death to Total Confirmed cases ratio: {state_lowest_ratio['Name of State / UT']}")


State with the lowest Death to Total Confirmed cases ratio: delhi


##  Find which month the more_Newer recovered cases.

In [51]:
from pyspark.sql.functions import month, to_date, sum as spark_sum

df = df.withColumn('clDate', to_date(col('clDate'), 'yyyy-MM-dd'))

monthly_recovered = df.groupBy(month('clDate').alias('Month')).agg(spark_sum('New recovered').alias('Total_New_Recovered'))

month_with_max_recovered = monthly_recovered.orderBy(col('Total_New_Recovered').desc()).first()
month_number = month_with_max_recovered['Month']

months = {
    1: 'January', 2: 'February', 3: 'March', 4: 'April', 5: 'May',
    6: 'June', 7: 'July', 8: 'August', 9: 'September', 10: 'October',
    11: 'November', 12: 'December'
}
month_name = months.get(month_number, 'Unknown')
print(f"Month with the most new recovered cases: {month_name}")


Month with the most new recovered cases: July
