# DS-610 Week 6 Homework: Machine Learning on Apache Spark
 we will build a spam classifier on data stored on the Cloud.

In [0]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import Tokenizer
from platform import python_version
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

### Loading Data
Reminder: It is highly recommended that you try this homework on Saint Peters' Databricks. Depending on whether you are running on the cloud or locally, adjust the data source accordingly below.

In [0]:
#Data source
data_source = "/FileStore/shared_uploads/dlee5@saintpeters.edu/ds610/SMSSpamCollection"
# Load data and rename column. DO NOT MODIFY
df = spark.read.option("header", "false") \
    .option("delimiter", "\t") \
    .option("inferSchema", "true") \
    .csv(data_source) \
    .withColumnRenamed("_c0", "label_string") \
    .withColumnRenamed("_c1", "sms")
df.limit(10).show()

+------------+--------------------+
|label_string|                 sms|
+------------+--------------------+
|         ham|Go until jurong p...|
|         ham|Ok lar... Joking ...|
|        spam|Free entry in 2 a...|
|         ham|U dun say so earl...|
|         ham|Nah I don't think...|
|        spam|FreeMsg Hey there...|
|         ham|Even my brother i...|
|         ham|As per your reque...|
|        spam|WINNER!! As a val...|
|        spam|Had your mobile 1...|
+------------+--------------------+



In [0]:
df.printSchema()

root
 |-- label_string: string (nullable = true)
 |-- sms: string (nullable = true)



## Part 1: Pipelines
First we will declare `pipeline_stages` which will hold the complete steps for getting our dataset into the format for model training.

In [0]:

pipeline_stages = []

### Part 1a
We note that the `label_string` column consists of the label set `{ ham, spam }` (`ham` means not spam). Your task is now to create a pipeline stage which takes the input column `label_string` and outputs into a new column `label` which performs the following mapping: `{ham -> 0, spam -> 1}`. This is necessary since we would like to train a classifier on the training data.

You may find the following useful:
https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.StringIndexer.html

In [0]:
# Convert the "label_string" colun to 0/1 using StringIndexer.
indexer = StringIndexer(inputCol="label_string", outputCol="label")
# Add to the pipeline stage. DO NOT MODIFY.
pipeline_stages.append(indexer)

### Part 1b
Now we take a look at the `sms` column which represents the raw SMS message. We have to turn this into a more useful form. First we have to *tokenize* the message. For example:
```
The quick brown fox jumps over the lazy dog -> [ "the", "quick", "brown", "fox", "jumps", "over", "the", "lazy", "dog" ]
 You may find the following useful:
https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.Tokenizer.html

In [0]:
# Tokenize the "sms" column into the list of words under the column name "words" (inputCol="sms" and outputCol="words")
# https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.Tokenizer.html
tokenizer = Tokenizer(inputCol='sms',outputCol='words') 

# Add to the pipeline stage. DO NOT MODIFY.
pipeline_stages.append(tokenizer)

### Part 1c
Finally, we create a feature vector called count vectorizer. For example:
```
The quick brown fox jumps over the lazy dog -> [ "the", "quick", "brown", "fox", "jumps", "over", "the", "lazy", "dog" ] -> (1000, [8, 10, 23, 34, 58, 100, 110, 112, 140], [2, 1, 1, 1, 1, 1, 1, 1])
```
where the first number denotes the total size of the vocabulary, the second list denotes the word index for each of the words in the original sentence, the third list denotes the corresponding count for each word in the original sentence.

Your task is to create a count vector feature out of the `words` column under the column name `features`, using `(inputCol="words", outputCol="features")`.

You may find this useful:
https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.CountVectorizer.html

In [0]:
# Make a count vector feature out of the words column under the column name "features" (inputCol="words", outputCol="features")
# https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.CountVectorizer.html
cv = CountVectorizer(inputCol='words', outputCol='features',minDF=2.0)

# Add to the pipeline stage. DO NOT MODIFY.
pipeline_stages.append(cv)

### Interlude
If your Part 1 is correctly implemented, then the following code should transform the data for model training. Note that for modeling training, only two of the columns in the transformed data is used, namely `label` and `features` columns.

In [0]:
# DO NOT MODIFY.
pipeline = Pipeline(stages=pipeline_stages)
data=pipeline.fit(df).transform(df)

In [0]:
data.show()

+------------+--------------------+-----+--------------------+--------------------+
|label_string|                 sms|label|               words|            features|
+------------+--------------------+-----+--------------------+--------------------+
|         ham|Go until jurong p...|  0.0|[go, until, juron...|(5461,[8,42,52,64...|
|         ham|Ok lar... Joking ...|  0.0|[ok, lar..., joki...|(5461,[5,75,411,5...|
|        spam|Free entry in 2 a...|  1.0|[free, entry, in,...|(5461,[0,3,8,20,5...|
|         ham|U dun say so earl...|  0.0|[u, dun, say, so,...|(5461,[5,22,60,14...|
|         ham|Nah I don't think...|  0.0|[nah, i, don't, t...|(5461,[0,1,66,87,...|
|        spam|FreeMsg Hey there...|  1.0|[freemsg, hey, th...|(5461,[0,2,6,10,1...|
|         ham|Even my brother i...|  0.0|[even, my, brothe...|(5461,[0,7,9,13,2...|
|         ham|As per your reque...|  0.0|[as, per, your, r...|(5461,[0,10,11,44...|
|        spam|WINNER!! As a val...|  1.0|[winner!!, as, a,...|(5461,[0,2,3,1

## Part 2: Model Training
Now we are ready to train out model. There are two parts to this exercise.

### Part 2a
Let us first divide the `data` into `train` and `test` in the ratio of 0.8 to 0.2. You may find the following useful:
https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.DataFrame.randomSplit.html

In [0]:
# Divide into train and test.
train, test = data.randomSplit([0.8,0.2],seed=42)          

### Interlude
The machine learning model we will be working with is called logistic regression. For your reference, the sample code for training is shown below.

In [0]:
# Sample logistic regression code.
lr = LogisticRegression()
lrModel = lr.fit(train)

### Part 2b
In machine learning, we have to do what is a called hyperparameter tuning on a combination of parameters. For logistic regression, we can tune two hyperparameters, namely the $L_1$ regularizer and the $L_2$ regularizer in its elastic net formulation. For details on elastic net regularization, please see:
https://en.wikipedia.org/wiki/Elastic_net_regularization

The Wikipedia entry will just have a elastic net formulation of linear regression. For elastic net formulation of logistic regression, the cost function is a bit different and involves adding L1 and L2 regularization to cross entropy loss of the data. For more details, you may want to consult a machine learning textbook.

Your task is here to finish the implementation that runs cross-validation on the set of elastic net regularization parameter below. Most of the skeleton code is provided for you, including setting up the parameter grid which can be plugged into the `CrossValidator`.

For more details: https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.tuning.CrossValidator.html

In [0]:
# Your code for Part 2b goes here.
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Create ParamGrid and Evaluator for Cross Validation
paramGrid = ParamGridBuilder().baseOn({lr.maxIter: 100}).baseOn({lr.fitIntercept: False}).addGrid(lr.elasticNetParam, [0, 0.25, 0.5, 0.75, 1]).build()
cvEvaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction")

# Run Cross-validation
cv = CrossValidator(estimator=lr, evaluator=cvEvaluator, estimatorParamMaps=paramGrid, numFolds=3)
cvModel = cv.fit(train)

## Part 3: Model Evaluation
Evaluate the best model trained in Part 2 on the `test` set from Part 2a. You would need to have `cvModel` from the previous part perform `transform` the `test` set.

For example:
https://spark.apache.org/docs/latest/ml-classification-regression.html#binomial-logistic-regression

Search for `# Make predictions` in this page for an example.

In [0]:
# Make predictions on testData. cvModel uses the bestModel.
cvPredictions = cvModel.transform(test)       
print(cvPredictions.take(10))

# Evaluate bestModel found from Cross Validation. DO NOT MODIFY.
print ("Test Area Under ROC: ", cvEvaluator.evaluate(cvPredictions))

[Row(label_string='ham', sms=' &lt;DECIMAL&gt; m but its not a common car here so its better to buy from china or asia. Or if i find it less expensive. I.ll holla', label=0.0, words=['', '&lt;decimal&gt;', 'm', 'but', 'its', 'not', 'a', 'common', 'car', 'here', 'so', 'its', 'better', 'to', 'buy', 'from', 'china', 'or', 'asia.', 'or', 'if', 'i', 'find', 'it', 'less', 'expensive.', 'i.ll', 'holla'], features=SparseVector(5461, {0: 1.0, 1: 1.0, 3: 1.0, 19: 1.0, 21: 1.0, 22: 1.0, 23: 1.0, 24: 2.0, 25: 1.0, 29: 1.0, 40: 1.0, 59: 2.0, 145: 1.0, 161: 1.0, 199: 1.0, 308: 1.0, 330: 1.0, 396: 1.0, 467: 1.0, 1123: 1.0, 1143: 1.0, 1641: 1.0, 2805: 1.0, 3117: 1.0, 4443: 1.0}), rawPrediction=DenseVector([121.5116, -121.5116]), probability=DenseVector([1.0, 0.0]), prediction=0.0), Row(label_string='ham', sms=" said kiss, kiss, i can't do the sound effects! He is a gorgeous man isn't he! Kind of person who needs a smile to brighten his day! ", label=0.0, words=['', 'said', 'kiss,', 'kiss,', 'i', "can'

### Conclusion
We are done! In order to see what parameter sets were explored during the grid search, we can run the command below.

In [0]:

cvModel.bestModel.extractParamMap()

{Param(parent='LogisticRegression_3c0c7995a540', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2).'): 2,
 Param(parent='LogisticRegression_3c0c7995a540', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.0,
 Param(parent='LogisticRegression_3c0c7995a540', name='family', doc='The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial'): 'auto',
 Param(parent='LogisticRegression_3c0c7995a540', name='featuresCol', doc='features column name.'): 'features',
 Param(parent='LogisticRegression_3c0c7995a540', name='fitIntercept', doc='whether to fit an intercept term.'): False,
 Param(parent='LogisticRegression_3c0c7995a540', name='labelCol', doc='label column name.'): 'label',
 Param(parent='LogisticRegression_3c0c7995a540', name='maxBlockSizeInMB', doc='maximum memory in MB for 