## Data Wrangling with PySpark, for Beginners


https://towardsdatascience.com/data-wrangling-with-pyspark-for-beginners-3f2197c81511

Content Credit: Aashish Nair

#### About PySpark:
- Parallel processing 
- Some reffer as `lazy evaluation/ computing`- only runs when necessary, unlike Pandas
- Sightly higher learn curve
- Best for big data

#### Sample Data:
https://www.kaggle.com/datasets/mathchi/diabetes-data-set

In [1]:
# Imports 

import pandas as pd
import numpy as np

In [2]:
# Create PySpark Session

from pyspark.sql import SparkSession

# create a spark session
ss = SparkSession.builder.appName('Test').getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/06/20 13:28:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


> ### Load Data

In [3]:
# Load Data pandas

df_pandas = pd.read_csv('diabetes.csv')
df_pandas.head()

Unnamed: 0,Pregnancies,Glucose,BloodPressure,SkinThickness,Insulin,BMI,DiabetesPedigreeFunction,Age,Outcome
0,6,148,72,35,0,33.6,0.627,50,1
1,1,85,66,29,0,26.6,0.351,31,0
2,8,183,64,0,0,23.3,0.672,32,1
3,1,89,66,23,94,28.1,0.167,21,0
4,0,137,40,35,168,43.1,2.288,33,1


#### PySpark Load Data 

In [4]:
df_pyspark = ss.read.csv('diabetes.csv', header=True, inferSchema=True)

In [5]:
# PySpark use the show() function as a trigger (lazy execution)

df_pyspark.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          6|    148|           72|           35|      0|33.6|                   0.627| 50|      1|
|          1|     85|           66|           29|      0|26.6|                   0.351| 31|      0|
|          8|    183|           64|            0|      0|23.3|                   0.672| 32|      1|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|      1|
|          5|    116|           74|            0|      0|25.6|                   0.201| 30|      0|
|          3|     78|           50|           32|     88|31.0|                   0.248| 26|      1|


#### NOTES:
By default, PySpark will treat the header as the first row and all columns as string variables. To prevent PySpark from making any false assumptions, the user must assign values to the header and inferSchema parameters.

> ### Selecting column(s)

In [6]:
# Selecting columns with Pandas
df_pandas[['Age', 'BMI']].head()

Unnamed: 0,Age,BMI
0,50,33.6
1,31,26.6
2,32,23.3
3,21,28.1
4,33,43.1


In [7]:
df_pandas['Glucose'].head()


0    148
1     85
2    183
3     89
4    137
Name: Glucose, dtype: int64

In [8]:
df_pandas[['Glucose']].head()

Unnamed: 0,Glucose
0,148
1,85
2,183
3,89
4,137


### PySpark.select

In [9]:
# Selecting columns with PySpark
df_pyspark.select('Age', 'BMI')

DataFrame[Age: int, BMI: double]

In [10]:
df_pyspark.select('Age')

DataFrame[Age: int]

> ### Describe df

In [11]:
# Pandas

df_pandas.describe()

Unnamed: 0,Pregnancies,Glucose,BloodPressure,SkinThickness,Insulin,BMI,DiabetesPedigreeFunction,Age,Outcome
count,768.0,768.0,768.0,768.0,768.0,768.0,768.0,768.0,768.0
mean,3.845052,120.894531,69.105469,20.536458,79.799479,31.992578,0.471876,33.240885,0.348958
std,3.369578,31.972618,19.355807,15.952218,115.244002,7.88416,0.331329,11.760232,0.476951
min,0.0,0.0,0.0,0.0,0.0,0.0,0.078,21.0,0.0
25%,1.0,99.0,62.0,0.0,0.0,27.3,0.24375,24.0,0.0
50%,3.0,117.0,72.0,23.0,30.5,32.0,0.3725,29.0,0.0
75%,6.0,140.25,80.0,32.0,127.25,36.6,0.62625,41.0,1.0
max,17.0,199.0,122.0,99.0,846.0,67.1,2.42,81.0,1.0


In [12]:
# PySpark

df_pyspark.describe().show()

+-------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------------+------------------+------------------+
|summary|       Pregnancies|          Glucose|     BloodPressure|     SkinThickness|           Insulin|               BMI|DiabetesPedigreeFunction|               Age|           Outcome|
+-------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------------+------------------+------------------+
|  count|               768|              768|               768|               768|               768|               768|                     768|               768|               768|
|   mean|3.8450520833333335|     120.89453125|       69.10546875|20.536458333333332| 79.79947916666667|31.992578124999977|      0.4718763020833327|33.240885416666664|0.3489583333333333|
| stddev|  3.36957806269887|31.97261819513622|19.355807170644777|15.95

### NOTES:
- Pandas includes the percentiles with describe (25th, 50th, 75th), PySpark does not
- PySpark includes categorical descritions unlike Pandas
    
    

> #### Add Colunns

In [13]:
# # Pandas

# col = 'BMI'

# df_pandas["BMI_classes"] = np.where(df_pandas[col] < 18.5, 'underweight', 
#                       (np.where((df_pandas[col] > 18.5) & (df_pandas[col] <  24.9)), 'normal'),
#                       (np.where((df_pandas[col] > 25.9) & (df_pandas[col] <  29.9)), 'preobese'),
#                        (np.where((df_pandas[col] >= 30.0)), 'obese'),
#                        (np.where((df_pandas[col] > 30.0) & (df_pandas[col] <  34.9)), 'class_i'),
#                         (np.where((df_pandas[col] > 35.0) & (df_pandas[col] <  39.9)), 'class_ii'),
#                          np.where((df_pandas[col] >= 40.0), 'class_iii'))                                            

In [14]:
df_pandas['Age_in_10_Years'] = df_pandas['Age'] + 10
df_pandas.head()

Unnamed: 0,Pregnancies,Glucose,BloodPressure,SkinThickness,Insulin,BMI,DiabetesPedigreeFunction,Age,Outcome,Age_in_10_Years
0,6,148,72,35,0,33.6,0.627,50,1,60
1,1,85,66,29,0,26.6,0.351,31,0,41
2,8,183,64,0,0,23.3,0.672,32,1,42
3,1,89,66,23,94,28.1,0.167,21,0,31
4,0,137,40,35,168,43.1,2.288,33,1,43


In [15]:
# PySpark

df_pyspark = df_pyspark.withColumn('age in 10 years', df_pyspark['age']+10)
df_pyspark.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+---------------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|age in 10 years|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+---------------+
|          6|    148|           72|           35|      0|33.6|                   0.627| 50|      1|             60|
|          1|     85|           66|           29|      0|26.6|                   0.351| 31|      0|             41|
|          8|    183|           64|            0|      0|23.3|                   0.672| 32|      1|             42|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|             31|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|      1|             43|
|          5|    116|           74|            0|      0|25.6|          

> ### Remove Columns 

In [16]:
# Pandas
df_pandas = df_pandas.drop('Age_in_10_Years', axis=1)
df_pandas.head(2)

Unnamed: 0,Pregnancies,Glucose,BloodPressure,SkinThickness,Insulin,BMI,DiabetesPedigreeFunction,Age,Outcome
0,6,148,72,35,0,33.6,0.627,50,1
1,1,85,66,29,0,26.6,0.351,31,0


In [17]:
# PySpark

df_pyspark = df_pyspark.drop('age in 10 years')


In [18]:
df_pyspark.limit(5).show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          6|    148|           72|           35|      0|33.6|                   0.627| 50|      1|
|          1|     85|           66|           29|      0|26.6|                   0.351| 31|      0|
|          8|    183|           64|            0|      0|23.3|                   0.672| 32|      1|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|      1|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+



> ### Remove Missing Values 

In [19]:
# Pandas
df_pandas = df_pandas.dropna()
df_pandas.head(2)

Unnamed: 0,Pregnancies,Glucose,BloodPressure,SkinThickness,Insulin,BMI,DiabetesPedigreeFunction,Age,Outcome
0,6,148,72,35,0,33.6,0.627,50,1
1,1,85,66,29,0,26.6,0.351,31,0


In [20]:
# PySpark
df_pyspark = df_pyspark.na.drop()
df_pyspark.limit(5).show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          6|    148|           72|           35|      0|33.6|                   0.627| 50|      1|
|          1|     85|           66|           29|      0|26.6|                   0.351| 31|      0|
|          8|    183|           64|            0|      0|23.3|                   0.672| 32|      1|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|      1|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+



> ### Filtering
Example-- People with pre-obese BMI (25.9 - 29.9)

In [21]:
# Pandas

df_pandas_preobese = df_pandas[(df_pandas['BMI'] > 25.9) & (df_pandas['BMI'] < 29.9)]
df_pandas_preobese.head()

Unnamed: 0,Pregnancies,Glucose,BloodPressure,SkinThickness,Insulin,BMI,DiabetesPedigreeFunction,Age,Outcome
1,1,85,66,29,0,26.6,0.351,31,0
3,1,89,66,23,94,28.1,0.167,21,0
12,10,139,80,0,0,27.1,1.441,57,0
17,7,107,74,0,0,29.6,0.254,31,1
23,9,119,80,35,0,29.0,0.263,29,1


In [22]:
# PySpark Option I

df_pyspark_preobese = df_pyspark[(df_pyspark.BMI > 25.9) & (df_pyspark.BMI < 29.9)]
df_pyspark_preobese.limit(5).show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          1|     85|           66|           29|      0|26.6|                   0.351| 31|      0|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|
|         10|    139|           80|            0|      0|27.1|                   1.441| 57|      0|
|          7|    107|           74|            0|      0|29.6|                   0.254| 31|      1|
|          9|    119|           80|           35|      0|29.0|                   0.263| 29|      1|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+



In [23]:
# PySpark Option II

df_pyspark_preobese_ii = df_pyspark.filter(((df_pyspark['BMI'] > 25.9)) & ((df_pyspark['BMI'] < 29.9)))
df_pyspark_preobese_ii.limit(5).show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          1|     85|           66|           29|      0|26.6|                   0.351| 31|      0|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|
|         10|    139|           80|            0|      0|27.1|                   1.441| 57|      0|
|          7|    107|           74|            0|      0|29.6|                   0.254| 31|      1|
|          9|    119|           80|           35|      0|29.0|                   0.263| 29|      1|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+



> ### Aggregation

In [24]:
# Pandas
df_pandas.groupby('Outcome').mean()

Unnamed: 0_level_0,Pregnancies,Glucose,BloodPressure,SkinThickness,Insulin,BMI,DiabetesPedigreeFunction,Age
Outcome,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
0,3.298,109.98,68.184,19.664,68.792,30.3042,0.429734,31.19
1,4.865672,141.257463,70.824627,22.164179,100.335821,35.142537,0.5505,37.067164


In [25]:
# PySpark

df_pyspark.groupby('Outcome').mean().show(vertical = True)


-RECORD 0--------------------------------------------
 Outcome                       | 1                   
 avg(Pregnancies)              | 4.865671641791045   
 avg(Glucose)                  | 141.25746268656715  
 avg(BloodPressure)            | 70.82462686567165   
 avg(SkinThickness)            | 22.16417910447761   
 avg(Insulin)                  | 100.33582089552239  
 avg(BMI)                      | 35.14253731343278   
 avg(DiabetesPedigreeFunction) | 0.5505              
 avg(Age)                      | 37.06716417910448   
 avg(Outcome)                  | 1.0                 
-RECORD 1--------------------------------------------
 Outcome                       | 0                   
 avg(Pregnancies)              | 3.298               
 avg(Glucose)                  | 109.98              
 avg(BloodPressure)            | 68.184              
 avg(SkinThickness)            | 19.664              
 avg(Insulin)                  | 68.792              
 avg(BMI)                   

> ### Joins

In [26]:
# Pandas

df_pandas2 = pd.read_csv('diabetes.csv')
df_pandas_join = df_pandas.merge(df_pandas2, on='Pregnancies', how='inner')

# preview of data
df_pandas_join.head()

Unnamed: 0,Pregnancies,Glucose_x,BloodPressure_x,SkinThickness_x,Insulin_x,BMI_x,DiabetesPedigreeFunction_x,Age_x,Outcome_x,Glucose_y,BloodPressure_y,SkinThickness_y,Insulin_y,BMI_y,DiabetesPedigreeFunction_y,Age_y,Outcome_y
0,6,148,72,35,0,33.6,0.627,50,1,148,72,35,0,33.6,0.627,50,1
1,6,148,72,35,0,33.6,0.627,50,1,92,92,0,0,19.9,0.188,28,0
2,6,148,72,35,0,33.6,0.627,50,1,144,72,27,228,33.9,0.255,40,0
3,6,148,72,35,0,33.6,0.627,50,1,93,50,30,64,28.7,0.356,23,0
4,6,148,72,35,0,33.6,0.627,50,1,111,64,39,0,34.2,0.26,24,0


In [27]:
# PySpark

# join datasets
df_pyspark2 = ss.read.csv('diabetes.csv', header=True, inferSchema=True)
df_pyspark_join = df_pyspark.join(df_pyspark2, on='Pregnancies', how='inner')

# preview of data
df_pyspark_join.show(5, vertical = True)

-RECORD 0-------------------------
 Pregnancies              | 6     
 Glucose                  | 148   
 BloodPressure            | 72    
 SkinThickness            | 35    
 Insulin                  | 0     
 BMI                      | 33.6  
 DiabetesPedigreeFunction | 0.627 
 Age                      | 50    
 Outcome                  | 1     
 Glucose                  | 190   
 BloodPressure            | 92    
 SkinThickness            | 0     
 Insulin                  | 0     
 BMI                      | 35.5  
 DiabetesPedigreeFunction | 0.278 
 Age                      | 66    
 Outcome                  | 1     
-RECORD 1-------------------------
 Pregnancies              | 6     
 Glucose                  | 148   
 BloodPressure            | 72    
 SkinThickness            | 35    
 Insulin                  | 0     
 BMI                      | 33.6  
 DiabetesPedigreeFunction | 0.627 
 Age                      | 50    
 Outcome                  | 1     
 Glucose            

> ### Converting Joined Pyspark Frame to Pandas DF

In [28]:
df_in_pandas  = df_pyspark_join.toPandas()
df_in_pandas.head()

Unnamed: 0,Pregnancies,Glucose,BloodPressure,SkinThickness,Insulin,BMI,DiabetesPedigreeFunction,Age,Outcome,Glucose.1,BloodPressure.1,SkinThickness.1,Insulin.1,BMI.1,DiabetesPedigreeFunction.1,Age.1,Outcome.1
0,6,148,72,35,0,33.6,0.627,50,1,190,92,0,0,35.5,0.278,66,1
1,6,148,72,35,0,33.6,0.627,50,1,162,62,0,0,24.3,0.178,50,1
2,6,148,72,35,0,33.6,0.627,50,1,80,80,36,0,39.8,0.177,28,0
3,6,148,72,35,0,33.6,0.627,50,1,125,78,31,0,27.6,0.565,49,1
4,6,148,72,35,0,33.6,0.627,50,1,195,70,0,0,30.9,0.328,31,1


## Pros and Cons

- PySpark does not support row indexing
- PySpark bugs are hard to find
- Visualizations not possible with PySpark (convert to Pandas df `.toPandas`)
