## Preprocessing data with PySpark Dataframes

In this section, we will learn following concepts:
- Documentation
- Reading the DataSet (.csv)
- Reading dataset with column header
- Checking the data type of the columns (DF schema)
- Selecting columns and Indexing
- Adding new columns to the dataframe
- Drop existing columns fromt he dataframe
- Renaming columns in the dataframe

Step-01: Documentation

- [pyspark.sql.DataFrameReader.csv](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.csv.html)
- [pyspark.sql.DataFrameReader.json](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.json.html)
- [pyspark.sql.DataFrameReader.load](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.load.html)
- [pyspark.sql.DataFrameReader.option](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.option.html)
- [pyspark.sql.DataFrameReader.parquet](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.parquet.html)
- [pyspark.sql.DataFrameWriter.csv](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.csv.html)



In [4]:
# import pyspark package
import pyspark
from pyspark.sql import SparkSession

In [5]:
# Create a new Spark session
spark=SparkSession.builder.appName("DataframePartOne").getOrCreate()

In [9]:
# Read a Dataset | Create a Dataframe from a csv dataset
csv_df = spark.read.csv('./notebooks/datasets/country_wise_latest.csv')

In [10]:
csv_df.show()

+-------------------+---------+------+---------+------+---------+----------+-------------+--------------------+
|                _c0|      _c1|   _c2|      _c3|   _c4|      _c5|       _c6|          _c7|                 _c8|
+-------------------+---------+------+---------+------+---------+----------+-------------+--------------------+
|     Country/Region|Confirmed|Deaths|Recovered|Active|New cases|New deaths|New recovered|          WHO Region|
|        Afghanistan|    36263|  1269|    25198|  9796|      106|        10|           18|Eastern Mediterra...|
|            Albania|     4880|   144|     2745|  1991|      117|         6|           63|              Europe|
|            Algeria|    27973|  1163|    18837|  7973|      616|         8|          749|              Africa|
|            Andorra|      907|    52|      803|    52|       10|         0|            0|              Europe|
|             Angola|      950|    41|      242|   667|       18|         1|            0|              

In [6]:
# Read a Dataset | Create a Dataframe from a csv dataset with column header
csv_df = spark.read.option('header','true').csv('./notebooks/datasets/country_wise_latest.csv')

In [14]:
csv_df.show()

+-------------------+---------+------+---------+------+---------+----------+-------------+--------------------+
|     Country/Region|Confirmed|Deaths|Recovered|Active|New cases|New deaths|New recovered|          WHO Region|
+-------------------+---------+------+---------+------+---------+----------+-------------+--------------------+
|        Afghanistan|    36263|  1269|    25198|  9796|      106|        10|           18|Eastern Mediterra...|
|            Albania|     4880|   144|     2745|  1991|      117|         6|           63|              Europe|
|            Algeria|    27973|  1163|    18837|  7973|      616|         8|          749|              Africa|
|            Andorra|      907|    52|      803|    52|       10|         0|            0|              Europe|
|             Angola|      950|    41|      242|   667|       18|         1|            0|              Africa|
|Antigua and Barbuda|       86|     3|       65|    18|        4|         0|            5|            Am

In [16]:
# Another way of reading a csv dataset with a column header
csv_df = spark.read.format("csv").option('header','true').load('./notebooks/datasets/country_wise_latest.csv')

In [17]:
# Display top rows "n" of the dataframe
csv_df.show(2)

+--------------+---------+------+---------+------+---------+----------+-------------+--------------------+
|Country/Region|Confirmed|Deaths|Recovered|Active|New cases|New deaths|New recovered|          WHO Region|
+--------------+---------+------+---------+------+---------+----------+-------------+--------------------+
|   Afghanistan|    36263|  1269|    25198|  9796|      106|        10|           18|Eastern Mediterra...|
|       Albania|     4880|   144|     2745|  1991|      117|         6|           63|              Europe|
+--------------+---------+------+---------+------+---------+----------+-------------+--------------------+
only showing top 2 rows



In [18]:
# Print the schema (datatypes of cols) of the dataframe
csv_df.printSchema()

root
 |-- Country/Region: string (nullable = true)
 |-- Confirmed: string (nullable = true)
 |-- Deaths: string (nullable = true)
 |-- Recovered: string (nullable = true)
 |-- Active: string (nullable = true)
 |-- New cases: string (nullable = true)
 |-- New deaths: string (nullable = true)
 |-- New recovered: string (nullable = true)
 |-- WHO Region: string (nullable = true)



### Adding column in Dataframe

In [11]:
csv_df.withColumn('Total Deaths',csv_df['Deaths']+csv_df['New deaths']).show()

+-------------------+---------+------+---------+------+---------+----------+-------------+--------------------+------------+
|     Country/Region|Confirmed|Deaths|Recovered|Active|New cases|New deaths|New recovered|          WHO Region|Total Deaths|
+-------------------+---------+------+---------+------+---------+----------+-------------+--------------------+------------+
|        Afghanistan|    36263|  1269|    25198|  9796|      106|        10|           18|Eastern Mediterra...|      1279.0|
|            Albania|     4880|   144|     2745|  1991|      117|         6|           63|              Europe|       150.0|
|            Algeria|    27973|  1163|    18837|  7973|      616|         8|          749|              Africa|      1171.0|
|            Andorra|      907|    52|      803|    52|       10|         0|            0|              Europe|        52.0|
|             Angola|      950|    41|      242|   667|       18|         1|            0|              Africa|        42.0|


In [9]:
csv_df.printSchema()

root
 |-- Country/Region: string (nullable = true)
 |-- Confirmed: string (nullable = true)
 |-- Deaths: string (nullable = true)
 |-- Recovered: string (nullable = true)
 |-- Active: string (nullable = true)
 |-- New cases: string (nullable = true)
 |-- New deaths: string (nullable = true)
 |-- New recovered: string (nullable = true)
 |-- WHO Region: string (nullable = true)



### Drop a Column (Delete) from Dataframe

In [13]:
csv_df.drop('Confirmed','Deaths').show()

+-------------------+---------+------+---------+----------+-------------+--------------------+
|     Country/Region|Recovered|Active|New cases|New deaths|New recovered|          WHO Region|
+-------------------+---------+------+---------+----------+-------------+--------------------+
|        Afghanistan|    25198|  9796|      106|        10|           18|Eastern Mediterra...|
|            Albania|     2745|  1991|      117|         6|           63|              Europe|
|            Algeria|    18837|  7973|      616|         8|          749|              Africa|
|            Andorra|      803|    52|       10|         0|            0|              Europe|
|             Angola|      242|   667|       18|         1|            0|              Africa|
|Antigua and Barbuda|       65|    18|        4|         0|            5|            Americas|
|          Argentina|    72575| 91782|     4890|       120|         2057|            Americas|
|            Armenia|    26665| 10014|       73|  

### Drop a row based on Null values

In [16]:
# Delete all the rows which has a null value in any column
csv_df.na.drop().show()

+-------------------+---------+------+---------+------+---------+----------+-------------+--------------------+
|     Country/Region|Confirmed|Deaths|Recovered|Active|New cases|New deaths|New recovered|          WHO Region|
+-------------------+---------+------+---------+------+---------+----------+-------------+--------------------+
|        Afghanistan|    36263|  1269|    25198|  9796|      106|        10|           18|Eastern Mediterra...|
|            Albania|     4880|   144|     2745|  1991|      117|         6|           63|              Europe|
|            Algeria|    27973|  1163|    18837|  7973|      616|         8|          749|              Africa|
|            Andorra|      907|    52|      803|    52|       10|         0|            0|              Europe|
|             Angola|      950|    41|      242|   667|       18|         1|            0|              Africa|
|Antigua and Barbuda|       86|     3|       65|    18|        4|         0|            5|            Am

In [19]:
# "How" can have two values: any OR all
# Delete a row only if all the column values are null
csv_df.na.drop(how='all')

DataFrame[Country/Region: string, Confirmed: string, Deaths: string, Recovered: string, Active: string, New cases: string, New deaths: string, New recovered: string, WHO Region: string]

In [None]:
# Delete a row even if atleast one column values has null value
csv_df.na.drop(how='any')

In [21]:
# Delete a row using threshhold | Delete the row only if you have atleast "3 non-nulls" in the row
csv_df.na.drop(how='any', thresh=3)

DataFrame[Country/Region: string, Confirmed: string, Deaths: string, Recovered: string, Active: string, New cases: string, New deaths: string, New recovered: string, WHO Region: string]

In [31]:
# Delete a row using "subset" | Delete the row based on the null values present in a particular column(s)
csv_df.na.drop(how="any", subset=["Deaths"])

DataFrame[Country/Region: string, Confirmed: string, Deaths: string, Recovered: string, Active: string, New cases: string, New deaths: string, New recovered: string, WHO Region: string]

### Filling the null (or missing) values

In [33]:
# Replace all the null values in the dataframe with some value | Use "fill()"
csv_df.na.fill(10,'Deaths')

DataFrame[Country/Region: string, Confirmed: string, Deaths: string, Recovered: string, Active: string, New cases: string, New deaths: string, New recovered: string, WHO Region: string]

### GlobalTempView

In [None]:
csv_df.createGlobalTempView("sampleViewBin")

In [None]:
# Change the cell type from Python to SQL and run it
select * from sampleViewBin