## Automatically pick up the last model training from MLFlow instead of hardcoding the run id

In [1]:
import mlflow

last_parent_run = set()
exp = mlflow.get_experiment_by_name("Default")

df = mlflow.search_runs([exp.experiment_id], order_by=["Created DESC"])
last_run_id = df.loc[0,'run_id']

print(last_run_id)

0546a833afee4ee18369ff8554444861


## Load back the model from MLFlow

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

logged_model = f'runs:/{last_run_id}/model'
# logged_model = 'runs:/502530abbc2a4ae3b043462eaa4b8828/model'

# Load model as a Spark UDF. Override result_type if the model does not return double values.
loaded_model = mlflow.pyfunc.spark_udf(spark, model_uri=logged_model, result_type='string')

## Create a Spark DataFrame with some rows of text

In [3]:
from pyspark.sql.functions import struct, col

df = spark.createDataFrame([
    ['ca', 'I love Yosemite', 1, 10], 
    ['wi', 'This was sandbagged', 2, 20], 
], schema=['state', 'text', 'cat', 'id'])

df.show()
display(df)

+-----+-------------------+---+---+
|state|               text|cat| id|
+-----+-------------------+---+---+
|   ca|    I love Yosemite|  1| 10|
|   wi|This was sandbagged|  2| 20|
+-----+-------------------+---+---+



DataFrame[state: string, text: string, cat: bigint, id: bigint]

## Now apply the Model on the DataFrame

In [4]:
df2 = df.withColumn('predictions', loaded_model(struct(*map(col, df.columns))))
df2.show()

+-----+-------------------+---+---+-----------+
|state|               text|cat| id|predictions|
+-----+-------------------+---+---+-----------+
|   ca|    I love Yosemite|  1| 10|   POSITIVE|
|   wi|This was sandbagged|  2| 20|   NEGATIVE|
+-----+-------------------+---+---+-----------+



## Now register the model as a Spark UDF

In [5]:
spark.udf.register("predict_sentiment", loaded_model)

spark.sql("SHOW FUNCTIONS LIKE '*predict_sentiment*'").show()

+-----------------+
|         function|
+-----------------+
|predict_sentiment|
+-----------------+



## Use the same DataFrame example data as a table

In [6]:
df.createOrReplaceTempView("reviews")

spark.sql("SELECT * FROM reviews").show()

+-----+-------------------+---+---+
|state|               text|cat| id|
+-----+-------------------+---+---+
|   ca|    I love Yosemite|  1| 10|
|   wi|This was sandbagged|  2| 20|
+-----+-------------------+---+---+



## The crux, now we run the SQL query with the model function and get the predictions!

In [7]:
spark.sql("SELECT text, predict_sentiment(text) AS prediction FROM reviews").show()

+-------------------+----------+
|               text|prediction|
+-------------------+----------+
|    I love Yosemite|  POSITIVE|
|This was sandbagged|  NEGATIVE|
+-------------------+----------+



## Bonus: Run the model on a Panda DataFrames

In [8]:
import pandas as pd

# Load model as a PyFuncModel.
loaded_model = mlflow.pyfunc.load_model(logged_model)

# From Spark DataFrame 
data = df.toPandas()
display(data)

print('Spark DataFrame: %s\n' % loaded_model.predict(data))

# From Panda DataFrame
d = { 'text': ['Hello, my dog is cute', 'I hated this broken idea']}
pdf = pd.DataFrame(data=d)

# Predict on a single Pandas DataFrame row
print('Pandas DataFrame row: %s\n' % loaded_model.predict(pdf[1:2]))

# Predict on a Pandas DataFrame.
pdf['sentiment'] = pdf.apply(loaded_model.predict, axis=1)
print('Pandas DataFrame:')
display(pdf)

Unnamed: 0,state,text,cat,id
0,ca,I love Yosemite,1,10
1,wi,This was sandbagged,2,20


Spark DataFrame:           0
0  POSITIVE

Pandas DataFrame row:           0
0  NEGATIVE

Pandas DataFrame:


Unnamed: 0,text,sentiment
0,"Hello, my dog is cute",0 0 POSITIVE
1,I hated this broken idea,0 0 NEGATIVE


## Call via a SQL cell

In [9]:
%load_ext sparksql_magic

In [10]:
%%sparksql
SELECT * FROM reviews

0,1,2,3
state,text,cat,id
ca,I love Yosemite,1,10
wi,This was sandbagged,2,20


In [11]:
%%sparksql
SELECT text, predict_sentiment(text) AS prediction FROM reviews

0,1
text,prediction
I love Yosemite,POSITIVE
This was sandbagged,NEGATIVE


## Bonus 2: just directly apply a Transformer pipeline

In [12]:
from transformers import pipeline

classifier = pipeline("sentiment-analysis")
classifier("ML and SQL are cool!")

[{'label': 'POSITIVE', 'score': 0.9997429251670837}]

In [13]:
def classify(s):
  return classifier(s)[0]['label']

spark.udf.register("classify", classify)

<function __main__.classify(s)>

In [14]:
%%sparksql
SELECT text, classify(text) AS prediction FROM reviews

0,1
text,prediction
I love Yosemite,POSITIVE
This was sandbagged,NEGATIVE
