In [1]:
%%configure -f
{
    "conf": {
        "spark.pyspark.python": "python3",
        "spark.pyspark.virtualenv.enabled": "true",
        "spark.pyspark.virtualenv.type":"native",
        "spark.pyspark.virtualenv.bin.path":"/usr/bin/virtualenv"
    }
}

In [2]:
sc.install_pypi_package("boto3==1.19.2")
sc.install_pypi_package("pandas==1.0.5")
sc.install_pypi_package("scipy==1.4.1")
sc.install_pypi_package("matplotlib==3.2.1")
sc.install_pypi_package("seaborn==0.10.1")
sc.install_pypi_package("snownlp")
sc.install_pypi_package("jieba")
sc.install_pypi_package("cnsenti")

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1685147028910_0002,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Collecting boto3==1.19.2
  Downloading https://files.pythonhosted.org/packages/59/60/163503f24cf09553d0bb6c37db9ff3254f6cda812cab00430602867d03f5/boto3-1.19.2-py3-none-any.whl (131kB)
Collecting botocore<1.23.0,>=1.22.2 (from boto3==1.19.2)
  Downloading https://files.pythonhosted.org/packages/6a/73/552b27e3a1b4f83630907c4958be78e9d4c906e73efd554ebd5e21cb1692/botocore-1.22.12-py3-none-any.whl (8.1MB)
Collecting s3transfer<0.6.0,>=0.5.0 (from boto3==1.19.2)
  Downloading https://files.pythonhosted.org/packages/7b/9c/f51775ebe7df5a7aa4e7c79ed671bde94e154bd968aca8d65bb24aba0c8c/s3transfer-0.5.2-py3-none-any.whl (79kB)
Collecting python-dateutil<3.0.0,>=2.1 (from botocore<1.23.0,>=1.22.2->boto3==1.19.2)
  Downloading https://files.pythonhosted.org/packages/36/7a/87837f39d0296e723bb9b62bbb257d0355c7f6128853c78955f57342a56d/python_dateutil-2.8.2-py2.py3-none-any.whl (247kB)
Collecting urllib3<1.27,>=1.25.4 (from botocore<1.23.0,>=1.22.2->boto3==1.19.2)
  Downloading https://files.pythonhoste

In [3]:
train = spark.read.json('s3://amazon-reviews-ml/json/train/dataset_zh_train.json')
test = spark.read.json('s3://amazon-reviews-ml/json/test/dataset_zh_test.json')
dev = spark.read.json('s3://amazon-reviews-ml/json/dev/dataset_zh_dev.json')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
comb = train.union(dev).persist()
comb = comb.union(test).persist()
# comb = test

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
print('Total Columns: %d' % len(comb.dtypes))
print('Total Rows: %d' % comb.count())
comb.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Total Columns: 8
Total Rows: 210000
root
 |-- language: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- review_title: string (nullable = true)
 |-- reviewer_id: string (nullable = true)
 |-- stars: string (nullable = true)

In [6]:
from pyspark.ml.classification import LogisticRegression
from pyspark.sql.functions import *
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.pipeline import Transformer
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from pyspark.sql import functions as F
from pyspark.sql.functions import udf
from pyspark.sql import types as T
from pyspark.sql.types import ArrayType, FloatType, DoubleType, IntegerType
import numpy as np
from snownlp import SnowNLP
import jieba
from cnsenti import Sentiment
senti = Sentiment()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
class ChineseTokenizer(Transformer, DefaultParamsReadable, DefaultParamsWritable):
    def __init__(self, inputCol="review_body", outputCol="body"):
        self.inputCol = inputCol
        self.outputCol = outputCol

    def _transform(self, df):
        def f(s):
            return list(jieba.cut(s))

        return df.withColumn(self.outputCol, udf(f, T.ArrayType(T.StringType()))(F.col(self.inputCol)))


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
chineseTokenizer1 = ChineseTokenizer(inputCol="review_body", outputCol="body")
chineseTokenizer2 = ChineseTokenizer(inputCol="review_title", outputCol="title")
stopwords = StopWordsRemover.loadDefaultStopWords("english")
#stopwordsRemover1 = StopWordsRemover(inputCol = "body", 
                                    #outputCol = "filtered_body").setStopWords(stopwords)
label_stringIdx3 = StringIndexer(inputCol = "product_category", outputCol = "feature3")
label_stringIdx4 = StringIndexer(inputCol = "stars", outputCol = "label")
countVectors1 = CountVectorizer(inputCol="body", outputCol="feature1", minDF=5)
countVectors2 = CountVectorizer(inputCol="title", outputCol="feature2", minDF=5)
pipeline = Pipeline(stages=[chineseTokenizer1, chineseTokenizer2, 
                            label_stringIdx3,label_stringIdx4,
                            countVectors1, countVectors2])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
def get_score(text):
    s = SnowNLP(text)
    result = senti.sentiment_count(text)
#     return [1.1,2.2,3.3]
    return [s.sentiments, float(result['pos']), float(result['neg'])]

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
sentiment_analysis_udf = udf(get_score, ArrayType(DoubleType()))
comb = comb.withColumn("sentiment_scores", sentiment_analysis_udf(comb.review_body)).persist()
comb = comb.withColumn("feature4", comb.sentiment_scores.getItem(0)).persist()
comb = comb.withColumn("feature5", comb.sentiment_scores.getItem(1)).persist()
comb = comb.withColumn("feature6", comb.sentiment_scores.getItem(2)).persist()
comb = comb.drop("sentiment_scores").persist()
# comb = comb.withColumn("feature4", sentiment_analysis_udf(comb.review_body).cast('double')).persist()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
pipelineFit = pipeline.fit(comb)
transformed_data = pipelineFit.transform(comb)
# data = transformed_data.select("feature1","feature2","feature3","label").persist()
data = transformed_data.select("feature1","feature2","feature3", 'feature4', 'feature5', 'feature6',"label").persist()
# data.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
features = ["feature1","feature2","feature3","feature4","feature5","feature6"]
# features = ["feature1","feature2","feature3"]
assembler = VectorAssembler(inputCols = features, outputCol = 'features')
transformed_data = assembler.transform(data.na.drop())
train, test = transformed_data.randomSplit([0.7, 0.3], seed = 521)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [49]:
stars = (train.groupBy('label')
             .count()
             .sort('label', ascending=False)
        )
stars.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+-----+
|label|count|
+-----+-----+
|  4.0|29500|
|  3.0|29412|
|  2.0|29317|
|  1.0|29312|
|  0.0|29532|
+-----+-----+

In [28]:
lr = LogisticRegression(standardization = True)
grid = ParamGridBuilder().addGrid(lr.regParam, np.arange(0.5,1,.1)) \
.addGrid(lr.elasticNetParam, [-0.5, 0.1, 1]).build()
evaluator = MulticlassClassificationEvaluator()
cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator,
    parallelism=320, numFolds = 5)
cvModel = cv.fit(train)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [29]:
print(cvModel.bestModel.getRegParam())
print(cvModel.bestModel.getElasticNetParam())
evaluationSummary = cvModel.bestModel.transform(test)
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(evaluationSummary)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

0.7000000000000001
0.0
0.4917401800541683

In [13]:
from pyspark.ml.classification import RandomForestClassifier

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [14]:
rf = RandomForestClassifier(numTrees=100, maxDepth=30, labelCol="label", seed=521,
    leafCol="leafId")
model = rf.fit(train)
predictions = model.transform(test)
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %s" % (accuracy))
print("Test Error = %s" % (1.0 - accuracy))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Accuracy = 0.44481590738753624
Test Error = 0.5551840926124638

In [50]:
from pyspark.ml.classification import NaiveBayes

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [51]:
nb = NaiveBayes(modelType="multinomial",labelCol="label", featuresCol="features")
nbparamGrid = (ParamGridBuilder()
               .addGrid(nb.smoothing, np.arange(1, 40, 1))
               .build())
nbevaluator = MulticlassClassificationEvaluator(labelCol="label", 
                                                predictionCol="prediction", 
                                                metricName="accuracy")
nbcv = CrossValidator(estimator = nb,
                      estimatorParamMaps = nbparamGrid,
                      evaluator = nbevaluator,
                      numFolds = 5)
nbmodel = nbcv.fit(train)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [52]:
nbpredictions = nbmodel.transform(test)
print('Accuracy:', nbevaluator.evaluate(nbpredictions))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Accuracy: 0.5020897230123794

In [53]:
data.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------------+--------+--------------------+--------+--------+-----+
|            feature1|            feature2|feature3|            feature4|feature5|feature6|label|
+--------------------+--------------------+--------+--------------------+--------+--------+-----+
|(31836,[0,1,2,3,5...|(10132,[34,47,501...|     0.0|1.003648108122146E-5|     2.0|     1.0|  0.0|
|(31836,[0,1,2,3,5...|(10132,[3,11,641,...|     0.0|0.001484328403515...|     0.0|     3.0|  0.0|
|(31836,[0,1,2,4,5...|(10132,[1,1173,83...|    15.0|1.616676119364779...|     2.0|     1.0|  0.0|
|(31836,[0,1,5,6,9...|(10132,[2126,2228...|     7.0| 0.06811525087289605|     0.0|     1.0|  0.0|
|(31836,[0,3,6,9,1...|(10132,[0,2,3,8,2...|     0.0|0.021909130004662658|     0.0|     0.0|  0.0|
+--------------------+--------------------+--------+--------------------+--------+--------+-----+
only showing top 5 rows