'''
\
@Author: Jayesh Patil \
@Date: 2024-10-13 \
@Last Modified by: Jayesh Patil \
@Title: Covid data set problem Using pyspark 

'''

import the pyspark

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *


Create Spark Session

In [2]:
spark = SparkSession.builder.appName('covid_problem').getOrCreate()

Read The Data From CSV File

In [3]:

schema = StructType([
    StructField("province_state", StringType(), True),  
    StructField("country_region", StringType(), False),  
    StructField("latitude", DecimalType(10, 7), True),
    StructField("longitude", DecimalType(10, 7), True),
    StructField("date", DateType(), False),              
    StructField("confirmed", IntegerType(), True),
    StructField("deaths", IntegerType(), True),
    StructField("recovered", IntegerType(), True),
    StructField("active", IntegerType(), True),
    StructField("who_region", StringType(), True)
])
df_covid_19_clean = spark.read.csv("C:\\Users\\Jayesh\\Downloads\\covid-kaggle-dataset\\covid_19_clean_complete.csv",header = True ,schema = schema)

In [4]:
df_covid_19_clean.show()

+--------------------+-------------------+-----------+-----------+----------+---------+------+---------+------+--------------------+
|      province_state|     country_region|   latitude|  longitude|      date|confirmed|deaths|recovered|active|          who_region|
+--------------------+-------------------+-----------+-----------+----------+---------+------+---------+------+--------------------+
|                NULL|        Afghanistan| 33.9391100| 67.7099530|2020-01-22|        0|     0|        0|     0|Eastern Mediterra...|
|                NULL|            Albania| 41.1533000| 20.1683000|2020-01-22|        0|     0|        0|     0|              Europe|
|                NULL|            Algeria| 28.0339000|  1.6596000|2020-01-22|        0|     0|        0|     0|              Africa|
|                NULL|            Andorra| 42.5063000|  1.5218000|2020-01-22|        0|     0|        0|     0|              Europe|
|                NULL|             Angola|-11.2027000| 17.8739000|202

In [5]:
schema = """
    country_region STRING,
    continent STRING,
    population BIGINT,
    total_cases BIGINT,
    new_cases BIGINT,
    total_deaths BIGINT,
    new_deaths BIGINT,
    total_recovered BIGINT,
    new_recovered BIGINT,
    active_cases BIGINT,
    serious_critical INT,
    tot_cases_per_million FLOAT,
    deaths_per_million FLOAT,
    total_tests BIGINT,
    tests_per_million FLOAT,
    who_region STRING
"""
df_worldometer = spark.read.csv("C:\\Users\\Jayesh\\Downloads\\covid-kaggle-dataset\\worldometer_data.csv", schema=schema,header=True)

In [6]:
df_worldometer.show()

+--------------+-------------+----------+-----------+---------+------------+----------+---------------+-------------+------------+----------------+---------------------+------------------+-----------+-----------------+--------------------+
|country_region|    continent|population|total_cases|new_cases|total_deaths|new_deaths|total_recovered|new_recovered|active_cases|serious_critical|tot_cases_per_million|deaths_per_million|total_tests|tests_per_million|          who_region|
+--------------+-------------+----------+-----------+---------+------------+----------+---------------+-------------+------------+----------------+---------------------+------------------+-----------+-----------------+--------------------+
|           USA|North America| 331198130|    5032179|     NULL|      162804|      NULL|        2576668|         NULL|     2292707|           18296|              15194.0|             492.0|   63139605|         190640.0|            Americas|
|        Brazil|South America| 212710692

1.To find out the death percentage locally and globally

A.Global Death Percentage

In [7]:
global_deaths = df_covid_19_clean.agg(sum("deaths")).first()[0]
global_cases = df_covid_19_clean.agg(sum("confirmed")).first()[0]
global_death_percentage = (global_deaths / global_cases) * 100 if global_cases > 0 else 0

print(f"Global Death Percentage: {global_death_percentage:.2f}%")

Global Death Percentage: 5.24%


B.local Death Percentage by country

In [8]:
local_death_percentage = df_covid_19_clean \
    .groupBy("country_region") \
    .agg(
        sum("deaths").alias("local_deaths"),
        (sum("deaths") * 100.0 / sum("confirmed")).alias("local_death_percentage")
    ) \
    .filter(col("local_deaths") > 0)

local_death_percentage.show()


+--------------+------------+----------------------+
|country_region|local_deaths|local_death_percentage|
+--------------+------------+----------------------+
|          Chad|        5523|     8.599321147198953|
|      Paraguay|        1663|    1.0634828263191216|
|        Russia|      619385|    1.3640314346168159|
|         Yemen|       17707|    26.357546888955046|
|       Senegal|        7177|    1.5353283831454017|
|    Cabo Verde|         854|    1.0322487066673114|
|        Sweden|      448913|     9.026715408311818|
|        Guyana|        1346|     7.051181308607052|
|   Philippines|      110892|     3.730457836561864|
|         Burma|         639|    2.5369223439733206|
|      Djibouti|        3011|    0.8955552382991886|
|      Malaysia|       12971|    1.4792319078909855|
|     Singapore|        2441|   0.06969363352512169|
|        Turkey|      466056|    2.6031783446054355|
|        Malawi|        1640|    1.8290098811143578|
|Western Sahara|          63|     6.9922308546

2.To find out the infected population percentage locally and globally


A.Global Infected Population Percentage

In [9]:
global_population = df_worldometer.agg(sum("population")).first()[0]
global_infected_population_per = (
    (global_cases / global_population) * 100
    if global_population > 0 else 0 
) 
print(f"Global Infected Population Percentage: {global_infected_population_per:.2f}%")

Global Infected Population Percentage: 13.10%


B.Local Infected Population Percentage by Country

In [10]:
local_infected_population_percentage = df_worldometer \
    .filter(col("total_cases") > 0) \
    .select(
        col("country_region"),
        col("total_cases"),
        col("population"),
        (col("total_cases") * 100.0 / col("population")).alias("local_infected_population_percentage")
    )

local_infected_population_percentage.show()


+--------------+-----------+----------+------------------------------------+
|country_region|total_cases|population|local_infected_population_percentage|
+--------------+-----------+----------+------------------------------------+
|           USA|    5032179| 331198130|                  1.5193862960518527|
|        Brazil|    2917562| 212710692|                  1.3716104125127853|
|         India|    2025409|1381344997|                 0.14662586134519442|
|        Russia|     871894| 145940924|                  0.5974294091765515|
|  South Africa|     538184|  59381566|                  0.9063149328193871|
|        Mexico|     462690| 129066160|                 0.35849056019021563|
|          Peru|     455409|  33016319|                  1.3793451656436928|
|         Chile|     366671|  19132514|                  1.9164810228284688|
|      Colombia|     357710|  50936262|                  0.7022698289089215|
|         Spain|     354530|  46756648|                  0.7582451162880623|

3.To find out the countries with the highest infection rates

In [11]:
highest_infection_rates = df_worldometer \
    .filter(col("total_cases") > 0) \
    .select(
        col("country_region"),
        col("total_cases"),
        col("population"),
        (col("total_cases") * 100.0 / col("population")).alias("infection_rate")
    ) \
    .orderBy(col("infection_rate").desc()) \
    .limit(10)

highest_infection_rates.show()


+--------------+-----------+----------+------------------+
|country_region|total_cases|population|    infection_rate|
+--------------+-----------+----------+------------------+
|         Qatar|     112092|   2807805|3.9921575750452756|
| French Guiana|       8127|    299385|2.7145648579588157|
|       Bahrain|      42889|   1706669|2.5130239079751258|
|    San Marino|        699|     33938| 2.059638163710295|
|         Chile|     366671|  19132514|1.9164810228284688|
|        Panama|      71418|   4321282|1.6527039892328248|
|        Kuwait|      70045|   4276658|1.6378443167538765|
|          Oman|      80713|   5118446|1.5769043963734306|
|           USA|    5032179| 331198130|1.5193862960518527|
|  Vatican City|         12|       801|1.4981273408239701|
+--------------+-----------+----------+------------------+



4.To find out the countries and continents with the highest death counts

A. Highest Death Counts by Country

In [12]:
highest_death_counts_country = df_covid_19_clean \
    .groupBy("country_region") \
    .agg(sum("deaths").alias("total_deaths")) \
    .orderBy(col("total_deaths").desc())
highest_death_counts_country.show()

+--------------+------------+
|country_region|total_deaths|
+--------------+------------+
|            US|    11011411|
|United Kingdom|     3997775|
|        Brazil|     3938034|
|         Italy|     3707717|
|        France|     3048524|
|         Spain|     3033030|
|        Mexico|     1728277|
|         India|     1111831|
|          Iran|     1024136|
|       Belgium|      963679|
|       Germany|      871322|
|        Canada|      699566|
|         China|      672413|
|          Peru|      652113|
|   Netherlands|      622314|
|        Russia|      619385|
|        Turkey|      466056|
|        Sweden|      448913|
|       Ecuador|      346618|
|         Chile|      322480|
+--------------+------------+
only showing top 20 rows



B. Highest Death Counts by Continent

In [13]:
highest_death_counts_continent = df_covid_19_clean \
    .groupBy("who_region") \
    .agg(sum("deaths").alias("total_deaths")) \
    .orderBy(col("total_deaths").desc())
highest_death_counts_continent.show()    

+--------------------+------------+
|          who_region|total_deaths|
+--------------------+------------+
|            Americas|    19359292|
|              Europe|    19271040|
|Eastern Mediterra...|     1924029|
|     South-East Asia|     1458134|
|     Western Pacific|      932430|
|              Africa|      439978|
+--------------------+------------+



5.Average Number of Deaths by Day (Continents and Countries)

A.Average Number of Deaths by Day (Continents )

In [14]:
average_daily_deaths_continents = df_covid_19_clean \
    .groupBy("who_region","date") \
    .agg(sum("deaths").alias ("daily_deaths")) \
    .groupBy("who_region") \
    .agg(avg("daily_deaths").alias("average_daily_deaths"))\
    .orderBy(col("average_daily_deaths").desc())
average_daily_deaths_continents.show()      

+--------------------+--------------------+
|          who_region|average_daily_deaths|
+--------------------+--------------------+
|            Americas|   102974.9574468085|
|              Europe|  102505.53191489361|
|Eastern Mediterra...|  10234.196808510638|
|     South-East Asia|   7756.031914893617|
|     Western Pacific|   4959.734042553191|
|              Africa|   2340.308510638298|
+--------------------+--------------------+



B.Average Number of Deaths by Day (Countries)

In [15]:
average_daily_deaths_continents = df_covid_19_clean \
    .groupBy("country_region","date") \
    .agg(sum("deaths").alias ("daily_deaths")) \
    .groupBy("country_region") \
    .agg(avg("daily_deaths").alias("average_daily_deaths"))\
    .orderBy(col("average_daily_deaths").desc())
average_daily_deaths_continents.show()   

+--------------+--------------------+
|country_region|average_daily_deaths|
+--------------+--------------------+
|            US|  58571.335106382976|
|United Kingdom|  21264.760638297874|
|        Brazil|  20946.989361702126|
|         Italy|   19721.89893617021|
|        France|  16215.553191489362|
|         Spain|   16133.13829787234|
|        Mexico|   9192.962765957447|
|         India|   5913.994680851064|
|          Iran|   5447.531914893617|
|       Belgium|   5125.952127659574|
|       Germany|   4634.691489361702|
|        Canada|   3721.095744680851|
|         China|  3576.6648936170213|
|          Peru|   3468.686170212766|
|   Netherlands|    3310.18085106383|
|        Russia|   3294.601063829787|
|        Turkey|   2479.021276595745|
|        Sweden|  2387.8351063829787|
|       Ecuador|   1843.712765957447|
|         Chile|  1715.3191489361702|
+--------------+--------------------+
only showing top 20 rows



6.Average of cases divided by the number of population of each country (TOP 10)

In [16]:
average_cases_per_population = df_covid_19_clean \
    .join(df_worldometer, "country_region") \
    .groupBy("country_region") \
    .agg((sum("confirmed") * 1.0 / sum("population")).alias("average_cases_per_population")) \
    .orderBy(col("average_cases_per_population").desc()) \
    .limit(10)

average_cases_per_population.show()


+--------------+----------------------------+
|country_region|average_cases_per_population|
+--------------+----------------------------+
|         Qatar|        0.012081836697833968|
|    San Marino|        0.011746858790058969|
|       Andorra|        0.006497954607653055|
|       Bahrain|        0.005470423455080...|
|         Chile|         0.00470838625584943|
|    Luxembourg|        0.004253566325482042|
|        Kuwait|        0.003880739527144993|
|       Iceland|        0.003446367357671...|
|     Singapore|        0.003181961842215...|
|         Spain|        0.003117550188980...|
+--------------+----------------------------+



: 