In [1]:
df1 = spark.read.csv("/home/hadoop/Desktop/doc_class.dat", sep='|', header=True)

In [2]:
df1.count()

334500

In [3]:
df1.show(1)

+--------+----------+--------+--------------------+--------------------+
|myapp_id|typenameid|typename|          myapp_word|      myapp_word_all|
+--------+----------+--------+--------------------+--------------------+
| 1376533|         2|  action|game, android, world|game, android, wo...|
+--------+----------+--------+--------------------+--------------------+
only showing top 1 row



In [4]:
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql import Row
from pyspark.ml.linalg import Vector

In [5]:
tokenizer = Tokenizer(inputCol='myapp_word_all', outputCol='words')
hashingTF = HashingTF(inputCol='words', outputCol='features')

In [6]:
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType

In [7]:
training = df1.withColumnRenamed('typenameid','label').withColumn('label', col('label').cast(IntegerType()))

In [16]:
lr = LogisticRegression(maxIter=10)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

In [21]:
from pyspark.ml.tuning import ParamGridBuilder
paramGrid = ParamGridBuilder().addGrid(hashingTF.numFeatures,[10, 100, 1000]).addGrid(lr.regParam, [0.1, 0.01]).build()

In [34]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator()

In [35]:
from pyspark.ml.tuning import CrossValidator
cv = CrossValidator(estimator=pipeline, evaluator=evaluator, estimatorParamMaps=paramGrid, numFolds=2)

In [36]:
cvModel = cv.fit(training)

In [76]:
test = training.limit(40)

In [31]:
p = cvModel.transform(test)

In [33]:
p.show()

+--------+-----+--------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|myapp_id|label|typename|          myapp_word|      myapp_word_all|               words|            features|       rawPrediction|         probability|prediction|
+--------+-----+--------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
| 1376533|    2|  action|game, android, world|game, android, wo...|[game,, android,,...|(10,[0,1,2,3,4,5,...|[-7.5729925252911...|[1.49808390669311...|       2.0|
| 1376542|    2|  action|                game|game, app, enjoy,...|[game,, app,, enj...|(10,[0,1,2,3,4,5,...|[-7.5729925252911...|[1.49808390669311...|       2.0|
| 1376603|    2|  action|run, tap, collect...|run, tap, collect...|[run,, tap,, coll...|(10,[0,1,2,3,4,5,...|[-7.5729925252911...|[1.49808390669311...|       2.0|
| 1376792|    2|  acti

In [52]:
cvModel.bestModel.stages[2].numFeatures

10

In [51]:
cvModel.bestModel.stages[2].numClasses

49

In [72]:
cvModel.bestModel.stages[2].summary

<pyspark.ml.classification.LogisticRegressionTrainingSummary at 0x7f38b5dbcdd8>

In [56]:
cvModel.bestModel.stages[2].getParam('regParam')

Param(parent='LogisticRegression_436d80994f403bb078ad', name='regParam', doc='regularization parameter (>= 0)')

In [75]:
cvModel.getEvaluator().evaluate(cvModel.transform(test))

1.0

In [8]:
from pyspark.ml.linalg import Vectors
scoreAndLabels = map(lambda x: (Vectors.dense([1.0 - x[0], x[0]]), x[1]),[(0.1, 0.0), (0.1, 1.0), (0.4, 0.0), (0.6, 0.0), (0.6, 1.0), (0.6, 1.0), (0.8, 1.0)])
dataset = spark.createDataFrame(scoreAndLabels, ["raw", "label"])

In [10]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol="raw")
evaluator.evaluate(dataset)

0.7083333333333334

In [11]:
dataset.show()

+--------------------+-----+
|                 raw|label|
+--------------------+-----+
|           [0.9,0.1]|  0.0|
|           [0.9,0.1]|  1.0|
|           [0.6,0.4]|  0.0|
|           [0.4,0.6]|  0.0|
|           [0.4,0.6]|  1.0|
|           [0.4,0.6]|  1.0|
|[0.19999999999999...|  1.0|
+--------------------+-----+



In [12]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import ParamGridBuilder
lr = LogisticRegression()
output = ParamGridBuilder() \
        .baseOn({lr.labelCol: 'l'}) \
        .baseOn([lr.predictionCol, 'p']) \
        .addGrid(lr.regParam, [1.0, 2.0]) \
        .addGrid(lr.maxIter, [1, 5]) \
        .build()
expected = [
        {lr.regParam: 1.0, lr.maxIter: 1, lr.labelCol: 'l', lr.predictionCol: 'p'},
        {lr.regParam: 2.0, lr.maxIter: 1, lr.labelCol: 'l', lr.predictionCol: 'p'},
        {lr.regParam: 1.0, lr.maxIter: 5, lr.labelCol: 'l', lr.predictionCol: 'p'},
        {lr.regParam: 2.0, lr.maxIter: 5, lr.labelCol: 'l', lr.predictionCol: 'p'}]
len(output) == len(expected)

True

In [13]:
output

[{Param(parent='LogisticRegression_446d98d24feabc468dc1', name='labelCol', doc='label column name.'): 'l',
  Param(parent='LogisticRegression_446d98d24feabc468dc1', name='predictionCol', doc='prediction column name.'): 'p',
  Param(parent='LogisticRegression_446d98d24feabc468dc1', name='regParam', doc='regularization parameter (>= 0).'): 1.0,
  Param(parent='LogisticRegression_446d98d24feabc468dc1', name='maxIter', doc='max number of iterations (>= 0).'): 1},
 {Param(parent='LogisticRegression_446d98d24feabc468dc1', name='labelCol', doc='label column name.'): 'l',
  Param(parent='LogisticRegression_446d98d24feabc468dc1', name='predictionCol', doc='prediction column name.'): 'p',
  Param(parent='LogisticRegression_446d98d24feabc468dc1', name='regParam', doc='regularization parameter (>= 0).'): 1.0,
  Param(parent='LogisticRegression_446d98d24feabc468dc1', name='maxIter', doc='max number of iterations (>= 0).'): 5},
 {Param(parent='LogisticRegression_446d98d24feabc468dc1', name='labelCol

In [14]:
all([m in expected for m in output])

True

In [15]:
for m in output:
    print(m)

{Param(parent='LogisticRegression_446d98d24feabc468dc1', name='labelCol', doc='label column name.'): 'l', Param(parent='LogisticRegression_446d98d24feabc468dc1', name='predictionCol', doc='prediction column name.'): 'p', Param(parent='LogisticRegression_446d98d24feabc468dc1', name='regParam', doc='regularization parameter (>= 0).'): 1.0, Param(parent='LogisticRegression_446d98d24feabc468dc1', name='maxIter', doc='max number of iterations (>= 0).'): 1}
{Param(parent='LogisticRegression_446d98d24feabc468dc1', name='labelCol', doc='label column name.'): 'l', Param(parent='LogisticRegression_446d98d24feabc468dc1', name='predictionCol', doc='prediction column name.'): 'p', Param(parent='LogisticRegression_446d98d24feabc468dc1', name='regParam', doc='regularization parameter (>= 0).'): 1.0, Param(parent='LogisticRegression_446d98d24feabc468dc1', name='maxIter', doc='max number of iterations (>= 0).'): 5}
{Param(parent='LogisticRegression_446d98d24feabc468dc1', name='labelCol', doc='label col

In [18]:
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.linalg import Vectors
dataset = spark.createDataFrame(
        [(Vectors.dense([0.0]), 0.0),
         (Vectors.dense([0.4]), 1.0),
         (Vectors.dense([0.5]), 0.0),
         (Vectors.dense([0.6]), 1.0),
         (Vectors.dense([1.0]), 1.0)] * 10,
        ["features", "label"])
lr = LogisticRegression()
grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()
evaluator = BinaryClassificationEvaluator()
cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator,
        parallelism=2)
cvModel = cv.fit(dataset)
cvModel.avgMetrics[0]


0.5

In [19]:
evaluator.evaluate(cvModel.transform(dataset))

0.8333333333333333