In [1]:
!pip install pandas matplotlib seaborn



In [2]:
!pip install pyspark



In [None]:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, lit
spark = SparkSession.builder.appName("Covid-19 Data Preprocessing").getOrCreate()
df = spark.read.csv('D:/Pyspark_hackathon/dataset.csv',header = True, inferSchema=True)
print("Initial Schema")
# df.printSchema()
# df.show(5)
# Iterate over schema and print column names and types
for field in df.schema.fields:
    print(f"Column Name: {field.name}, Data Type: {field.dataType}")


Initial Schema
Column Name: country, Data Type: StringType()
Column Name: date, Data Type: DateType()
Column Name: total_cases, Data Type: IntegerType()
Column Name: new_cases, Data Type: IntegerType()
Column Name: new_cases_smoothed, Data Type: DoubleType()
Column Name: total_cases_per_million, Data Type: DoubleType()
Column Name: new_cases_per_million, Data Type: DoubleType()
Column Name: new_cases_smoothed_per_million, Data Type: DoubleType()
Column Name: total_deaths, Data Type: IntegerType()
Column Name: new_deaths, Data Type: IntegerType()
Column Name: new_deaths_smoothed, Data Type: DoubleType()
Column Name: total_deaths_per_million, Data Type: DoubleType()
Column Name: new_deaths_per_million, Data Type: DoubleType()
Column Name: new_deaths_smoothed_per_million, Data Type: DoubleType()
Column Name: excess_mortality, Data Type: DoubleType()
Column Name: excess_mortality_cumulative, Data Type: DoubleType()
Column Name: excess_mortality_cumulative_absolute, Data Type: DoubleType()


In [14]:
# df.show(5)
df.count()

453386

In [4]:
from pyspark.sql import functions as F
null_counts  = df.select([F.sum(F.col(c).isNull().cast('int')).alias(c) for c in df.columns])
null_counts.show()

+-------+----+-----------+---------+------------------+-----------------------+---------------------+------------------------------+------------+----------+-------------------+------------------------+----------------------+-------------------------------+----------------+---------------------------+------------------------------------+---------------------------------------+-------------+-------------------------+----------------------+----------------------------------+------------+------------------------+---------------------+---------------------------------+----------------+-----------------+-----------+---------+------------------------+----------------------+------------------+-------------------------------+-------------+--------------+------------------+-----------------+-----------------------+--------------+----------------+-------------------------+------------------------------+-----------------------------+-----------------------------------+--------------------------+--

In [None]:
# dataset_cleaned= df.dropna()
# dataset_cleaned.count()

0

In [None]:
# dataset_cleaned.show(5)

+-------+----+-----------+---------+------------------+-----------------------+---------------------+------------------------------+------------+----------+-------------------+------------------------+----------------------+-------------------------------+----------------+---------------------------+------------------------------------+---------------------------------------+-------------+-------------------------+----------------------+----------------------------------+------------+------------------------+---------------------+---------------------------------+----------------+-----------------+-----------+---------+------------------------+----------------------+------------------+-------------------------------+-------------+--------------+------------------+-----------------+-----------------------+--------------+----------------+-------------------------+------------------------------+-----------------------------+-----------------------------------+--------------------------+--

In [16]:
dataset_cleaned = df.withColumn("date", to_date(col("date"), "yyyy-MM-dd"))


In [21]:
dataset_cleaned = dataset_cleaned.filter(col("population") > 0)
dataset_cleaned = dataset_cleaned.filter((col("new_cases") >= 0) & (col("new_deaths") >= 0))
# Standardize column names by converting them to lowercase
dataset_cleaned = dataset_cleaned.select(*[col(c).alias(c.lower()) for c in dataset_cleaned.columns])
dataset_cleaned.show(4500)

+-----------+----------+-----------+---------+------------------+-----------------------+---------------------+------------------------------+------------+----------+-------------------+------------------------+----------------------+-------------------------------+----------------+---------------------------+------------------------------------+---------------------------------------+-------------+-------------------------+----------------------+----------------------------------+------------+------------------------+---------------------+---------------------------------+----------------+-----------------+-----------+---------+------------------------+----------------------+------------------+-------------------------------+-------------+--------------+------------------+-----------------+-----------------------+--------------+----------------+-------------------------+------------------------------+-----------------------------+-----------------------------------+-------------------

In [35]:
from pyspark.sql.functions import col, year, month,dayofmonth,sum, avg, count,max,min
country_agg = dataset_cleaned.groupBy("country").agg(
    sum("total_cases").alias("total_cases_sum"),
    sum('new_cases').alias("total_new_cases"),
    sum("total_deaths").alias("total_deaths_sum"),
    sum("new_deaths").alias("new_deaths_sum"),
    sum("total_tests").alias("total_tests_sum"),
    sum("new_tests").alias("new_tests_sum"),
    sum("total_vaccinations").alias("total_vaccinations_sum"),
    avg("population").alias("avg_population")
)

# showing the aggregated data by country
print("Aggregated data by Countries: ")
country_agg.show(500)

Aggregated data by Countries: 
+--------------------+---------------+---------------+----------------+--------------+---------------+-------------+----------------------+--------------+
|             country|total_cases_sum|total_new_cases|total_deaths_sum|new_deaths_sum|total_tests_sum|new_tests_sum|total_vaccinations_sum|avg_population|
+--------------------+---------------+---------------+----------------+--------------+---------------+-------------+----------------------+--------------+
|                Chad|        9833469|           7702|          279746|           194|        4571170|         NULL|          1.49221681E8|   1.8455319E7|
|            Anguilla|        3927404|           3904|           11863|            12|          83038|         NULL|             1314943.0|       14202.0|
|            Djibouti|       21258149|          15690|          257474|           189|        6718280|         1233|           1.4506808E7|     1137100.0|
|             Comoros|       10594936| 

In [36]:
# aggregating data based on date(daily aggregation)
date_agg = dataset_cleaned.groupBy('date').agg(
    sum("new_cases").alias("daily_new_cases"),
    sum('new_deaths').alias("daily_new_deaths"),
    sum("new_tests").alias("daily_new_tests"),
    sum("new_vaccinations").alias("daily_new_vaccinations")
)
# data_agg = date_agg.orderBy(col('date').asc())

print("Daily aggregated Data:")
date_agg.show(500)

Daily aggregated Data:
+----------+---------------+----------------+---------------+----------------------+
|      date|daily_new_cases|daily_new_deaths|daily_new_tests|daily_new_vaccinations|
+----------+---------------+----------------+---------------+----------------------+
|2020-08-24|              0|               0|        2888928|                  NULL|
|2021-01-27|              0|               0|        7145385|        1.8206840158E7|
|2021-06-22|              0|               0|        7705536|        1.8435903037E8|
|2021-08-27|              0|               0|        9837438|        1.9413328062E8|
|2021-10-11|              0|               0|        7269359|         7.497258114E7|
|2021-11-13|              0|               0|        7318470|  1.2561146849000001E8|
|2021-12-18|              0|               0|       10483646|  1.5435654679000002E8|
|2022-03-28|              0|               0|        5622292|   7.899482741999999E7|
|2022-07-31|       30519904|           761

In [37]:
# Aggregating the dataset with Month
month_agg = dataset_cleaned.withColumn("year",year("date")).withColumn('month', month('date')).groupBy('year','month').agg(
    sum("new_cases").alias("monthly_new_cases"),
    sum("new_deaths").alias("monthly_new_deaths"),
    sum("new_tests").alias("monthly_new_tests"),
    sum("new_vaccinations").alias("monthly_new_vaccinations"),
    avg("population_density").alias("avg_population_density")
)
print(" Monthly Aggreagted Dataset:")
month_agg.show(500)

 Monthly Aggreagted Dataset:
+----+-----+-----------------+------------------+-----------------+------------------------+----------------------+
|year|month|monthly_new_cases|monthly_new_deaths|monthly_new_tests|monthly_new_vaccinations|avg_population_density|
+----+-----+-----------------+------------------+-----------------+------------------------+----------------------+
|2022|   10|         67234274|            240311|             NULL|    5.2705296550440013E8|    304.86202017219597|
|2024|    7|           952935|             14002|             NULL|       276723.7073106001|    307.93992009617136|
|2020|    6|         15852831|            582757|         57037555|                    NULL|       304.73264871282|
|2021|    8|         91155689|           1393926|        258352340|         4.62015043442E9|     304.8620201721959|
|2021|    6|         43778574|           1045067|        235486138|        4.649047580089E9|    304.86202017219586|
|2021|    5|         97179059|           18

In [38]:
# Aggregating the dataset on Year
year_agg = dataset_cleaned.withColumn('year',year('date')).groupBy('year').agg(
    sum("new_cases").alias("yearly_new_cases"),
    sum("new_deaths").alias("yearly_new_deaths"),
    sum("new_tests").alias("yearly_new_tests"),
    sum("new_vaccinations").alias("yearly_new_vaccinations"),
    max("stringency_index").alias("max_stringency_index")
)
print(" Yearly Aggregated Dataset:")
year_agg.show(500)

 Yearly Aggregated Dataset:
+----+----------------+-----------------+----------------+-----------------------+--------------------+
|year|yearly_new_cases|yearly_new_deaths|yearly_new_tests|yearly_new_vaccinations|max_stringency_index|
+----+----------------+-----------------+----------------+-----------------------+--------------------+
|2023|       283439705|          1309542|            NULL|   1.6043724577620006E9|                NULL|
|2022|      1821188081|          5285670|      1212921492|   1.481232311847182E10|                96.3|
|2020|       335975105|          7965902|       939654038|    3.998657007946424E7|               100.0|
|2024|        12740056|           204987|            NULL|      8095476.751771462|                NULL|
|2021|       839031523|         14724578|      2907920773|   3.626536383647301E10|               97.22|
+----+----------------+-----------------+----------------+-----------------------+--------------------+

