# Vivekn Sentiment

In [152]:
# Imports
import time
import sys
import os
sys.path.append('../../')

from sparknlp.annotator import *
from sparknlp.base import DocumentAssembler, Finisher

Python decorator to measure the execution time of methods:

In [153]:
import time

def timeit(method):
    def timed(*args, **kw):
        ts = time.time()
        result = method(*args, **kw)
        te = time.time()

        if 'log_time' in kw:
            name = kw.get('log_name', method.__name__.upper())
            kw['log_time'][name] = int((te - ts))
        else:
            print('%r  %2.2f s' % \
                  (method.__name__, (te - ts)))
        return result

    return timed

Class with the main functions to be used for the evaluation

In [154]:
from pyspark.sql import functions as F
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.evaluation import BinaryClassificationEvaluator


class ViveknSentiment:
    
    @staticmethod
    @timeit
    def loadData(file_path):
        """
        This method loads a csv file
        """
        data = spark. \
               read. \
               load(file_path,
                    format="com.databricks.spark.csv",
                    header="true") 
        return data
    
    @staticmethod
    @timeit
    def fitModel(pipeline, data):
        model = pipeline.fit(data)
        return model
    
    @staticmethod
    @timeit
    def exportModel(pipeline, data, object_type, dir_name):
        """
        This method exports a model to a directory
        :param pipeline: Pipeline or PipelineModel objects
        :param data: Data to fit a PipelineModel
        :param object_type: p for Pipeline, pm for PipelineModel
        :param dir_name: directory where the model is saved to
        """
        if object_type == "p":
            pipeline.write().overwrite().save(dir_name)
        elif object_type == "pm":
            pipeline.fit(data).write().overwrite().save(dir_name)
        print(f"Model exported")
    
    @staticmethod
    @timeit
    def loadModel(object_type, dir_name):
        """
        This method loads a model from a directory
        :param object_type: p for Pipeline, pm for PipelineModel
        :param dir_name: directory where the model is loaded from 
        """
        if object_type == "p":
            model = Pipeline.read().load(dir_name)
        elif object_type == "pm":
            model = PipelineModel.read().load(dir_name)
        print(f"Model loaded")
        return model
    
    @staticmethod
    @timeit
    def predict(model, data):
        prediction = model.transform(data)
        return prediction
    
    @staticmethod
    @timeit
    def getLabeledPrediction(labeled_data, predicted_data):
        """
        This method creates a dataframe with the required format
        for evaluation of a binary classifier
        :param labeled_data: dataset with ground truth data
        :param predicted_data: dataset with predictions
        """
        predictions = labeled_data.join(predicted_data, 
                                        labeled_data.id == predicted_data.id)
        predictions = predictions. \
                      withColumn("prediction", 
                                  F.when(F.col("finished_sentiment") == "result->positive", 1). \
                                         otherwise(0))
        predictions.show(5)
        # Evaluator for binary classification, expects input column label.
        predictions = predictions. \
                      select(F.col("sentiment").alias("label").cast("double"),  
                             F.col("prediction").cast("double"))
        return predictions
    
    @staticmethod
    @timeit
    def evaluatePrediction(predictions, metric="areaUnderROC"):
        """
        This method evaluates the model
        :param predictions: dataset in the output format of getLabeledPrediction
        :param metric: areaUnderROC or areaUnderPR
        """
        # Evaluator for binary classification, expects two input columns: rawPrediction and label.
        evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction",
                                                  metricName=metric)
        accuracy = evaluator.evaluate(predictions)
        return accuracy

### Pre-process Data

In [435]:
home_path = "file:///" + os.getcwd() + "/../../../../"

In [436]:
### Define the dataframe
document_assembler = DocumentAssembler() \
                    .setInputCol("text")

In [437]:
### Tokenizer
tokenizer = Tokenizer() \
            .setInputCols(["document"]) \
            .setOutputCol("token")

In [438]:
### Normalizer
normalizer = Normalizer() \
            .setInputCols(["token"]) \
            .setOutputCol("normal")

In [450]:
### Spell Checker
corpus = home_path + \
         "spark-nlp/src/test/resources/spell/sherlockholmes.txt" 
# "spark-nlp-models/src/main/resources/spell/wiki1_en.txt"                          
         
dictionary = home_path + \
             "spark-nlp-models/src/main/resources/spell/words.txt"
spell_checker = NorvigSweetingApproach() \
            .setInputCols(["normal"]) \
            .setOutputCol("spell") \
            .setDictionary(dictionary) \
            .setCorpus(corpus) \
            .setDoubleVariants(True) \
            .setCaseSensitive(True) \
            .setShortCircuit(True)

In [451]:
# Export Spell Checker
pipeline = Pipeline(stages=[spell_checker])
ViveknSentiment.exportModel(pipeline, "", "p", "./sc")

Model exported
'exportModel'  0.06 s


In [452]:
positive_text = home_path + \
                 "spark-nlp-models/src/main/resources/vivekn/training_positive"
negative_text = home_path + \
                 "spark-nlp-models/src/main/resources/vivekn/training_negative"
    
sentiment_detector = ViveknSentimentApproach() \
    .setInputCols(["spell", "document"]) \
    .setOutputCol("sentiment") \
    .setPruneCorpus(0) \
    .setPositiveSource(positive_text) \
    .setNegativeSource(negative_text)

In [453]:
finisher = Finisher() \
    .setInputCols(["sentiment"]) \
    .setIncludeKeys(True)

### Predict Sentiment

In [454]:
training_file = home_path + \
                "spark-nlp-models/src/main/resources/datasets/training_balanced"
train_data = ViveknSentiment.loadData(training_file)
train_data.cache()
train_data.show(10)
train_data.count()

'loadData'  0.07 s
+----+--------------------+---------+
|  id|                text|sentiment|
+----+--------------------+---------+
|3995|da vinci code was...|        0|
|3996|Then again, the D...|        0|
|3999|God, Yahoo Games ...|        0|
|4000|Da Vinci Code doe...|        0|
|4001|And better...-We ...|        0|
|4002|Last time, Da Vin...|        0|
|4003|And better...-We ...|        0|
|4004|And better..-We a...|        0|
|4006|If Jesus is fabri...|        0|
|4007|I think this bols...|        0|
+----+--------------------+---------+
only showing top 10 rows



4000

In [22]:
# Load spell checker model
# sc = ViveknSentiment.loadModel("p", "./sc")
# spell_checker = sc.getStages()[0]

Model loaded
'loadModel'  0.12 s


In [455]:
pipeline = Pipeline(stages=[
    document_assembler,
    tokenizer,
    normalizer,
    spell_checker,
    sentiment_detector,
    finisher
])

model = ViveknSentiment.fitModel(pipeline, train_data)
train_predict = ViveknSentiment.predict(model, train_data)
train_predict.show(10)

'fitModel'  5.29 s
'predict'  0.06 s
+----+--------------------+------------------+
|  id|                text|finished_sentiment|
+----+--------------------+------------------+
|3995|da vinci code was...|  result->negative|
|3996|Then again, the D...|  result->positive|
|3999|God, Yahoo Games ...|  result->negative|
|4000|Da Vinci Code doe...|  result->negative|
|4001|And better...-We ...|  result->negative|
|4002|Last time, Da Vin...|  result->negative|
|4003|And better...-We ...|  result->negative|
|4004|And better..-We a...|  result->negative|
|4006|If Jesus is fabri...|  result->negative|
|4007|I think this bols...|  result->negative|
+----+--------------------+------------------+
only showing top 10 rows



In [380]:
# Find positive sentiments
start = time.time()
train_predict.where("finished_sentiment == 'result->positive'").show(5)
end = time.time()
print("Time elapsed in query: " + str(end - start))

+----+--------------------+------------------+
|  id|                text|finished_sentiment|
+----+--------------------+------------------+
|3996|Then again, the D...|  result->positive|
|4018|and also, The Da ...|  result->positive|
|4042|DA VINCI CODE-SUC...|  result->positive|
|4115|The Da vinci Code...|  result->positive|
|4149|Also, Da Vinci Co...|  result->positive|
+----+--------------------+------------------+
only showing top 5 rows

Time elapsed in query: 3.074977159500122


In [422]:
# Find negative sentiments
start = time.time()
train_predict.where("finished_sentiment == 'result->negative'").show(5)
end = time.time()
print("Time elapsed in query: " + str(end - start))

+----+--------------------+------------------+
|  id|                text|finished_sentiment|
+----+--------------------+------------------+
|3995|da vinci code was...|  result->negative|
|3999|God, Yahoo Games ...|  result->negative|
|4000|Da Vinci Code doe...|  result->negative|
|4001|And better...-We ...|  result->negative|
|4002|Last time, Da Vin...|  result->negative|
+----+--------------------+------------------+
only showing top 5 rows

Time elapsed in query: 5.151686668395996


In [31]:
train_predict.printSchema()  # print data types

root
 |-- id: string (nullable = true)
 |-- text: string (nullable = true)
 |-- finished_sentiment: string (nullable = true)



## Training Accuracy

Measures the training accuracy of the model. 

In [14]:
predictions = ViveknSentiment.getLabeledPrediction(train_data, train_predict)
# predictions.show(5)

+----+--------------------+---------+----+--------------------+------------------+----------+
|  id|                text|sentiment|  id|                text|finished_sentiment|prediction|
+----+--------------------+---------+----+--------------------+------------------+----------+
|3995|da vinci code was...|        0|3995|da vinci code was...|  result->negative|         0|
|3996|Then again, the D...|        0|3996|Then again, the D...|  result->positive|         1|
|3999|God, Yahoo Games ...|        0|3999|God, Yahoo Games ...|  result->negative|         0|
|4000|Da Vinci Code doe...|        0|4000|Da Vinci Code doe...|  result->negative|         0|
|4001|And better...-We ...|        0|4001|And better...-We ...|  result->negative|         0|
+----+--------------------+---------+----+--------------------+------------------+----------+
only showing top 5 rows

'getLabeledPrediction'  91.71 s


In [15]:
roc = ViveknSentiment.evaluatePrediction(predictions)
print("Area Under ROC: {:0.4f}".format(roc))

'evaluatePrediction'  97.20 s
Area Under ROC: 0.9575


In [16]:
pr = ViveknSentiment.evaluatePrediction(predictions, metric="areaUnderPR")
print("Area Under PR: {:0.4f}".format(pr))

'evaluatePrediction'  95.18 s
Area Under PR: 0.9613


In [456]:
# Export model
ViveknSentiment.exportModel(pipeline, train_data, "pm", "./ms")

Model exported
'exportModel'  4.28 s


## Testing Accuracy

In [457]:
# Load model
model = ViveknSentiment.loadModel("pm", "./ms")

Model loaded
'loadModel'  3.82 s


In [458]:
# Load testing data
test_file = home_path + \
            "spark-nlp-models/src/main/resources/datasets/testing"
test_data = ViveknSentiment.loadData(test_file)
test_data.cache()
test_data.show(10)
test_data.count()

'loadData'  0.07 s
+---+--------------------+---------+
| id|                text|sentiment|
+---+--------------------+---------+
|  0|The Da Vinci Code...|        1|
|  1|this was the firs...|        1|
|  2|i liked the Da Vi...|        1|
| 13|The Da Vinci Code...|        1|
| 26|I really like The...|        1|
| 27|Da Vinci Code is ...|        1|
| 31|And then we went ...|        1|
| 34|Well I did enjoy ...|        1|
| 41|And I was quite p...|        1|
| 44|The Da Vinci Code...|        1|
+---+--------------------+---------+
only showing top 10 rows



1408

In [459]:
# Predict test data
test_predict = ViveknSentiment.predict(model, test_data)

'predict'  0.07 s


In [460]:
predictions = ViveknSentiment.getLabeledPrediction(test_data, test_predict)
# predictions.show(5)

+---+--------------------+---------+---+--------------------+------------------+----------+
| id|                text|sentiment| id|                text|finished_sentiment|prediction|
+---+--------------------+---------+---+--------------------+------------------+----------+
|  0|The Da Vinci Code...|        1|  0|The Da Vinci Code...|  result->positive|         1|
|  1|this was the firs...|        1|  1|this was the firs...|  result->positive|         1|
|  2|i liked the Da Vi...|        1|  2|i liked the Da Vi...|  result->positive|         1|
| 13|The Da Vinci Code...|        1| 13|The Da Vinci Code...|  result->positive|         1|
| 26|I really like The...|        1| 26|I really like The...|  result->positive|         1|
+---+--------------------+---------+---+--------------------+------------------+----------+
only showing top 5 rows

'getLabeledPrediction'  198.37 s


In [461]:
roc = ViveknSentiment.evaluatePrediction(predictions)
print("Area Under ROC: {:0.4f}".format(roc))

'evaluatePrediction'  195.32 s
Area Under ROC: 0.8263


In [462]:
pr = ViveknSentiment.evaluatePrediction(predictions, metric="areaUnderPR")
print("Area Under PR: {:0.4f}".format(pr))

'evaluatePrediction'  200.07 s
Area Under PR: 0.8977
