Run using ML Runtime Cluster

In [0]:
catalog = "main"
schema = "default"
labeled_table = f"{catalog}.{schema}.labeled_pilot_notes"
model_name = f"{catalog}.{schema}.pilot_notes_model"
target_table = f"{catalog}.{schema}.pilot_notes_supervised_classification"

LABEL_COL = "unsupervised_prediction"
TEXT_COL = "pilot_notes"

In [0]:
labeled_df = spark.read.table(labeled_table).dropna(subset=[TEXT_COL, LABEL_COL])
train_df, test_df = labeled_df.randomSplit([0.8, 0.2], seed=42)

In [0]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer, IDF, StringIndexer, IndexToString
from pyspark.ml.classification import LogisticRegression

# 1) Supervised label -> index
label_indexer = StringIndexer(inputCol=LABEL_COL, outputCol="label", handleInvalid="skip")
label_indexer_model = label_indexer.fit(train_df)  

# 2) Text -> tokens -> filtered tokens
regex_tok = RegexTokenizer(inputCol=TEXT_COL, outputCol="tokens", pattern="\\W+", minTokenLength=2)
stop_rem = StopWordsRemover(inputCol="tokens", outputCol="tokens_no_sw")


# 3) Vectorize + TF-IDF
cv = CountVectorizer(inputCol="tokens_no_sw", outputCol="tf", vocabSize=100_000, minDF=2)
idf = IDF(inputCol="tf", outputCol="features")


# 4) Classifier (use whatever you prefer here)
clf = LogisticRegression(featuresCol="features", labelCol="label", maxIter=50, regParam=0.0, elasticNetParam=0.0)


# 5) Optional: recover human-readable predicted label for convenience
label_to_str = IndexToString(
    inputCol="prediction",
    outputCol="predicted_label",
    labels=label_indexer_model.labels
)

pipeline = Pipeline(stages=[label_indexer_model, regex_tok, stop_rem, cv, idf, clf, label_to_str])

In [0]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


param_grid = (
    ParamGridBuilder()
    .addGrid(cv.vocabSize, [50_000, 100_000])
    .addGrid(clf.regParam, [0.0, 0.01, 0.1])
    .addGrid(clf.elasticNetParam, [0.0, 0.5, 1.0])
    .build()
)

evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy"
)

cv_estimator = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=param_grid,
    evaluator=evaluator,
    numFolds=3,
    parallelism=4,
    seed=42,
)

In [0]:
import mlflow
import mlflow.spark
from mlflow.models.signature import infer_signature


with mlflow.start_run(run_name="sparkml_pipeline_cv"):

    # Fit CV; this returns a CrossValidatorModel whose .bestModel is a PipelineModel
    cv_model = cv_estimator.fit(train_df)
    best_pipeline_model = cv_model.bestModel

    # Evaluate on test
    test_pred = best_pipeline_model.transform(test_df)
    test_acc = evaluator.evaluate(test_pred)

    # Useful extra metrics
    f1_eval = MulticlassClassificationEvaluator(
        labelCol="label", predictionCol="prediction", metricName="f1"
    )
    test_f1 = f1_eval.evaluate(test_pred)

    mlflow.log_metric("test_accuracy", test_acc)
    mlflow.log_metric("test_f1", test_f1)

    # Infer model signature from input/output schema
    sample_input = train_df.limit(5).toPandas()
    sample_output = best_pipeline_model.transform(train_df.limit(5)).toPandas()[["prediction"]]
    signature = infer_signature(sample_input, sample_output)

    # Log the entire PipelineModel as one artifact and register it in Unity Catalog
    registered_model = mlflow.spark.log_model(
        spark_model=best_pipeline_model,
        artifact_path="model",
        registered_model_name=model_name,
        signature=signature,
        input_example=sample_input
    )

In [0]:
model_version = registered_model.registered_model_version
loaded_model = mlflow.spark.load_model(f"models:/{model_name}/{model_version}")
predictions = loaded_model.transform(labeled_df)
predictions.write.mode("overwrite").saveAsTable(target_table)
spark.read.table(target_table).display()