In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, avg, max, min, count, round

In [6]:
spark = SparkSession.builder.appName("CovidAnalysis").getOrCreate()

In [70]:
country_wise_df = spark.read.option("header", True).option("inferSchema", True).csv("hdfs://localhost:9000/user/yash1122/DataSet/Covid-19-dataset/country_wise_latest.csv")
full_grouped_df = spark.read.option("header", True).option("inferSchema", True).csv("hdfs://localhost:9000/user/yash1122/DataSet/Covid-19-dataset/full_grouped.csv")
day_wise_df = spark.read.option("header", True).option("inferSchema", True).csv("hdfs://localhost:9000/user/yash1122/DataSet/Covid-19-dataset/day_wise.csv")
worldometer_df = spark.read.option("header", True).option("inferSchema", True).csv("hdfs://localhost:9000/user/yash1122/DataSet/Covid-19-dataset/worldometer_data.csv")


print("country wise Schema: ")
country_wise_df.printSchema()

print("worldometer Schema: ")
worldometer_df.printSchema()

country wise Schema: 
root
 |-- Country/Region: string (nullable = true)
 |-- Confirmed: integer (nullable = true)
 |-- Deaths: integer (nullable = true)
 |-- Recovered: integer (nullable = true)
 |-- Active: integer (nullable = true)
 |-- New cases: integer (nullable = true)
 |-- New deaths: integer (nullable = true)
 |-- New recovered: integer (nullable = true)
 |-- Deaths / 100 Cases: double (nullable = true)
 |-- Recovered / 100 Cases: double (nullable = true)
 |-- Deaths / 100 Recovered: string (nullable = true)
 |-- Confirmed last week: integer (nullable = true)
 |-- 1 week change: integer (nullable = true)
 |-- 1 week % increase: double (nullable = true)
 |-- WHO Region: string (nullable = true)

worldometer Schema: 
root
 |-- Country/Region: string (nullable = true)
 |-- Continent: string (nullable = true)
 |-- Population: integer (nullable = true)
 |-- TotalCases: integer (nullable = true)
 |-- NewCases: integer (nullable = true)
 |-- TotalDeaths: integer (nullable = true)
 |-

In [12]:
worldometer_df.describe().show()

+-------+--------------+-------------+--------------------+----------------+------------------+------------------+-----------------+------------------+-----------------+------------------+------------------+------------------+-----------------+------------------+------------------+--------------+
|summary|Country/Region|    Continent|          Population|      TotalCases|          NewCases|       TotalDeaths|        NewDeaths|    TotalRecovered|     NewRecovered|       ActiveCases|  Serious,Critical|  Tot Cases/1M pop|    Deaths/1M pop|        TotalTests|      Tests/1M pop|    WHO Region|
+-------+--------------+-------------+--------------------+----------------+------------------+------------------+-----------------+------------------+-----------------+------------------+------------------+------------------+-----------------+------------------+------------------+--------------+
|  count|           209|          208|                 208|             209|                 4|           

In [86]:
country_wise_df.createOrReplaceTempView("country_wise")
full_grouped_df.createOrReplaceTempView("full_grouped")
day_wise_df.createOrReplaceTempView("day_wise")
worldometer_df.createOrReplaceTempView("worldometer")

In [14]:
#Q1: Regional Summary

regional_summary = worldometer_df.groupBy("Continent").agg(
    sum("TotalCases").alias("Total_Confirmed"),
    sum("TotalDeaths").alias("Total_Deaths"),
    sum("TotalRecovered").alias("Total_Recovered")
)

regional_summary.show()

+-----------------+---------------+------------+---------------+
|        Continent|Total_Confirmed|Total_Deaths|Total_Recovered|
+-----------------+---------------+------------+---------------+
|           Europe|        2982576|      205232|        1587302|
|           Africa|        1011867|       22114|         693620|
|             NULL|            712|          13|            651|
|Australia/Oceania|          21735|         281|          12620|
|    North America|        5919209|      229855|        3151678|
|    South America|        4543273|      154885|        3116150|
|             Asia|        4689794|      100627|        3508170|
+-----------------+---------------+------------+---------------+



In [16]:
clean_df = worldometer_df.na.drop(subset=["Continent"])

In [18]:
#Q1: Regional Summary

regional_summary = clean_df.groupBy("Continent").agg(
    sum("TotalCases").alias("Total_Confirmed"),
    sum("TotalDeaths").alias("Total_Deaths"),
    sum("TotalRecovered").alias("Total_Recovered")
)

regional_summary.show()

+-----------------+---------------+------------+---------------+
|        Continent|Total_Confirmed|Total_Deaths|Total_Recovered|
+-----------------+---------------+------------+---------------+
|           Europe|        2982576|      205232|        1587302|
|           Africa|        1011867|       22114|         693620|
|Australia/Oceania|          21735|         281|          12620|
|    North America|        5919209|      229855|        3151678|
|    South America|        4543273|      154885|        3116150|
|             Asia|        4689794|      100627|        3508170|
+-----------------+---------------+------------+---------------+



In [20]:
null_count_in_Continent = worldometer_df.filter(col("Continent").isNull()).count()
null_count_in_Total_Confirmed = worldometer_df.filter(col("NewRecovered").isNull()).count()

print(f"Number of nulls in 'Continent': {null_count_in_Continent}")
print(f"Number of nulls in 'NewRecovered': {null_count_in_Total_Confirmed}")

Number of nulls in 'Continent': 1
Number of nulls in 'NewRecovered': 206


In [21]:
print("country wise Schema: ")
country_wise_df.printSchema()

country wise Schema: 
root
 |-- Country/Region: string (nullable = true)
 |-- Confirmed: integer (nullable = true)
 |-- Deaths: integer (nullable = true)
 |-- Recovered: integer (nullable = true)
 |-- Active: integer (nullable = true)
 |-- New cases: integer (nullable = true)
 |-- New deaths: integer (nullable = true)
 |-- New recovered: integer (nullable = true)
 |-- Deaths / 100 Cases: double (nullable = true)
 |-- Recovered / 100 Cases: double (nullable = true)
 |-- Deaths / 100 Recovered: string (nullable = true)
 |-- Confirmed last week: integer (nullable = true)
 |-- 1 week change: integer (nullable = true)
 |-- 1 week % increase: double (nullable = true)
 |-- WHO Region: string (nullable = true)



In [24]:
#Q2: Top 10 Countries by COVID-19 Recovery Rates

top_10_recovery = country_wise_df.orderBy(col("Recovered / 100 Cases").desc()).select("Country/Region", "Recovered / 100 Cases").limit(10)
top_10_recovery.show()

+--------------+---------------------+
|Country/Region|Recovered / 100 Cases|
+--------------+---------------------+
|      Holy See|                100.0|
|       Grenada|                100.0|
|      Dominica|                100.0|
|      Djibouti|                98.38|
|       Iceland|                98.33|
|        Brunei|                97.87|
|   New Zealand|                97.24|
|         Qatar|                97.02|
|      Malaysia|                 96.6|
|     Mauritius|                96.51|
+--------------+---------------------+



In [26]:
#Q3 Comparison of Recovery and Fatality Rates by Country

recovery_fatality_comparison = country_wise_df.select(
    "Country/Region",
    "Recovered / 100 Cases",
    "Deaths / 100 Cases"
).orderBy(col("Recovered / 100 Cases").desc())

recovery_fatality_comparison.show()


+--------------+---------------------+------------------+
|Country/Region|Recovered / 100 Cases|Deaths / 100 Cases|
+--------------+---------------------+------------------+
|      Dominica|                100.0|               0.0|
|       Grenada|                100.0|               0.0|
|      Holy See|                100.0|               0.0|
|      Djibouti|                98.38|              1.15|
|       Iceland|                98.33|              0.54|
|        Brunei|                97.87|              2.13|
|   New Zealand|                97.24|              1.41|
|         Qatar|                97.02|              0.15|
|      Malaysia|                 96.6|              1.39|
|     Mauritius|                96.51|              2.91|
|        Norway|                95.84|              2.79|
|       Taiwan*|                95.24|              1.52|
|          Laos|                 95.0|               0.0|
|         Malta|                94.86|              1.28|
|       Estoni

In [28]:
#Q:4  Countries with the Lowest Number of COVID-19 Deaths

lowest_deaths = country_wise_df.orderBy(col("Deaths").asc()).select("Country/Region", "Deaths").filter(col("Deaths").isNotNull()).limit(10)
lowest_deaths.show()


+--------------+------+
|Country/Region|Deaths|
+--------------+------+
|      Dominica|     0|
|      Mongolia|     0|
|       Eritrea|     0|
|        Bhutan|     0|
|      Cambodia|     0|
|          Fiji|     0|
|     Greenland|     0|
|       Grenada|     0|
|      Holy See|     0|
|          Laos|     0|
+--------------+------+



In [30]:
#Q:5 Countries with the Highest Number of COVID-19 Cases

lowest_deaths = country_wise_df.orderBy(col("Deaths").desc()).select("Country/Region", "Deaths").filter(col("Deaths").isNotNull()).limit(10)
lowest_deaths.show()


+--------------+------+
|Country/Region|Deaths|
+--------------+------+
|            US|148011|
|        Brazil| 87618|
|United Kingdom| 45844|
|        Mexico| 44022|
|         Italy| 35112|
|         India| 33408|
|        France| 30212|
|         Spain| 28432|
|          Peru| 18418|
|          Iran| 15912|
+--------------+------+



In [32]:
# Q6: Global Recovery Rate
global_recovery_rate = worldometer_df.agg(
    (sum("TotalRecovered") / sum("TotalCases") * 100).alias("Global_Recovery_Rate_Percentage")
)

print("Global Recovery Rate (%):")
global_recovery_rate.show()


Global Recovery Rate (%):
+-------------------------------+
|Global_Recovery_Rate_Percentage|
+-------------------------------+
|               62.9666987077059|
+-------------------------------+



In [34]:
# Q2: COVID-19 Trends By Continent
covid_trends_continent = worldometer_df.groupBy("Continent").agg(
    sum("TotalCases").alias("Total_Cases"),
    sum("TotalDeaths").alias("Total_Deaths"),
    sum("TotalRecovered").alias("Total_Recovered")
).orderBy(col("Total_Cases").desc())

print("COVID-19 Trends by Continent:")
covid_trends_continent.show()


COVID-19 Trends by Continent:
+-----------------+-----------+------------+---------------+
|        Continent|Total_Cases|Total_Deaths|Total_Recovered|
+-----------------+-----------+------------+---------------+
|    North America|    5919209|      229855|        3151678|
|             Asia|    4689794|      100627|        3508170|
|    South America|    4543273|      154885|        3116150|
|           Europe|    2982576|      205232|        1587302|
|           Africa|    1011867|       22114|         693620|
|Australia/Oceania|      21735|         281|          12620|
|             NULL|        712|          13|            651|
+-----------------+-----------+------------+---------------+



In [36]:
# Q8: Continent That Has The Lowest Cases
lowest_cases_continent = worldometer_df.groupBy("Continent").agg(
    sum("TotalCases").alias("Total_Cases")
).orderBy(col("Total_Cases").asc()).limit(1)

print("Continent with the Lowest Total Cases:")
lowest_cases_continent.show()


Continent with the Lowest Total Cases:
+---------+-----------+
|Continent|Total_Cases|
+---------+-----------+
|     NULL|        712|
+---------+-----------+



In [38]:
# Q9: Recovery Rate by Continent
recovery_rate_continent = worldometer_df.groupBy("Continent").agg(
    round((sum("TotalRecovered") / sum("TotalCases")) * 100, 2).alias("Recovery_Rate_Percentage")
).orderBy(col("Recovery_Rate_Percentage").desc())

print("Recovery Rate by Continent (%):")
recovery_rate_continent.show()


Recovery Rate by Continent (%):
+-----------------+------------------------+
|        Continent|Recovery_Rate_Percentage|
+-----------------+------------------------+
|             NULL|                   91.43|
|             Asia|                    74.8|
|    South America|                   68.59|
|           Africa|                   68.55|
|Australia/Oceania|                   58.06|
|    North America|                   53.24|
|           Europe|                   53.22|
+-----------------+------------------------+



In [80]:
#Q10: To find out the death percentage locally and globally
df_global = spark.sql("""
SELECT 
    SUM(`Deaths`) * 100.0 / NULLIF(SUM(`Confirmed`), 0) AS global_death_percentage
FROM day_wise;

""")
df_global.show()


df_local = spark.sql("""
SELECT 
    `Country/Region`, 
    SUM(`Deaths`) * 100.0 / NULLIF(SUM(`Confirmed`), 0) AS country_death_percentage
FROM full_grouped
GROUP BY `Country/Region`
ORDER BY country_death_percentage DESC; 
""")
df_local.show()


+-----------------------+
|global_death_percentage|
+-----------------------+
|       5.23650680018023|
+-----------------------+

+-------------------+------------------------+
|     Country/Region|country_death_percentage|
+-------------------+------------------------+
|              Yemen|       26.35754688895505|
|            Belgium|       15.34248054008237|
|     United Kingdom|       14.94574274147640|
|             France|       14.37242296729525|
|              Italy|       13.86314039426595|
|            Hungary|       12.88413540039420|
|        Netherlands|       12.04280741462629|
|             Mexico|       11.56331889532873|
|              Spain|       11.06781863772301|
|            Bahamas|        9.94214876033058|
|             Sweden|        9.02671540831182|
|               Chad|        8.59932114719895|
|             Belize|        8.42185128983308|
|             Canada|        7.47675078135095|
|            Ecuador|        7.40874845249413|
|Antigua and Barbuda|  

In [100]:
#Q11: To find out the infected population percentage locally and globally

global_df = spark.sql("""
SELECT 
    SUM(TotalCases) * 100.0 / NULLIF(SUM(Population), 0) AS global_infection_percentage
FROM worldometer;
""")
global_df.show()


local_df = spark.sql("""
SELECT 
    `Country/Region`, 
    SUM(`TotalCases`) * 100.0 / NULLIF(MAX(`Population`), 0) AS country_infection_percentage
FROM worldometer
GROUP BY `Country/Region`
ORDER BY country_infection_percentage DESC;
""")
local_df.show()

+---------------------------+
|global_infection_percentage|
+---------------------------+
|           0.30300173069884|
+---------------------------+

+--------------+----------------------------+
|Country/Region|country_infection_percentage|
+--------------+----------------------------+
|         Qatar|              3.992157575045|
| French Guiana|              2.714564857959|
|       Bahrain|              2.513023907975|
|    San Marino|              2.059638163710|
|         Chile|              1.916481022828|
|        Panama|              1.652703989233|
|        Kuwait|              1.637844316754|
|          Oman|              1.576904396373|
|           USA|              1.519386296052|
|  Vatican City|              1.498127340824|
|          Peru|              1.379345165644|
|        Brazil|              1.371610412513|
|       Armenia|              1.343506721582|
|       Andorra|              1.221563705065|
|    Luxembourg|              1.128156541490|
|       Mayotte|     

In [108]:
#Q12: To find out the countries with the highest infection rates
high_infection_df = spark.sql("""
SELECT 
    `Country/Region`, 
    MAX(TotalCases) AS highest_infection_count, 
    MAX(TotalCases) * 100.0 / NULLIF(MAX(Population), 0) AS infection_rate
FROM worldometer
GROUP BY `Country/Region`
ORDER BY infection_rate DESC
LIMIT 10;
""")
high_infection_df.show()

+--------------+-----------------------+--------------+
|Country/Region|highest_infection_count|infection_rate|
+--------------+-----------------------+--------------+
|         Qatar|                 112092|3.992157575045|
| French Guiana|                   8127|2.714564857959|
|       Bahrain|                  42889|2.513023907975|
|    San Marino|                    699|2.059638163710|
|         Chile|                 366671|1.916481022828|
|        Panama|                  71418|1.652703989233|
|        Kuwait|                  70045|1.637844316754|
|          Oman|                  80713|1.576904396373|
|           USA|                5032179|1.519386296052|
|  Vatican City|                     12|1.498127340824|
+--------------+-----------------------+--------------+



In [118]:
#Q13: To find out the countries and continents with the highest death counts
country_deaths_df = spark.sql("""
SELECT 
    `Country/Region`, 
    SUM(Deaths) AS TotalDeaths
FROM full_grouped
GROUP BY `Country/Region`
ORDER BY TotalDeaths DESC
LIMIT 10;
""")
country_deaths_df.show()


continents_deaths_df = spark.sql("""
SELECT 
    Continent, 
    SUM(TotalDeaths) AS TotalDeaths
FROM worldometer
GROUP BY Continent
ORDER BY TotalDeaths DESC;
""")
continents_deaths_df.show()


+--------------+-----------+
|Country/Region|TotalDeaths|
+--------------+-----------+
|            US|   11011411|
|United Kingdom|    3997775|
|        Brazil|    3938034|
|         Italy|    3707717|
|        France|    3048524|
|         Spain|    3033030|
|        Mexico|    1728277|
|         India|    1111831|
|          Iran|    1024136|
|       Belgium|     963679|
+--------------+-----------+

+-----------------+-----------+
|        Continent|TotalDeaths|
+-----------------+-----------+
|    North America|     229855|
|           Europe|     205232|
|    South America|     154885|
|             Asia|     100627|
|           Africa|      22114|
|Australia/Oceania|        281|
|             NULL|         13|
+-----------------+-----------+



In [140]:
#Q14: Average number of deaths by day (Continents and Countries)
avg_deaths_global_df = spark.sql("""
SELECT 
    AVG(`New deaths`) AS avg_daily_deaths
FROM day_wise;
""")
avg_deaths_global_df.show()


avg_deaths_by_countries_df = spark.sql("""
SELECT 
    `Country/Region`, 
    AVG(`New deaths`) AS avg_daily_deaths
FROM full_grouped
GROUP BY `Country/Region`
ORDER BY avg_daily_deaths DESC;
""")
avg_deaths_by_countries_df.show()



avg_deaths_by_Continents_df = spark.sql("""
SELECT 
    Continent, 
    AVG(`TotalDeaths`) AS avg_daily_deaths
FROM worldometer
GROUP BY Continent
ORDER BY avg_daily_deaths DESC;
""")
avg_deaths_by_Continents_df.show()


+-----------------+
| avg_daily_deaths|
+-----------------+
|3478.824468085106|
+-----------------+

+--------------+------------------+
|Country/Region|  avg_daily_deaths|
+--------------+------------------+
|            US| 787.2925531914893|
|        Brazil| 466.0531914893617|
|United Kingdom|243.85106382978722|
|        Mexico| 234.1595744680851|
|         Italy| 186.7659574468085|
|         India|177.70212765957447|
|        France|160.70212765957447|
|         Spain| 151.2340425531915|
|          Peru| 97.96808510638297|
|          Iran| 84.63829787234043|
|        Russia| 70.92553191489361|
|       Belgium|52.244680851063826|
|         Chile| 48.86702127659574|
|       Germany|48.537234042553195|
|        Canada| 47.57446808510638|
|      Colombia| 46.68617021276596|
|  South Africa|37.590425531914896|
|   Netherlands|32.765957446808514|
|      Pakistan|31.074468085106382|
|        Sweden|30.319148936170212|
+--------------+------------------+
only showing top 20 rows

+--------

In [146]:
#Q15:  Average of cases divided by the number of population of each country (TOP 10)
top_10_df = spark.sql("""
SELECT 
    `Country/Region`, 
    MAX(TotalCases) * 1.0 / NULLIF(MAX(Population), 0) AS case_population_ratio
FROM worldometer
GROUP BY `Country/Region`
ORDER BY case_population_ratio DESC
LIMIT 10;
""")
top_10_df.show()

+--------------+---------------------+
|Country/Region|case_population_ratio|
+--------------+---------------------+
|         Qatar|       0.039921575750|
| French Guiana|       0.027145648580|
|       Bahrain|       0.025130239080|
|    San Marino|       0.020596381637|
|         Chile|       0.019164810228|
|        Panama|       0.016527039892|
|        Kuwait|       0.016378443168|
|          Oman|       0.015769043964|
|           USA|       0.015193862961|
|  Vatican City|       0.014981273408|
+--------------+---------------------+

