# PySpark Processing Pipeline 

This notebook aims to explain major steps in a data processing pipeline. Therefore, most the steps come with some code to check their results (mostly `printSchema()` and `show()`. For an end-to-end pipeline, please use the `pipeline_template` notebook.

### Loading Data

Change the csv path to your correct file which should be stored in a HDFS cluster.

`printSchema()` will display the columns' names and types

In [1]:
%spark2.pyspark

#path to data
hdfs_path = '/tmp/data/'
data_file = 'heart_disease.csv'

data = spark.read.options(header='True',inferSchema='True',delimiter=',').csv("/tmp/data/heart_disease.csv")
data.printSchema()

root
 |-- Age: integer (nullable = true)
 |-- Sex: string (nullable = true)
 |-- ChestPainType: string (nullable = true)
 |-- RestingBP: integer (nullable = true)
 |-- Cholesterol: integer (nullable = true)
 |-- FastingBS: integer (nullable = true)
 |-- RestingECG: string (nullable = true)
 |-- MaxHR: integer (nullable = true)
 |-- ExerciseAngina: string (nullable = true)
 |-- Oldpeak: double (nullable = true)
 |-- ST_Slope: string (nullable = true)
 |-- HeartDisease: integer (nullable = true)



Check some rows with `show()`


In [3]:
%spark2.pyspark
data.show()

+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+
|Age|Sex|ChestPainType|RestingBP|Cholesterol|FastingBS|RestingECG|MaxHR|ExerciseAngina|Oldpeak|ST_Slope|HeartDisease|
+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+
| 40|  M|          ATA|      140|        289|        0|    Normal|  172|             N|    0.0|      Up|           0|
| 49|  F|          NAP|      160|        180|        0|    Normal|  156|             N|    1.0|    Flat|           1|
| 37|  M|          ATA|      130|        283|        0|        ST|   98|             N|    0.0|      Up|           0|
| 48|  F|          ASY|      138|        214|        0|    Normal|  108|             Y|    1.5|    Flat|           1|
| 54|  M|          NAP|      150|        195|        0|    Normal|  122|             N|    0.0|      Up|           0|
| 39|  M|          NAP|      120|        339|        0| 

#### Change all integer columns to float

We need to change all integer columns to float types, otherwise, we will get errors in modeling.

Modify the code below to include all integer columns in `integer_cols` (as strings). `printSchema()` then verifies if everything is double. In my example, `PatientID` is not casted since we will drop it anyway.

In [5]:
%spark2.pyspark

from pyspark.sql.functions import col
from pyspark.sql.types import DoubleType

integer_cols = ['Age', 'RestingBP', 'Cholesterol', 'FastingBS', 'MaxHR', 'HeartDisease']

for c in integer_cols:
    data = data.withColumn(c, col(c).cast(DoubleType()))

data.printSchema()

root
 |-- Age: double (nullable = true)
 |-- Sex: string (nullable = true)
 |-- ChestPainType: string (nullable = true)
 |-- RestingBP: double (nullable = true)
 |-- Cholesterol: double (nullable = true)
 |-- FastingBS: double (nullable = true)
 |-- RestingECG: string (nullable = true)
 |-- MaxHR: double (nullable = true)
 |-- ExerciseAngina: string (nullable = true)
 |-- Oldpeak: double (nullable = true)
 |-- ST_Slope: string (nullable = true)
 |-- HeartDisease: double (nullable = true)



#### Drop unnecessary columns

Drop all unneccessary columns in the paragraph below by including them in the `drop_cols` list. Verify with the result of `printSchema()`. 

In general, ID columns and name columns (first name, last name, middle name, etc) should be dropped.

In [7]:
%spark2.pyspark

drop_cols = ['PatientID']
data_main = data.drop(*drop_cols)
data_main.printSchema()

root
 |-- Age: double (nullable = true)
 |-- Sex: string (nullable = true)
 |-- ChestPainType: string (nullable = true)
 |-- RestingBP: double (nullable = true)
 |-- Cholesterol: double (nullable = true)
 |-- FastingBS: double (nullable = true)
 |-- RestingECG: string (nullable = true)
 |-- MaxHR: double (nullable = true)
 |-- ExerciseAngina: string (nullable = true)
 |-- Oldpeak: double (nullable = true)
 |-- ST_Slope: string (nullable = true)
 |-- HeartDisease: double (nullable = true)



### Train Test Splitting

Change the 0.7 - 0.3 ratio to other as needed. Then, we use `count()` to verify the sizes of the two sets

In [9]:
%spark2.pyspark

data_train, data_test = data_main.randomSplit([0.7, 0.3])

data_train.count(), data_test.count()

(651, 267)


### Processing Pipeline

Modify `string_cols`, `numeric_cols`, and `target` to include the correct columns in each list. The pipeline below will
- Index all string columns (categorical columns) then perform one hot encoder. Missing is dealed with by `handleInvalid='keep'`
- Impute all numeric columns, then standardize them
- Assemble all processed columns in a Vector `features`

In [11]:
%spark2.pyspark
string_cols = ['ChestPainType', 'RestingECG', 'ExerciseAngina', 'ST_Slope']
numeric_cols = ['Age','RestingBP','Cholesterol','FastingBS','MaxHR','Oldpeak']
target = 'HeartDisease'


from pyspark.ml.feature import StringIndexer, OneHotEncoder, Imputer, StandardScaler, VectorAssembler
from pyspark.ml import Pipeline

###one hot encode the categorical columns
encoders = []
for c in string_cols:
    encoders.append(StringIndexer(inputCol=c, outputCol=c+'Index', handleInvalid='keep'))
    encoders.append(OneHotEncoder(inputCol=c+'Index', outputCol=c+'Codes'))

###impute the numeric columns
imputer = Imputer(inputCols = numeric_cols, outputCols = [c+'Imp' for c in numeric_cols], strategy = 'median')

###standardization
num_assembler = VectorAssembler(inputCols=[c+'Imp' for c in numeric_cols], outputCol='imputed')
scaler = StandardScaler(inputCol = 'imputed', outputCol = 'scaled')

###combine results
assembler = VectorAssembler(inputCols=[c+'Codes' for c in string_cols]+['scaled'], outputCol='features')



###build pipeline
pipeline = Pipeline(stages = encoders + [imputer, num_assembler, scaler, assembler])

###train pipeline
pipeline_trained = pipeline.fit(data_train)

#### Transform the training data with the pipeline

In [13]:
%spark2.pyspark

train_prc = pipeline_trained.transform(data_train).select(target,'features')
train_prc.show()

+------------+--------------------+
|HeartDisease|            features|
+------------+--------------------+
|         0.0|(18,[2,6,7,10,12,...|
|         0.0|(18,[2,4,7,10,12,...|
|         0.0|(18,[2,6,7,10,12,...|
|         0.0|(18,[2,4,7,10,12,...|
|         0.0|(18,[3,5,7,10,12,...|
|         0.0|(18,[2,5,7,10,12,...|
|         1.0|(18,[0,4,8,9,12,1...|
|         0.0|(18,[2,4,7,10,12,...|
|         1.0|(18,[0,4,7,9,12,1...|
|         0.0|(18,[2,4,7,10,12,...|
|         1.0|(18,[3,4,7,10,12,...|
|         1.0|(18,[0,4,8,9,12,1...|
|         0.0|(18,[2,4,7,10,12,...|
|         1.0|(18,[0,4,7,10,12,...|
|         0.0|(18,[2,4,7,10,12,...|
|         0.0|(18,[2,5,7,10,12,...|
|         1.0|(18,[3,4,7,9,12,1...|
|         0.0|(18,[0,4,7,10,12,...|
|         0.0|(18,[3,5,7,10,12,...|
|         1.0|(18,[0,4,8,9,12,1...|
+------------+--------------------+
only showing top 20 rows



#### Transform the testing data with the pipeline

In [15]:
%spark2.pyspark

test_prc = pipeline_trained.transform(data_test).select(target,'features')
test_prc.show()

+------------+--------------------+
|HeartDisease|            features|
+------------+--------------------+
|         0.0|(18,[2,4,7,10,12,...|
|         0.0|(18,[1,4,7,10,12,...|
|         0.0|(18,[2,4,7,10,12,...|
|         0.0|(18,[3,6,7,10,12,...|
|         0.0|(18,[0,4,7,10,12,...|
|         1.0|(18,[0,4,8,9,12,1...|
|         1.0|(18,[0,6,8,10,12,...|
|         0.0|(18,[2,6,7,10,12,...|
|         0.0|(18,[2,4,7,10,12,...|
|         0.0|(18,[2,4,7,10,12,...|
|         0.0|(18,[1,4,7,9,12,1...|
|         1.0|(18,[0,4,8,9,12,1...|
|         0.0|(18,[2,5,7,10,12,...|
|         0.0|(18,[1,4,7,11,12,...|
|         1.0|(18,[0,4,7,10,12,...|
|         1.0|(18,[0,4,8,9,12,1...|
|         1.0|(18,[0,4,7,9,12,1...|
|         1.0|(18,[0,4,8,11,12,...|
|         0.0|(18,[2,4,7,10,12,...|
|         0.0|(18,[1,4,7,10,12,...|
+------------+--------------------+
only showing top 20 rows



## Modeling

Training a simple model and check its performance. This part is just for demonstration.

In [17]:
%spark2.pyspark

from pyspark.ml.classification import LogisticRegression

logistic_model = LogisticRegression(featuresCol='features', labelCol=target)

logistic_trained = logistic_model.fit(train_prc)

In [18]:
%spark2.pyspark
train_predicted = logistic_trained.transform(train_prc)
train_predicted.crosstab(target, 'prediction').show()

+-----------------------+---+---+
|HeartDisease_prediction|0.0|1.0|
+-----------------------+---+---+
|                    1.0| 42|321|
|                    0.0|241| 47|
+-----------------------+---+---+



In [19]:
%spark2.pyspark
test_predicted = logistic_trained.transform(test_prc)
test_predicted.crosstab(target, 'prediction').show()

+-----------------------+---+---+
|HeartDisease_prediction|0.0|1.0|
+-----------------------+---+---+
|                    1.0| 20|125|
|                    0.0|100| 22|
+-----------------------+---+---+

