In [168]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql import types 
from pyspark.sql.window import Window

In [169]:
# creating a session
spark = SparkSession.builder.appName('CovidCaseAnalysis').getOrCreate()

In [170]:
spark_context = spark.sparkContext

In [171]:
spark_context

In [172]:
#load the csv file,
df_coviddata = spark.read.csv('complete.csv', header=True, inferSchema=True)

In [173]:
#to see the structure of the dataset
df_coviddata.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Name of State / UT: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Total Confirmed cases: double (nullable = true)
 |-- Death: string (nullable = true)
 |-- Cured/Discharged/Migrated: double (nullable = true)
 |-- New cases: integer (nullable = true)
 |-- New deaths: integer (nullable = true)
 |-- New recovered: integer (nullable = true)



In [174]:
df_coviddata = df_coviddata.withColumn("Death", df_coviddata["Death"].cast(types.IntegerType()))

In [175]:
df_coviddata.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Name of State / UT: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Total Confirmed cases: double (nullable = true)
 |-- Death: integer (nullable = true)
 |-- Cured/Discharged/Migrated: double (nullable = true)
 |-- New cases: integer (nullable = true)
 |-- New deaths: integer (nullable = true)
 |-- New recovered: integer (nullable = true)



In [176]:
numeric_cols = ['Total Confirmed cases','Death','Cured/Discharged/Migrated','New cases','New deaths','New recovered']
df_coviddata_f = df_coviddata.fillna(0, subset=numeric_cols)

In [177]:
string_cols = ['Date','Name of State / UT']
df_coviddata_f = df_coviddata_f.dropna(subset=string_cols)

In [178]:
df_coviddata_f = df_coviddata.drop(F.col('Latitude'))
df_coviddata_f = df_coviddata_f.drop(F.col('Longitude'))
df_coviddata_f.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Name of State / UT: string (nullable = true)
 |-- Total Confirmed cases: double (nullable = true)
 |-- Death: integer (nullable = true)
 |-- Cured/Discharged/Migrated: double (nullable = true)
 |-- New cases: integer (nullable = true)
 |-- New deaths: integer (nullable = true)
 |-- New recovered: integer (nullable = true)



In [179]:
df_coviddata_f.show()

+----------+------------------+---------------------+-----+-------------------------+---------+----------+-------------+
|      Date|Name of State / UT|Total Confirmed cases|Death|Cured/Discharged/Migrated|New cases|New deaths|New recovered|
+----------+------------------+---------------------+-----+-------------------------+---------+----------+-------------+
|2020-01-30|            Kerala|                  1.0|    0|                      0.0|        0|         0|            0|
|2020-01-31|            Kerala|                  1.0|    0|                      0.0|        0|         0|            0|
|2020-02-01|            Kerala|                  2.0|    0|                      0.0|        1|         0|            0|
|2020-02-02|            Kerala|                  3.0|    0|                      0.0|        1|         0|            0|
|2020-02-03|            Kerala|                  3.0|    0|                      0.0|        0|         0|            0|
|2020-02-04|            Kerala| 

Q1. To Convert all state names to LOWERCASE 

In [180]:
states_in_lowercase = df_coviddata_f.select(F.lower(F.col('Name of State / UT')).alias("State_InLowerCase"))
states_in_lowercase.distinct().show(truncate=False)

+----------------------------------------+
|State_InLowerCase                       |
+----------------------------------------+
|delhi                                   |
|maharashtra                             |
|meghalaya                               |
|odisha                                  |
|haryana                                 |
|west bengal                             |
|goa                                     |
|punjab                                  |
|jammu and kashmir                       |
|dadra and nagar haveli and daman and diu|
|karnataka                               |
|andhra pradesh                          |
|telangana                               |
|nagaland                                |
|bihar                                   |
|madhya pradesh                          |
|jharkhand                               |
|assam                                   |
|kerala                                  |
|tamil nadu                              |
+----------

Qn2. Day with high covid cases

In [181]:
df_coviddata_f = df_coviddata_f.withColumn("TotalCases", F.col('Total Confirmed cases')+F.col('New cases')-F.col('New recovered')-F.col('Death')-F.col('New deaths'))
df_coviddata_f.show()

+----------+------------------+---------------------+-----+-------------------------+---------+----------+-------------+----------+
|      Date|Name of State / UT|Total Confirmed cases|Death|Cured/Discharged/Migrated|New cases|New deaths|New recovered|TotalCases|
+----------+------------------+---------------------+-----+-------------------------+---------+----------+-------------+----------+
|2020-01-30|            Kerala|                  1.0|    0|                      0.0|        0|         0|            0|       1.0|
|2020-01-31|            Kerala|                  1.0|    0|                      0.0|        0|         0|            0|       1.0|
|2020-02-01|            Kerala|                  2.0|    0|                      0.0|        1|         0|            0|       3.0|
|2020-02-02|            Kerala|                  3.0|    0|                      0.0|        1|         0|            0|       4.0|
|2020-02-03|            Kerala|                  3.0|    0|                 

In [182]:
total_case_by_date = df_coviddata_f.groupBy('Date').agg(F.sum('TotalCases').alias('TotalCase'))
total_case_by_date

DataFrame[Date: date, TotalCase: double]

In [183]:
max_covid_case = total_case_by_date.orderBy(F.col('TotalCase').desc()).limit(1)
max_covid_case.show()

+----------+---------+
|      Date|TotalCase|
+----------+---------+
|2020-08-06|1933998.0|
+----------+---------+



Qn3. State that has second largest covid test cases

In [184]:
total_case_by_state = df_coviddata_f.groupBy('Name of State / UT').agg(F.sum('TotalCases').alias('TotalCase'))
total_case_by_state.show(truncate=False)

+----------------------------------------+---------+
|Name of State / UT                      |TotalCase|
+----------------------------------------+---------+
|Nagaland                                |46756.0  |
|Karnataka                               |2759449.0|
|Odisha                                  |840908.0 |
|Kerala                                  |605976.0 |
|Ladakh                                  |57501.0  |
|Dadra and Nagar Haveli and Daman and Diu|26568.0  |
|Tamil Nadu                              |7793962.0|
|Telengana                               |102604.0 |
|Chhattisgarh                            |257817.0 |
|Andhra Pradesh                          |2793600.0|
|Madhya Pradesh                          |1253710.0|
|Punjab                                  |533977.0 |
|Manipur                                 |85188.0  |
|Goa                                     |152020.0 |
|Mizoram                                 |13585.0  |
|Himachal Pradesh                        |8097

In [185]:
window_spec = Window.orderBy(F.col('TotalCase').desc())
ranked_covid_cases = total_case_by_state.withColumn('rank', F.row_number().over(window_spec))
second_largest_df = ranked_covid_cases.filter(F.col('rank') == 2)

In [186]:
second_largest_df.drop('rank').show()

+------------------+---------+
|Name of State / UT|TotalCase|
+------------------+---------+
|        Tamil Nadu|7793962.0|
+------------------+---------+



Qn4. Which union teritory has least number of deaths

In [187]:
ut_df = df_coviddata_f.filter(F.col('Name of State / UT').like('Union Territory of%'))
ut_df.show()

+----------+--------------------+---------------------+-----+-------------------------+---------+----------+-------------+----------+
|      Date|  Name of State / UT|Total Confirmed cases|Death|Cured/Discharged/Migrated|New cases|New deaths|New recovered|TotalCases|
+----------+--------------------+---------------------+-----+-------------------------+---------+----------+-------------+----------+
|2020-03-07|Union Territory o...|                  2.0|    0|                      0.0|        0|         0|            0|       2.0|
|2020-03-08|Union Territory o...|                  2.0|    0|                      0.0|        0|         0|            0|       2.0|
|2020-03-09|Union Territory o...|                  1.0|    0|                      0.0|        0|         0|            0|       1.0|
|2020-03-09|Union Territory o...|                  2.0|    0|                      0.0|        0|         0|            0|       2.0|
|2020-03-10|Union Territory o...|                  1.0|    0| 

In [188]:
ut_covid_death_cases = ut_df.groupBy('Name of State / UT').agg(F.sum('Death').alias('TotalDeaths'))
ut_with_least_death = ut_covid_death_cases.orderBy(F.col('TotalDeaths')).limit(1)
ut_with_least_death.show(truncate=False)

+------------------------------------+-----------+
|Name of State / UT                  |TotalDeaths|
+------------------------------------+-----------+
|Union Territory of Jammu and Kashmir|0          |
+------------------------------------+-----------+



Qn5. State with lowest death to confirmed case ratio

In [189]:
df_coviddata_f = df_coviddata_f.withColumn('Ratio', (F.col('Death') / F.col('Total Confirmed cases')).cast(types.DoubleType()))
df_coviddata_f.show()

+----------+------------------+---------------------+-----+-------------------------+---------+----------+-------------+----------+-----+
|      Date|Name of State / UT|Total Confirmed cases|Death|Cured/Discharged/Migrated|New cases|New deaths|New recovered|TotalCases|Ratio|
+----------+------------------+---------------------+-----+-------------------------+---------+----------+-------------+----------+-----+
|2020-01-30|            Kerala|                  1.0|    0|                      0.0|        0|         0|            0|       1.0|  0.0|
|2020-01-31|            Kerala|                  1.0|    0|                      0.0|        0|         0|            0|       1.0|  0.0|
|2020-02-01|            Kerala|                  2.0|    0|                      0.0|        1|         0|            0|       3.0|  0.0|
|2020-02-02|            Kerala|                  3.0|    0|                      0.0|        1|         0|            0|       4.0|  0.0|
|2020-02-03|            Kerala|   

In [190]:
df = df_coviddata_f.groupBy('Name of State / UT').agg(F.avg('Ratio').alias('Ratio'))
df.show(truncate=False)

+----------------------------------------+---------------------+
|Name of State / UT                      |Ratio                |
+----------------------------------------+---------------------+
|Nagaland                                |5.148898929658259E-4 |
|Karnataka                               |0.031754025497581745 |
|Odisha                                  |0.005638267232158746 |
|Kerala                                  |0.004188629705022502 |
|Ladakh                                  |0.0014909269205045839|
|Dadra and Nagar Haveli and Daman and Diu|6.431815799792035E-4 |
|Tamil Nadu                              |0.010024093518879517 |
|Telengana                               |0.019244516020895063 |
|Chhattisgarh                            |0.002177624004674632 |
|Andhra Pradesh                          |0.01398904872015251  |
|Madhya Pradesh                          |0.04409446528702753  |
|Punjab                                  |0.038667648638451915 |
|Manipur                 

In [191]:
df = df.orderBy(F.col('Ratio')).limit(1)
df.show()

+------------------+-----+
|Name of State / UT|Ratio|
+------------------+-----+
|           Mizoram|  0.0|
+------------------+-----+



Qn6. Month with more recored cases

In [192]:
month_mapping = {
    1: 'January', 2: 'February', 3: 'March', 4: 'April', 5: 'May', 6: 'June',
    7: 'July', 8: 'August', 9: 'September', 10: 'October', 11: 'November', 12: 'December'
}

In [193]:
def get_month_name(month_number):
    return month_mapping.get(month_number, 'None')

In [194]:
get_month_name_udf = F.udf(get_month_name, types.StringType())

In [195]:
df_coviddata_f = df_coviddata_f.withColumn("month", F.month(F.col("Date")))

In [196]:
df_coviddata_f.show()

+----------+------------------+---------------------+-----+-------------------------+---------+----------+-------------+----------+-----+-----+
|      Date|Name of State / UT|Total Confirmed cases|Death|Cured/Discharged/Migrated|New cases|New deaths|New recovered|TotalCases|Ratio|month|
+----------+------------------+---------------------+-----+-------------------------+---------+----------+-------------+----------+-----+-----+
|2020-01-30|            Kerala|                  1.0|    0|                      0.0|        0|         0|            0|       1.0|  0.0|    1|
|2020-01-31|            Kerala|                  1.0|    0|                      0.0|        0|         0|            0|       1.0|  0.0|    1|
|2020-02-01|            Kerala|                  2.0|    0|                      0.0|        1|         0|            0|       3.0|  0.0|    2|
|2020-02-02|            Kerala|                  3.0|    0|                      0.0|        1|         0|            0|       4.0|  0.0

In [197]:
monthly_recovery_df = df_coviddata_f.groupBy("month").agg(F.sum("New recovered").alias("total_recovered_cases_by_month"))
monthly_recovery_df.show()

+-----+------------------------------+
|month|total_recovered_cases_by_month|
+-----+------------------------------+
|    1|                             0|
|    6|                        247662|
|    3|                           124|
|    5|                         78659|
|    4|                          8201|
|    8|                        270531|
|    7|                        722983|
|    2|                             0|
+-----+------------------------------+



In [198]:
month_with_max_recovered = monthly_recovery_df.orderBy(F.col('total_recovered_cases_by_month').desc()).limit(1)
month_with_max_recovered.show()

+-----+------------------------------+
|month|total_recovered_cases_by_month|
+-----+------------------------------+
|    7|                        722983|
+-----+------------------------------+



In [200]:
max_recovered_row = month_with_max_recovered.collect()[0]
month = month_mapping.get(max_recovered_row['month'], 'Unknown')
month

'July'