Spark

In [1]:
import pyspark as ps
print(ps.__version__)

3.5.2


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lower
spark = SparkSession.builder.appName('ReadCSV').getOrCreate()

csv_path = "./assets/complete.csv"
df = spark.read.csv(csv_path, header=True, inferSchema=True)

df.show()

+----------+------------------+--------+---------+---------------------+-----+-------------------------+---------+----------+-------------+
|      Date|Name of State / UT|Latitude|Longitude|Total Confirmed cases|Death|Cured/Discharged/Migrated|New cases|New deaths|New recovered|
+----------+------------------+--------+---------+---------------------+-----+-------------------------+---------+----------+-------------+
|2020-01-30|            Kerala| 10.8505|  76.2711|                  1.0|    0|                      0.0|        0|         0|            0|
|2020-01-31|            Kerala| 10.8505|  76.2711|                  1.0|    0|                      0.0|        0|         0|            0|
|2020-02-01|            Kerala| 10.8505|  76.2711|                  2.0|    0|                      0.0|        1|         0|            0|
|2020-02-02|            Kerala| 10.8505|  76.2711|                  3.0|    0|                      0.0|        1|         0|            0|
|2020-02-03|        

In [3]:
df.select("Name of State / UT").show()

+------------------+
|Name of State / UT|
+------------------+
|            Kerala|
|            Kerala|
|            Kerala|
|            Kerala|
|            Kerala|
|            Kerala|
|            Kerala|
|            Kerala|
|            Kerala|
|            Kerala|
|            Kerala|
|            Kerala|
|            Kerala|
|            Kerala|
|            Kerala|
|            Kerala|
|            Kerala|
|            Kerala|
|            Kerala|
|            Kerala|
+------------------+
only showing top 20 rows



1.showing lowercase of state name

In [4]:
df_lowercase = df.withColumn("Name of State / UT", lower(df["Name of State / UT"]))
df_lowercase.select("Name of State / UT").show(truncate=False)
df_lowercase.show(truncate=False,n=df_lowercase.count())

+------------------+
|Name of State / UT|
+------------------+
|kerala            |
|kerala            |
|kerala            |
|kerala            |
|kerala            |
|kerala            |
|kerala            |
|kerala            |
|kerala            |
|kerala            |
|kerala            |
|kerala            |
|kerala            |
|kerala            |
|kerala            |
|kerala            |
|kerala            |
|kerala            |
|kerala            |
|kerala            |
+------------------+
only showing top 20 rows

+----------+----------------------------------------+------------------+---------+---------------------+-----+-------------------------+---------+----------+-------------+
|Date      |Name of State / UT                      |Latitude          |Longitude|Total Confirmed cases|Death|Cured/Discharged/Migrated|New cases|New deaths|New recovered|
+----------+----------------------------------------+------------------+---------+---------------------+-----+----------------

In [None]:
print(df.columns)

2.Day had high covid cases

In [5]:
from pyspark.sql.functions import col, sum

df_high_cases = df.withColumn("Date", col("Date").cast("date"))

#sum of new cases
new_cases_by_day = df_high_cases.groupBy("Date").agg(sum("New cases").alias("total_new_cases"))

#finding date
day_with_highest_new_cases = new_cases_by_day.orderBy(col("total_new_cases").desc()).limit(1)

# Show the result
print("Day with more new cases:")
# new_cases_by_day.show(n=new_cases_by_day.count())
day_with_highest_new_cases.show(truncate=False)

Day with more new cases:
+----------+---------------+
|Date      |total_new_cases|
+----------+---------------+
|2020-07-18|70962          |
+----------+---------------+



3. 2nd State with no of cases

In [6]:
from pyspark.sql.functions import col, sum

cases_by_state = df.groupBy("Name of State / UT").agg(sum("Total Confirmed cases").alias("total_confirmed_cases"))

# state with 2nd largest no of total cases
states_sorted = cases_by_state.orderBy(col("total_confirmed_cases").desc())
state_with_second_largest_cases = states_sorted.limit(2).collect()[1]

print("State with 2nd largest no of cases:")
print(f"State: {state_with_second_largest_cases['Name of State / UT']}, Total Confirmed Cases: {state_with_second_largest_cases['total_confirmed_cases']}")

State with 2nd largest no of cases:
State: Tamil Nadu, Total Confirmed Cases: 7847083.0


4. UT with less deaths

In [7]:
union_territories = ['Delhi', 'Chandigarh', 'Puducherry', 'Ladakh', 'Lakshadweep']
df_ut = df.filter(col("Name of State / UT").isin(union_territories))

# total deaths
deaths_by_ut = df_ut.groupBy("Name of State / UT").agg(sum("Death").alias("total_deaths"))

# filter
ut_with_least_deaths = deaths_by_ut.orderBy(col("total_deaths").asc()).limit(1)


print("UT with least no of deaths:")
deaths_by_ut.show(truncate=False)
ut_with_least_deaths.show(truncate=False)

UT with least no of deaths:
+------------------+------------+
|Name of State / UT|total_deaths|
+------------------+------------+
|Ladakh            |127.0       |
|Puducherry        |1186.0      |
|Delhi             |171177.0    |
|Chandigarh        |628.0       |
+------------------+------------+

+------------------+------------+
|Name of State / UT|total_deaths|
+------------------+------------+
|Ladakh            |127.0       |
+------------------+------------+



5.state has lowest death to total confirmed ratio

In [8]:
from pyspark.sql.functions import expr

aggregated_data = df.groupBy("Name of State / UT").agg(
    sum("Death").alias("total_deaths"),
    sum("Total Confirmed cases").alias("total_confirmed_cases")
)

# Calculate the death-to-confirmed cases ratio
# Use a conditional expression to avoid division by zero


aggregated_data = aggregated_data.withColumn(
    "death_to_confirmed_ratio",
    expr("total_deaths / total_confirmed_cases")
)

# Find the state with the lowest death-to-confirmed cases ratio
state_with_lowest_ratio = aggregated_data.orderBy(col("death_to_confirmed_ratio").asc()).limit(1)

# Show the result
print("State with the lowest death-to-confirmed cases ratio:")
# aggregated_data.show()
state_with_lowest_ratio.show(truncate=False)

State with the lowest death-to-confirmed cases ratio:
+------------------+------------+---------------------+------------------------+
|Name of State / UT|total_deaths|total_confirmed_cases|death_to_confirmed_ratio|
+------------------+------------+---------------------+------------------------+
|Mizoram           |0.0         |13335.0              |0.0                     |
+------------------+------------+---------------------+------------------------+



6.month with more cases

In [9]:
from pyspark.sql.functions import month

df_date = df.withColumn("Date", col("Date").cast("date"))

df_monthly = df_date.groupBy(month("Date").alias("Month")).agg(sum("New recovered").alias("total_recovered_cases"))

# month
month_with_most_recovered = df_monthly.orderBy(col("total_recovered_cases").desc()).limit(1)


month_names = {
    1: 'January', 2: 'February', 3: 'March', 4: 'April', 5: 'May', 6: 'June',
    7: 'July', 8: 'August', 9: 'September', 10: 'October', 11: 'November', 12: 'December'
}

# Print name
result = month_with_most_recovered.collect()[0]
month_number = result["Month"]
month_name = month_names[month_number]

print(f"Month with the highest number of recovered cases: {month_name}")

Month with the highest number of recovered cases: July
