# Spark Machine Learning Pipeline

This coursework is about implementing and applying Spark Machine Learning Pipelines, and evaluating them with respect to preprocessing, parametrisation, and scaling.

## 1. Data set initial analysis and summary of pipeline task. (20%)

### 1.1. Summary of machine learning pipeline
Step 1.  
Step 2.  
Step 3.  
Step 4.  


### 1.2. Loading data to RDD and first preprocessing

In [1]:
# load dependencies
import numpy as np
import pandas as pd

type_dict={'ncodpers':np.int32,
           'ind_ahor_fin_ult1':np.uint8, 'ind_aval_fin_ult1':np.uint8, 
           'ind_cco_fin_ult1':np.uint8,'ind_cder_fin_ult1':np.uint8,
           'ind_cno_fin_ult1':np.uint8,'ind_ctju_fin_ult1':np.uint8,'ind_ctma_fin_ult1':np.uint8,
           'ind_ctop_fin_ult1':np.uint8,'ind_ctpp_fin_ult1':np.uint8,'ind_deco_fin_ult1':np.uint8,
           'ind_deme_fin_ult1':np.uint8,'ind_dela_fin_ult1':np.uint8,'ind_ecue_fin_ult1':np.uint8,
           'ind_fond_fin_ult1':np.uint8,'ind_hip_fin_ult1':np.uint8,'ind_plan_fin_ult1':np.uint8,
           'ind_pres_fin_ult1':np.uint8,'ind_reca_fin_ult1':np.uint8,'ind_tjcr_fin_ult1':np.uint8,
           'ind_valo_fin_ult1':np.uint8,'ind_viv_fin_ult1':np.uint8, 'ind_recibo_ult1':np.uint8 }

# load data from server into dataframe (only loading the top 1,000,000 for demonstration purpose)
df=pd.read_csv("/data/tempstore/santander-products/train_ver2.csv", nrows=1000000, dtype=type_dict)


  interactivity=interactivity, compiler=compiler, result=result)


### 1.3. Descriptive Statistics

In [2]:
df.describe()

Unnamed: 0,ncodpers,ind_nuevo,indrel,indrel_1mes,tipodom,cod_prov,ind_actividad_cliente,renta,ind_ahor_fin_ult1,ind_aval_fin_ult1,...,ind_hip_fin_ult1,ind_plan_fin_ult1,ind_pres_fin_ult1,ind_reca_fin_ult1,ind_tjcr_fin_ult1,ind_valo_fin_ult1,ind_viv_fin_ult1,ind_nomina_ult1,ind_nom_pens_ult1,ind_recibo_ult1
count,1000000.0,989218.0,989218.0,989218.0,989218.0,982266.0,989218.0,824817.0,1000000.0,1000000.0,...,1000000.0,1000000.0,1000000.0,1000000.0,1000000.0,1000000.0,1000000.0,994598.0,994598.0,1000000.0
mean,690596.7,0.000489,1.109074,1.000085,1.0,26.852131,0.564971,139646.2,0.000177,3.9e-05,...,0.009982,0.014553,0.004661,0.072581,0.066084,0.039378,0.006442,0.071629,0.079543,0.166275
std,404408.4,0.022114,3.267624,0.012954,0.0,12.422924,0.495761,238985.8,0.013303,0.006245,...,0.09941,0.119755,0.068112,0.259448,0.248429,0.194493,0.080003,0.257873,0.270584,0.372327
min,15889.0,0.0,1.0,1.0,1.0,1.0,0.0,1202.73,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
25%,336411.0,0.0,1.0,1.0,1.0,18.0,0.0,71571.84,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
50%,664476.0,0.0,1.0,1.0,1.0,28.0,1.0,106651.9,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
75%,1074511.0,0.0,1.0,1.0,1.0,33.0,1.0,163432.5,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
max,1379131.0,1.0,99.0,3.0,1.0,52.0,1.0,28894400.0,1.0,1.0,...,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0


### 1.4. Data Cleaning

In [3]:
# keep only unique id
unique_ids = pd.Series(df["ncodpers"].unique())
df = df[df.ncodpers.isin(unique_ids)]  
df.count() # number of instances

fecha_dato               1000000
ncodpers                 1000000
ind_empleado              989218
pais_residencia           989218
sexo                      989214
age                      1000000
fecha_alta                989218
ind_nuevo                 989218
antiguedad               1000000
indrel                    989218
ult_fec_cli_1t              1101
indrel_1mes               989218
tiprel_1mes               989218
indresi                   989218
indext                    989218
conyuemp                     178
canal_entrada             989139
indfall                   989218
tipodom                   989218
cod_prov                  982266
nomprov                   982266
ind_actividad_cliente     989218
renta                     824817
segmento                  989105
ind_ahor_fin_ult1        1000000
ind_aval_fin_ult1        1000000
ind_cco_fin_ult1         1000000
ind_cder_fin_ult1        1000000
ind_cno_fin_ult1         1000000
ind_ctju_fin_ult1        1000000
ind_ctma_f

In [4]:
# eliminate mostly empty columns and redundant variables
df.drop(["tipodom","cod_prov", "ult_fec_cli_1t","conyuemp"],axis=1,inplace=True)

In [5]:
# transform to numeric and set missing values to nan
df['age']=pd.to_numeric(df.age, errors='coerce')
df['ind_nuevo']=pd.to_numeric(df.ind_nuevo, errors='coerce')
df['antiguedad']=pd.to_numeric(df.antiguedad, errors='coerce')
df['indrel']=pd.to_numeric(df.indrel, errors='coerce')
df['renta']=pd.to_numeric(df.renta, errors='coerce')
df['indrel_1mes']=pd.to_numeric(df.indrel_1mes, errors='coerce')

In [6]:
# Remove age outliers and nan from age variable
df.loc[df.age < 18,"age"]  = df.loc[(df.age >= 18) & (df.age <= 30),"age"].mean(skipna=True) # replace outlier con mean
df.loc[df.age > 100,"age"] = df.loc[(df.age >= 30) & (df.age <= 100),"age"].mean(skipna=True) # replace outlier con mean
df["age"].fillna(df["age"].mean(),inplace=True) # replace nan with mean
df["age"] = df["age"].astype(int)

In [7]:
# transfor dates to datetime datatype
df["fecha_dato"] = pd.to_datetime(df["fecha_dato"],format="%Y-%m-%d")
df["fecha_alta"] = pd.to_datetime(df["fecha_alta"],format="%Y-%m-%d")
df["fecha_dato"].unique()

array(['2015-01-28T00:00:00.000000000', '2015-02-28T00:00:00.000000000'], dtype='datetime64[ns]')

In [8]:
# fill datetime missing values
dates=df.loc[:,"fecha_alta"].sort_values().reset_index()
median_date = int(np.median(dates.index.values))
df.loc[df.fecha_alta.isnull(),"fecha_alta"] = dates.loc[median_date,"fecha_alta"] 

In [9]:
# check all missing values
df.isnull().any()

fecha_dato               False
ncodpers                 False
ind_empleado              True
pais_residencia           True
sexo                      True
age                      False
fecha_alta               False
ind_nuevo                 True
antiguedad                True
indrel                    True
indrel_1mes               True
tiprel_1mes               True
indresi                   True
indext                    True
canal_entrada             True
indfall                   True
nomprov                   True
ind_actividad_cliente     True
renta                     True
segmento                  True
ind_ahor_fin_ult1        False
ind_aval_fin_ult1        False
ind_cco_fin_ult1         False
ind_cder_fin_ult1        False
ind_cno_fin_ult1         False
ind_ctju_fin_ult1        False
ind_ctma_fin_ult1        False
ind_ctop_fin_ult1        False
ind_ctpp_fin_ult1        False
ind_deco_fin_ult1        False
ind_deme_fin_ult1        False
ind_dela_fin_ult1        False
ind_ecue

In [10]:
# Replace missing values in target features with 0
# target features = boolean indicator as to whether or not that product was owned that month
df.loc[df.ind_nomina_ult1.isnull(), "ind_nomina_ult1"] = 0
df.loc[df.ind_nom_pens_ult1.isnull(), "ind_nom_pens_ult1"] = 0

In [11]:
# Replace other missing values
df.loc[df["ind_nuevo"].isnull(),"ind_nuevo"] = 1                   # new customers id '1'
df.loc[df.antiguedad.isnull(),"antiguedad"] = df.antiguedad.min()
df.loc[df.antiguedad <0, "antiguedad"] = 0                         # new customer antiguedad '0'
df.loc[df.indrel.isnull(),"indrel"] = 1 
df.loc[df.ind_actividad_cliente.isnull(),"ind_actividad_cliente"] = \
df["ind_actividad_cliente"].median()                   # fill in customer activity missing
df.loc[df.nomprov.isnull(),"nomprov"] = "UNKNOWN"      # known values for city of residence
df.loc[df.indfall.isnull(),"indfall"] = "N"            # missing deceased index set to N
df.loc[df.tiprel_1mes.isnull(),"tiprel_1mes"] = "A"    # customer status, if missing = active 
df.tiprel_1mes = df.tiprel_1mes.astype("category")     # customer status as categorical

In [12]:
# Customer type normalization as categorical variable 
map_dict = { 1.0:"1", "1.0":"1", "1":"1", "3.0":"3", "P":"P", 3.0:"3", 2.0:"2", "3":"3", "2.0":"2", "4.0":"4", "4":"4", "2":"2"}
df.indrel_1mes.fillna("P",inplace=True)
df.indrel_1mes = df.indrel_1mes.apply(lambda x: map_dict.get(x,x))
df.indrel_1mes = df.indrel_1mes.astype("category")

In [13]:
# remove rows with any nan value left
df = df.dropna(subset=['renta', 'segmento', 'canal_entrada', 'ind_empleado', 
                       'pais_residencia', 'indresi', 'indresi', 'sexo'], how='any')

In [14]:
# check all missing values are gone
df.isnull().any()

fecha_dato               False
ncodpers                 False
ind_empleado             False
pais_residencia          False
sexo                     False
age                      False
fecha_alta               False
ind_nuevo                False
antiguedad               False
indrel                   False
indrel_1mes              False
tiprel_1mes              False
indresi                  False
indext                   False
canal_entrada            False
indfall                  False
nomprov                  False
ind_actividad_cliente    False
renta                    False
segmento                 False
ind_ahor_fin_ult1        False
ind_aval_fin_ult1        False
ind_cco_fin_ult1         False
ind_cder_fin_ult1        False
ind_cno_fin_ult1         False
ind_ctju_fin_ult1        False
ind_ctma_fin_ult1        False
ind_ctop_fin_ult1        False
ind_ctpp_fin_ult1        False
ind_deco_fin_ult1        False
ind_deme_fin_ult1        False
ind_dela_fin_ult1        False
ind_ecue

In [15]:
df.count() # number of instances

fecha_dato               824742
ncodpers                 824742
ind_empleado             824742
pais_residencia          824742
sexo                     824742
age                      824742
fecha_alta               824742
ind_nuevo                824742
antiguedad               824742
indrel                   824742
indrel_1mes              824742
tiprel_1mes              824742
indresi                  824742
indext                   824742
canal_entrada            824742
indfall                  824742
nomprov                  824742
ind_actividad_cliente    824742
renta                    824742
segmento                 824742
ind_ahor_fin_ult1        824742
ind_aval_fin_ult1        824742
ind_cco_fin_ult1         824742
ind_cder_fin_ult1        824742
ind_cno_fin_ult1         824742
ind_ctju_fin_ult1        824742
ind_ctma_fin_ult1        824742
ind_ctop_fin_ult1        824742
ind_ctpp_fin_ult1        824742
ind_deco_fin_ult1        824742
ind_deme_fin_ult1        824742
ind_dela

In [16]:
df.dtypes

fecha_dato               datetime64[ns]
ncodpers                          int32
ind_empleado                     object
pais_residencia                  object
sexo                             object
age                               int64
fecha_alta               datetime64[ns]
ind_nuevo                       float64
antiguedad                      float64
indrel                          float64
indrel_1mes                    category
tiprel_1mes                    category
indresi                          object
indext                           object
canal_entrada                    object
indfall                          object
nomprov                          object
ind_actividad_cliente           float64
renta                           float64
segmento                         object
ind_ahor_fin_ult1                 uint8
ind_aval_fin_ult1                 uint8
ind_cco_fin_ult1                  uint8
ind_cder_fin_ult1                 uint8
ind_cno_fin_ult1                  uint8


## 2. Implementation of machine learning pipeline. (25%)
Implement a machine learning pipeline in Spark, including feature extractors, transformers, and/or selectors. Test that your pipeline it is correctly implemented and explain your choice of processing steps, learning algorithms, and parameter settings.

In [18]:
spark.stop()

In [None]:
type(df)

In [19]:
from pyspark.sql import SQLContext
sc = SparkContext()
sqlCtx = SQLContext(sc) #print(sc)
df_spark = sqlCtx.createDataFrame(df)

In [20]:
type(df_spark)

pyspark.sql.dataframe.DataFrame

In [27]:
#df_spark.describe
df_spark.dtypes
#df_spark.take(2)
#df_spark.show()

[('fecha_dato', 'bigint'),
 ('ncodpers', 'bigint'),
 ('ind_empleado', 'string'),
 ('pais_residencia', 'string'),
 ('sexo', 'string'),
 ('age', 'bigint'),
 ('fecha_alta', 'bigint'),
 ('ind_nuevo', 'double'),
 ('antiguedad', 'double'),
 ('indrel', 'double'),
 ('indrel_1mes', 'string'),
 ('tiprel_1mes', 'string'),
 ('indresi', 'string'),
 ('indext', 'string'),
 ('canal_entrada', 'string'),
 ('indfall', 'string'),
 ('nomprov', 'string'),
 ('ind_actividad_cliente', 'double'),
 ('renta', 'double'),
 ('segmento', 'string'),
 ('ind_ahor_fin_ult1', 'bigint'),
 ('ind_aval_fin_ult1', 'bigint'),
 ('ind_cco_fin_ult1', 'bigint'),
 ('ind_cder_fin_ult1', 'bigint'),
 ('ind_cno_fin_ult1', 'bigint'),
 ('ind_ctju_fin_ult1', 'bigint'),
 ('ind_ctma_fin_ult1', 'bigint'),
 ('ind_ctop_fin_ult1', 'bigint'),
 ('ind_ctpp_fin_ult1', 'bigint'),
 ('ind_deco_fin_ult1', 'bigint'),
 ('ind_deme_fin_ult1', 'bigint'),
 ('ind_dela_fin_ult1', 'bigint'),
 ('ind_ecue_fin_ult1', 'bigint'),
 ('ind_fond_fin_ult1', 'bigint'),
 ('

In [47]:
df_spark = df_spark.select(df_spark.fecha_dato.cast("date"),
                                   df_spark.ncodpers.cast("float"),
                                   df_spark.ind_empleado.cast("string"),
                                   df_spark.pais_residencia.cast("string"),
                                   df_spark.sexo.cast("string"),
                                   df_spark.age.cast("float"),
                                   df_spark.fecha_alta.cast("date"),
                                   df_spark.ind_nuevo.cast("float"),
                                   df_spark.antiguedad.cast("float"),
                                   df_spark.indrel.cast("float"),
                                   df_spark.indrel_1mes.cast("float"),
                                   df_spark.tiprel_1mes.cast("string"),
                                   df_spark.indresi.cast("string"),
                                   df_spark.indext.cast("string"),
                                   df_spark.canal_entrada.cast("string"),
                                   df_spark.indfall.cast("string"),
                                   df_spark.nomprov.cast("string"),
                                   df_spark.ind_actividad_cliente.cast("float"),
                                   df_spark.renta.cast("float"),
                                   df_spark.segmento.cast("string"),
                                   df_spark.ind_ahor_fin_ult1.cast("float"),
                                   df_spark.ind_aval_fin_ult1.cast("float"),
                                   df_spark.ind_cco_fin_ult1.cast("float"),
                                   df_spark.ind_cder_fin_ult1.cast("float"),
                                   df_spark.ind_cno_fin_ult1.cast("float"),
                                   df_spark.ind_ctju_fin_ult1.cast("float"),
                                   df_spark.ind_ctma_fin_ult1.cast("float"),
                                   df_spark.ind_ctop_fin_ult1.cast("float"),
                                   df_spark.ind_ctpp_fin_ult1.cast("float"),
                                   df_spark.ind_deco_fin_ult1.cast("float"),
                                   df_spark.ind_deme_fin_ult1.cast("float"),
                                   df_spark.ind_dela_fin_ult1.cast("float"),
                                   df_spark.ind_ecue_fin_ult1.cast("float"),
                                   df_spark.ind_fond_fin_ult1.cast("float"),
                                   df_spark.ind_hip_fin_ult1.cast("float"),
                                   df_spark.ind_plan_fin_ult1.cast("float"),
                                   df_spark.ind_pres_fin_ult1.cast("float"),
                                   df_spark.ind_reca_fin_ult1.cast("float"),
                                   df_spark.ind_tjcr_fin_ult1.cast("float"),
                                   df_spark.ind_valo_fin_ult1.cast("float"),
                                   df_spark.ind_viv_fin_ult1.cast("float"),
                                   df_spark.ind_nomina_ult1.cast("float"),
                                   df_spark.ind_nom_pens_ult1.cast("float"),
                                   df_spark.ind_recibo_ult1.cast("float"))


In [46]:
df_spark.dtypes

[('fecha_dato', 'date'),
 ('ncodpers', 'float'),
 ('ind_empleado', 'string'),
 ('pais_residencia', 'string'),
 ('sexo', 'string'),
 ('age', 'float'),
 ('fecha_alta', 'date'),
 ('ind_nuevo', 'float'),
 ('antiguedad', 'float'),
 ('indrel', 'float'),
 ('indrel_1mes', 'float'),
 ('tiprel_1mes', 'string'),
 ('indresi', 'string'),
 ('indext', 'string'),
 ('canal_entrada', 'string'),
 ('indfall', 'string'),
 ('nomprov', 'string'),
 ('ind_actividad_cliente', 'float'),
 ('renta', 'float'),
 ('segmento', 'string'),
 ('ind_ahor_fin_ult1', 'float'),
 ('ind_aval_fin_ult1', 'float'),
 ('ind_cco_fin_ult1', 'float'),
 ('ind_cder_fin_ult1', 'float'),
 ('ind_cno_fin_ult1', 'float'),
 ('ind_ctju_fin_ult1', 'float'),
 ('ind_ctma_fin_ult1', 'float'),
 ('ind_ctop_fin_ult1', 'float'),
 ('ind_ctpp_fin_ult1', 'float'),
 ('ind_deco_fin_ult1', 'float'),
 ('ind_deme_fin_ult1', 'float'),
 ('ind_dela_fin_ult1', 'float'),
 ('ind_ecue_fin_ult1', 'float'),
 ('ind_fond_fin_ult1', 'float'),
 ('ind_hip_fin_ult1', 'float'

In [45]:
# Example implementation from apache official documentation:

from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer, OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import SparkSession

if 1 == 1:
    spark = SparkSession\
        .builder\
        .appName("RandomForestClassifierExample")\
        .getOrCreate()
        
    # Index labels, adding metadata to the label column.
    # Fit on whole dataset to include all labels in index.
    
    stages = []

    inputCol =  ["fecha_dato", "ncodpers", "ind_empleado", "pais_residencia","sexo",
            "age","fecha_alta","ind_nuevo","antiguedad", "indrel", "indrel_1mes",
            "tiprel_1mes", "indresi", "indext", "canal_entrada",
            "indfall", "nomprov", "ind_actividad_cliente", "renta",
            "segmento", "ind_ahor_fin_ult1", "ind_aval_fin_ult1",
            "ind_cco_fin_ult1", "ind_cder_fin_ult1", "ind_cno_fin_ult1",
            "ind_ctju_fin_ult1", "ind_ctma_fin_ult1", "ind_ctop_fin_ult1",
            "ind_ctpp_fin_ult1", "ind_deco_fin_ult1", "ind_deme_fin_ult1", 
            "ind_dela_fin_ult1", "ind_ecue_fin_ult1", "ind_fond_fin_ult1",
            "ind_hip_fin_ult1", "ind_plan_fin_ult1", "ind_pres_fin_ult1",
            "ind_reca_fin_ult1", "ind_tjcr_fin_ult1", "ind_valo_fin_ult1", 
            "ind_viv_fin_ult1", "ind_nomina_ult1", "ind_nom_pens_ult1","ind_recibo_ult1"]
    
    for _col in inputCol:
        stringIndexer = StringIndexer(inputCol=_col, outputCol=_col+"Index") # Category Indexing with StringIndexer
        encoder = OneHotEncoder(inputCol=_col+"Index", outputCol=_col+"classVec") # Use OneHotEncoder to convert categorical variables into binary SparseVectors
        stages += [stringIndexer, encoder]
        assembler = VectorAssembler(inputCols=_col+"classVec", outputCol=_col+"feature")
        stages += [assembler]  # Add stage to the pipeline

    rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=10)
    stages += [rf]  # Add stage to the pipeline

    pipeline = Pipeline(stages=stages)



TypeError: Invalid param value given for param "inputCols". Could not convert fecha_datoclassVec to list of strings

In [26]:
## ORIGINAL CODE from internet

# code modified from Spark documentation at:
# https://spark.apache.org/docs/2.1.0/ml-classification-regression.html#random-forest-classifier
# and DataBricks at:
# https://docs.databricks.com/spark/latest/mllib/binary-classification-mllib-pipelines.html

# imports dependencies for Random Forest pipeline
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer, OneHotEncoder, StringIndexer, VectorAssembler


# stages in the Pipeline
stages = []

# One-Hot Encoding
categoricalColumns = ["a", "b", "c", "d", "e", "f", "g", "j"]
for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol+"Index") # Category Indexing with StringIndexer
    encoder = OneHotEncoder(inputCol=categoricalCol+"Index", outputCol=categoricalCol+"classVec") # Use OneHotEncoder to convert categorical variables into binary SparseVectors
    stages += [stringIndexer, encoder]  # Add stages to the pipeline
    
# Convert labels into label indices using the StringIndexer
label_stringIdx = StringIndexer(inputCol = "add here target column in csv file", outputCol = "labels")
stages += [label_stringIdx]  # Add stage to the pipeline

# Transform all features into a vector using VectorAssembler
numericCols = ["m", "n", "o", "p", "q", "r"]
assemblerInputs = map(lambda c: c + "classVec", categoricalColumns) + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]  # Add stage to the pipeline

# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="labels", 
                            featuresCol="features", 
                            numTrees=100,                 #  Number of trees in the random forest
                            impurity='entropy',            # Criterion used for information gain calculation
                            featureSubsetStrategy="auto",
                            predictionCol="prediction")
                            #maxDepth=5, 
                            #maxBins=32, 
                            #minInstancesPerNode=1, 
                            #minInfoGain=0.0, 
                            #subsamplingRate=1.0)
stages += [rf]  # Add stage to the pipeline

# Machine Learning Pipeline
pipeline = Pipeline(stages=stages)


TypeError: unsupported operand type(s) for +: 'map' and 'list'

## 3. Evaluation and test of model. (20%)
Evaluate the performance of your pipeline using training and test set (don’t use CV but pyspark.ml.tuning.TrainValidationSplit).

### 3.1. Evaluate performance of machine learning pipeline on training data and test data.

In [None]:
# imports dependencies
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, TrainValidationSplit
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


# Split data into training set and testing set
[trainData, testData] = trainData.randomSplit([0.8, 0.2], seed = 100)

# Train model in pipeline
rfModel = pipeline.fit(trainData)

# Make predictions for training set and compute training set accuracy
predictions = rfModel.transform(trainData)
evaluator = MulticlassClassificationEvaluator(labelCol="labels", 
                                              predictionCol="prediction", 
                                              metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))
print(train_pipeline.stages[0])  # summary


# Run the feature transformations pipeline on the test data set
pipelineModel = prePro_pipeline.fit(testClients)  #  computes feature statistics
testData = pipelineModel.transform(testClients)  #  transforms the features

# Make predictions for test set and compute test error
test_predictions = rfModel.transform(testData)
test_accuracy = evaluator.evaluate(test_predictions)
print("Test Error = %g" % (1.0 - test_accuracy))


## 4. Model fine-tuning. (35%) 
Implement a parameter grid (using pyspark.ml.tuning.ParamGridBuilder[source]), varying at least one feature preprocessing step, one machine learning parameter, and the training set size. Document the training and test performance and the time taken for training and testing. Comment on your findings.

### 4.1. Training set size evaluation

In [None]:
print('Training set size evaluation')

# size of different training set to be evaluated, and split of training set
sizes = [0.5, 0.1, 0.05, 0.01, 0.001]
data = trainData.randomSplit(sizes, seed = 100)

print('\n=== training set of size 100%')
# Train model in pipeline
tempModel = pipeline.fit(trainData)
# Make predictions for training set and compute training set accuracy
tempPredictions = tempModel.transform(trainData)
tempAccuracy = evaluator.evaluate(tempPredictions)
print("Classification Error = %g" % (1.0 - tempAccuracy))

for x in data:
    print('\n=== training set of size reduced to %g' % x)
    # Train model in pipeline
    tempModel = pipeline.fit(data[x])
    # Make predictions for training set and compute training set accuracy
    tempPredictions = tempModel.transform(data[x])
    tempAccuracy = evaluator.evaluate(tempPredictions)
    print("Classification Error = %g" % (1.0 - tempAccuracy))
    

### 4.2. Machine Learning Model Hyperparameter search

In [None]:
# Define hyperparameters and their values to search and evaluate
paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [10,20,50,100,200,500,1000,5000]) \
    .addGrid(rf.minInstancesPerNode, [0,1,2,4,6,8,10]) \
    .addGrid(rf.maxDepth, [2,5,10,20,50]).build()

# Grid Search and Cross Validation
crossVal = CrossValidator(estimator=rf, estimatorParamMaps=paramGrid, evaluator=evaluator)
print('starting Hyperparameter Grid Search with cross-validation')
rfCrosVal = crossVal.fit(trainData)
print('Grid Search has finished')

print(rfCrosVal.bestModel.rank)
paramMap = list(zip(rfCrosVal.getEstimatorParamMaps(),rfCrosVal.avgMetrics))
paramMax = max(paramMap, key=lambda x: x[1])
print(paramMax)

# Evaluate the model with test data
cvtest_predictions = rfCrosVal.transform(testData)
cvtest_accuracy = evaluator.evaluate(cvtest_predictions)
print("Test Error = %g" % (1.0 - cvtest_accuracy))


### 4.3. Evaluate model performance using a subset of variables (predictors)