In [3]:
%pip install pyspark==2.4.4

Collecting pyspark==2.4.4
  Using cached pyspark-2.4.4.tar.gz (215.7 MB)
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-2.4.4-py2.py3-none-any.whl size=216130388 sha256=1f1493681c9ba312d4e509b5886a6c80c8b411089d147d71b55fdcaf9ecb206f
  Stored in directory: /Users/briancai/Library/Caches/pip/wheels/11/48/19/c3b6b66e4575c164407a83bc065179904ddc33c9d6500846f0
Successfully built pyspark
Installing collected packages: pyspark
  Attempting uninstall: pyspark
    Found existing installation: pyspark 2.4.5
    Uninstalling pyspark-2.4.5:
      Successfully uninstalled pyspark-2.4.5
Successfully installed pyspark-2.4.4
Note: you may need to restart the kernel to use updated packages.


In [4]:
%pip install spark-nlp==2.6.3

Collecting spark-nlp==2.6.3
  Using cached spark_nlp-2.6.3-py2.py3-none-any.whl (129 kB)
Installing collected packages: spark-nlp
Successfully installed spark-nlp-2.6.3
Note: you may need to restart the kernel to use updated packages.


In [23]:
from pyspark.sql import functions as sf
import sparknlp
from sparknlp.base import DocumentAssembler
from sparknlp.annotator import (
    BertSentenceEmbeddings,
    SentimentDLApproach,
)
from pyspark.ml import Pipeline
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql.types import DoubleType
import os

In [2]:
spark = sparknlp.start()

In [4]:
DATA_PATH = "/Users/briancai/Desktop/Datasets/yelp_dataset/yelp_academic_dataset_review.json"
data = spark.read.json(DATA_PATH).limit(1000)

data = (
    data
    .withColumn(
        "label",
        sf.when(data.stars > 3, 1.0).otherwise(0.0)
    )
    .select("text", "label")
)

train, test = data.randomSplit([0.8, 0.2], seed=421)

In [12]:
bert_model_path = "/Users/briancai/Drive/NU/Q4/Text/sparknlp_performance/models/bert_base_uncased_en_2.6.0_2.4_1598340514223"

In [33]:
document = (
    DocumentAssembler()
    .setInputCol("text")
    .setOutputCol("document")
)

bert_sent = (
    BertSentenceEmbeddings
    .pretrained("bert_base_uncased")
    .setInputCols(["document"])
    .setOutputCol("sentence_embeddings")
)

sentimentdl = (
    SentimentDLApproach()
    .setInputCols(["sentence_embeddings"])
    .setOutputCol("class")
    .setLabelColumn("label")
    .setMaxEpochs(5)
    .setEnableOutputLogs(True)
    .setThreshold(0.5)
)


pipeline = Pipeline(
    stages=[
        document,
        bert_sent,
        sentimentdl
    ]
)

bert_base_uncased download started this may take some time.
Approximate size to download 392.5 MB
[OK!]


KeyboardInterrupt: 

In [17]:
pipelineModel = pipeline.fit(train)

In [24]:
preds = (
    pipelineModel
    .transform(test)
    .withColumn("prediction", sf.col("class.result").getItem(0).cast(DoubleType()))
    .select("label", "prediction")
)

In [25]:
preds

DataFrame[label: double, prediction: double]

In [26]:
metrics = MulticlassMetrics(preds.rdd)
f1_score = metrics.fMeasure(1.0)

In [28]:
f1_score

0.7612903225806451

In [29]:
def save_dict_as_json(dictionary, path):

    with open(path, "w") as outfile:
        json.dump(dictionary, outfile)

    return None

In [30]:
results_dict = {
    "model type": "SparkNLP",
    "n": 1000,
    "f1 score": f1_score,
}

In [31]:
results_dict

{'model type': 'SparkNLP', 'n': 1000, 'f1 score': 0.7612903225806451}

In [32]:
save_path = os.path.join("/Users/briancai/Drive/NU/Q4/Text/sparknlp_performance/output", "spark_nlp_1")
save_dict_as_json(results_dict, save_path)