In [None]:
!pip install pandas==1.0.3

In [None]:
import mlflow
import boto3
mlflow.set_tracking_uri("http://model-repository.stg.dreamplug.net/")
mlflow.set_experiment("spark-test")

In [None]:
from pyspark.sql import SparkSession
import os
import mlflow.spark
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
import pyspark
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.amazonaws:aws-java-sdk-pom:1.10.34,org.apache.hadoop:hadoop-aws:2.7.2 pyspark-shell'

spark = (SparkSession.builder
            .config("spark.jars.packages", "org.mlflow:mlflow-spark:1.23.1")
            .master("local[*]")
            .getOrCreate())


In [None]:
import shutil

training = spark.createDataFrame([
    (0, "a b c d e spark", 1.0),
    (1, "b d", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce", 0.0) ], ["id", "text", "label"])
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
model = pipeline.fit(training)
with mlflow.start_run() as run:
    mlflow.spark.save_model(model, "spark-model")

In [None]:
######## LOAD MODEL #########################################################################
model = mlflow.pyfunc.load_model("spark-model")
# Prepare test documents, which are unlabeled (id, text) tuples.
test = spark.createDataFrame([
    (4, "spark i j k"),
    (5, "l m n"),
    (6, "spark hadoop spark"),
    (7, "apache hadoop")], ["id", "text"])
# Make predictions on test documents
import pandas as pd
data = {'id': [4], 'text': ["a b c d e spark"]}  
prediction = model.predict(pd.DataFrame(data))
print(prediction)