In [1]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import HashingTF, RegexTokenizer, StopWordsRemover, NGram, VectorAssembler, StringIndexer, IndexToString
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.mllib.util import *
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("Senti-Analysis") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1607500156469_0001,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
reviews = "s3://bd-project-data/review.json.bz2"
reviewsDF = spark.read.json(reviews)
reviewsDF.createOrReplaceTempView("reviews")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
reviewsDF.groupBy("stars").count().orderBy("stars").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+-------+
|stars|  count|
+-----+-------+
|  1.0|1283897|
|  2.0| 635072|
|  3.0| 842289|
|  4.0|1673404|
|  5.0|3586460|
+-----+-------+

In [4]:
reviewsDF = reviewsDF.sample(False, 0.2)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
reviewsDF = spark.sql(
"""
  SELECT text, label, rowNumber FROM (
    SELECT
       CASE
              WHEN reviews.stars  = 1 THEN 1
              WHEN reviews.stars  = 2 THEN 1
              WHEN reviews.stars  = 3 THEN 2
              WHEN reviews.stars  = 4 THEN 2
              WHEN reviews.stars  = 5 THEN 2
       END AS label
      ,reviews.text AS text
      ,row_number() OVER (PARTITION BY stars ORDER BY rand()) AS rowNumber
    FROM reviews
  ) reviews
  WHERE rowNumber <= 70000
  """
)

reviewsDF.persist(pyspark.StorageLevel.MEMORY_AND_DISK_2)

reviewsDF.groupBy("label").count().orderBy("label").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+------+
|label| count|
+-----+------+
|    1|140000|
|    2|210000|
+-----+------+

In [24]:
training, test = reviewsDF.randomSplit([0.8, 0.2], seed=12345)

training.persist(pyspark.StorageLevel.MEMORY_AND_DISK_2)
test.persist(pyspark.StorageLevel.MEMORY_AND_DISK_2)

numTraining = training.count()
numTest = test.count()
print("numTraining = "+str(numTraining)+" numtest = "+str(numTest))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

numTraining = 280009 numtest = 69991

## Defining Parts of the Pipeline


In [25]:
regextokenizer = RegexTokenizer(inputCol="text", pattern="[a-zA-Z']+", outputCol="regex")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [26]:
remover = StopWordsRemover(inputCol="regex", outputCol="nostopwords")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [27]:
ngram2 = NGram(n=2, inputCol="nostopwords", outputCol="ngram2")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [28]:
ngram3 = NGram(n=3, inputCol="nostopwords", outputCol="ngram3")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [29]:
hashingTFremover = HashingTF(inputCol="nostopwords", outputCol="hashremover")
hashingTFngram2 = HashingTF(inputCol="ngram2", outputCol="hashngram2")
hashingTFngram3 = HashingTF(inputCol="ngram3", outputCol="hashngram3")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [30]:
assembler = VectorAssembler(inputCols=["hashremover","hashngram2","hashngram3"],outputCol="assembler")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [31]:
labelIndexer = StringIndexer(inputCol="label", outputCol="LabelIndex")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [32]:
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel", labels=['1','2'])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [33]:
nb = NaiveBayes(smoothing=1.0, labelCol="LabelIndex", featuresCol="assembler", predictionCol="prediction", modelType="multinomial")


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [34]:
pipeline = Pipeline(stages=[regextokenizer, remover, ngram2, ngram3, hashingTFremover, hashingTFngram2, hashingTFngram3, assembler, labelIndexer, nb, labelConverter])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [35]:
paramGrid = ParamGridBuilder() \
    .addGrid(hashingTFremover.numFeatures, [1000, 10000]) \
    .addGrid(hashingTFngram2.numFeatures, [1000, 10000]) \
    .addGrid(hashingTFngram3.numFeatures, [1000, 10000]) \
    .build()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [36]:
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator(labelCol="LabelIndex", predictionCol="prediction", metricName="accuracy"),
                          numFolds=3)  # use 3+ folds in practice


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [37]:
cvModel = crossval.fit(training)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [38]:
predictions = cvModel.transform(test)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [39]:
evaluator = MulticlassClassificationEvaluator(labelCol="LabelIndex", predictionCol="prediction",metricName="accuracy")

precision = evaluator.evaluate(predictions)
print(precision)
print("Test Error = " + str(1.0 - precision))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

0.6498264062522324
Test Error = 0.3501735937477676