In [1]:
import os
import sys

SPARK_HOME = "/usr/hdp/current/spark2-client"
PYSPARK_PYTHON = "/opt/conda/envs/dsenv/bin/python"
os.environ["PYSPARK_PYTHON"]= PYSPARK_PYTHON
os.environ["SPARK_HOME"] = SPARK_HOME

PYSPARK_HOME = os.path.join(SPARK_HOME, "python/lib")
sys.path.insert(0, os.path.join(PYSPARK_HOME, "py4j-0.10.7-src.zip"))
sys.path.insert(0, os.path.join(PYSPARK_HOME, "pyspark.zip"))

In [2]:
from pyspark import SparkConf
from pyspark.sql import SparkSession

conf = SparkConf()
conf.set("spark.ui.port", 23830) # подставьте случайное пятизначное число

spark = SparkSession.builder.config(conf=conf).appName("Spark ML Intro").getOrCreate()

In [3]:
#spark = SparkSession.builder.getOrCreate()
#spark.sparkContext.setLogLevel('WARN')

In [4]:
spark

In [3]:
from pyspark.sql.types import *
from pyspark.ml.feature import *
import pyspark.sql.functions as f
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [4]:
path = "/datasets/amazon/all_reviews_5_core_train_small.json"
data = spark.read.json(path)

In [5]:
train, test = data.randomSplit([0.9, 0.1], seed=12345)

In [6]:
stop_words = StopWordsRemover.loadDefaultStopWords("english")
tokenizer = RegexTokenizer(inputCol="reviewText", outputCol="wordsReview", pattern="\\W")
swr = StopWordsRemover(inputCol=tokenizer.getOutputCol(), outputCol="reviewFiltered", stopWords=stop_words)
count_vectorizer = CountVectorizer(inputCol=swr.getOutputCol(), outputCol="reviewVector", binary=True, vocabSize=3000)

In [7]:
lr = LogisticRegression(featuresCol="reviewVector", labelCol="overall",  maxIter=30)

In [7]:
evaluator = RegressionEvaluator(labelCol="overall", predictionCol="prediction", metricName='rmse')

In [9]:
pipeline = Pipeline(stages=[
    tokenizer,   
    swr,    
    count_vectorizer,
    lr
])

In [10]:
pipeline_model = pipeline.fit(train)

In [11]:
predictions = pipeline_model.transform(test)
rmse = evaluator.evaluate(predictions)
rmse

1.0875679521051143

## Подбор параметров

In [25]:
paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0, 0.01, 0.05]) \
    .build()

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3)

cvModel = crossval.fit(data)
#zip(cvModel.avgMetrics, paramGrid)
#predictions = cvModel.transform(test)

lr.regParam, [0, 0.1, 0.01]

In [26]:
cvModel.avgMetrics 

[1.0921764158184089, 1.0922413098323918, 1.0921702174855719]

In [32]:
lr = LogisticRegression(featuresCol="reviewVector", labelCol="overall", regParam=0)

In [33]:
paramGrid = ParamGridBuilder() \
    .addGrid(lr.maxIter, [5, 10, 15, 20, 25, 30])\
    .build()

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3)

cvModel = crossval.fit(data)

In [34]:
cvModel.avgMetrics 

[1.092192733151485,
 1.0920426876340248,
 1.0919565062351881,
 1.092192733151485,
 1.0919772559163152,
 1.0921702348342683]

In [20]:
lr = LogisticRegression(featuresCol="reviewVector", labelCol="overall", regParam=0)
count_vectorizer = CountVectorizer(inputCol=swr.getOutputCol(), outputCol="reviewVector", binary=True)

In [21]:
paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [5, 10]) \
    .addGrid(count_vectorizer.vocabSize, [1000, 1500, 2000, 2500, 3000])\
    .build()

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3)

cvModel = crossval.fit(data)

In [22]:
cvModel.avgMetrics 

[1.0969129463223743,
 1.0969129463223743,
 1.0969235314572523,
 1.0969129463223743,
 1.0969129463223743,
 1.0969310290759866,
 1.0969235314572523,
 1.0969235314572523,
 1.0969129463223743,
 1.0969310290759866]

In [38]:
paramGrid

[{Param(parent='LogisticRegression_42d3ae5db561952d5812', name='regParam', doc='regularization parameter (>= 0).'): 5,
  Param(parent='CountVectorizer_4b6e9efc02bb315e2802', name='vocabSize', doc='max size of the vocabulary. Default 1 << 18.'): 1000},
 {Param(parent='LogisticRegression_42d3ae5db561952d5812', name='regParam', doc='regularization parameter (>= 0).'): 5,
  Param(parent='CountVectorizer_4b6e9efc02bb315e2802', name='vocabSize', doc='max size of the vocabulary. Default 1 << 18.'): 1500},
 {Param(parent='LogisticRegression_42d3ae5db561952d5812', name='regParam', doc='regularization parameter (>= 0).'): 5,
  Param(parent='CountVectorizer_4b6e9efc02bb315e2802', name='vocabSize', doc='max size of the vocabulary. Default 1 << 18.'): 2000},
 {Param(parent='LogisticRegression_42d3ae5db561952d5812', name='regParam', doc='regularization parameter (>= 0).'): 5,
  Param(parent='CountVectorizer_4b6e9efc02bb315e2802', name='vocabSize', doc='max size of the vocabulary. Default 1 << 18.'):

## Сравним по времени (5, 3000) и (10, 2500) и добавим еще один столбец

In [11]:
lr = LogisticRegression(featuresCol="reviewVector", labelCol="overall", maxIter=5, regParam=0)
count_vectorizer = CountVectorizer(inputCol=swr.getOutputCol(), outputCol="reviewVector", binary=True, vocabSize=3000)

In [12]:
assembler = VectorAssembler(inputCols=[count_vectorizer.getOutputCol(), 'verified'], outputCol="features")

In [13]:
pipeline = Pipeline(stages=[
    tokenizer,   
    swr,    
    count_vectorizer,
    assembler,
    lr
])

In [14]:
%%time
pipeline_model = pipeline.fit(train)

CPU times: user 46.1 ms, sys: 23.8 ms, total: 69.9 ms
Wall time: 31.7 s


In [15]:
predictions = pipeline_model.transform(test)
rmse = evaluator.evaluate(predictions)
rmse

1.1595389754660674

In [16]:
lr = LogisticRegression(featuresCol="reviewVector", labelCol="overall", maxIter=10, regParam=0)
count_vectorizer = CountVectorizer(inputCol=swr.getOutputCol(), outputCol="reviewVector", binary=True, vocabSize=2500)

In [17]:
pipeline = Pipeline(stages=[
    tokenizer,   
    swr,    
    count_vectorizer,
    assembler,
    lr
])

In [18]:
%%time
pipeline_model = pipeline.fit(train)

CPU times: user 57.4 ms, sys: 10.4 ms, total: 67.9 ms
Wall time: 16.9 s


In [19]:
predictions = pipeline_model.transform(test)
rmse = evaluator.evaluate(predictions)
rmse

1.0894365388203096

Выбираем maxIter=10, regParam=0, vocabSize=2500, 

In [21]:
spark.stop()