<a href="https://cocl.us/Data_Science_with_Scalla_top"><img src = "https://s3-api.us-geo.objectstorage.softlayer.net/cf-courses-data/CognitiveClass/SC0103EN/adds/Data_Science_with_Scalla_notebook_top.png" width = 750, align = "center"></a>
 <br/>
<a><img src="https://ibm.box.com/shared/static/ugcqz6ohbvff804xp84y4kqnvvk3bq1g.png" width="200" align="center"></a>"

# Module 5: Pipeline and Grid Search

## Predicting Grant Applications: Building a Pipeline

### Lesson Objectives

* After completing this lesson, you should be able to:
  - Understand the role of pipelines in spark.ml
  - Use a pipeline to fit a model and make predictions
  - Evaluate the results
  
### Key Concepts

* Transformer
  - an algorithm which transforms one DataFrame into another
* Estimator
  - an algorithm which can be fit on a DataFrame to produce a Transformer
* Parameter
  - there is a common API shared by Transformers and Estimators
* Pipeline
  - chains multiple Transformers together to specify a machine learning workflow
* Evaluator
  - measures the performance of an estimator or pipeline against some metric(s)
  
### Pipelines in spark.ml

* Inspired by the scikit-learn project
* Components:
  - Transformers
  - Estimators
* Properties of components:
  - Transformer.transform() and Estimator.fit() are stateless
  - Each instance of Transformer/Estimator has a unique ID
* A sequence of PipelineStages to be run in a specific order
  - input DataFrame is transformed as it passes through each stage
  - Transformer stages: `transform()` method is called on the DF
  - Estimator stages: `fit()` method is called to produce a Transformer
    - this Transformer becomes part of the Pipeline Model
    - `transform()` method is called on the DF
* Runtime checking is done using the DF's schema before actually running the Pipeline

### Create the Pipeline

load grant data

In [1]:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().getOrCreate()
import spark.implicits._
import org.apache.spark.sql.functions._

val data = spark.read.
  format("com.databricks.spark.csv").
  option("delimiter", "\t").
  option("header", "true").
  option("inferSchema", "true").
  load("/resources/data/grantsPeople.csv")

data.show()

------------+------------+--------------------+-------------------+
|Grant_Application_ID| RFCD_Code|RFCD_Percentage| SEO_Code|SEO_Percentage|Person_ID|                Role|Year_of_Birth|Country_of_Birth|Home_Language| Dept_No|Faculty_No|With_PHD|No_of_Years_in_Uni_at_Time_of_Grant|Number_of_Successful_Grant|Number_of_Unsuccessful_Grant|  A2|   A|   B|   C|Grant_Status|Sponsor_Code| Contract_Value_Band|Grant_Category_Code|
+--------------------+----------+---------------+---------+--------------+---------+--------------------+-------------+----------------+-------------+--------+----------+--------+-----------------------------------+--------------------------+----------------------------+----+----+----+----+------------+------------+--------------------+-------------------+
|                   1|RFCD280199|          100.0|SEO700299|         100.0|    40572|  CHIEF_INVESTIGATOR|         1965|     AsiaPacific|    OtherLang|Dept3073| Faculty31|    null|                        DurationLT0

spark = org.apache.spark.sql.SparkSession@5b04f1fb
data = [Grant_Application_ID: int, RFCD_Code: string ... 22 more fields]


[Grant_Application_ID: int, RFCD_Code: string ... 22 more fields]

create features

In [2]:
val researchers = data.
  withColumn ("phd", data("With_PHD").equalTo("Yes").cast("Int")).
  withColumn ("CI", data("Role").equalTo("CHIEF_INVESTIGATOR").cast("Int")).
  withColumn("paperscore", data("A2") * 4 + data("A") * 3)

val grants = researchers.groupBy("Grant_Application_ID").agg(
  max("Grant_Status").as("Grant_Status"),
  max("Grant_Category_Code").as("Category_Code"),
  max("Contract_Value_Band").as("Value_Band"),
  sum("phd").as("PHDs"),
  when(max(expr("paperscore * CI")).isNull, 0).
    otherwise(max(expr("paperscore * CI"))).as("paperscore"),
  count("*").as("teamsize"),
  when(sum("Number_of_Successful_Grant").isNull, 0).
    otherwise(sum("Number_of_Successful_Grant")).as("successes"),
  when(sum("Number_of_Unsuccessful_Grant").isNull, 0).
    otherwise(sum("Number_of_Unsuccessful_Grant")).as("failures")
)

grants.show()

+--------------------+------------+-------------+--------------------+----+----------+--------+---------+--------+
|Grant_Application_ID|Grant_Status|Category_Code|          Value_Band|PHDs|paperscore|teamsize|successes|failures|
+--------------------+------------+-------------+--------------------+----+----------+--------+---------+--------+
|                 148|           0|  GrantCat30B|ContractValueBandUnk|null|         6|       1|        0|       1|
|                 463|           1|  GrantCat30C|ContractValueBandUnk|null|         0|       1|        1|       0|
|                 471|           0|  GrantCat30B|  ContractValueBandA|   1|       127|       2|        1|       5|
|                 496|           0|  GrantCat30B|  ContractValueBandA|null|         0|       1|        1|       3|
|                 833|           1|  GrantCat10A|  ContractValueBandF|null|         0|       1|        0|       0|
|                1088|           1|  GrantCat50A|  ContractValueBandA|   1|     

[Grant_Application_ID: int, Grant_Status: int ... 7 more fields]

researchers = [Grant_Application_ID: int, RFCD_Code: string ... 25 more fields]
grants = [Grant_Application_ID: int, Grant_Status: int ... 7 more fields]


String Indexer

In [3]:
import org.apache.spark.ml.feature.StringIndexer

val value_band_indexer = new StringIndexer().
  setInputCol("Value_Band").
  setOutputCol("Value_index").
  fit(grants)
  
val category_indexer = new StringIndexer().
  setInputCol("Category_Code").
  setOutputCol("Category_index").
  fit(grants)
  
val label_indexer = new StringIndexer().
  setInputCol("Grant_Status").
  setOutputCol("status").
  fit(grants)


import org.apache.spark.ml.feature.VectorAssembler

val assembler = new VectorAssembler().
  setInputCols(Array(
    "Value_index"
    ,"Category_index"
    ,"PHDs"
    ,"paperscore"
    ,"teamsize"
    ,"successes"
    ,"failures"
  )).setOutputCol("assembled")

value_band_indexer = strIdx_70f09822672c
category_indexer = strIdx_f241fe372433
label_indexer = strIdx_05eb78903f1e
assembler = vecAssembler_666f53825f3f


vecAssembler_666f53825f3f

  Random Forest Classifier and Pipeline

In [4]:
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.classification.RandomForestClassificationModel

val rf = new RandomForestClassifier().
  setFeaturesCol("assembled").
  setLabelCol("status").
  setSeed(42)

import org.apache.spark.ml.Pipeline
val pipeline = new Pipeline().setStages(Array(
    value_band_indexer,
    category_indexer,
    label_indexer,
    assembler,
    rf)
  )

rf = rfc_32c925869922
pipeline = pipeline_84f8da424a24


pipeline_84f8da424a24

### Create an Evaluator

In [5]:
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
val auc_eval = new BinaryClassificationEvaluator().
  setLabelCol("status").
  setRawPredictionCol("rawPrediction")
auc_eval.getMetricName

auc_eval = binEval_9c12f6297b3f


areaUnderROC

### Split into Training and Test

In [6]:
val tr = grants.filter("Grant_Application_ID < 6635")
val te = grants.filter("Grant_Application_ID >= 6635")
val training = tr.na.fill(0, Seq("PHDs"))
val test = te.na.fill(0, Seq("PHDs"))

tr = [Grant_Application_ID: int, Grant_Status: int ... 7 more fields]
te = [Grant_Application_ID: int, Grant_Status: int ... 7 more fields]
training = [Grant_Application_ID: int, Grant_Status: int ... 7 more fields]
test = [Grant_Application_ID: int, Grant_Status: int ... 7 more fields]


[Grant_Application_ID: int, Grant_Status: int ... 7 more fields]

### Run and Evaluate the Pipeline

In [7]:
val model = pipeline.fit(training)
val pipeline_results = model.transform(test)
auc_eval.evaluate(pipeline_results)

model = pipeline_84f8da424a24
pipeline_results = [Grant_Application_ID: int, Grant_Status: int ... 14 more fields]


0.9107593972299525

### About the Authors

[Petro Verkhogliad](https://www.linkedin.com/in/vpetro) is Consulting Manager at Lightbend. He holds a Masters degree in Computer Science with specialization in Intelligent Systems. He is passionate about functional programming and applications of AI.