In [9]:
import os
import sys

SPARK_HOME = "/usr/hdp/current/spark2-client"
PYSPARK_PYTHON = "/opt/conda/envs/dsenv/bin/python"
os.environ["PYSPARK_PYTHON"]= PYSPARK_PYTHON
os.environ["SPARK_HOME"] = SPARK_HOME

PYSPARK_HOME = os.path.join(SPARK_HOME, "python/lib")
sys.path.insert(0, os.path.join(PYSPARK_HOME, "py4j-0.10.9.3-src.zip"))
sys.path.insert(0, os.path.join(PYSPARK_HOME, "pyspark.zip"))

In [10]:
from pyspark import SparkConf
from pyspark.sql import SparkSession

conf = SparkConf()

spark = SparkSession.builder.config(conf=conf).appName("Spark ML Intro").getOrCreate()

In [7]:
spark

In [8]:
from IPython.display import Image

### Pipeline

+ **DataFrame**: This ML API uses DataFrame from Spark SQL as an ML dataset, which can hold a variety of data types. E.g., a DataFrame could have different columns storing text, feature vectors, true labels, and predictions.


+ **Transformer**: A Transformer is an algorithm which can transform one DataFrame into another DataFrame. E.g., an ML model is a Transformer which transforms a DataFrame with features into a DataFrame with predictions.


+ **Estimator**: An Estimator is an algorithm which can be fit on a DataFrame to produce a Transformer. E.g., a learning algorithm is an Estimator which trains on a DataFrame and produces a model.


+ **Pipeline**: A Pipeline chains multiple Transformers and Estimators together to specify an ML workflow.


+ **Parameter**: All Transformers and Estimators now share a common API for specifying parameters.

### Estimator

![](https://spark.apache.org/docs/latest/img/ml-Pipeline.png)

### Transformer

![](https://spark.apache.org/docs/latest/img/ml-PipelineModel.png)

In [5]:
from pyspark.ml.linalg import Vectors

In [11]:
from pyspark.ml.classification import LogisticRegression

In [None]:
train = spark.createDataFrame([
    (1.0, Vectors.dense([0.0, 1.1, 0.1])),
    (0.0, Vectors.dense([2.0, 1.0, -1.0])),
    (0.0, Vectors.dense([2.0, 1.3, 1.0])),
    (1.0, Vectors.dense([0.0, 1.2, -0.5]))], schema = ["label", "features"])

In [None]:
train.show()

In [None]:
lr = LogisticRegression(maxIter=10, regParam=0.01)

In [None]:
from pyspark.ml import Estimator, Transformer

In [None]:
isinstance(lr, Estimator)

In [None]:
type(lr)

In [None]:
lr.params

In [None]:
lr.getOrDefault("regParam")

In [None]:
train.rdd.getNumPartitions()

In [None]:
model = lr.fit(train)

In [None]:
type(model)

In [None]:
isinstance(model, Transformer)

In [None]:
model.coefficients

In [None]:
model.interceptVector

In [None]:
predict = model.transform(train)

In [None]:
predict.printSchema()

In [None]:
predict.show(10, truncate=False, vertical=True)

In [None]:
model.getOrDefault("threshold")

### Toxic Comment Classification Challenge

In [12]:
from pyspark.sql.types import *

In [13]:
schema = StructType([
    StructField("id", StringType()),
    StructField("comment_text", StringType()),
    StructField("toxic", IntegerType()),
    StructField("severe_toxic", IntegerType()),
    StructField("obscene", IntegerType()),
    StructField("threat", IntegerType()),
    StructField("insult", IntegerType()),
    StructField("identity_hate", IntegerType())
])

In [14]:
dataset = spark.read.csv("/datasets/toxic/train.csv", schema=schema, header=True)

In [None]:
dataset.show(2, vertical=True)

### There was an issue with multiline CSVs, fixed in 2.2.0 https://issues.apache.org/jira/browse/SPARK-19610

In [None]:
!head -n10 train.csv

In [None]:
dataset = spark.read.csv("/datasets/toxic/train.csv", schema=schema, header=True, multiLine=True)

In [None]:
dataset.select("id").show(10)

### You need to add `escape` parameter!

In [11]:
dataset = spark.read.csv("/datasets/toxic/train.csv", schema=schema, header=True, multiLine=True, escape='"')

In [None]:
dataset.select("id").show(10)

In [12]:
dataset.show(2, vertical=True, truncate=False)

[Stage 0:>                                                          (0 + 1) / 1]

-RECORD 0----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 id            | 0000997932d777bf                                                                                                                                                                                                                                                          
 comment_text  | Explanation\nWhy the edits made under my username Hardcore Metallica Fan were reverted? They weren't vandalisms, just closure on some GAs after I voted at New York Dolls FAC. And please don't remove the template from the talk page since I'm retired now.89.205.38.27 
 toxic         | 0                                                                                                                                  

                                                                                

In [None]:
dataset.rdd.getNumPartitions()

In [None]:
dataset.count()

In [13]:
dataset = dataset.repartition(4).cache()

In [None]:
dataset.count()

### Let's define a binary target (toxic/non-toxic)

In [14]:
from pyspark.sql import functions as f

In [15]:
target = f.when(
    (dataset.toxic == 0) &
    (dataset.severe_toxic == 0) &
    (dataset.obscene == 0) &
    (dataset.threat == 0) &
    (dataset.insult == 0) &
    (dataset.identity_hate == 0),
    0
).otherwise(1)

In [16]:
dataset = dataset.withColumn("target", target)

In [17]:
dataset.select("id", "target").show(10)

[Stage 1:>                                                          (0 + 1) / 1]

+----------------+------+
|              id|target|
+----------------+------+
|6fdb7b6734f8bf40|     0|
|26e1b63617df36b1|     0|
|85e4f353ca4b2bde|     0|
|9d2196265213dce8|     0|
|fb7a63a8e287b2d1|     0|
|fd42fd6a1ea341c4|     0|
|54f9e59924682c6e|     0|
|01c0ae884d69319b|     0|
|f7fec98d6aac8ce3|     0|
|25553d990b245467|     1|
+----------------+------+
only showing top 10 rows



                                                                                

In [18]:
targets = dict(dataset.groupBy("target").count().collect())

                                                                                

In [19]:
targets[1] / (targets[0] + targets[1])

0.10167887648758234

In [20]:
dataset = dataset.drop("toxic", "severe_toxic", "obscene", "threat", "insult", "identity_hate").cache()

In [21]:
dataset

DataFrame[id: string, comment_text: string, target: int]

In [22]:
dataset.show(2, False, True)

-RECORD 0----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 id           | 6fdb7b6734f8bf40                                                                                                                                                                                                                                                                                                                       
 comment_text | "\n\n""Katara""\nI've removed the section entirely. I don't care if you like to pretend that Katara and Zuko are meant for each other. It's still not case, and there has been no indication whatsoever. Thus, there's little point to actually have the section and exempt Toph and Sokka beyond the in

In [None]:
dataset.write.parquet("/user/pklemenkov/toxic_dataset", mode="overwrite")

###  Let's fit the simplest binary-BoW logistic regression

In [23]:
from pyspark.ml.feature import *

### Split comments into words

In [24]:
tokenizer = Tokenizer(inputCol="comment_text", outputCol="words")

In [25]:
dataset2 = tokenizer.transform(dataset)

In [26]:
dataset2.select("id", "words").show(2, False, True)

[Stage 11:>                                                         (0 + 1) / 1]

-RECORD 0-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 id    | 6fdb7b6734f8bf40                                                                                                                                                                                                                                                                                                                                                                               
 words | [", , ""katara"", i've, removed, the, section, entirely., i, don't, care, if, you, like, to, pretend, that, katara, and, zuko, are, meant, for, each, other., it's, still, not, case,, and, there, has, been,

                                                                                

### Convert texts into binary vectors using Hashing trick

In [27]:
hasher = HashingTF(numFeatures=100, binary=True, inputCol=tokenizer.getOutputCol(), outputCol="word_vector")
dataset2 = hasher.transform(dataset2)

In [28]:
dataset2.select("id", "word_vector").show(2, False, True)

-RECORD 0----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 id          | 6fdb7b6734f8bf40                                                                                                                                                                                                                                                                              
 word_vector | (100,[0,3,4,5,10,12,14,16,17,19,24,30,31,34,35,37,38,41,42,44,47,50,56,60,63,66,68,69,72,74,76,81,85,88,89,90,91,95,96,99],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]) 
-RECORD 1-------------------------------------------------------------------------------------

### Now let's split into train and test. Don't forget that we have imbalanced classes, so let's do stratified sampling

In [29]:
train = dataset2.sampleBy("target", fractions={0: 0.8, 1: 0.8}, seed=5757)

In [30]:
train_targets = dict(train.groupby("target").count().collect())

                                                                                

In [31]:
train_targets[1] / (train_targets[0] + train_targets[1])

0.10123065331365963

In [32]:
test = dataset2.join(train, on="id", how="leftanti")

In [33]:
test_targets = dict(test.groupby("target").count().collect())

                                                                                

In [34]:
test_targets[1] / (test_targets[0] + test_targets[1])

0.10345470941260611

In [35]:
train.rdd.getNumPartitions()

4

In [36]:
test.rdd.getNumPartitions()

                                                                                

2

In [37]:
train = train.drop("comment_text", "words").cache()

In [38]:
test = test.drop("comment_text", "words").coalesce(4).cache()

### Let's fit logistic regression

In [39]:
from pyspark.ml.classification import LogisticRegression

In [40]:
lr = LogisticRegression(featuresCol=hasher.getOutputCol(), labelCol="target", maxIter=15)

In [41]:
lr_model = lr.fit(train)

                                                                                

In [42]:
lr_model

LogisticRegressionModel: uid=LogisticRegression_8c4a027b7505, numClasses=2, numFeatures=100

In [43]:
predictions = lr_model.transform(test)

In [44]:
predictions.printSchema()

root
 |-- id: string (nullable = true)
 |-- target: integer (nullable = false)
 |-- word_vector: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [45]:
predictions.select("id", "target", "prediction", "probability", "rawPrediction").show(10, False, True)

[Stage 77:>                                                         (0 + 1) / 1]

-RECORD 0--------------------------------------------------
 id            | 00f33e1915a72826                          
 target        | 0                                         
 prediction    | 0.0                                       
 probability   | [0.8510524349407096,0.1489475650592904]   
 rawPrediction | [1.7428794106956955,-1.7428794106956955]  
-RECORD 1--------------------------------------------------
 id            | 01f29c98adb73328                          
 target        | 0                                         
 prediction    | 0.0                                       
 probability   | [0.8929767624654421,0.10702323753455789]  
 rawPrediction | [2.1215145745593365,-2.1215145745593365]  
-RECORD 2--------------------------------------------------
 id            | 028aa892076cf0d1                          
 target        | 0                                         
 prediction    | 0.0                                       
 probability   | [0.5696021494297644,0.4

                                                                                

In [46]:
true_predictions = predictions.select("target", f.col("prediction").cast("int")).filter("target == prediction").count()
true_predictions

                                                                                

28880

In [47]:
print("Accuracy is {}".format(true_predictions / predictions.count()))

Accuracy is 0.8980378743120122


### Not bad! Or...

In [48]:
predictions.select("target", f.col("prediction").cast("int"))\
           .filter((f.col("target") == 1) & (f.col("prediction") == f.col("target")))\
           .count()

178

In [49]:
predictions.printSchema()

root
 |-- id: string (nullable = true)
 |-- target: integer (nullable = false)
 |-- word_vector: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [50]:
predictions_pd = predictions.select("target", f.col("prediction").cast("int")).toPandas()

In [51]:
predictions_pd.head()

Unnamed: 0,target,prediction
0,0,0
1,0,0
2,0,0
3,0,0
4,0,0


In [52]:
lr.getOrDefault("threshold")

0.5

In [53]:
from sklearn.metrics import classification_report, precision_score

In [54]:
print(classification_report(predictions_pd.target, predictions_pd.prediction))

              precision    recall  f1-score   support

           0       0.90      1.00      0.95     28832
           1       0.58      0.05      0.10      3327

    accuracy                           0.90     32159
   macro avg       0.74      0.52      0.52     32159
weighted avg       0.87      0.90      0.86     32159



In [None]:
train.count()

### What if we want more sophisticated metrics?

In [55]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [56]:
evaluator = BinaryClassificationEvaluator(labelCol="target", metricName='areaUnderROC')

In [57]:
evaluator.evaluate(predictions)

                                                                                

0.768172436897586

In [58]:
evaluator.setParams(metricName="precision")

BinaryClassificationEvaluator_25f7596c37d9

In [59]:
evaluator.evaluate(predictions)

IllegalArgumentException: BinaryClassificationEvaluator_25f7596c37d9 parameter metricName given invalid value precision.

### `spark.ml.evaluation.BinaryClassificationEvaluator` supports only ROC AUC and PR AUC. What if we want more?

In [60]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [61]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="target", metricName="accuracy")

In [62]:
evaluator.evaluate(predictions)

0.8980378743120122

In [63]:
evaluator = evaluator.setMetricName("weightedPrecision")

In [64]:
evaluator.evaluate(predictions)

0.8676956687517091

In [65]:
evaluator = evaluator.setMetricName("weightedRecall")

In [66]:
evaluator.evaluate(predictions)

0.8980378743120122

### Let's define a pipeline!

In [68]:
dataset = spark.read.parquet("/user/pklemenkov/toxic_dataset")

                                                                                

In [69]:
dataset

DataFrame[id: string, comment_text: string, target: int]

In [70]:
dataset.rdd.getNumPartitions()

4

In [71]:
from pyspark.ml import Pipeline

In [73]:
pipeline = Pipeline(stages=[
    tokenizer,
    hasher,
    lr
])

In [74]:
train = dataset.sampleBy("target", fractions={0: 0.8, 1: 0.8}).cache()

In [75]:
test = dataset.join(train, on="id", how="leftanti").cache()

In [76]:
pipeline_model = pipeline.fit(train)

                                                                                

In [77]:
pipeline_model

PipelineModel_1a8c7a5e0755

In [78]:
predictions = pipeline_model.transform(test)

In [79]:
evaluator = BinaryClassificationEvaluator(labelCol="target", metricName='areaUnderROC')

In [80]:
evaluator.evaluate(predictions)

                                                                                

0.7643669340887645

### Okay, may be some more sophisticated stuff?

In [81]:
from pyspark.ml.classification import GBTClassifier

In [82]:
gbt = GBTClassifier(featuresCol=hasher.getOutputCol(), labelCol="target", maxIter=20, maxDepth=3)

In [83]:
pipeline = Pipeline(stages=[
    tokenizer,
    hasher,
    gbt
])

In [84]:
pipeline_model = pipeline.fit(train)

                                                                                

In [85]:
predictions = pipeline_model.transform(test)

In [86]:
evaluator.evaluate(predictions)

                                                                                

0.7249354943681089

### Lets add more degrees of freedom

In [87]:
pipeline_model.stages

[Tokenizer_316380ea9fe2,
 HashingTF_92d4a1ba5902,
 GBTClassificationModel: uid = GBTClassifier_fbe468aaef5f, numTrees=20, numClasses=2, numFeatures=100]

In [88]:
pipeline_model = pipeline.fit(train, params={hasher.numFeatures: 1000})

                                                                                

In [89]:
pipeline_model.stages[1].extractParamMap()

{Param(parent='HashingTF_92d4a1ba5902', name='binary', doc='If True, all non zero counts are set to 1. This is useful for discrete probabilistic models that model binary events rather than integer counts. Default False.'): True,
 Param(parent='HashingTF_92d4a1ba5902', name='numFeatures', doc='Number of features. Should be greater than 0.'): 1000,
 Param(parent='HashingTF_92d4a1ba5902', name='outputCol', doc='output column name.'): 'word_vector',
 Param(parent='HashingTF_92d4a1ba5902', name='inputCol', doc='input column name.'): 'words'}

In [90]:
predictions = pipeline_model.transform(test)

In [91]:
evaluator.evaluate(predictions)

                                                                                

0.785057742922844

### Let's remove stopwords

In [93]:
stop_words = StopWordsRemover.loadDefaultStopWords("english")

In [94]:
stop_words

['i',
 'me',
 'my',
 'myself',
 'we',
 'our',
 'ours',
 'ourselves',
 'you',
 'your',
 'yours',
 'yourself',
 'yourselves',
 'he',
 'him',
 'his',
 'himself',
 'she',
 'her',
 'hers',
 'herself',
 'it',
 'its',
 'itself',
 'they',
 'them',
 'their',
 'theirs',
 'themselves',
 'what',
 'which',
 'who',
 'whom',
 'this',
 'that',
 'these',
 'those',
 'am',
 'is',
 'are',
 'was',
 'were',
 'be',
 'been',
 'being',
 'have',
 'has',
 'had',
 'having',
 'do',
 'does',
 'did',
 'doing',
 'a',
 'an',
 'the',
 'and',
 'but',
 'if',
 'or',
 'because',
 'as',
 'until',
 'while',
 'of',
 'at',
 'by',
 'for',
 'with',
 'about',
 'against',
 'between',
 'into',
 'through',
 'during',
 'before',
 'after',
 'above',
 'below',
 'to',
 'from',
 'up',
 'down',
 'in',
 'out',
 'on',
 'off',
 'over',
 'under',
 'again',
 'further',
 'then',
 'once',
 'here',
 'there',
 'when',
 'where',
 'why',
 'how',
 'all',
 'any',
 'both',
 'each',
 'few',
 'more',
 'most',
 'other',
 'some',
 'such',
 'no',
 'nor',
 '

In [95]:
swr = StopWordsRemover(inputCol=tokenizer.getOutputCol(), outputCol="words_filtered", stopWords=stop_words)

In [96]:
hasher = HashingTF(numFeatures=1000, binary=True, inputCol=swr.getOutputCol(), outputCol="word_vector")

In [97]:
pipeline = Pipeline(stages=[
    tokenizer,
    swr,
    hasher,
    lr
])

In [98]:
pipeline_model = pipeline.fit(train)

22/04/11 17:46:59 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
22/04/11 17:46:59 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
22/04/11 17:47:02 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
22/04/11 17:47:02 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS


In [99]:
pipeline_model.stages

[Tokenizer_316380ea9fe2,
 StopWordsRemover_e2242cff1968,
 HashingTF_ba50066741f6,
 LogisticRegressionModel: uid=LogisticRegression_8c4a027b7505, numClasses=2, numFeatures=1000]

In [100]:
predictions = pipeline_model.transform(test)

In [101]:
evaluator.evaluate(predictions)

                                                                                

0.8497071893174107

### Need moar features!

In [102]:
import pyspark.sql.functions as f

In [103]:
dataset.printSchema()

root
 |-- id: string (nullable = true)
 |-- comment_text: string (nullable = true)
 |-- target: integer (nullable = true)



In [104]:
dataset = dataset.withColumn("comment_length", f.length(dataset.comment_text))

In [105]:
dataset.show()

+----------------+--------------------+------+--------------+
|              id|        comment_text|target|comment_length|
+----------------+--------------------+------+--------------+
|6fdb7b6734f8bf40|"\n\n""Katara""\n...|     0|           323|
|26e1b63617df36b1|"\n\n charlie wil...|     0|           242|
|85e4f353ca4b2bde|Arthur Rose Eldre...|     0|           581|
|9d2196265213dce8|Re: Help translat...|     0|           110|
|fb7a63a8e287b2d1|Sources for Gambi...|     0|           568|
|fd42fd6a1ea341c4|Just let me hear ...|     0|           130|
|54f9e59924682c6e|Flamboyant (gay) ...|     0|           197|
|01c0ae884d69319b|Thank you \n\nI j...|     0|            36|
|f7fec98d6aac8ce3|Removed the notab...|     0|            32|
|25553d990b245467|I AM HAVING MY PE...|     1|            22|
|0f5c123acb6ba9be|Hello. Why do you...|     0|           114|
|901d7783bca3d63f|Use one that does...|     1|            52|
|7b65f1b25caab0cc|Movie removal fro...|     0|           241|
|09201fc

In [106]:
train = dataset.sampleBy("target", fractions={0: 0.8, 1: 0.8}).cache()
test = dataset.join(train, on="id", how="leftanti").cache()

### All features must be assembled into one column. `VectorAssembler` to the rescue!

In [107]:
assembler = VectorAssembler(inputCols=[hasher.getOutputCol(), "comment_length"], outputCol="features")

In [108]:
lr = LogisticRegression(labelCol="target", maxIter=15)

In [109]:
pipeline = Pipeline(stages=[
    tokenizer,
    swr,
    hasher,
    assembler,
    lr
])

In [110]:
pipeline_model = pipeline.fit(train)

                                                                                

In [111]:
pipeline_model.stages

[Tokenizer_316380ea9fe2,
 StopWordsRemover_e2242cff1968,
 HashingTF_ba50066741f6,
 VectorAssembler_bec6f89e8b02,
 LogisticRegressionModel: uid=LogisticRegression_9558ab98318d, numClasses=2, numFeatures=1001]

In [112]:
predictions = pipeline_model.transform(test)

In [113]:
evaluator.evaluate(predictions)

                                                                                

0.8524172089655803

In [None]:
pipeline_model.stages[-1].coefficients[-1]

### Ok, how do you do it right!? 
https://www.kaggle.com/c/jigsaw-toxic-comment-classification-challenge/discussion/52557

### Very funny, anyway?

In [114]:
count_vectorizer = CountVectorizer(inputCol=swr.getOutputCol(), outputCol="word_vector", binary=True)

In [115]:
assembler = VectorAssembler(inputCols=[count_vectorizer.getOutputCol(), "comment_length"], outputCol="features")

In [116]:
pipeline = Pipeline(stages=[
    tokenizer,
    swr,
    count_vectorizer,
    assembler,
    lr
])

In [117]:
pipeline_model = pipeline.fit(train)

22/04/11 17:55:18 WARN DAGScheduler: Broadcasting large task binary with size 18.6 MiB
22/04/11 17:55:24 WARN DAGScheduler: Broadcasting large task binary with size 18.6 MiB
22/04/11 17:55:29 WARN DAGScheduler: Broadcasting large task binary with size 18.6 MiB
22/04/11 17:55:30 WARN DAGScheduler: Broadcasting large task binary with size 18.6 MiB
22/04/11 17:55:32 WARN DAGScheduler: Broadcasting large task binary with size 18.6 MiB
22/04/11 17:55:33 WARN DAGScheduler: Broadcasting large task binary with size 18.6 MiB
22/04/11 17:55:35 WARN DAGScheduler: Broadcasting large task binary with size 18.6 MiB
22/04/11 17:55:37 WARN DAGScheduler: Broadcasting large task binary with size 18.6 MiB
22/04/11 17:55:38 WARN DAGScheduler: Broadcasting large task binary with size 18.6 MiB
22/04/11 17:55:40 WARN DAGScheduler: Broadcasting large task binary with size 18.6 MiB
22/04/11 17:55:41 WARN DAGScheduler: Broadcasting large task binary with size 18.6 MiB
22/04/11 17:55:43 WARN DAGScheduler: Broadc

In [119]:
predictions = pipeline_model.transform(test)

In [120]:
evaluator.evaluate(predictions)

22/04/11 17:56:04 WARN DAGScheduler: Broadcasting large task binary with size 20.6 MiB
                                                                                

0.9303699031864843

22/04/11 18:06:06 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_1159_1 !
22/04/11 18:06:06 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_1223_1 !
22/04/11 18:06:06 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_328_3 !
22/04/11 18:06:07 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_29_2 !
22/04/11 18:06:07 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_98_1 !
22/04/11 18:06:07 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_10_2 !
22/04/11 18:06:07 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_29_0 !
22/04/11 18:06:07 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_266_2 !
22/04/11 18:06:07 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_29_1 !
22/04/11 18:06:07 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_328_1 !
22/04/11 18:06:07 WARN BlockManagerMasterEndpoint: No more repl

![](http://29.media.tumblr.com/tumblr_lltzgnHi5F1qzib3wo1_400.jpg)

### Hyperparameter tuning

In [None]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [None]:
paramGrid = ParamGridBuilder().addGrid(count_vectorizer.vocabSize, [100, 500])\
                              .addGrid(lr.regParam, [0.01, 0.05])\
                              .build()

In [None]:
paramGrid

In [None]:
crossval = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid,
                          evaluator=evaluator, numFolds=3, parallelism=4)

In [None]:
cv_model = crossval.fit(train)

In [None]:
cv_model.avgMetrics

In [None]:
cv_model.bestModel

In [None]:
predictions = cv_model.transform(test)

In [None]:
evaluator.evaluate(predictions)

In [None]:
spark.stop()