# SparkNLP

In [None]:
from pyspark.sql.functions import size, split, when, lit, col, sum_distinct, count
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import re
import string
import sparknlp
from sparknlp.base import *
from sparknlp.annotator import *
from pyspark.ml import Pipeline

In [None]:
spark.sparkContext.getConf().get('spark.jars.packages')

In [None]:
MODEL_NAME = 'finetuned-distilbert'

sequenceClassifier = DistilBertForSequenceClassification.loadSavedModel(
     '{}/saved_model/1'.format(MODEL_NAME),
     spark
 )\
  .setInputCols(["document",'token'])\
  .setOutputCol("class")\
  .setCaseSensitive(True)\
  .setMaxSentenceLength(128)

  sequenceClassifier.write().overwrite().save("./{}_spark_nlp".format(MODEL_NAME))

  sequenceClassifier_loaded = DistilBertForSequenceClassification.load("./{}_spark_nlp".format(MODEL_NAME))\
  .setInputCols(["document",'token'])\
  .setOutputCol("class")

In [None]:
document_assembler = DocumentAssembler() \
            .setInputCol("text") \
            .setOutputCol("document")
    
tokenizer = Tokenizer() \
            .setInputCols(["document"]) \
            .setOutputCol("token")
          
# .setCleanupPatterns(Array("""[^\w\d\s]""")) // remove punctuations (keep alphanumeric chars)
# // if we don't set CleanupPatterns, it will only keep alphabet letters ([^A-Za-z])
normalizer = Normalizer() \
            .setInputCols(["lemma"]) \
            .setOutputCol("normalized")

stopwords_cleaner = StopWordsCleaner()\
            .setInputCols("normalized")\
            .setOutputCol("cleanTokens")\
            .setCaseSensitive(False)

lemma = LemmatizerModel.pretrained('lemma_antbnc') \
            .setInputCols(["token"]) \
            .setOutputCol("lemma")

glove_embeddings = WordEmbeddingsModel().pretrained() \
                        .setInputCols(["document",'cleanTokens'])\
                        .setOutputCol("embeddings")\
                        .setCaseSensitive(False)

embeddingsSentence = SentenceEmbeddings() \
                        .setInputCols(["document", "embeddings"]) \
                        .setOutputCol("sentence_embeddings") \
                        .setPoolingStrategy("AVERAGE")

clf_pipeline = Pipeline(
    stages=[
        document_assembler, 
        tokenizer, 
        lemma,
        normalizer,
        stopwords_cleaner,
        glove_embeddings,
        embeddingsSentence,
        sequenceClassifier_loaded])

# MLFlow serving

In [None]:
import mlflow
mlflow.spark.log_model(clf_pipeline, "sentiment_analysis", pip_requirements=["spark-nlp==4.2.4","mlflow==2.0.1","pyspark==3.3.0"])

Out[25]: <mlflow.models.model.ModelInfo at 0x7f26c35462b0>