In [None]:
#Import libraries
from pyspark.sql.functions import col, pandas_udf,udf,lit
from azureml.core import Workspace
from azureml.core.authentication import ServicePrincipalAuthentication
import azure.synapse.ml.predict as pcontext
import azure.synapse.ml.predict.utils._logger as synapse_predict_logger

In [None]:
#Set input data path
DATA_FILE = "abfss://<containername>@<storageaccountname>.dfs.core.windows.net/dailyjoininput.csv"
#Set model URI
#Set AML URI, if trained model is registered in AML
AML_MODEL_URI = "aml://churns_mlflow:1" #In URI ":x" signifies model version in AML. You can   choose which model version you want to run. If ":x" is not provided then by default   latest version will be picked.

#Define model return type
RETURN_TYPES = "bigint" # for ex: int, float etc. PySpark data types are supported

#Define model runtime. This supports only mlflow
RUNTIME = "mlflow"

In [None]:
#AML workspace authentication using linked service
from notebookutils.mssparkutils import azureML
ws = azureML.getWorkspace("<linked_service_name>") #   "<linked_service_name>" is the linked service name, not AML workspace name. Also, linked   service supports MSI and service principal both

In [None]:
#Enable SynapseML predict
spark.conf.set("spark.synapse.ml.predict.enabled","true")

In [None]:
#Bind model within Spark session
model = pcontext.bind_model(
    return_types=RETURN_TYPES, 
    runtime=RUNTIME, 
    model_alias="<your model alias>", #This alias will be used in PREDICT call to refer  this   model
    model_uri=AML_MODEL_URI, #In case of AML, it will be AML_MODEL_URI
    aml_workspace=ws #This is only for AML. In case of ADLS, this parameter can be removed
    ).register()

In [None]:
from pyspark.sql.types import StructType, StructField, LongType, StringType

churn_schm = StructType([
        StructField("customer_id", StringType(), True),
        StructField("current_linkedin_activity", LongType(), True),
        StructField("email_domain", StringType(), True),
        StructField("linkedin_skill_code", LongType(), True),
        StructField("mentor_program_involvement", LongType(), True),
        StructField("negative_review_in_past_5_years", LongType(), True),
        StructField("recruiting_location_code", LongType(),True),
        StructField("recruiting_method_code", LongType(), True),
        StructField("weekly_consumption", LongType(), True),
        StructField("years_of_membership", LongType(), True),
        StructField("survey_attitude_towards_company", LongType(), True),
        StructField("survey_attitude_towards_product_features", LongType(), True),
        StructField("survey_attitude_towards_performance", LongType(), True),
        StructField("survey_attitude_towards_usability", LongType(), True),
        StructField("survey_attitude_towards_product_quality", LongType(), True),
        StructField("survey_attitude_towards_customer_service", LongType(), True)
    ])


In [None]:
#load test data
df = spark.read \
    .format("csv") \
    .option("header", "true") \
    .schema(churn_schm) \
    .csv(DATA_FILE) 
df = df.select(df.columns[1:])
df.createOrReplaceTempView('data')
df.show(10)

In [None]:
%%sql

select * from data

In [None]:
# Call PREDICT

predictions = spark.sql(
                    """
                        SELECT PREDICT('<your model alias>', *) AS predict FROM data
                    """
                 ).show()