In [37]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql import types 
from pyspark.sql.window import Window

In [3]:
spark = SparkSession.builder.appName("covid").getOrCreate()
filepath = "/Users/SRIKANTH/Downloads/Data/complete.csv"

In [5]:
df_csv = spark.read.format("csv") \
            .option("header", True) \
            .option("multiLine", True) \
            .option("ignoreLeadingWhiteSpace",True) \
            .option("ignoreTrailingWhiteSpace",True) \
            .option("escape", "\\") \
            .option("quote", "\"") \
            .load(filepath)

In [6]:
df_csv.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Name of State / UT: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Total Confirmed cases: string (nullable = true)
 |-- Death: string (nullable = true)
 |-- Cured/Discharged/Migrated: string (nullable = true)
 |-- New cases: string (nullable = true)
 |-- New deaths: string (nullable = true)
 |-- New recovered: string (nullable = true)



In [112]:
df_csv = df_csv.withColumn("total_case", df_csv["Total Confirmed cases"].cast(types.LongType()))
df_csv = df_csv.withColumn("total_newly_recovered", df_csv["New recovered"].cast(types.LongType()))
df_csv = df_csv.withColumn("state", df_csv["Name of State / UT"].cast(types.StringType()))
df_csv = df_csv.withColumn("death_Case", df_csv["Death"].cast(types.LongType()))
df_csv.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Name of State / UT: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Total Confirmed cases: string (nullable = true)
 |-- Death: string (nullable = true)
 |-- Cured/Discharged/Migrated: string (nullable = true)
 |-- New cases: string (nullable = true)
 |-- New deaths: string (nullable = true)
 |-- New recovered: string (nullable = true)
 |-- total_case: long (nullable = true)
 |-- state: string (nullable = true)
 |-- death_Case: long (nullable = true)
 |-- total_newly_recovered: long (nullable = true)



# The day had more number of covid cases.

In [56]:
output_df_1 = df_csv.groupBy("Date").agg(F.sum("total_case").alias("sum_total_case"))

In [57]:
window_spec = Window.orderBy(F.col("sum_total_case").desc())

In [62]:
output_df_1 = output_df_1.withColumn("recency", F.row_number().over(window_spec)).filter(F.col('recency') == 1).drop('recency')

In [63]:
output_df_1.show()

+----------+--------------+
|      Date|sum_total_case|
+----------+--------------+
|2020-08-06|       1964536|
+----------+--------------+



# The state has the second-largest number of covid cases.

In [64]:
output_df_2 = df_csv.groupBy("state").agg(F.sum("total_case").alias("sum_total_case"))

In [65]:
window_spec = Window.orderBy(F.col("sum_total_case").desc())

In [67]:
output_df_2 = output_df_2.withColumn("recency", F.row_number().over(window_spec)).filter(F.col('recency') == 2).drop('recency')

In [68]:
output_df_2.show()

+----------+--------------+
|     state|sum_total_case|
+----------+--------------+
|Tamil Nadu|       7847083|
+----------+--------------+



# Which Union Territory has the least number of death.

In [94]:
output_df_3 = df_csv.where(F.col('state').like("Union Territory%"))

In [95]:
output_df_3 = output_df_3.groupBy("state").agg(F.sum("death_Case").alias("sum_total_death"))

In [96]:
window_spec = Window.orderBy(F.col("sum_total_death"))

In [97]:
output_df_3 = output_df_3.withColumn("recency", F.row_number().over(window_spec)).filter(F.col('recency') == 1).drop('recency')

In [98]:
output_df_3.show(truncate=False)

+------------------------------------+---------------+
|state                               |sum_total_death|
+------------------------------------+---------------+
|Union Territory of Jammu and Kashmir|0.0            |
+------------------------------------+---------------+



# The state has the Lowest Death to Total Confirmed cases ratio.

In [100]:
output_df_4 = df_csv.withColumn("ratio", F.col("death_Case")/F.col("total_case"))

In [102]:
output_df_4 = output_df_4.groupBy("state").agg(F.avg("ratio").alias("avg_ratio"))

In [104]:
window_spec = Window.orderBy(F.col("avg_ratio"))

In [105]:
output_df_4 = output_df_4.withColumn("recency", F.row_number().over(window_spec)).filter(F.col('recency') == 1).drop('recency')

In [106]:
output_df_4.show(truncate=False)

+-------+---------+
|state  |avg_ratio|
+-------+---------+
|Mizoram|0.0      |
+-------+---------+



# Convert all state names to lowercase.

In [107]:
output_df_5 = df_csv.withColumn('state_lower', F.lower(F.col("state")))

In [109]:
output_df_5.select("state_lower").distinct().show()

+--------------------+
|         state_lower|
+--------------------+
|               delhi|
|         maharashtra|
|           meghalaya|
|              odisha|
|             haryana|
|         west bengal|
|                 goa|
|              punjab|
|   jammu and kashmir|
|dadra and nagar h...|
|           karnataka|
|      andhra pradesh|
|           telangana|
|            nagaland|
|               bihar|
|      madhya pradesh|
|           jharkhand|
|               assam|
|              kerala|
|          tamil nadu|
+--------------------+
only showing top 20 rows



# Find which month the Newer recovered cases.

In [155]:
import calendar
from datetime import datetime

def get_month_name(month_number):
    return calendar.month_name[int(month_number)]

def get_month(date):
    print(date)
    date = datetime.strptime(date, "%Y-%m-%d")
    return date.month

get_month_udf = F.udf(lambda a : get_month(a), types.StringType())
get_month_name_udf = F.udf(lambda a : get_month_name(a), types.StringType())

In [156]:
output_df_6 = df_csv.withColumn("month", get_month_udf(F.col("date")))

In [157]:
output_df_6 = output_df_6.groupBy("month").agg(F.sum("total_newly_recovered").alias("sum_newly_recovered"))

In [158]:
window_spec = Window.orderBy(F.col("sum_newly_recovered").desc())

In [159]:
output_df_6 = output_df_6.withColumn("recency", F.row_number().over(window_spec)).filter(F.col('recency') == 1).drop('recency')

In [160]:
output_df_6 = output_df_6.withColumn("month", get_month_name_udf(F.col("month")))

In [161]:
output_df_6.show()

+-----+-------------------+
|month|sum_newly_recovered|
+-----+-------------------+
| July|             722983|
+-----+-------------------+

