If you dont have the dataset, run the below code

In [None]:
import gdown

In [None]:
gdown.download("https://drive.google.com/uc?id=1mW974SwZsSMH-nr89c2Pe9PPHhT1ifDr")

In [1]:
import threading

# Helper thread to avoid the Spark StreamingContext from blocking Jupyter
        
class StreamingThread(threading.Thread):
    def __init__(self, ssc):
        super().__init__()
        self.ssc = ssc
    def run(self):
        self.ssc.start()
        self.ssc.awaitTermination()
    def stop(self):
        print('----- Stopping... this may take a few seconds -----')
        self.ssc.stop(stopSparkContext=False, stopGraceFully=True)

In [2]:
sc

In [3]:
spark

In [4]:
#import necessary packages
import random
from pyspark.streaming import StreamingContext
from pyspark.sql import Row
from pyspark.sql.functions import udf, struct, array, col, lit, lower
from pyspark.sql.types import StringType
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer, SQLTransformer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml.feature import HashingTF, IDF
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import StandardScaler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

Load in data

In [5]:
df = spark.read.json('/Users/christianbutcher/Desktop/spark/reviews/*')
df.show(10)

                                                                                

+-------+-----+---------+--------------------+
| app_id|label|review_id|         review_text|
+-------+-----+---------+--------------------+
|2156300|    1|136759198|The Demo was grea...|
|2372320|    0|136758922|First of, this ga...|
|1498040|    1|136761203|Пример того, как ...|
|1811990|    1|136761840|I have beaten the...|
|1811990|    1|136761635|It really is very...|
|1782810|    1|136633021|Great for its cur...|
|1649740|    1|136629798|THROW YOUR MONEY ...|
|1649740|    1|136629381|I forgot I backed...|
|1649740|    1|136628148|Firstly, if you'r...|
|1649740|    1|136627883|[h1] HUNT THE NIG...|
+-------+-----+---------+--------------------+
only showing top 10 rows



Clean the data set:

In [6]:
df = df.dropDuplicates(['review_id'])
df = df.filter(df['review_text'] != '')
df = df.withColumn("review_text", lower(df["review_text"]))

Create a balanced data set:

In [7]:
n = 500
seed = 1

fractions = df.groupBy("label").count().withColumn("required_n", n/col("count"))\
                .drop("count").rdd.collectAsMap()

df_bal = df.stat.sampleBy("label", fractions, seed)
df_bal.groupBy("label").count().show()



+-----+-----+
|label|count|
+-----+-----+
|    0|  499|
|    1|  507|
+-----+-----+



                                                                                

Split data into training and test sets:

In [8]:
(trainingData, testData) = df_bal.randomSplit([0.7, 0.3], seed = 100)
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))

                                                                                

Training Dataset Count: 708




Test Dataset Count: 298


                                                                                

Inititalise pipeline stages:

In [9]:
# regular expression tokenizer
regexTokenizer = RegexTokenizer(inputCol="review_text", outputCol="words", pattern="\\W")
# stop words
stops = StopWordsRemover.loadDefaultStopWords('english')
stopwordsRemover = StopWordsRemover(inputCol=regexTokenizer.getOutputCol(), outputCol="filtered", 
                                   stopWords = stops)

hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=10000)

idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms


lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0.2)

Put everything together in the pipeline:

In [10]:
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, hashingTF, idf, lr])

In [11]:
paramGrid = ParamGridBuilder() \
    .addGrid(hashingTF.numFeatures, [10, 100, 1000]) \
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .addGrid(lr.elasticNetParam, [0,0.2,0.5,0.8]) \
    .build()

In [12]:
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=3)

In [13]:
# Run cross-validation, and choose the best set of parameters.
model = crossval.fit(trainingData)

                                                                                

23/05/18 10:41:38 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/05/18 10:41:38 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS


                                                                                

23/05/18 10:46:27 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
23/05/18 10:46:27 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS




In [26]:
best = model.bestModel
print(best.stages)

[RegexTokenizer_67ec11fab942, StopWordsRemover_fcfcb43619a5, HashingTF_f6f2e8ae7c56, IDFModel: uid=IDF_32bdbcb2897f, numDocs=708, numFeatures=1000, LogisticRegressionModel: uid=LogisticRegression_f9c14558cf36, numClasses=2, numFeatures=1000]


In [24]:
best.stages[2].extractParamMap()

{Param(parent='HashingTF_f6f2e8ae7c56', name='binary', doc='If True, all non zero counts are set to 1. This is useful for discrete probabilistic models that model binary events rather than integer counts. Default False.'): False,
 Param(parent='HashingTF_f6f2e8ae7c56', name='numFeatures', doc='Number of features. Should be greater than 0.'): 1000,
 Param(parent='HashingTF_f6f2e8ae7c56', name='outputCol', doc='output column name.'): 'rawFeatures',
 Param(parent='HashingTF_f6f2e8ae7c56', name='inputCol', doc='input column name.'): 'filtered'}

In [27]:
best.stages[4].extractParamMap()

{Param(parent='LogisticRegression_f9c14558cf36', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2).'): 2,
 Param(parent='LogisticRegression_f9c14558cf36', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.0,
 Param(parent='LogisticRegression_f9c14558cf36', name='family', doc='The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial'): 'auto',
 Param(parent='LogisticRegression_f9c14558cf36', name='featuresCol', doc='features column name.'): 'features',
 Param(parent='LogisticRegression_f9c14558cf36', name='fitIntercept', doc='whether to fit an intercept term.'): True,
 Param(parent='LogisticRegression_f9c14558cf36', name='labelCol', doc='label column name.'): 'label',
 Param(parent='LogisticRegression_f9c14558cf36', name='maxBlockSizeInMB', doc='maximum memory in MB for s

Obtain predictions for the test data:

In [28]:
prediction = model.transform(testData)

In [29]:
prediction.columns

['app_id',
 'label',
 'review_id',
 'review_text',
 'words',
 'filtered',
 'rawFeatures',
 'features',
 'rawPrediction',
 'probability',
 'prediction']

In [30]:
prediction.select('review_text','label','probability','prediction').show(10)



+--------------------+-----+--------------------+----------+
|         review_text|label|         probability|prediction|
+--------------------+-----+--------------------+----------+
|hai so i dont wit...|    1|[0.29622636001772...|       1.0|
|fun game, a worth...|    1|[0.39636672883789...|       1.0|
|cute, simple and ...|    1|[0.25049880010621...|       1.0|
|wow, where did th...|    1|[0.34961248026696...|       1.0|
|this game is just...|    0|[0.68285556407288...|       0.0|
|refunded in less ...|    0|[0.97194186498040...|       0.0|
|tried it for an h...|    0|[0.74694255342842...|       0.0|
|battleblock theat...|    1|[0.23425697935099...|       1.0|
|it gets stale ver...|    0|[0.86507549452499...|       0.0|
|       oh absolutely|    1|[0.44576065370890...|       1.0|
+--------------------+-----+--------------------+----------+
only showing top 10 rows



                                                                                

Evaluate the model predictions:

In [31]:
evaluator = BinaryClassificationEvaluator()
evaluator.evaluate(prediction)

0.7652802893309227

In [32]:
accuracy = prediction.filter(prediction.label == prediction.prediction).count() / float(testData.count())
accuracy

                                                                                

0.7315436241610739