# Pipelines and Cross Validation

In this lecture we'll create a model using Spark's `Pipeline` and `CrossValidator` libraries. The `CrossValidator` class is similar to `sklearn`'s `GridSearchCV`.

Use the cell below for imports.

In [0]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import VectorAssembler, StringIndexer, Tokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

## Loading the DataFrame

The csv for our classification model will be the [SMSSpamCollection](https://archive.ics.uci.edu/ml/datasets/sms+spam+collection) dataset from the UCI repository. After you've uploaded the csv into DataBricks, use the cell below to load create a DataFrame. 

Our goal is to predict whether or not a given text message is spam or ham.

NOTE: `SMSSpamCollection` is a tab separated file. There's a delimiter option we'll need to set.

In [0]:
df = spark.read.option("delimiter", '\t').csv('/FileStore/tables/SMSSpamCollection')
df.show(3)

# Creating the `label` column.

We'll need to use `StringIndexer` to encode our labels (`_c0`).

Recall from the previous lecture how we instantiate a `StringIndexer`:
```python
labelIndexer = StringIndexer(inputCol='...', outputCol='...')```
  
The key difference for this lecture is **we won't transform our DataFrame**...yet. That will happen in the `Pipeline`.

In the cell below, create the indexer.

In [0]:
indexer = StringIndexer(inputCol='_c0', outputCol='label')

# NLP transformers

The `CountVectorizer` in `sklearn` included everything but the kitchen sink. In Spark, you have to use several classes to accomplish the same job. We'll create instances of the following transformers:


- `Tokenizer`: Splits our SMS messages into tokens
- `StopWordsRemover`: Removes all stopwords from the tokens column (created by the `Tokenizer`)
- `CountVectorizer`: Creates our document-term matrix. The output from `StopWordsRemover` will be fed into the vectorizer.

In [0]:
# Tokenizer
tokenizer = Tokenizer(inputCol='_c1', outputCol='tokens')
tokenizer.transform(df).show(3)

In [0]:
# Get the output column of the tokenizer
tokenizer.getOutputCol()

In [0]:
# StopWordsRemover
remover = StopWordsRemover(inputCol=tokenizer.getOutputCol(), outputCol="tokensSansStopWords")

In [0]:
# CountVectorizer
vect = CountVectorizer(inputCol=remover.getOutputCol(), outputCol='DTM')

## Creating the features column

As always, we'll need to create our features column using the `VectorAssembler` object like so:
```python
assembler = VectorAssembler(inputCols=[vectorizer.getOutputCol()], outputCol='features')```
  
Unlike the previous lecture, **we won't transform our DataFrame**...yet. That will happen in the `Pipeline`.

In [0]:
[vect.getOutputCol()]

In [0]:
assembler = VectorAssembler(inputCols=[vect.getOutputCol()], outputCol='features')

## Train/Test Split

Split your DataFrame up into training and testing sets using the `.randomSplit()` method:

```python
train, test = df.randomSplit([.8, .2], 42)
``` 

The first parameter (`[.8, .2]`) determines what percentage will go into `train` and `test` respectively. The second parameter (`42`) is a random seed.

In [0]:
train, test = df.randomSplit([.8, .2], seed=42)

## Model

Now we're ready to create our model. MLlib has several options to choose from. For this project, we'll use `LogisticRegression`. 

We'll instantiate our model like so:
```python
lr = LogisticRegression()```

In [0]:
lr = LogisticRegression()

# Pipeline

Now we're ready to create our pipeline using the `Pipeline` object like so:
```python
pipeline = Pipeline(stages=[transformer1, transformer2, ..., model])
```

The pipeline object behaves exactly like Spark's models. It has a `.fit()` method which returns a **fitted model** instance, which can be used to make predictions with `.transform()`.

**Note**: The order in which you place your objects is the order in which they'll run.

In [0]:
pipe = Pipeline(stages=[indexer, tokenizer, remover, vect, assembler, lr])

## Model evaluation

For our `CrossValidator` we'll need an instance of `MulticlassClassificationEvaluator`:
```python
evaluator = MulticlassClassificationEvaluator()```

The default metric is F1. To change it to something else, use the `metricName=` parameter:

```python
evaluator = MulticlassClassificationEvaluator(metricName='accuracy')```
  
  
Create an evaluator the cell below, using accuracy as the metric.

In [0]:
evaluator = MulticlassClassificationEvaluator(metricName='accuracy')

# Hypertuning parameters

In `sklearn`'s `GridSearchCV` our parameter grid is a simple dictionary:
```python
params = {
  'foo': ['option1', 'option2', 'option3'],
  'bar': ['option1', 'option2', 'option3']
}```

In Spark, we have to use `ParamGridBuilder` to accomplish the same task:
```python
paramGrid = ParamGridBuilder().addGrid(lr.elasticNetParam, [.5, 1.]).addGrid(vectorizer.vocabSize, [3000, 4000]).build()
```

Each option is set with `.addGrid()`, which takes the parameter to be tuned over, along with the `Array` of options. You'll then call `.build()` once you're done adding params.

In [0]:
params = (ParamGridBuilder()
          .addGrid(lr.elasticNetParam, [0.5, 1.0])
          .addGrid(vect.vocabSize, [2000, 3000])
          .addGrid(vect.binary, [True, False])
          .build())

# Cross Validation

Now we're ready to create our `CrossValidator` (Spark's equivalent of `GridSearchCV`). Your cv will require three params:
1. The model (in our case it's the `pipeline`)
2. The param grid
3. The evaluator

```python
cv = CrossValidator(estimator=pipeline, evaluator=evaluator, estimatorParamMaps=paramGrid)```

In [0]:
cv = CrossValidator(estimator=pipe, estimatorParamMaps=params, evaluator=evaluator, numFolds=2)

## Model fitting

When we fit our `CrossValidator` (`cv.fit(train)`), it actually returns a different object: `CrossValidatorModel`. This is different form `sklearn`, which mutates the original instance. As a result, we'll need to save our newly fitted model as a variable:

```python
cvModel = cv.fit(train)```

The convention in Spark is to use `Model` in the name when it's fitted.

In [0]:
cvModel = cv.fit(train)

## Making predictions

Now that we have a fitted model, we're ready to make some predictions (`cvModel.transform(test)`). This will return a new DataFrame, with a `prediction` column appended to the end.

In [0]:
pred = cvModel.transform(test)

In [0]:
display(pred)

_c0,_c1,label,tokens,tokensSansStopWords,DTM,features,rawPrediction,probability,prediction
ham,<DECIMAL> m but its not a common car here so its better to buy from china or asia. Or if i find it less expensive. I.ll holla,0.0,"List(, <decimal>, m, but, its, not, a, common, car, here, so, its, better, to, buy, from, china, or, asia., or, if, i, find, it, less, expensive., i.ll, holla)","List(, <decimal>, m, common, car, better, buy, china, asia., find, less, expensive., i.ll, holla)","Map(vectorType -> sparse, length -> 3000, indices -> List(3, 85, 133, 235, 236, 275, 471, 1066, 1218, 1594), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))","Map(vectorType -> sparse, length -> 3000, indices -> List(3, 85, 133, 235, 236, 275, 471, 1066, 1218, 1594), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))","Map(vectorType -> dense, length -> 2, values -> List(112.23047293325102, -112.23047293325102))","Map(vectorType -> dense, length -> 2, values -> List(1.0, 1.8152017588819518E-49))",0.0
ham,"said kiss, kiss, i can't do the sound effects! He is a gorgeous man isn't he! Kind of person who needs a smile to brighten his day!",0.0,"List(, said, kiss,, kiss,, i, can't, do, the, sound, effects!, he, is, a, gorgeous, man, isn't, he!, kind, of, person, who, needs, a, smile, to, brighten, his, day!)","List(, said, kiss,, kiss,, sound, effects!, gorgeous, man, he!, kind, person, needs, smile, brighten, day!)","Map(vectorType -> sparse, length -> 3000, indices -> List(3, 121, 202, 227, 283, 536, 573, 1097, 2571), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))","Map(vectorType -> sparse, length -> 3000, indices -> List(3, 121, 202, 227, 283, 536, 573, 1097, 2571), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))","Map(vectorType -> dense, length -> 2, values -> List(91.91987753906339, -91.91987753906339))","Map(vectorType -> dense, length -> 2, values -> List(1.0, 1.2014464197063428E-40))",0.0
ham,what number do u live at? Is it 11?,0.0,"List(, what, number, do, u, live, at?, is, it, 11?)","List(, number, u, live, at?, 11?)","Map(vectorType -> sparse, length -> 3000, indices -> List(0, 3, 83, 204), values -> List(1.0, 1.0, 1.0, 1.0))","Map(vectorType -> sparse, length -> 3000, indices -> List(0, 3, 83, 204), values -> List(1.0, 1.0, 1.0, 1.0))","Map(vectorType -> dense, length -> 2, values -> List(64.99522529420095, -64.99522529420095))","Map(vectorType -> dense, length -> 2, values -> List(1.0, 5.928329099876836E-29))",0.0
ham,"""Response"" is one of d powerful weapon 2 occupy a place in others 'HEART'... So, always give response 2 who cares 4 U""... Gud night..swt dreams..take care",0.0,"List(""response"", is, one, of, d, powerful, weapon, 2, occupy, a, place, in, others, 'heart'..., so,, always, give, response, 2, who, cares, 4, u""..., gud, night..swt, dreams..take, care)","List(""response"", one, d, powerful, weapon, 2, occupy, place, others, 'heart'..., so,, always, give, response, 2, cares, 4, u""..., gud, night..swt, dreams..take, care)","Map(vectorType -> sparse, length -> 3000, indices -> List(2, 8, 23, 55, 60, 100, 129, 145, 182, 1474, 1610), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))","Map(vectorType -> sparse, length -> 3000, indices -> List(2, 8, 23, 55, 60, 100, 129, 145, 182, 1474, 1610), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))","Map(vectorType -> dense, length -> 2, values -> List(112.10929359005648, -112.10929359005648))","Map(vectorType -> dense, length -> 2, values -> List(1.0, 2.0490493761572604E-49))",0.0
ham,<#> great loxahatchee xmas tree burning update: you can totally see stars here,0.0,"List(<#>, , great, loxahatchee, xmas, tree, burning, update:, you, can, totally, see, stars, here)","List(<#>, , great, loxahatchee, xmas, tree, burning, update:, totally, see, stars)","Map(vectorType -> sparse, length -> 3000, indices -> List(3, 7, 31, 64, 491, 2097), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0))","Map(vectorType -> sparse, length -> 3000, indices -> List(3, 7, 31, 64, 491, 2097), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0))","Map(vectorType -> dense, length -> 2, values -> List(20.098175057294885, -20.098175057294885))","Map(vectorType -> dense, length -> 2, values -> List(0.9999999981315844, 1.868415560356516E-9))",0.0
ham,"<#> , that's all? Guess that's easy enough",0.0,"List(<#>, ,, that's, all?, guess, that's, easy, enough)","List(<#>, ,, all?, guess, easy, enough)","Map(vectorType -> sparse, length -> 3000, indices -> List(7, 200, 252, 335, 710), values -> List(1.0, 1.0, 1.0, 1.0, 1.0))","Map(vectorType -> sparse, length -> 3000, indices -> List(7, 200, 252, 335, 710), values -> List(1.0, 1.0, 1.0, 1.0, 1.0))","Map(vectorType -> dense, length -> 2, values -> List(49.538801713167246, -49.538801713167246))","Map(vectorType -> dense, length -> 2, values -> List(1.0, 3.05894575686436E-22))",0.0
ham,"(No promises on when though, haven't even gotten dinner yet)",0.0,"List((no, promises, on, when, though,, haven't, even, gotten, dinner, yet))","List((no, promises, though,, even, gotten, dinner, yet))","Map(vectorType -> sparse, length -> 3000, indices -> List(101, 357, 2822, 2941), values -> List(1.0, 1.0, 1.0, 1.0))","Map(vectorType -> sparse, length -> 3000, indices -> List(101, 357, 2822, 2941), values -> List(1.0, 1.0, 1.0, 1.0))","Map(vectorType -> dense, length -> 2, values -> List(21.042730190573018, -21.042730190573018))","Map(vectorType -> dense, length -> 2, values -> List(0.9999999992734618, 7.265380998349525E-10))",0.0
ham,"* Was a nice day and, impressively, i was sensible, went home early and now feel fine. Or am i just boring?! When's yours, i can't remember.",0.0,"List(*, was, a, nice, day, and,, impressively,, i, was, sensible,, went, home, early, and, now, feel, fine., or, am, i, just, boring?!, when's, yours,, i, can't, remember.)","List(*, nice, day, and,, impressively,, sensible,, went, home, early, feel, fine., boring?!, yours,, remember.)","Map(vectorType -> sparse, length -> 3000, indices -> List(45, 46, 119, 125, 165, 189, 349, 580), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))","Map(vectorType -> sparse, length -> 3000, indices -> List(45, 46, 119, 125, 165, 189, 349, 580), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))","Map(vectorType -> dense, length -> 2, values -> List(94.23321318242192, -94.23321318242192))","Map(vectorType -> dense, length -> 2, values -> List(1.0, 1.18859938951562E-41))",0.0
ham,", ow u dey.i paid 60,400thousad.i told u would call .",0.0,"List(,, ow, u, dey.i, paid, 60,400thousad.i, told, , u, would, call, .)","List(,, ow, u, dey.i, paid, 60,400thousad.i, told, , u, call, .)","Map(vectorType -> sparse, length -> 3000, indices -> List(0, 1, 3, 9, 113, 252), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0))","Map(vectorType -> sparse, length -> 3000, indices -> List(0, 1, 3, 9, 113, 252), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0))","Map(vectorType -> dense, length -> 2, values -> List(75.83113220911456, -75.83113220911456))","Map(vectorType -> dense, length -> 2, values -> List(1.0, 1.1666960435418256E-33))",0.0
ham,... Are you in the pub?,0.0,"List(..., are, you, in, the, pub?)","List(..., pub?)","Map(vectorType -> sparse, length -> 3000, indices -> List(20), values -> List(1.0))","Map(vectorType -> sparse, length -> 3000, indices -> List(20), values -> List(1.0))","Map(vectorType -> dense, length -> 2, values -> List(12.61092357330291, -12.61092357330291))","Map(vectorType -> dense, length -> 2, values -> List(0.9999966646298949, 3.335370105211786E-6))",0.0


## Model evaluation

Use your evaluator to score the preditions.

In [0]:
evaluator.evaluate(pred)

In [0]:
df.groupBy("_c0").count().show()

In [0]:
# Get the baseline accuracy
baseline_accuracy = 4827.0 / (4827.0 + 747.0)
baseline_accuracy

In [0]:
i = cvModel.avgMetrics.index(max(cvModel.avgMetrics))
cvModel.getEstimatorParamMaps()[i]

In [0]:
cvModel.bestModel.stages[-1].extractParamMap()

In [0]:
# Getting the best parameters after grid search - https://stackoverflow.com/questions/46110563/pyspark-getting-the-best-models-parameters-after-a-gridsearch-is-blank

In [0]:
best_mod = cvModel.bestModel
param_dict = best_mod.stages[-1].extractParamMap()

sane_dict = {}
for k, v in param_dict.items():
  sane_dict[k.name] = v

best_reg = sane_dict["regParam"]
best_elastic_net = sane_dict["elasticNetParam"]
best_max_iter = sane_dict["maxIter"]

In [0]:
best_elastic_net

In [0]:
import numpy as np
print(cvModel.getEstimatorParamMaps()[np.argmax(cvModel.avgMetrics)])
