In [None]:
import mlflow
import sparknlp
from sparknlp.base import *
from sparknlp.annotator import *
from pyspark.ml import Pipeline
import pandas as pd
from sparknlp.training import CoNLL
import pyspark
from pyspark.sql import SparkSession

In [None]:
documentAssembler = DocumentAssembler()\
    .setInputCol("text")\
    .setOutputCol("document")
tokenizer = Tokenizer() \
    .setInputCols(["document"]) \
    .setOutputCol("token")
lemmatizer = LemmatizerModel.pretrained() \
    .setInputCols(["token"]) \
    .setOutputCol("prediction")  # It's mandatory to call it prediction
pipeline = Pipeline(stages=[
  documentAssembler,
  tokenizer,
  lemmatizer
 ])

In [None]:
conda_env = {
    'channels': ['conda-forge'],
    'dependencies': [
        'python=3.8.8',
        {
            "pip": [
              'pyspark==3.1.1',
              'mlflow==1.21.0',
              'spark-nlp==3.3.2'
            ]
        }
    ],
    'name': 'mlflow-env'
}

In [None]:
mlflow.spark.log_model(p_model, "lemmatizer", conda_env=conda_env)

In [None]:
import mlflow
logged_model = 'runs:/a8cf070528564792bbf66d82211db0a0/lemmatizer'
# Load model as a Spark UDF.
loaded_model = mlflow.pyfunc.spark_udf(spark, model_uri=logged_model)
# Predict on a Spark DataFrame.
columns = list(df.columns)
df.withColumn('predictions', loaded_model(*columns)).collect()

In [None]:
import mlflow
logged_model = 'runs:/a8cf070528564792bbf66d82211db0a0/lemmatizer'
loaded_model = mlflow.pyfunc.load_model(model_uri=logged_model)
# Predict on a Spark DataFrame.
res_spark = loaded_model.predict(df_1_spark.rdd)

In [None]:
import pyspark.sql.types as T
import pyspark.sql.functions as f
annotationType = T.StructType([
            T.StructField('annotatorType', T.StringType(), False),
            T.StructField('begin', T.IntegerType(), False),
            T.StructField('end', T.IntegerType(), False),
            T.StructField('result', T.StringType(), False),
            T.StructField('metadata', T.MapType(T.StringType(), T.StringType()), False),
            T.StructField('embeddings', T.ArrayType(T.FloatType()), False)
        ])

In [None]:
spark_res = spark.createDataFrame(res_pandas[0], schema=annotationType)