# Exploring PySpark: A Hands-On Project with Spark SQL and DataFrames for COVID-19 Analysis

Our primary dataset of interest is the PatientInfo CSV file, a rich source of information regarding COVID-19 patients. Leveraging the distributed computing capabilities of PySpark and the advanced querying features of Spark SQL, coupled with the versatility of DataFrames, participants will gain practical insights into processing and analyzing large-scale datasets efficiently.

Throughout these labs, attendees will immerse themselves in PySpark's functionalities, using Spark SQL for expressive querying and DataFrames for structured data manipulation. The overarching goal is to equip participants with the skills necessary to navigate, analyze, and glean insights from real-world datasets, reinforcing their proficiency in PySpark, Spark SQL, and DataFrame operations.

Upon completion of this practical PySpark project, participants will not only have refined their PySpark skills but will have also acquired valuable experience in leveraging big data tools for impactful data analysis, particularly within the context of the COVID-19 dataset.

## PatientInfo.csv

**patient_id**
the ID of the patient

**sex**
the sex of the patient

**age**
the age of the patient

**country**
the country of the patient

**province**
the province of the patient

**city**
the city of the patient

**infection_case**
the case of infection

**infected_by**
the ID of who infected the patient


**contact_number**
the number of contacts with people

**symptom_onset_date**
the date of symptom onset

**confirmed_date**
the date of being confirmed

**released_date**
the date of being released

**deceased_date**
the date of being deceased

**state**
isolated / released / deceased

### Import the pyspark and check it's version

In [1]:
# Download Java Virtual Machine (JVM)
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [2]:
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

In [3]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [4]:
!ls

PatientInfo.csv			 spark-3.1.1-bin-hadoop3.2.tgz.2
sample_data			 spark-3.1.1-bin-hadoop3.2.tgz.3
spark-3.1.1-bin-hadoop3.2	 spark-3.1.1-bin-hadoop3.2.tgz.4
spark-3.1.1-bin-hadoop3.2.tgz	 spark-3.1.1-bin-hadoop3.2.tgz.5
spark-3.1.1-bin-hadoop3.2.tgz.1  spark-3.1.1-bin-hadoop3.2.tgz.6


### Import and create SparkSession

In [5]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark

In [6]:
sc = spark.sparkContext

### Load the PatientInfo.csv file and show the first 5 rows

In [54]:
dfPatient = spark.read.csv('PatientInfo.csv', header=True, inferSchema= True)

In [55]:
dfPatient.cache();

In [56]:
dfPatient.show(5)

+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|patient_id|   sex|age|country|province|       city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|
+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|1000000001|  male|50s|  Korea|   Seoul| Gangseo-gu|     overseas inflow|       null|            75|        2020-01-22|    2020-01-23|   2020-02-05|         null|released|
|1000000002|  male|30s|  Korea|   Seoul|Jungnang-gu|     overseas inflow|       null|            31|              null|    2020-01-30|   2020-03-02|         null|released|
|1000000003|  male|50s|  Korea|   Seoul|  Jongno-gu|contact with patient| 2002000001|            17|              null|    2020-01-30|   202

### Display the schema of the dataset

In [57]:
dfPatient.printSchema()

root
 |-- patient_id: long (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: string (nullable = true)
 |-- country: string (nullable = true)
 |-- province: string (nullable = true)
 |-- city: string (nullable = true)
 |-- infection_case: string (nullable = true)
 |-- infected_by: string (nullable = true)
 |-- contact_number: string (nullable = true)
 |-- symptom_onset_date: string (nullable = true)
 |-- confirmed_date: string (nullable = true)
 |-- released_date: string (nullable = true)
 |-- deceased_date: string (nullable = true)
 |-- state: string (nullable = true)



### Display the statistical summary

In [58]:
dfPDescription = dfPatient.describe()
dfPDescription.show()

+-------+--------------------+------+----+----------+--------+--------------+--------------------+--------------------+--------------------+------------------+--------------+-------------+-------------+--------+
|summary|          patient_id|   sex| age|   country|province|          city|      infection_case|         infected_by|      contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|
+-------+--------------------+------+----+----------+--------+--------------+--------------------+--------------------+--------------------+------------------+--------------+-------------+-------------+--------+
|  count|                5165|  4043|3785|      5165|    5165|          5071|                4246|                1346|                 791|               690|          5162|         1587|           66|    5165|
|   mean|2.8636345618679576E9|  null|null|      null|    null|          null|                null|2.2845944015643125E9|1.6772572523506988E7|            

### Using the state column.
### How many people survived (released), and how many didn't survive (isolated/deceased)?

In [59]:
# survived
dfSurvived = dfPatient.filter(dfPatient["state"] == "released")
# did not survived
dfNotSurvived = dfPatient.filter((dfPatient["state"] == "isolated") | (dfPatient["state"] == "deceased"))

# print count of both
print (f"Number of survived people : {dfSurvived.count()}")
print (f"Number of unsurvived people : {dfNotSurvived.count()}")

Number of survived people : 2929
Number of unsurvived people : 2236


### Display the number of null values in each column

In [60]:
dfPCols = dfPatient.columns

for i in dfPCols:
  print(f"Nulls in {i} = {dfPatient.where(dfPatient[i].isNull()).count()}")

Nulls in patient_id = 0
Nulls in sex = 1122
Nulls in age = 1380
Nulls in country = 0
Nulls in province = 0
Nulls in city = 94
Nulls in infection_case = 919
Nulls in infected_by = 3819
Nulls in contact_number = 4374
Nulls in symptom_onset_date = 4475
Nulls in confirmed_date = 3
Nulls in released_date = 3578
Nulls in deceased_date = 5099
Nulls in state = 0


## Data preprocessing

### Fill the nulls in the deceased_date with the released_date.

In [61]:
from pyspark.sql.functions import coalesce, col

In [62]:
dfPatientProcessed1 = dfPatient.withColumn("deceased_date", coalesce(dfPatient["deceased_date"], dfPatient["released_date"]))
dfPatientProcessed1.show(5)

+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|patient_id|   sex|age|country|province|       city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|
+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|1000000001|  male|50s|  Korea|   Seoul| Gangseo-gu|     overseas inflow|       null|            75|        2020-01-22|    2020-01-23|   2020-02-05|   2020-02-05|released|
|1000000002|  male|30s|  Korea|   Seoul|Jungnang-gu|     overseas inflow|       null|            31|              null|    2020-01-30|   2020-03-02|   2020-03-02|released|
|1000000003|  male|50s|  Korea|   Seoul|  Jongno-gu|contact with patient| 2002000001|            17|              null|    2020-01-30|   202

### Add a column named no_days which is difference between the deceased_date and the confirmed_date then show the top 5 rows. Print the schema.


In [63]:
from pyspark.sql.types import DateType, IntegerType, BooleanType

In [64]:
# casting the datatype of date columns (deceased_date - confirmed_date)

dfPatientProcessed2 = dfPatientProcessed1.withColumn("deceased_date", dfPatientProcessed1["deceased_date"].cast(DateType()))
dfPatientProcessed2 = dfPatientProcessed1.withColumn("confirmed_date", dfPatientProcessed1["confirmed_date"].cast(DateType()))


In [65]:
dfPatientProcessed2.printSchema()

root
 |-- patient_id: long (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: string (nullable = true)
 |-- country: string (nullable = true)
 |-- province: string (nullable = true)
 |-- city: string (nullable = true)
 |-- infection_case: string (nullable = true)
 |-- infected_by: string (nullable = true)
 |-- contact_number: string (nullable = true)
 |-- symptom_onset_date: string (nullable = true)
 |-- confirmed_date: date (nullable = true)
 |-- released_date: string (nullable = true)
 |-- deceased_date: string (nullable = true)
 |-- state: string (nullable = true)



In [66]:
from pyspark.sql.functions import datediff

In [67]:
# Adding no_days columns

dfPatientProcessed3 = dfPatientProcessed2.withColumn("no_days", datediff(col("deceased_date"), col("confirmed_date")))

In [68]:
dfPatientProcessed3.show(5)

+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+
|patient_id|   sex|age|country|province|       city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|no_days|
+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+
|1000000001|  male|50s|  Korea|   Seoul| Gangseo-gu|     overseas inflow|       null|            75|        2020-01-22|    2020-01-23|   2020-02-05|   2020-02-05|released|     13|
|1000000002|  male|30s|  Korea|   Seoul|Jungnang-gu|     overseas inflow|       null|            31|              null|    2020-01-30|   2020-03-02|   2020-03-02|released|     32|
|1000000003|  male|50s|  Korea|   Seoul|  Jongno-gu|contact with patient| 2002000001|            17|

### Add a is_male column if male then it should yield true, else then False

In [69]:
dfPatientProcessed4 = dfPatientProcessed3.dropna(subset=["sex"])

dfPatientProcessed4.groupBy("sex").count().show()

+------+-----+
|   sex|count|
+------+-----+
|female| 2218|
|  male| 1825|
+------+-----+



In [70]:
dfPatientProcessed4 = spark.sql("""
  SELECT *,
      CASE WHEN sex == "male" THEN True
      ELSE False
      END AS is_male
  FROM patient_table
""")

dfPatientProcessed4.show(5)

+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+
|patient_id|   sex|age|country|province|       city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|is_male|
+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+
|1000000001|  male|50s|  Korea|   Seoul| Gangseo-gu|     overseas inflow|       null|            75|        2020-01-22|    2020-01-23|   2020-02-05|         null|released|   true|
|1000000002|  male|30s|  Korea|   Seoul|Jungnang-gu|     overseas inflow|       null|            31|              null|    2020-01-30|   2020-03-02|         null|released|   true|
|1000000003|  male|50s|  Korea|   Seoul|  Jongno-gu|contact with patient| 2002000001|            17|

### Add a is_dead column if patient state is not released then it should yield true, else then False

- Use <b>UDF</b> to perform this task.
- However, UDF is not recommended there is no built in function can do the required operation.
- UDF is slower than built in functions.

In [72]:
from pyspark.sql.functions import udf
from pyspark.sql.types import BooleanType

In [73]:
def is_dead_func(text):
    if text == 'released':
        return True
    else:
        return False


deadUDF = udf(is_dead_func, BooleanType())
df_6 = dfPatientProcessed4.withColumn('is_dead', deadUDF(dfPatientProcessed4['state']))

In [74]:
dfPatientProcessed4.show(5)

+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+
|patient_id|   sex|age|country|province|       city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|is_male|
+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+
|1000000001|  male|50s|  Korea|   Seoul| Gangseo-gu|     overseas inflow|       null|            75|        2020-01-22|    2020-01-23|   2020-02-05|         null|released|   true|
|1000000002|  male|30s|  Korea|   Seoul|Jungnang-gu|     overseas inflow|       null|            31|              null|    2020-01-30|   2020-03-02|         null|released|   true|
|1000000003|  male|50s|  Korea|   Seoul|  Jongno-gu|contact with patient| 2002000001|            17|

### Change the ages to bins from 10s, 0s, 10s, 20s,.etc to 0,10, 20

In [75]:
dfPatientProcessed4.groupBy("age").count().show()

+----+-----+
| age|count|
+----+-----+
|  0s|   66|
| 10s|  178|
|null| 1380|
| 40s|  518|
| 80s|  170|
| 70s|  232|
| 90s|   49|
| 20s|  899|
| 50s|  667|
| 60s|  482|
|100s|    1|
| 30s|  523|
+----+-----+



In [76]:
from pyspark.sql.functions import when, split

In [77]:
dfPatientProcessed5 = dfPatientProcessed4.withColumn("age",
        when(dfPatientProcessed4["age"].isNotNull(), split(dfPatientProcessed4["age"],"s")[0]).otherwise(dfPatientProcessed4["age"])
)

In [78]:
dfPatientProcessed5.show(5)

+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+
|patient_id|   sex|age|country|province|       city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|is_male|
+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+
|1000000001|  male| 50|  Korea|   Seoul| Gangseo-gu|     overseas inflow|       null|            75|        2020-01-22|    2020-01-23|   2020-02-05|         null|released|   true|
|1000000002|  male| 30|  Korea|   Seoul|Jungnang-gu|     overseas inflow|       null|            31|              null|    2020-01-30|   2020-03-02|         null|released|   true|
|1000000003|  male| 50|  Korea|   Seoul|  Jongno-gu|contact with patient| 2002000001|            17|

### Change age, and no_days  to be typecasted as Double

In [29]:
from pyspark.sql.types import DoubleType

In [30]:
dfPatientProcessed5 = dfPatientProcessed5.withColumn("age", dfPatientProcessed5["age"].cast(DoubleType()))
dfPatientProcessed5 = dfPatientProcessed5.withColumn("no_days", dfPatientProcessed5["no_days"].cast(DoubleType()))

dfPatientProcessed5.printSchema()

root
 |-- patient_id: long (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: double (nullable = true)
 |-- country: string (nullable = true)
 |-- province: string (nullable = true)
 |-- city: string (nullable = true)
 |-- infection_case: string (nullable = true)
 |-- infected_by: string (nullable = true)
 |-- contact_number: string (nullable = true)
 |-- symptom_onset_date: string (nullable = true)
 |-- confirmed_date: date (nullable = true)
 |-- released_date: string (nullable = true)
 |-- deceased_date: string (nullable = true)
 |-- state: string (nullable = true)
 |-- no_days: double (nullable = true)
 |-- is_male: boolean (nullable = false)



### Drop the columns
["patient_id","sex","infected_by","contact_number","released_date","state",
"symptom_onset_date","confirmed_date","deceased_date","country","no_days",
"city","infection_case"]

In [31]:
dfPatientProcessed6 = dfPatientProcessed5.drop("patient_id","sex","infected_by","contact_number","released_date","state",
"symptom_onset_date","confirmed_date","deceased_date","country","no_days",
"city","infection_case")

### Recount the number of nulls now

In [32]:
dfPCols = dfPatientProcessed6.columns

for i in dfPCols:
  print(f"Nulls in {i} = {dfPatientProcessed6.where(dfPatientProcessed6[i].isNull()).count()}")

Nulls in age = 1380
Nulls in province = 0
Nulls in is_male = 0


## Now do the same but using SQL select statement

### From the original Patient DataFrame, Create a temporary view (table).

In [33]:
dfPatient.createOrReplaceTempView("patient_table")

### Use SELECT statement to select all columns from the dataframe and show the output.

In [34]:
dfPatientSQLProcessed1 = spark.sql("""
  SELECT * FROM patient_table
""")

dfPatientSQLProcessed1.show()

+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|patient_id|   sex|age|country|province|        city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|
+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|1000000001|  male|50s|  Korea|   Seoul|  Gangseo-gu|     overseas inflow|       null|            75|        2020-01-22|    2020-01-23|   2020-02-05|         null|released|
|1000000002|  male|30s|  Korea|   Seoul| Jungnang-gu|     overseas inflow|       null|            31|              null|    2020-01-30|   2020-03-02|         null|released|
|1000000003|  male|50s|  Korea|   Seoul|   Jongno-gu|contact with patient| 2002000001|            17|              null|    2020-01-30|

### *Using SQL commands*, limit the output to only 5 rows

In [35]:
dfPatientSQLProcessed2 = spark.sql("""
  SELECT * FROM patient_table LIMIT 5
""")

dfPatientSQLProcessed2.show()

+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|patient_id|   sex|age|country|province|       city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|
+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|1000000001|  male|50s|  Korea|   Seoul| Gangseo-gu|     overseas inflow|       null|            75|        2020-01-22|    2020-01-23|   2020-02-05|         null|released|
|1000000002|  male|30s|  Korea|   Seoul|Jungnang-gu|     overseas inflow|       null|            31|              null|    2020-01-30|   2020-03-02|         null|released|
|1000000003|  male|50s|  Korea|   Seoul|  Jongno-gu|contact with patient| 2002000001|            17|              null|    2020-01-30|   202

### Select the count of males and females in the dataset

In [36]:
dfPatientSQLProcessed3 = spark.sql("""
  SELECT count(sex) FROM patient_table WHERE sex IN ("male", "female")
""")

dfPatientSQLProcessed3.show()

+----------+
|count(sex)|
+----------+
|      4043|
+----------+



### How many people did survive, and how many didn't?

In [38]:
dfPatientSQLProcessed4 = spark.sql("""
  SELECT state, count(state)
  FROM patient_table WHERE state IN ("released", "isolated")
  GROUP BY state
""")

dfPatientSQLProcessed4.show()

+--------+------------+
|   state|count(state)|
+--------+------------+
|isolated|        2158|
|released|        2929|
+--------+------------+



### Now, let's perform some preprocessing using SQL:
1. Convert *age* column to double after removing the 's' at the end -- *hint: check SUBSTRING method*
2. Select only the following columns: `['sex', 'age', 'province', 'state']`
3. Store the result of the query in a new dataframe

In [79]:
dfPatientSQLProcessed5 = spark.sql("""
  SELECT sex, substring(age, 1, length(age)-1) AS age, province, state
  FROM patient_table
""")

dfPatientSQLProcessed5.show()

+------+---+--------+--------+
|   sex|age|province|   state|
+------+---+--------+--------+
|  male| 50|   Seoul|released|
|  male| 30|   Seoul|released|
|  male| 50|   Seoul|released|
|  male| 20|   Seoul|released|
|female| 20|   Seoul|released|
|female| 50|   Seoul|released|
|  male| 20|   Seoul|released|
|  male| 20|   Seoul|released|
|  male| 30|   Seoul|released|
|female| 60|   Seoul|released|
|female| 50|   Seoul|released|
|  male| 20|   Seoul|released|
|  male| 80|   Seoul|deceased|
|female| 60|   Seoul|released|
|  male| 70|   Seoul|released|
|  male| 70|   Seoul|released|
|  male| 70|   Seoul|released|
|  male| 20|   Seoul|released|
|female| 70|   Seoul|released|
|female| 70|   Seoul|released|
+------+---+--------+--------+
only showing top 20 rows

