In [0]:
import warnings
warnings.filterwarnings('ignore')
import pandas as pd

from pyspark.ml import *
from pyspark.ml.classification import *

from pyspark.ml.feature import *
from pyspark.ml.param import *
from pyspark.ml.tuning import *
from pyspark.ml.evaluation import *
from pyspark.sql.types import DoubleType

from pyspark.sql.functions import rand 
from sklearn.metrics import classification_report
from time import time
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, Row

In [0]:
print("Running Spark Version %s" % (sc.version))

In [0]:
# sql(sqlContext,"set spark.sql.caseSensitive=true")
spark.conf.set('spark.sql.caseSensitive', "false")

In [0]:
df_Toys = spark.read.format("json").load("dbfs:/FileStore/shared_uploads/ckchancc@connect.ust.hk/Toys_and_Games_5.json")
df_Sports = spark.read.format("json").load("dbfs:/FileStore/shared_uploads/ckchancc@connect.ust.hk/Sports_and_Outdoors_5.json")
df_Video = spark.read.format("json").load("dbfs:/FileStore/shared_uploads/ckchancc@connect.ust.hk/Video_Games_5.json")


In [0]:
import functools 
def unionAll(dfs):
    return functools.reduce(lambda df1,df2: df1.union(df2.select(df1.columns)), dfs) 

In [0]:
df = unionAll([df_Video, df_Sports, df_Toys])

In [0]:
df.show()

In [0]:
df.printSchema()

In [0]:
df1 = df.select(df['reviewerID'],df['reviewText'],df['asin'],df['overall'].alias('label'))
df1=df1.dropna()
df1.show()

In [0]:
df1.count()

In [0]:
df2 = df1.select(df1['reviewerID'],df1['asin'],df1['reviewText'],df1['label']).where(df1['label']!=3)

In [0]:
from pyspark.sql import functions as F
def assign_postive_negative_label(record):
    if record <= 2.0:
        return int(0)
    elif record >= 4.0:
        return int(1)

assign_label_udf = F.udf(assign_postive_negative_label)
df3 = df2.withColumn("label", assign_label_udf("label"))

In [0]:
df3.show()

In [0]:
from pyspark.sql.types import DoubleType
df3 = df3.withColumn("label", df3["label"].cast (DoubleType()))

In [0]:
df3.printSchema()

In [0]:
from pyspark.ml.feature import StopWordsRemover, RegexTokenizer
from pyspark.sql.functions import expr
import string

In [0]:
contractions = { 
"ain't": "am not",
"aren't": "are not",
"can't": "cannot",
"can't've": "cannot have",
"'cause": "because",
"could've": "could have",
"couldn't": "could not",
"couldn't've": "could not have",
"didn't": "did not",
"doesn't": "does not",
"don't": "do not",
"hadn't": "had not",
"hadn't've": "had not have",
"hasn't": "has not",
"haven't": "have not",
"he'd": "he would",
"he'd've": "he would have",
"he'll": "he will",
"he's": "he is",
"how'd": "how did",
"how'll": "how will",
"how's": "how is",
"i'd": "i would",
"i'll": "i will",
"i'm": "i am",
"i've": "i have",
"isn't": "is not",
"it'd": "it would",
"it'll": "it will",
"it's": "it is",
"let's": "let us",
"ma'am": "madam",
"mayn't": "may not",
"might've": "might have",
"mightn't": "might not",
"must've": "must have",
"mustn't": "must not",
"needn't": "need not",
"oughtn't": "ought not",
"shan't": "shall not",
"sha'n't": "shall not",
"she'd": "she would",
"she'll": "she will",
"she's": "she is",
"should've": "should have",
"shouldn't": "should not",
"that'd": "that would",
"that's": "that is",
"there'd": "there had",
"there's": "there is",
"they'd": "they would",
"they'll": "they will",
"they're": "they are",
"they've": "they have",
"wasn't": "was not",
"we'd": "we would",
"we'll": "we will",
"we're": "we are",
"we've": "we have",
"weren't": "were not",
"what'll": "what will",
"what're": "what are",
"what's": "what is",
"what've": "what have",
"where'd": "where did",
"where's": "where is",
"who'll": "who will",
"who's": "who is",
"won't": "will not",
"wouldn't": "would not",
"you'd": "you would",
"you'll": "you will",
"you're": "you are"
}

In [0]:
sw = StopWordsRemover().getStopWords()
punctuation = list(string.punctuation)
mylist = sw + punctuation
notremove=["not","none","nothing","nowhere","never","cannot",
           "cant","couldnt","except","hasnt","neither","no","nobody","nor","without"]
mylist = [x for x in mylist if x not in notremove]
list_keys=list(contractions.keys())
mylist = [x for x in mylist if x not in list_keys]

In [0]:
# convert the distinct labels in the input dataset to index values
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(df3)

In [0]:
# tokenizer 
tokenizer = RegexTokenizer(inputCol="reviewText", outputCol="words", pattern="\W")##'\w' remove none-word letters
df_tokenized = tokenizer.transform(df3)
df_tokenized.show()

In [0]:
# remove stop words
remover = StopWordsRemover(inputCol='words', outputCol='cleaned_words', stopWords=mylist)
df_removed = remover.transform(df_tokenized)

In [0]:
# labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(df3)
# tokenizer = RegexTokenizer(inputCol="reviewText", outputCol="words", pattern="\W")
# df_tokenized = tokenizer.transform(df3)
# remover = StopWordsRemover(inputCol='words', outputCol='cleaned_words', stopWords=mylist)
# df_removed = remover.transform(df_tokenized)
# # Convert to TF words vector
# hashingTF = HashingTF(inputCol="cleaned_words", outputCol="rawFeatures")
# df_TF = hashingTF.transform(df_removed)
# # Convert to TF*IDF words vector
# idf = IDF(inputCol="rawFeatures", outputCol="features")
# idfModel = idf.fit(df_TF)
# df_idf = idfModel.transform(df_TF)

In [0]:
# Convert to TF words vector
hashingTF = HashingTF(inputCol="cleaned_words", outputCol="rawFeatures")
df_TF = hashingTF.transform(df_removed)

# Convert to TF*IDF words vector
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(df_TF)
df_idf = idfModel.transform(df_TF)

for features_label in df_idf.select("features", "label").take(3):
    print(features_label)

In [0]:
# Split data aproximately into training (80%) and test (20%)
(train, test)=df3.randomSplit([0.8,0.2], seed = 0)

# Cache the train and test data in-memory
train = train.cache()
test = test.cache()
print ('Sample number in the train set : {}'.format(train.count()))
print ('Sample number in the test set : {}'.format(test.count()))
train.groupby('label').count().show()

In [0]:
#NaiveBayes
def grid_search(p1,p2):
    nb = NaiveBayes()
    pipeline = Pipeline(stages=[labelIndexer,tokenizer, remover, hashingTF, idfModel, nb])
  
    #Create ParamGrid for Cross Validation
    paramGrid = (ParamGridBuilder()
                 .addGrid(hashingTF.numFeatures, [p1])
                 .addGrid(nb.smoothing, [p2])
                 .build())
    evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
    crossval = CrossValidator(estimator=pipeline,
                              estimatorParamMaps=paramGrid,
                              evaluator=evaluator,
                              numFolds=4)
    
    ########  Run cross-validation, and choose the best set of parameters.
    cvModel = crossval.fit(train)
    # average cross-validation accuracy metric/s on all folds
    average_score = cvModel.avgMetrics
    print('average cross-validation accuracy = {}'.format(average_score[0]))
    return average_score[0]

In [0]:
score=0.0
for p1 in [35000,40000,55000]:
    for p2 in [0.8,0.9,1.0]:
      t0 = time()
      print('(numFeatures,smoothing)=({},{})'.format(p1,p2))
      average_score=grid_search(p1,p2)
      tt = time() - t0
      print("Classifier trained in {} seconds".format(round(tt,3)))
      if average_score > score:
        print('################ Best score ######################')
        params=(p1,p2)
        score=average_score
print('Best score is {} at params ={}'.format(score, params))

In [0]:
def Data_modeling(train, test, pipeline, paramGrid):
    evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
    crossval = CrossValidator(estimator=pipeline,
                              estimatorParamMaps=paramGrid,
                              evaluator=evaluator,
                              numFolds=4)
    
    ########  Run cross-validation, and choose the best set of parameters.
    cvModel = crossval.fit(train)
    
    ########  Make predictions on on the test data
    prediction = cvModel.transform(test)
    average_score = cvModel.avgMetrics
    print('average cross-validation accuracy = {}'.format(average_score[0]))
    ######## Calculate accuracy of the prediction of the test data
    evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
    accuracy_score=evaluator.evaluate(prediction)
    # another way to calculate accuracy 
    #correct=prediction.filter(prediction['label']== prediction['prediction']).select("label","prediction")
    #accuracy_score = correct.count() / float(test.count())  
    print('Accuracy in the test data = {}'.format(accuracy_score))
    
    ######## calculate F1 score of the prediction of the test data
    evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
    f1_score=evaluator.evaluate(prediction)
    print('F1 score in the test data = {}'.format(f1_score)) 
    # Calculate area under ROC for the prediction of the test data
    #evaluator = BinaryClassificationEvaluator(labelCol="label", metricName="areaUnderROC")
    #ROC_score=evaluator.evaluate(prediction)
    #print 'areaUnderROC in the test data = {}'.format(ROC_score)
    
    ######## Print classification_report
    prediction_and_labels=prediction.select("label","prediction")
    y_true = []
    y_pred = []
    for x in prediction_and_labels.collect():
        xx = list(x)
        try:
            tt = int(xx[1])
            pp = int(xx[0])
            y_true.append(tt)
            y_pred.append(pp)
        except:
            continue

    target_names = ['0', '1']
    print(classification_report(y_true, y_pred, target_names=target_names))
    return 

In [0]:
# trained by a Naïve Bayes 
nb = NaiveBayes()
# Build a pipeline
pipeline = Pipeline(stages=[labelIndexer,tokenizer, remover, hashingTF, idfModel, nb])
# Create ParamGrid for Cross Validation 
paramGrid = (ParamGridBuilder()
             .addGrid(hashingTF.numFeatures, [55000])
             .addGrid(nb.smoothing, [1.0])
             .build())
# Execute 4-folds cross validation for hyperparameter tuning, model prediction and model evaluation.
Data_modeling(train, test, pipeline, paramGrid)


In [0]:
#LogisticRegression
def grid_search(p1,p2,p3,p4):
    lr = LogisticRegression()
    pipeline = Pipeline(stages=[labelIndexer,tokenizer, remover, hashingTF, idfModel, lr])
  
    #Create ParamGrid for Cross Validation
    paramGrid = (ParamGridBuilder()
                 .addGrid(hashingTF.numFeatures, [p1])
                 .addGrid(lr.regParam, [p2])
                 .addGrid(lr.elasticNetParam, [p3])
                 .addGrid(lr.maxIter, [p4])
                 .build())
    evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
    crossval = CrossValidator(estimator=pipeline,
                              estimatorParamMaps=paramGrid,
                              evaluator=evaluator,
                              numFolds=4)
    
    ########  Run cross-validation, and choose the best set of parameters.
    cvModel = crossval.fit(train)
    # average cross-validation accuracy metric/s on all folds
    average_score = cvModel.avgMetrics
    print('average cross-validation accuracy = {}'.format(average_score[0]))
    return average_score[0]

In [0]:
score=0.0
for p1 in [45000,50000,55000]:
    for p2 in [0.09,0.10,0.11]:
        for p3 in [0.09,0.10,0.11]:
            for p4 in [9,10,11]:
                t0 = time()
                print('(numFeatures,regParam,elasticNetParam,maxIter)=({},{},{},{})'.format(p1,p2,p3,p4))
                average_score=grid_search(p1,p2,p3,p4)
                tt = time() - t0
                print("Classifier trained in {} seconds".format(round(tt,3)))
                if average_score > score:
                    print('################ Best score ######################')
                    params=(p1,p2,p3,p4)
                    score=average_score
print('Best score is {} at params ={}'.format(score, params))

In [0]:
# trained by a logistic regression 
lr = LogisticRegression()
# Build a pipeline
pipeline = Pipeline(stages=[labelIndexer,tokenizer, remover, hashingTF, idfModel, lr])

# Create ParamGrid for Cross Validation 
paramGrid = (ParamGridBuilder()
             .addGrid(hashingTF.numFeatures, [55000])
             .addGrid(lr.regParam, [0.11])
             .addGrid(lr.elasticNetParam, [0.11])
             .addGrid(lr.maxIter, [11])
             .build())
# Execute 4-folds cross validation for hyperparameter tuning, model prediction and model evaluation.
Data_modeling(train, test, pipeline, paramGrid)


In [0]:
#Decision Tree
def grid_search(p1,p2,p3):
    # trained by a Decision Tree 
    dt = DecisionTreeClassifier(labelCol="label",impurity="entropy")
    pipeline = Pipeline(stages=[labelIndexer,tokenizer, remover, hashingTF, idfModel, dt])
  
    #Create ParamGrid for Cross Validation
    paramGrid = (ParamGridBuilder()
                 .addGrid(hashingTF.numFeatures, [p1])
                 .addGrid(dt.maxDepth, [p2])
                 .addGrid(dt.minInstancesPerNode, [p3])
                 .build())
    evaluator = MulticlassClassificationEvaluator(labelCol="Label", predictionCol="prediction", metricName="accuracy")
    crossval = CrossValidator(estimator=pipeline,
                              estimatorParamMaps=paramGrid,
                              evaluator=evaluator,
                              numFolds=4)
    
    ########  Run cross-validation, and choose the best set of parameters.
    cvModel = crossval.fit(train)
    # average cross-validation accuracy metric/s on all folds
    average_score = cvModel.avgMetrics
    print('average cross-validation accuracy = {}'.format(average_score[0]))
    return average_score[0]

In [0]:
score=0.0
# for p1 in [65000,70000,75000]:
#     for p2 in [23,24,25,26,27]:
#         for p3 in [3,4,5]:
for p1 in [65000,70000,75000]:
    for p2 in [12,13,14]:
        for p3 in [3,4]:
          t0 = time()
          print('(numFeatures,maxDepth,minInstancesPerNode)=({},{},{})'.format(p1,p2,p3))
          average_score=grid_search(p1,p2,p3)
          tt = time() - t0
          print("Classifi er trained in {} seconds".format(round(tt,3)))
          if average_score > score:
            print('################ Best score ######################')
            params=(p1,p2,p3)
            score=average_score
print('Best score is {} at params ={}'.format(score, params))

In [0]:
# trained by a Decision Tree 
dt = DecisionTreeClassifier(labelCol="label",impurity="entropy")
# Build a pipeline
pipeline = Pipeline(stages=[tokenizer, remover, hashingTF, idfModel, dt])
# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(hashingTF.numFeatures, [65000])
             .addGrid(dt.maxDepth, [12])
             .addGrid(dt.minInstancesPerNode, [4])
             .build())
# Execute 4-folds cross validation for hyperparameter tuning, model prediction and model evaluation.
Data_modeling(train, test, pipeline, paramGrid)

In [0]:
#Random Forest
def grid_search(p1,p2,p3,p4):
    rf = RandomForestClassifier(labelCol="label",impurity="entropy", seed=5043)
    pipeline = Pipeline(stages=[labelIndexer,tokenizer, remover, hashingTF, idfModel, rf])
  
    #Create ParamGrid for Cross Validation
    paramGrid = (ParamGridBuilder()
                 .addGrid(hashingTF.numFeatures, [p1])
                 .addGrid(rf.numTrees, [p2])
                 .addGrid(rf.maxDepth, [p3])
                 .addGrid(rf.minInstancesPerNode, [p4])
                 .build())
    evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
    crossval = CrossValidator(estimator=pipeline,
                              estimatorParamMaps=paramGrid,
                              evaluator=evaluator,
                              numFolds=4)
    
    ########  Run cross-validation, and choose the best set of parameters.
    cvModel = crossval.fit(train)
    # average cross-validation accuracy metric/s on all folds
    average_score = cvModel.avgMetrics
    print('average cross-validation accuracy = {}'.format(average_score[0]))
    return average_score[0]

In [0]:
score=0.0
# for p1 in [45000,50000,55000]:
#     for p2 in [30,31,32]:
#         for p3 in [28,29,30]:
for p1 in [45000,50000,55000]:
    for p2 in [12,13,14]:
        for p3 in [12,13,14]:
            for p4 in [1,2]:
                t0 = time()
                print('(numFeatures,numTrees,maxDepth,minInstancesPerNode)=({},{},{},{})'.format(p1,p2,p3,p4))
                average_score=grid_search(p1,p2,p3,p4)
                tt = time() - t0
                print("Classifier trained in {} seconds".format(round(tt,3)))
                if average_score > score:
                  print('################ Best score ######################')
                  params=(p1,p2,p3,p4)
                  score=average_score
print('Best score is {} at params ={}'.format(score, params))

In [0]:
rf = RandomForestClassifier(labelCol="label",impurity="entropy", seed=5043)
# Build a pipeline
pipeline = Pipeline(stages=[labelIndexer,tokenizer, remover, hashingTF, idfModel, rf])

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(hashingTF.numFeatures, [45000])
             .addGrid(rf.numTrees, [10])
             .addGrid(rf.maxDepth, [12])
             .addGrid(rf.minInstancesPerNode, [1])
             .build())
# Execute 4-folds cross validation for hyperparameter tuning, model prediction and model evaluation.
Data_modeling(train, test, pipeline, paramGrid)

In [0]:
#Gradient Boosted Tree
def grid_search(p1,p2,p3,p4):
    gbt = GBTClassifier(labelCol="label")
    pipeline = Pipeline(stages=[labelIndexer,tokenizer, remover, hashingTF, idfModel, gbt])
  
    #Create ParamGrid for Cross Validation
    paramGrid = (ParamGridBuilder()
                 .addGrid(hashingTF.numFeatures, [p1])
                 .addGrid(gbt.maxIter, [p2]) #(default: 20)
                 .addGrid(gbt.maxDepth, [p3])
                 .addGrid(gbt.minInstancesPerNode, [p4])
                 .build())
    evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
    crossval = CrossValidator(estimator=pipeline,
                              estimatorParamMaps=paramGrid,
                              evaluator=evaluator,
                              numFolds=4)
    
    ########  Run cross-validation, and choose the best set of parameters.
    cvModel = crossval.fit(train)
    # average cross-validation accuracy metric/s on all folds
    average_score = cvModel.avgMetrics
    print('average cross-validation accuracy = {}'.format(average_score[0]))
    return average_score[0]


In [0]:
score=0.0
# for p1 in [60000]:
#     for p2 in [25,26,27]:
#         for p3 in [18,19,20]:
for p1 in [60000]:
    for p2 in [15,16,17]:
        for p3 in [12,13,14]:
            for p4 in [2]: 
                t0 = time()
                print('(numFeatures,maxIter,maxDepth,minInstancesPerNode)=({},{},{},{})'.format(p1,p2,p3,p4))
                average_score=grid_search(p1,p2,p3,p4)
                tt = time() - t0
                print("Classifier trained in {} seconds".format(round(tt,3)))
                if average_score > score:
                  print('################ Best score ######################')
                  params=(p1,p2,p3,p4)
                  score=average_score
print('Best score is {} at params ={}'.format(score, params))

In [0]:
# trained by a Gradient Boosted Tree 
gbt = GBTClassifier(labelCol="label")
# Build a pipeline
pipeline = Pipeline(stages=[labelIndexer,tokenizer, remover, hashingTF, idfModel, gbt])
# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(hashingTF.numFeatures, [60000])
             .addGrid(gbt.maxIter, [15]) #(default: 20)
             .addGrid(gbt.maxDepth, [12])
             .addGrid(gbt.minInstancesPerNode, [1])
             .build())
# Execute 4-folds cross validation for hyperparameter tuning, model prediction and model evaluation.
Data_modeling(train, test, pipeline, paramGrid)

In [0]:
#Summary
import matplotlib.pyplot as plt
classifier_names=['Naive_Bayes', 'Logistic_Regression', 'Random_Forest','Decision_Tree', 'Gradient_Boosted_Tree']
time=[102.438,154.102,7652.926,3442.122,99132.029]
accuracy=[0.867845420350271,0.907842824055586,0.90115483,0.912495907105265,0.918]



fig, ax = plt.subplots(nrows=1,ncols=2,figsize=(18,5), facecolor='white')
ax[0].barh(np.arange(0, 5),time)
ax[0].set_yticks(np.arange(0.5, 5.5))
ax[0].set_yticklabels(classifier_names)
ax[0].grid(color='b', linestyle='--', linewidth=1)
ax[0].set_title('Model training time')
ax[0].set_xlabel('Time (sec)')
ax[0].set_xscale('log')

ax[1].barh(range(0, len(classifier_names)),accuracy)
ax[1].set_xlim([0.75,0.85])
ax[1].set_title('Model evaluation on the test set')
ax[1].set_xlabel('Accuracy')
ax[1].set_yticklabels([])
ax[1].grid(color='b', linestyle='--', linewidth=1)
display(fig)