# **Partitioning Covid19 cases data using SparkSQL + AWS S3**

**Steps**
1. Import Covid19 dataset from AWS S3 (using datasource from ourworldindata.org)
2. Check missing values
3. Fill missing values with 0 and calculate cumulative total cases and total deaths as new columns
4. Partitioning DataFrame by year of cases
5. Save output as csv file

# Install libraries to use



In [1]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.1.2.tar.gz (212.4 MB)
[K     |████████████████████████████████| 212.4 MB 64 kB/s 
[?25hCollecting py4j==0.10.9
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 50.8 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.2-py2.py3-none-any.whl size=212880768 sha256=c3304539af53bbff6c7ea9d0d278761a5441b9885aab83e34d316bd837236b8d
  Stored in directory: /root/.cache/pip/wheels/a5/0a/c1/9561f6fecb759579a7d863dcd846daaa95f598744e71b02c77
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.2


In [3]:
pip install boto3

Collecting boto3
  Downloading boto3-1.18.22-py3-none-any.whl (131 kB)
[K     |████████████████████████████████| 131 kB 4.9 MB/s 
[?25hCollecting botocore<1.22.0,>=1.21.22
  Downloading botocore-1.21.22-py3-none-any.whl (7.8 MB)
[K     |████████████████████████████████| 7.8 MB 46.0 MB/s 
[?25hCollecting s3transfer<0.6.0,>=0.5.0
  Downloading s3transfer-0.5.0-py3-none-any.whl (79 kB)
[K     |████████████████████████████████| 79 kB 8.0 MB/s 
[?25hCollecting jmespath<1.0.0,>=0.7.1
  Downloading jmespath-0.10.0-py2.py3-none-any.whl (24 kB)
Collecting urllib3<1.27,>=1.25.4
  Downloading urllib3-1.26.6-py2.py3-none-any.whl (138 kB)
[K     |████████████████████████████████| 138 kB 74.8 MB/s 
Installing collected packages: urllib3, jmespath, botocore, s3transfer, boto3
  Attempting uninstall: urllib3
    Found existing installation: urllib3 1.24.3
    Uninstalling urllib3-1.24.3:
      Successfully uninstalled urllib3-1.24.3
[31mERROR: pip's dependency resolver does not currently take 

# Import Data from AWS S3 and Create Spark Session

In [5]:
import boto3

s3 = boto3.client('s3', aws_access_key_id = AWS_ACCESS_KEY, aws_secret_access_key = AWS_SECRET_KEY)
s3.download_file(Bucket='mj-dataset', Filename='owid-covid-data.csv', Key='owid-covid-data.csv')

In [6]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, when, isnan, isnull

spark = SparkSession.builder.getOrCreate()

In [7]:
data_raw = spark.read.csv('owid-covid-data.csv', header=True, inferSchema=True)

# Handling missing values and calculate cumulative cases

In [8]:
# Create TempView
data_raw.createOrReplaceTempView('covid19')

# Check NULL value
spark.sql("""
select SUM(IF(ISNULL(date), 1, 0)) as null_date, SUM(IF(ISNULL(location), 1, 0)) as null_country , SUM(IF(ISNULL(new_cases), 1,0)) as null_new_cases, SUM(IF(ISNULL(new_deaths), 1, 0)) as null_new_deaths from covid19
""").show()


+---------+------------+--------------+---------------+
|null_date|null_country|null_new_cases|null_new_deaths|
+---------+------------+--------------+---------------+
|        0|           0|          4348|          14532|
+---------+------------+--------------+---------------+



In [21]:
# Select data, filling missing value and calculate cumulative of cases and deaths
query = """
SELECT * \
  , SUM(new_cases) OVER(PARTITION BY country ORDER BY country, date) as cumulative_cases  \
  , SUM(new_deaths) OVER(PARTITION BY country ORDER BY country, date) as cumalative_deaths \
FROM
  (SELECT date, YEAR(date) as year, MONTH(date) as month, location as country, IF( ISNULL(new_cases), 0, new_cases) as new_cases , IF( ISNULL(new_deaths), 0, new_deaths) as new_deaths FROM covid19)
"""
df_covid = spark.sql(query)
df_covid.show()
df_covid.count()

+----------+----+-----+-------+---------+----------+----------------+-----------------+
|      date|year|month|country|new_cases|new_deaths|cumulative_cases|cumalative_deaths|
+----------+----+-----+-------+---------+----------+----------------+-----------------+
|2020-03-19|2020|    3|   Chad|      1.0|       0.0|             1.0|              0.0|
|2020-03-20|2020|    3|   Chad|      0.0|       0.0|             1.0|              0.0|
|2020-03-21|2020|    3|   Chad|      0.0|       0.0|             1.0|              0.0|
|2020-03-22|2020|    3|   Chad|      0.0|       0.0|             1.0|              0.0|
|2020-03-23|2020|    3|   Chad|      0.0|       0.0|             1.0|              0.0|
|2020-03-24|2020|    3|   Chad|      2.0|       0.0|             3.0|              0.0|
|2020-03-25|2020|    3|   Chad|      0.0|       0.0|             3.0|              0.0|
|2020-03-26|2020|    3|   Chad|      0.0|       0.0|             3.0|              0.0|
|2020-03-27|2020|    3|   Chad| 

106356

# Partitioning data by year

In [19]:
# Partitioning Dataframe with Year
num_partition = df_covid.select('year').distinct().count()
df_covid = df_covid.repartition( num_partition , 'year')

print(df_covid.rdd.getNumPartitions())

# Save output as CSV Files
df_covid.write.csv('output', mode='overwrite', header=True)


2


In [22]:
# Test output
df_output = spark.read.csv('output', header=True)

df_output.show()
df_output.count()

+----------+----+-----+-------+---------+----------+----------------+-----------------+
|      date|year|month|country|new_cases|new_deaths|cumulative_cases|cumalative_deaths|
+----------+----+-----+-------+---------+----------+----------------+-----------------+
|2020-03-19|2020|    3|   Chad|      1.0|       0.0|             1.0|              0.0|
|2020-03-20|2020|    3|   Chad|      0.0|       0.0|             1.0|              0.0|
|2020-03-21|2020|    3|   Chad|      0.0|       0.0|             1.0|              0.0|
|2020-03-22|2020|    3|   Chad|      0.0|       0.0|             1.0|              0.0|
|2020-03-23|2020|    3|   Chad|      0.0|       0.0|             1.0|              0.0|
|2020-03-24|2020|    3|   Chad|      2.0|       0.0|             3.0|              0.0|
|2020-03-25|2020|    3|   Chad|      0.0|       0.0|             3.0|              0.0|
|2020-03-26|2020|    3|   Chad|      0.0|       0.0|             3.0|              0.0|
|2020-03-27|2020|    3|   Chad| 

106356