# Build ETL pipeline with PySpark

PySpark is an interface for Apache Spark in Python. It allows you to write Spark applications using Python APIs, and also provides the PySpark shell for interactively analyzing your data in a distributed environment. PySpark supports most of Spark’s features such as Spark SQL, DataFrame, Streaming, MLlib (Machine Learning) and Spark Core.

Refer to this [link](https://spark.apache.org/docs/latest/api/python/) for PySpark Documentation.

The objective for this task is to build an ETL pipeline using PySpark. In this notebook, we will:

- Setup a PySpark development environment
- Read data from flat files into Dataframe
- Perform DataFrame operations using Pandas API on Spark
- Use Spark-SQL to query the dataset
- Persist DataFrame to a PostgreSQL database

[Reference](https://blog.devgenius.io/setup-pyspark-locally-build-your-first-etl-pipeline-with-pyspark-91c3060c6133)

### Install Pyspark

In [2]:
#!pip install pyspark

### Import libraries

In [3]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, SQLContext
import os

### Set Java home

In [4]:
# set Java home
os.environ["JAVA_HOME"] = "C:\Program Files\Java\jdk-18.0.2.1"

### Initiate Spark Context

In [5]:
conf = SparkConf() \
    .setAppName("Example") \
    .setMaster("local") \
    .set("spark.driver.extraClassPath","C:/pyspark/*")

In [6]:
sc = SparkContext.getOrCreate(conf=conf)
spark = SparkSession(sc)

In [7]:
spark

### Read CSV File

In [8]:
df=spark.read.options(delimiter=",", header=True).csv(r"C:\Users\Priscilla Ng\Documents\Data Science Projects\Covid\Datasets\covid-data.csv")
df.show()

+--------+---------+-----------+----------+-----------+---------+------------------+------------+----------+-------------------+-----------------------+---------------------+------------------------------+------------------------+----------------------+-------------------------------+-----------------+------------+------------------------+-------------+-------------------------+---------------------+---------------------------------+----------------------+----------------------------------+-----------+---------+------------------------+----------------------+------------------+-------------------------------+-------------+--------------+-----------+------------------+-----------------+-----------------------+--------------+----------------+-------------------------+------------------------------+-----------------------------+-----------------------------------+--------------------------+-------------------------------------+------------------------------+-------------------------------

In [11]:
df.printSchema()

root
 |-- iso_code: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- location: string (nullable = true)
 |-- date: string (nullable = true)
 |-- total_cases: string (nullable = true)
 |-- new_cases: string (nullable = true)
 |-- new_cases_smoothed: string (nullable = true)
 |-- total_deaths: string (nullable = true)
 |-- new_deaths: string (nullable = true)
 |-- new_deaths_smoothed: string (nullable = true)
 |-- total_cases_per_million: string (nullable = true)
 |-- new_cases_per_million: string (nullable = true)
 |-- new_cases_smoothed_per_million: string (nullable = true)
 |-- total_deaths_per_million: string (nullable = true)
 |-- new_deaths_per_million: string (nullable = true)
 |-- new_deaths_smoothed_per_million: string (nullable = true)
 |-- reproduction_rate: string (nullable = true)
 |-- icu_patients: string (nullable = true)
 |-- icu_patients_per_million: string (nullable = true)
 |-- hosp_patients: string (nullable = true)
 |-- hosp_patients_per_million

### Common dataframe actions

In [8]:
# Filter out location Singapore
singapore = df.filter(df.location == "Singapore").show(truncate=False)

+--------+---------+---------+----------+-----------+---------+------------------+------------+----------+-------------------+-----------------------+---------------------+------------------------------+------------------------+----------------------+-------------------------------+-----------------+------------+------------------------+-------------+-------------------------+---------------------+---------------------------------+----------------------+----------------------------------+-----------+---------+------------------------+----------------------+------------------+-------------------------------+-------------+--------------+-----------+------------------+-----------------+-----------------------+--------------+----------------+-------------------------+------------------------------+-----------------------------+-----------------------------------+--------------------------+-------------------------------------+------------------------------+---------------------------------

In [9]:
# Create new dataframe with specific columns
df1 = df[['continent','location','date','total_cases', 'new_cases', 'total_deaths', 'new_deaths']]
df1.show()

+---------+-----------+----------+-----------+---------+------------+----------+
|continent|   location|      date|total_cases|new_cases|total_deaths|new_deaths|
+---------+-----------+----------+-----------+---------+------------+----------+
|     Asia|Afghanistan|2020-02-24|        5.0|      5.0|        null|      null|
|     Asia|Afghanistan|2020-02-25|        5.0|      0.0|        null|      null|
|     Asia|Afghanistan|2020-02-26|        5.0|      0.0|        null|      null|
|     Asia|Afghanistan|2020-02-27|        5.0|      0.0|        null|      null|
|     Asia|Afghanistan|2020-02-28|        5.0|      0.0|        null|      null|
|     Asia|Afghanistan|2020-02-29|        5.0|      0.0|        null|      null|
|     Asia|Afghanistan|2020-03-01|        5.0|      0.0|        null|      null|
|     Asia|Afghanistan|2020-03-02|        5.0|      0.0|        null|      null|
|     Asia|Afghanistan|2020-03-03|        5.0|      0.0|        null|      null|
|     Asia|Afghanistan|2020-

In [10]:
# Group by continent and perform a count of data
continent_df = df.groupBy('continent').count()
print(continent_df.show())

+-------------+-----+
|    continent|count|
+-------------+-----+
|       Europe|48158|
|       Africa|51463|
|         null|12803|
|North America|34238|
|South America|12365|
|      Oceania|15859|
|         Asia|47959|
+-------------+-----+

None


### Use Spark SQL to query data

In [13]:
# Query df using Spark SQL
df.createOrReplaceTempView("covid")
output = spark.sql("SELECT * from covid")
output.show()

+--------+---------+-----------+----------+-----------+---------+------------------+------------+----------+-------------------+-----------------------+---------------------+------------------------------+------------------------+----------------------+-------------------------------+-----------------+------------+------------------------+-------------+-------------------------+---------------------+---------------------------------+----------------------+----------------------------------+-----------+---------+------------------------+----------------------+------------------+-------------------------------+-------------+--------------+-----------+------------------+-----------------+-----------------------+--------------+----------------+-------------------------+------------------------------+-----------------------------+-----------------------------------+--------------------------+-------------------------------------+------------------------------+-------------------------------

In [14]:
output = spark.sql('SELECT continent, location, date, total_cases, new_cases from covid WHERE continent IS NOT NULL')
output.show()

+---------+-----------+----------+-----------+---------+
|continent|   location|      date|total_cases|new_cases|
+---------+-----------+----------+-----------+---------+
|     Asia|Afghanistan|2020-02-24|        5.0|      5.0|
|     Asia|Afghanistan|2020-02-25|        5.0|      0.0|
|     Asia|Afghanistan|2020-02-26|        5.0|      0.0|
|     Asia|Afghanistan|2020-02-27|        5.0|      0.0|
|     Asia|Afghanistan|2020-02-28|        5.0|      0.0|
|     Asia|Afghanistan|2020-02-29|        5.0|      0.0|
|     Asia|Afghanistan|2020-03-01|        5.0|      0.0|
|     Asia|Afghanistan|2020-03-02|        5.0|      0.0|
|     Asia|Afghanistan|2020-03-03|        5.0|      0.0|
|     Asia|Afghanistan|2020-03-04|        5.0|      0.0|
|     Asia|Afghanistan|2020-03-05|        5.0|      0.0|
|     Asia|Afghanistan|2020-03-06|        5.0|      0.0|
|     Asia|Afghanistan|2020-03-07|        8.0|      3.0|
|     Asia|Afghanistan|2020-03-08|        8.0|      0.0|
|     Asia|Afghanistan|2020-03-

In [18]:
output = spark.sql('SELECT continent, SUM(new_cases) as total FROM covid GROUP BY continent ORDER BY continent DESC')
output.show()

+-------------+------------+
|    continent|       total|
+-------------+------------+
|South America| 6.3976295E7|
|      Oceania| 1.2496728E7|
|North America| 1.1466352E8|
|       Europe|2.31427018E8|
|         Asia|1.85327103E8|
|       Africa| 1.2346896E7|
|         null|2.03122876E9|
+-------------+------------+



### Persist data to database

In [25]:
# Create a table 'pyspark_covid_data' in postgres using Pyspark
dest_tbl = 'public."pyspark_covid_data"'
database = 'Covid'
password = "password" #Input the password accordingly
user = "user" #Input the user accordingly

In [26]:
df.write.mode("overwrite") \
    .format("jdbc") \
    .option("url", f"jdbc:postgresql://localhost:5432/{database}") \
    .option("dbtable", dest_tbl) \
    .option("user", user) \
    .option("password", password) \
    .option("driver",  "org.postgresql.Driver") \
    .save()