In [0]:
pip install sentencepiece 

Python interpreter will be restarted.
Collecting sentencepiece
  Downloading sentencepiece-0.1.97-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.3 MB)
Installing collected packages: sentencepiece
Successfully installed sentencepiece-0.1.97
Python interpreter will be restarted.


In [0]:
import mlflow
import pandas as pd
import time
from pyspark.sql import functions as F
from pyspark.sql.types import *

# identify the model we'll pull from the model registry
model_name = "m2m100_translation_transformer" 

# grab the uploaded sample data 
deltaDF = spark.read.table('kenjohnson_demo.default.eng_por')
deltaDF = deltaDF.toPandas()
count = deltaDF['English'].count()
df_source = pd.DataFrame({'id':[_ for _ in range(count)]})
df_source['content'] = deltaDF['English'].astype(str)



In [0]:
#Uncomment this to use only 64 values for debugging purposes.
#df_source = df_source[df_source['id'].isin(df_source['id'].value_counts().head(64).index)]

In [0]:
# convert pandas dataframe to Spark dataframe, and force Spark to partition the dataframe across all available executors
df_source_spark = spark.createDataFrame(df_source).repartition(spark.sparkContext.defaultParallelism).cache()

In [0]:
# inferencing function we'll distribute as a Pandas UDF
def translation_predictions_function(df):
    translation_loaded = mlflow.pyfunc.load_model(f"models:/{model_name}/Production")
    ##mlflow.pyfunc.PythonModel enforces a one argument predict function so we use a tuple to send in our src and target languages
    param_dict = {'src_lang': 'en', 'target_lang': 'pt', 'batch_size': 32}
    model_input = ([df, param_dict])
    return translation_loaded.predict(model_input)
    
# the Spark Pandas function API requires a return value schema
schema = StructType(
    [
      StructField("id", LongType(), True),
      StructField("content", StringType(), True),
      StructField("translation", StringType(), True)
    ]
)

inferencingStartTime = time.time()
# actual translation inference on the Spark dataframe
df_source_translation = (
    df_source_spark\
    .groupBy(F.spark_partition_id().alias("_pid"))\
    .applyInPandas(translation_predictions_function, schema)
).cache()
df_source_translation.write.mode("overwrite").format("noop").save()

# viewing the results dataframe in a Databricks notebook
display(df_source_translation)

id,content,translation
14084,What's my prize?,Qual é o meu prêmio?
15332,I don't eat pork.,Não consumo porco.
15339,I don't hate you.,Eu não te odeio.
15288,I chose the wine.,Eu escolhi o vinho.
15555,I know the rules.,Eu conheço as regras.
13653,Tom was yelling.,Tom estava gritando.
15824,I still like you.,Eu ainda gosto de você.
15425,I found this one.,Eu encontrei esse.
13350,Tom hugged Mary.,Tom abraçou a Maria.
14749,Do your homework.,Faça o seu trabalho doméstico.


In [0]:
inferencingEndTime = time.time()
totalInferencingTime = inferencingEndTime - inferencingStartTime
dataRowCount = df_source["id"].count()

print (f"{dataRowCount} Source phrases were translated. Inferencing phase took {totalInferencingTime} seconds on {df_source_spark.rdd.getNumPartitions()} total nodes.")

168903 Source phrases were translated. Inferencing phase took 562.1647253036499 seconds on 64 total nodes.


In [0]:
print(df_source_spark.rdd.getNumPartitions())

64
