In [1]:
# Export java11 to use
import os
os.environ['JAVA_HOME'] = '/home/team1/.jdk/jdk-11.0.19+7'
os.environ["SPARK_HOME"] = "/opt/spark"

# import findspark and initialize it
import findspark
findspark.init("/opt/spark")

# Import the sparknlp library and the PretrainedPipeline class
import sparknlp
from sparknlp.pretrained import PretrainedPipeline
from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer, HashingTF, IDF
from pyspark.sql.functions import udf, col, lower, regexp_replace, when
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline, PipelineModel
from pyspark.sql.types import DoubleType, BooleanType
import pyspark.sql.functions as f
from sparknlp.base import *
from sparknlp.annotator import *

# start spark nlp session
sparknlp.start()

# Create a spark session
spark = SparkSession.builder \
    .appName("SentimentAnalysisPretrained") \
    .master("local[*]") \
    .config("spark.driver.memory", "100g") \
    .config("spark.executor.memory", "100g") \
    .config("spark.memory.offHeap.enabled","true") \
    .config("spark.memory.offHeap.size","100g") \
    .getOrCreate()

In [2]:
# Load the sentiment data
# Assume the data has two columns: body and score
# Score is an integer from 1 to 5
print('READ DATASET...')
data = spark.read.csv('part2_900k.csv', inferSchema=True, header=True, multiLine=True, quote='"', escape='"')
data = data.select('review/score', (lower(regexp_replace('review/text', "[^a-zA-Z\\s]", "")).alias('review/text')))
data = data.dropna()

# Convert to 2 label 0, 1
data = data.replace(1, 0, subset=["review/score"])
data = data.replace(2, 0, subset=["review/score"])
data = data.replace(3, 0, subset=["review/score"])
data = data.replace(4, 1, subset=["review/score"])
data = data.replace(5, 1, subset=["review/score"])

READ DATASET...


In [3]:
# Split the dataframe into training and testing sets
train_df, test_df = data.randomSplit([0.9, 0.1], seed=42)

In [4]:
document_assembler = DocumentAssembler() \
.setInputCol("review/text") \
.setOutputCol("document")

use = UniversalSentenceEncoder.pretrained('tfhub_use', lang="en") \
.setInputCols(["document"])\
.setOutputCol("sentence_embeddings")

classifier = SentimentDLModel().pretrained('sentimentdl_use_imdb')\
.setInputCols(["sentence_embeddings"])\
.setOutputCol("sentiment")

nlp_pipeline = Pipeline(stages=[document_assembler,
use,
classifier
])

l_model = LightPipeline(nlp_pipeline.fit(spark.createDataFrame([['']]).toDF("review/text")))

tfhub_use download started this may take some time.
Approximate size to download 923.7 MB
[OK!]
sentimentdl_use_imdb download started this may take some time.
Approximate size to download 12 MB
[OK!]


In [5]:
l_model.pipeline_model.save('pipelines/pretrained_sentiment_on_imdb')

In [5]:
predictions = l_model.transform(test_df)

In [6]:
predictions.printSchema()

root
 |-- review/score: integer (nullable = true)
 |-- review/text: string (nullable = true)
 |-- document: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- annotatorType: string (nullable = true)
 |    |    |-- begin: integer (nullable = false)
 |    |    |-- end: integer (nullable = false)
 |    |    |-- result: string (nullable = true)
 |    |    |-- metadata: map (nullable = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: string (valueContainsNull = true)
 |    |    |-- embeddings: array (nullable = true)
 |    |    |    |-- element: float (containsNull = false)
 |-- sentence_embeddings: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- annotatorType: string (nullable = true)
 |    |    |-- begin: integer (nullable = false)
 |    |    |-- end: integer (nullable = false)
 |    |    |-- result: string (nullable = true)
 |    |    |-- metadata: map (nullable = true)
 |    |    |    |-- key: str

In [7]:
predictions = predictions.select('review/score', 'sentiment')

In [8]:
predictions.show(3, truncate=False)

+------------+--------------------------------------------------------------------------------------+
|review/score|sentiment                                                                             |
+------------+--------------------------------------------------------------------------------------+
|0           |[[category, 0, 228, neg, [sentence -> 0, pos -> 2.1157134E-6, neg -> 0.99999785], []]]|
|0           |[[category, 0, 554, pos, [sentence -> 0, pos -> 0.99998105, neg -> 1.8939772E-5], []]]|
|0           |[[category, 0, 2551, pos, [sentence -> 0, pos -> 0.9999907, neg -> 9.277911E-6], []]] |
+------------+--------------------------------------------------------------------------------------+
only showing top 3 rows



In [6]:
result = predictions.withColumn('prediction', f.explode('sentiment.result')) 

In [10]:
result.select('prediction').distinct().collect()

[Row(prediction='pos'), Row(prediction='neg'), Row(prediction='neutral')]

In [7]:
result = result.replace('neg', '0', subset=['prediction'])
result = result.replace('neutral', '0', subset=['prediction'])
result = result.replace('pos', '1', subset=['prediction'])

result = result.withColumn('prediction', col('prediction').cast('double'))
result.printSchema()

root
 |-- review/score: integer (nullable = true)
 |-- review/text: string (nullable = true)
 |-- document: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- annotatorType: string (nullable = true)
 |    |    |-- begin: integer (nullable = false)
 |    |    |-- end: integer (nullable = false)
 |    |    |-- result: string (nullable = true)
 |    |    |-- metadata: map (nullable = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: string (valueContainsNull = true)
 |    |    |-- embeddings: array (nullable = true)
 |    |    |    |-- element: float (containsNull = false)
 |-- sentence_embeddings: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- annotatorType: string (nullable = true)
 |    |    |-- begin: integer (nullable = false)
 |    |    |-- end: integer (nullable = false)
 |    |    |-- result: string (nullable = true)
 |    |    |-- metadata: map (nullable = true)
 |    |    |    |-- key: str

In [12]:
result.show(3)

+------------+--------------------+----------+
|review/score|           sentiment|prediction|
+------------+--------------------+----------+
|           0|[{category, 0, 22...|       0.0|
|           0|[{category, 0, 55...|       1.0|
|           0|[{category, 0, 25...|       1.0|
+------------+--------------------+----------+
only showing top 3 rows



In [8]:
print("EVALUATION...")
evaluator = MulticlassClassificationEvaluator(labelCol="review/score", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(result)
print(f"The accuracy of the model is {accuracy:0.2f}")

EVALUATION...
The accuracy of the model is 0.83


In [9]:
evaluator.setMetricName('f1')
f1 = evaluator.evaluate(result)
print(f"The f1 of the model is {f1:0.2f}")

The f1 of the model is 0.82
