####  Spark Environment

In [1]:
import os
import sys

In [2]:
os.environ["SPARK_HOME"]="/usr/hdp/current/spark2-client/"
os.environ["PYLIB"]=os.environ["SPARK_HOME"]+"/python/lib"

In [3]:
sys.path.insert(0,os.environ["PYLIB"]+"/py4j-0.10.4-src.zip")
sys.path.insert(0,os.environ["PYLIB"]+"/pyspark.zip")

#### Load Required Libraries

In [4]:
from pyspark import SparkConf,SparkContext
from pyspark.sql import SparkSession,SQLContext
from pyspark.sql import functions as F

In [5]:
sc = SparkContext()
spark = SparkSession(sc)


#### Load  IMDB Reviews dataset 

In [7]:
imdb_reviews_df = spark.read.parquet('file:///home/mahidharv/Spark_TM/imdb_reviews_preprocessed.parquet')

In [8]:
imdb_reviews_df.show(5)

+---------+--------------------+-----+
|       id|              review|score|
+---------+--------------------+-----+
|pos_10006|In this "critical...|  1.0|
|pos_10013|Like one of the p...|  1.0|
|pos_10022|Aro Tolbukhin bur...|  1.0|
|pos_10033|The movie Titanic...|  1.0|
| pos_1003|Another Aussie ma...|  1.0|
+---------+--------------------+-----+
only showing top 5 rows



#### check the datatypes

In [9]:
imdb_reviews_df.dtypes

[('id', 'string'), ('review', 'string'), ('score', 'double')]

#### Check for the Score counts

In [10]:
imdb_reviews_df.groupby("score").agg(F.count(col="score").alias("score-Counts")).show()

+-----+------------+
|score|score-Counts|
+-----+------------+
|  0.0|       12500|
|  1.0|       12500|
+-----+------------+



In [11]:
imdb_reviews_df.where(imdb_reviews_df.score == 1).show(10)

+---------+--------------------+-----+
|       id|              review|score|
+---------+--------------------+-----+
|pos_10006|In this "critical...|  1.0|
|pos_10013|Like one of the p...|  1.0|
|pos_10022|Aro Tolbukhin bur...|  1.0|
|pos_10033|The movie Titanic...|  1.0|
| pos_1003|Another Aussie ma...|  1.0|
| pos_1004|After a brief pro...|  1.0|
|pos_10053|I must admit, whe...|  1.0|
|pos_10062|Wow. What a wonde...|  1.0|
|pos_10074|quote by Nicolas ...|  1.0|
|pos_10083|The fact that thi...|  1.0|
+---------+--------------------+-----+
only showing top 10 rows



In [12]:
imdb_reviews_df.select("review").first()

Row(review=u'In this "critically acclaimed psychological thriller based on true events, Gabriel (Robin Williams), a celebrated writer and late-night talk show host, becomes captivated by the harrowing story of a young listener and his adoptive mother (Toni Collette). When troubling questions arise about this boy\'s (story), however, Gabriel finds himself drawn into a widening mystery that hides a deadly secret\x85" according to film\'s official synopsis.<br /><br />You really should STOP reading these comments, and watch the film NOW...<br /><br />The "How did he lose his leg?" ending, with Ms. Collette planning her new life, should be chopped off, and sent to "deleted scenes" land. It\'s overkill. The true nature of her physical and mental ailments should be obvious, by the time Mr. Williams returns to New York. Possibly, her blindness could be in question - but a revelation could have be made certain in either the "highway" or "video tape" scenes. The film would benefit from a re-edi

#### Tokenize the data

In [13]:
from pyspark.ml.feature import RegexTokenizer

In [14]:
tokenizer = RegexTokenizer(gaps=False,inputCol="review",outputCol="words",pattern="\\p{L}+")

#### Use NLTK to define the list of Stopwords

In [15]:
from nltk.corpus import stopwords
stop_words = list(set(stopwords.words('english')))


#### Split the dataset into Train,Test and Validation 

In [16]:
training_df, validation_df, testing_df = imdb_reviews_df.randomSplit([0.7, 0.3, 0.1], seed=0)

In [17]:
from pyspark.ml.feature import StopWordsRemover

In [18]:
stopwordfilter = StopWordsRemover(inputCol="words",outputCol="filtered",stopWords=stop_words)

In [19]:
from pyspark.ml.feature import CountVectorizer

In [20]:
cvt = CountVectorizer(inputCol="filtered",minDF=5,outputCol="tf")

In [21]:
from pyspark.ml.feature import IDF

In [22]:
idf = IDF(inputCol="tf",outputCol="tfidf",minDocFreq=2)

In [23]:
training_df.groupby("score").agg(F.count("score").alias("class_count")).show()

+-----+-----------+
|score|class_count|
+-----+-----------+
|  0.0|       7945|
|  1.0|       8010|
+-----+-----------+



#### Build the Pipeline

In [24]:
from pyspark.ml import Pipeline

In [25]:
preprocess = Pipeline(stages=[tokenizer,stopwordfilter,cvt,idf])

#### Build the models

In [26]:
from pyspark.ml.classification import LogisticRegression

In [27]:
lr = LogisticRegression(maxIter=10, labelCol="score", featuresCol="tfidf")

In [28]:
lr_Pipeline = Pipeline(stages=[tokenizer,stopwordfilter,cvt,idf]+[lr]) 

lr_Pipeline_model = lr_Pipeline.fit(training_df)

In [29]:
print("Coefficients: " + str(lr_Pipeline_model.stages[-1].coefficients))
print("Intercept: " + str(lr_Pipeline_model.stages[-1].intercept))

Coefficients: [-0.040057640418887715,-0.006331520139974227,0.04870018290921189,-0.08218674701614329,-0.0648171994636084,0.21171963841233793,-0.25025774432707343,0.02826724227216306,-0.18275680917197898,0.10572312833202754,0.25518339456754,0.027684480522432893,0.3093387454124342,-0.06840212409623406,-0.43220839215959456,-0.010362055118389337,0.19229621352858936,0.0694668784625281,0.5627593158540559,0.045824311532642076,0.03528922674936519,0.032649323564006905,-0.1166054836651214,-0.23523247845708206,0.02928149141308151,0.1614152872127543,-0.006808200156804726,0.14897402559845146,-0.011247765242950743,0.014647254102395407,-0.02882877968798002,0.12107591085360937,0.0950620253736878,0.0570004967459221,-0.18564837769593928,0.35366227681380885,-0.16714366536032463,-0.01565674575045481,-0.019327909135011152,0.22783933143805074,0.0030895152949739313,0.010635397440730068,0.06412028116253561,0.05541955168007086,-0.13694702652490257,0.0031972401443148227,0.2131619129645009,-0.007370199089841381,-

In [30]:
train_predictions_lr = lr_Pipeline_model.transform(training_df)
test_predictions_lr = lr_Pipeline_model.transform(testing_df)

In [31]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="score", predictionCol="prediction", metricName="accuracy")

predictionAndLabels_train_lr = train_predictions_lr.select("prediction", "score")
train_accuracy_lr = evaluator.evaluate(predictionAndLabels_train_lr)

print("Train accuracy  = " + str(train_accuracy_lr))

predictionAndLabels_test_lr = test_predictions_lr.select("prediction", "score")
test_accuracy_lr = evaluator.evaluate(predictionAndLabels_test_lr)

print("Test accuracy = " + str(test_accuracy_lr))

Train accuracy  = 0.999937323723
Test accuracy = 0.871372888696


In [32]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = ParamGridBuilder()\
    .addGrid(lr.regParam, [0.1]) \
    .addGrid(lr.elasticNetParam, [0.5])\
    .build()
    
lr_crossval = CrossValidator(estimator=lr_Pipeline,
                             estimatorParamMaps=paramGrid,
                             evaluator=evaluator,
                             numFolds=2)  

In [33]:
lr_crossval_Model = lr_crossval.fit(training_df)

In [34]:
train_predictions_lrcv = lr_crossval_Model.transform(training_df)
test_predictions_lrcv = lr_crossval_Model.transform(testing_df)

In [35]:
predictionAndLabels_train_lrcv = train_predictions_lrcv.select("prediction", "score")
train_accuracycv = evaluator.evaluate(predictionAndLabels_train_lrcv)
print("Train set accuracy  = " + str(train_accuracycv))

predictionAndLabels_test_lrcv = test_predictions_lrcv.select("prediction", "score")
test_accuracycv = evaluator.evaluate(predictionAndLabels_test_lrcv)
print("Test set accuracy = " + str(test_accuracycv))

Train set accuracy  = 0.728549044187
Test set accuracy = 0.727154612386


#### Grid Search for the Model best Parameters

In [36]:

grid = ParamGridBuilder().\
    addGrid(lr.regParam, [0., 0.01, 0.02]).\
    addGrid(lr.elasticNetParam, [0., 0.2, 0.4]).\
    build()

In [37]:
grid

[{Param(parent=u'LogisticRegression_4fce964402a7ba71b51f', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.0,
  Param(parent=u'LogisticRegression_4fce964402a7ba71b51f', name='regParam', doc='regularization parameter (>= 0).'): 0.0},
 {Param(parent=u'LogisticRegression_4fce964402a7ba71b51f', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.2,
  Param(parent=u'LogisticRegression_4fce964402a7ba71b51f', name='regParam', doc='regularization parameter (>= 0).'): 0.0},
 {Param(parent=u'LogisticRegression_4fce964402a7ba71b51f', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.4,
  Param(parent=u'LogisticRegression_4fce964402a7ba71b51f', name='regParam

In [42]:
lr_crossval = CrossValidator(estimator=lr_Pipeline,
                             estimatorParamMaps=grid,
                             evaluator=evaluator,
                             numFolds=2) 

In [43]:
lr_crossval_model = lr_crossval.fit(training_df)

<bound method PipelineModel.explainParams of PipelineModel_46b8b6b6754b266971cf>

In [55]:
lr_crossval

CrossValidator_4ecc8c91543440fb117b

In [44]:
train_predictions_lrcv = lr_crossval_Model.transform(training_df)
test_predictions_lrcv = lr_crossval_Model.transform(testing_df)

In [45]:
predictionAndLabels_train_lrcv = train_predictions_lrcv.select("prediction", "score")
train_accuracycv = evaluator.evaluate(predictionAndLabels_train_lrcv)
print("Train set accuracy  = " + str(train_accuracycv))

predictionAndLabels_test_lrcv = test_predictions_lrcv.select("prediction", "score")
test_accuracycv = evaluator.evaluate(predictionAndLabels_test_lrcv)
print("Test set accuracy = " + str(test_accuracycv))

Train set accuracy  = 0.728549044187
Test set accuracy = 0.727154612386


In [46]:
all_models = []
for j in range(len(grid)):
    print("Fitting model {}".format(j+1))
    model = lr_Pipeline.fit(training_df, grid[j])
    all_models.append(model)

Fitting model 1
Fitting model 2
Fitting model 3
Fitting model 4
Fitting model 5
Fitting model 6
Fitting model 7
Fitting model 8
Fitting model 9


In [47]:
accuracies = [m.\
    transform(validation_df).\
    select(F.avg(F.expr('float(score = prediction)')).alias('accuracy')).\
    first().\
    accuracy for m in all_models]

In [48]:
import numpy as np

In [49]:
best_model_idx = np.argmax(accuracies)

In [50]:
best_model = all_models[best_model_idx]

#### Best Model Parameters

In [51]:
grid[best_model_idx]

{Param(parent=u'LogisticRegression_4fce964402a7ba71b51f', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.2,
 Param(parent=u'LogisticRegression_4fce964402a7ba71b51f', name='regParam', doc='regularization parameter (>= 0).'): 0.01}

In [52]:
accuracies[best_model_idx]

0.8894002375296912

In [53]:
bestModel_predictions_lrcv = best_model.transform(testing_df)

In [54]:
predictionAndLabels_test_lrcv = bestModel_predictions_lrcv.select("prediction", "score")
test_accuracycv = evaluator.evaluate(predictionAndLabels_test_lrcv)
print("Test set accuracy = " + str(test_accuracycv))

Test set accuracy = 0.890428757038
