<a href="https://colab.research.google.com/github/karema9/pyspark-analysis/blob/master/covid_data.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**Explaratory Data Analysis with Pyspark**

Setting up the Pyspark environment

In [None]:
!sudo apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
#Check this site for the latest download link https://www.apache.org/dyn/closer.lua/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!tar xf spark-3.2.1-bin-hadoop3.2.tgz
!pip install -q findspark
!pip install pyspark
!pip install py4j

import os
import sys

import findspark
findspark.init()
findspark.find()

import pyspark

from pyspark.sql import DataFrame, SparkSession
import pyspark.sql.types as T
import pyspark.sql.functions as F

from typing import List

# declare the spark session here
spark = SparkSession.builder.appName("Spark Program").getOrCreate()




Get:1 http://security.ubuntu.com/ubuntu jammy-security InRelease [110 kB]
Get:2 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,626 B]
Hit:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Get:4 http://security.ubuntu.com/ubuntu jammy-security/universe amd64 Packages [1,016 kB]
Get:5 https://ppa.launchpadcontent.net/c2d4u.team/c2d4u4.0+/ubuntu jammy InRelease [18.1 kB]
Get:6 http://security.ubuntu.com/ubuntu jammy-security/main amd64 Packages [1,242 kB]
Hit:7 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:8 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:9 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Get:10 https://ppa.launchpadcontent.net/c2d4u.team/c2d4u4.0+/ubuntu jammy/main Sources [2,244 kB]
Hit:11 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:12 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [119 kB]
Hit:13 http://arc

In [2]:
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import DataFrame, SparkSession

In [3]:
# check if the installation was successful
spark

# 2. **Reading Data**

In [4]:
import requests

uri = "https://raw.githubusercontent.com/owid/covid-19-data/master/public/data/owid-covid-data.csv"
res = requests.get(uri)
data = res.content

# open a csv file and write the data
data_file = open("covid-data.csv", "wb")
data_file.write(data)
data_file.close()

# read the data using spark
df = spark.read.csv("covid-data.csv", header=True, inferSchema=True)

# 3. DataFrames

In [6]:
# display the schema of the dataframe
df.printSchema()

root
 |-- iso_code: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- location: string (nullable = true)
 |-- date: date (nullable = true)
 |-- total_cases: double (nullable = true)
 |-- new_cases: double (nullable = true)
 |-- new_cases_smoothed: double (nullable = true)
 |-- total_deaths: double (nullable = true)
 |-- new_deaths: double (nullable = true)
 |-- new_deaths_smoothed: double (nullable = true)
 |-- total_cases_per_million: double (nullable = true)
 |-- new_cases_per_million: double (nullable = true)
 |-- new_cases_smoothed_per_million: double (nullable = true)
 |-- total_deaths_per_million: double (nullable = true)
 |-- new_deaths_per_million: double (nullable = true)
 |-- new_deaths_smoothed_per_million: double (nullable = true)
 |-- reproduction_rate: double (nullable = true)
 |-- icu_patients: double (nullable = true)
 |-- icu_patients_per_million: double (nullable = true)
 |-- hosp_patients: double (nullable = true)
 |-- hosp_patients_per_million: 

In [11]:
# Converting a column to a date column
df.select(F.to_date(df.date).alias('date')).show()

+----------+
|      date|
+----------+
|2020-01-03|
|2020-01-04|
|2020-01-05|
|2020-01-06|
|2020-01-07|
|2020-01-08|
|2020-01-09|
|2020-01-10|
|2020-01-11|
|2020-01-12|
|2020-01-13|
|2020-01-14|
|2020-01-15|
|2020-01-16|
|2020-01-17|
|2020-01-18|
|2020-01-19|
|2020-01-20|
|2020-01-21|
|2020-01-22|
+----------+
only showing top 20 rows



In [None]:
# don't run this part, this is for new cases
import pyspark
from pyspark.sql import SparkSession, DataFrame

import pyspark.sql.functions as F
import pyspark.sql.types as T

spark = SparkSession.builder.appName("local").getOrCreate()

In [13]:
# Summary stats
df.describe().show()

+-------+-------------------+-----------------+
|summary|        total_cases|     total_deaths|
+-------+-------------------+-----------------+
|  count|             320628|           299086|
|   mean| 6862418.5074447645|86954.36829206315|
| stddev|4.155250615307297E7|443968.2558095905|
|    min|                1.0|              1.0|
|    max|       7.72165753E8|        6981250.0|
+-------+-------------------+-----------------+



In [12]:
# Filtering Dataframes
df.filter(df.location == "Afghanistan").orderBy(F.desc("new_cases")).show()

+--------+---------+-----------+----------+-----------+---------+------------------+------------+----------+-------------------+-----------------------+---------------------+------------------------------+------------------------+----------------------+-------------------------------+-----------------+------------+------------------------+-------------+-------------------------+---------------------+---------------------------------+----------------------+----------------------------------+-----------+---------+------------------------+----------------------+------------------+-------------------------------+-------------+--------------+-----------+------------------+-----------------+-----------------------+--------------+----------------+-------------------------+------------------------------+-----------------------------+-----------------------------------+--------------------------+-------------------------------------+------------------------------+-------------------------------

In [None]:
# Group by function
df.groupBy("location").sum("new_cases").orderBy(F.desc("sum(new_cases)")).show(truncate=False)

+-------------------+--------------+
|location           |sum(new_cases)|
+-------------------+--------------+
|World              |7.71901143E8  |
|High income        |4.26293014E8  |
|Asia               |3.00797149E8  |
|Europe             |2.50114367E8  |
|Upper middle income|2.44569228E8  |
|European Union     |1.84433642E8  |
|North America      |1.24493368E8  |
|United States      |1.03436829E8  |
|China              |9.9318742E7   |
|Lower middle income|9.7504121E7   |
|South America      |6.8847969E7   |
|India              |4.5002138E7   |
|France             |3.899749E7    |
|Germany            |3.8437756E7   |
|Brazil             |3.7722322E7   |
|South Korea        |3.4571873E7   |
|Japan              |3.3803572E7   |
|Italy              |2.6257548E7   |
|United Kingdom     |2.4804243E7   |
|Russia             |2.3124717E7   |
+-------------------+--------------+
only showing top 20 rows



## Pyspark SQL


In [5]:
df.createOrReplaceTempView("covid")

In [6]:
df2 = spark.sql("SELECT * FROM covid")
df2.printSchema()

root
 |-- iso_code: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- location: string (nullable = true)
 |-- date: date (nullable = true)
 |-- total_cases: double (nullable = true)
 |-- new_cases: double (nullable = true)
 |-- new_cases_smoothed: double (nullable = true)
 |-- total_deaths: double (nullable = true)
 |-- new_deaths: double (nullable = true)
 |-- new_deaths_smoothed: double (nullable = true)
 |-- total_cases_per_million: double (nullable = true)
 |-- new_cases_per_million: double (nullable = true)
 |-- new_cases_smoothed_per_million: double (nullable = true)
 |-- total_deaths_per_million: double (nullable = true)
 |-- new_deaths_per_million: double (nullable = true)
 |-- new_deaths_smoothed_per_million: double (nullable = true)
 |-- reproduction_rate: double (nullable = true)
 |-- icu_patients: double (nullable = true)
 |-- icu_patients_per_million: double (nullable = true)
 |-- hosp_patients: double (nullable = true)
 |-- hosp_patients_per_million: 

In [10]:
grouped_df = spark.sql("SELECT location, count(total_cases) AS total from covid GROUP BY location ORDER BY location")
grouped_df.show()

+-------------------+-----+
|           location|total|
+-------------------+-----+
|        Afghanistan| 1366|
|             Africa| 1376|
|            Albania| 1352|
|            Algeria| 1366|
|     American Samoa|  796|
|            Andorra| 1360|
|             Angola| 1333|
|           Anguilla| 1336|
|Antigua and Barbuda| 1348|
|          Argentina| 1358|
|            Armenia| 1361|
|              Aruba| 1344|
|               Asia| 1419|
|          Australia| 1398|
|            Austria| 1364|
|         Azerbaijan| 1363|
|            Bahamas| 1346|
|            Bahrain| 1366|
|         Bangladesh| 1355|
|           Barbados| 1345|
+-------------------+-----+
only showing top 20 rows

