# Goal 
Complete Pipeline for handling ML **preprocessing** for categorical *and* continuous data

In [2]:
import findspark
findspark.init()

In [3]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as pyf
from pyspark.sql.types import *
import functools


from pyspark.ml.feature import RobustScaler
from pyspark.ml.feature import VectorAssembler

from pyspark.ml.feature import Imputer
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import StringIndexer

In [4]:
spark = SparkSession\
        .builder\
        .master("local[4]") \
        .appName("churn_pred") \
        .getOrCreate()

In [5]:
spark

In [None]:
my_df = spark.createDataFrame([(0,22,1, 11), (1,5,77, 12), (10,2,3, 21)]
                              , ["x", "y", "z"])

In [None]:
va_x_y = VectorAssembler(inputCols=["x", "y"], outputCol='x_y_features')

In [None]:
o1 = va_x_y.transform(my_df)
o1.show()

In [None]:
va_z_a = VectorAssembler(inputCols=["z", "a"], outputCol="z_a_features")
o2 = va_z_a.transform(o1)
o2.show()

In [None]:
va_final = VectorAssembler(inputCols=["x_y_features", "z_a_features"], outputCol="final")
final_df = va_final.transform(o2)
final_df.show()

<br/>
<br/>
<br/>
<br/>
<br/>

### Now with a real dataset

In [32]:
iris_df = spark.read.format("csv").load('/Users/spurushe/Documents/data-science-world/input_data/iris.csv', header=True, inferSchema=True)

In [37]:
iris_df.show(5)

+---+-----------+----------+-----------+----------+-------+
| Id|SepalLength|SepalWidth|PetalLength|PetalWidth|Species|
+---+-----------+----------+-----------+----------+-------+
|  1|        5.1|       3.5|        1.4|       0.2| setosa|
|  2|        4.9|       3.0|        1.4|       0.2| setosa|
|  3|        4.7|       3.2|        1.3|       0.2| setosa|
|  4|        4.6|       3.1|        1.5|       0.2| setosa|
|  5|        5.0|       3.6|        1.4|       0.2| setosa|
+---+-----------+----------+-----------+----------+-------+
only showing top 5 rows



In [38]:
num_va = VectorAssembler(inputCols=['SepalLength', 'SepalWidth', 'PetalLength', 'PetalWidth'], outputCol='num_features')

In [39]:
num_va.transform(iris_df).show()

+---+-----------+----------+-----------+----------+-------+-----------------+
| Id|SepalLength|SepalWidth|PetalLength|PetalWidth|Species|     num_features|
+---+-----------+----------+-----------+----------+-------+-----------------+
|  1|        5.1|       3.5|        1.4|       0.2| setosa|[5.1,3.5,1.4,0.2]|
|  2|        4.9|       3.0|        1.4|       0.2| setosa|[4.9,3.0,1.4,0.2]|
|  3|        4.7|       3.2|        1.3|       0.2| setosa|[4.7,3.2,1.3,0.2]|
|  4|        4.6|       3.1|        1.5|       0.2| setosa|[4.6,3.1,1.5,0.2]|
|  5|        5.0|       3.6|        1.4|       0.2| setosa|[5.0,3.6,1.4,0.2]|
|  6|        5.4|       3.9|        1.7|       0.4| setosa|[5.4,3.9,1.7,0.4]|
|  7|        4.6|       3.4|        1.4|       0.3| setosa|[4.6,3.4,1.4,0.3]|
|  8|        5.0|       3.4|        1.5|       0.2| setosa|[5.0,3.4,1.5,0.2]|
|  9|        4.4|       2.9|        1.4|       0.2| setosa|[4.4,2.9,1.4,0.2]|
| 10|        4.9|       3.1|        1.5|       0.1| setosa|[4.9,

In [60]:
rob_sc = RobustScaler(inputCol=num_va.getOutputCol(), outputCol='sca_features')

In [45]:
str_indexer = StringIndexer(inputCol='Species', outputCol='species_index')
str_indexer.fit(iris_df).transform(iris_df).show(10)

+---+-----------+----------+-----------+----------+-------+-------------+
| Id|SepalLength|SepalWidth|PetalLength|PetalWidth|Species|species_index|
+---+-----------+----------+-----------+----------+-------+-------------+
|  1|        5.1|       3.5|        1.4|       0.2| setosa|          0.0|
|  2|        4.9|       3.0|        1.4|       0.2| setosa|          0.0|
|  3|        4.7|       3.2|        1.3|       0.2| setosa|          0.0|
|  4|        4.6|       3.1|        1.5|       0.2| setosa|          0.0|
|  5|        5.0|       3.6|        1.4|       0.2| setosa|          0.0|
|  6|        5.4|       3.9|        1.7|       0.4| setosa|          0.0|
|  7|        4.6|       3.4|        1.4|       0.3| setosa|          0.0|
|  8|        5.0|       3.4|        1.5|       0.2| setosa|          0.0|
|  9|        4.4|       2.9|        1.4|       0.2| setosa|          0.0|
| 10|        4.9|       3.1|        1.5|       0.1| setosa|          0.0|
+---+-----------+----------+----------

In [48]:
ohe = OneHotEncoder(inputCol= str_indexer.getOutputCol(), outputCol="ohe_features")

In [49]:
from pyspark.ml import Pipeline

In [56]:
# Putting it all together 
num_cat_va = VectorAssembler(inputCols=[
    num_va.getOutputCol() # robust scaled numerical columns 
    ,ohe.getOutputCol() # encoded categorical columns
],outputCol='final_features')

In [62]:
p = Pipeline(stages=[
    num_va
    ,rob_sc
    ,str_indexer
    ,ohe
    ,num_cat_va])

In [None]:
last_stage = p.getStages()[len(p.getStages()) - 1]

In [None]:
# selecting only the required columns for the ML algorithm 
input_features = p.fit(iris_df)\
                  .transform(iris_df)\
                  .select([last_stage.getOutputCol(), "species_index"]) #X, y

NOW PLUG INTO ML ALGO