In [10]:
import mlflow.spark

In [11]:
import os
import warnings
import sys
import mlflow
import pandas as pd
import numpy as np
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.model_selection import train_test_split
from sklearn.linear_model import ElasticNet

In [12]:
import logging
import json
import shutil
import datetime
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
import cml.data_v1 as cmldata

In [13]:
#Edit your connection name here:
CONNECTION_NAME = "go01-aw-dl"

conn = cmldata.get_connection(CONNECTION_NAME)
spark = conn.get_spark_session()

Spark Application Id:spark-c9ab2527cc834e77b7fbcd33ade19c00


In [14]:
training_df = spark.createDataFrame(
[
    ("0", "a b c d e spark", 1.0),
    ("1", "b d", 0.0),
    ("2", "spark f g h", 1.0),
    ("3", "hadoop mapreduce", 0.0),
],
["id", "text", "label"],
)

In [15]:
def exp1(df):

    mlflow.set_experiment("sparkml-experiment-new")

    ##EXPERIMENT 1

    df.writeTo("spark_catalog.default.training").using("iceberg").createOrReplace()
    spark.sql("SELECT * FROM spark_catalog.default.training").show()

    ### SHOW TABLE HISTORY AND SNAPSHOTS
    spark.read.format("iceberg").load("spark_catalog.default.training.history").show(20, False)
    spark.read.format("iceberg").load("spark_catalog.default.training.snapshots").show(20, False)

    snapshot_id = spark.read.format("iceberg").load("spark_catalog.default.training.snapshots").select("snapshot_id").tail(1)[0][0]
    committed_at = spark.read.format("iceberg").load("spark_catalog.default.training.snapshots").select("committed_at").tail(1)[0][0].strftime('%m/%d/%Y')
    parent_id = spark.read.format("iceberg").load("spark_catalog.default.training.snapshots").select("parent_id").tail(1)[0][0]
    
    tags = {
      "iceberg_snapshot_id": snapshot_id,
      "iceberg_snapshot_committed_at": committed_at,
      "iceberg_parent_id": parent_id,
      "row_count": training_df.count()
    }
    
    ### MLFLOW EXPERIMENT RUN
    with mlflow.start_run() as run:

        maxIter=8
        regParam=0.01

        tokenizer = Tokenizer(inputCol="text", outputCol="words")
        hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
        lr = LogisticRegression(maxIter=maxIter, regParam=regParam)
        pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
        model = pipeline.fit(training_df)

        mlflow.log_param("maxIter", maxIter)
        mlflow.log_param("regParam", regParam)

        #storage_path = "s3://go01-demo/pdefusco/tests/sparkmodels"
        mlflow.spark.log_model(model, artifact_path="pipe_artifacts", dfs_tmpdir="/home/cdsw/models-tests") #, 
        #mlflow.spark.save_model(model, path="pipe_artifacts")
        #model.write().overwrite().save("/home/cdsw/model/pdefusco/pipeline") #"s3a://go01-demo"

        #prediction = model.transform(test)
        mlflow.set_tags(tags)

    mlflow.end_run()
    
    experiment_id = mlflow.get_experiment_by_name("sparkml-experiment-new").experiment_id
    runs_df = mlflow.search_runs(experiment_id, run_view_type=1)
    
    return runs_df

In [16]:
#!hdfs dfs -ls hdfs://ns1

In [17]:
exp1(training_df)

                                                                                

+---+----------------+-----+
| id|            text|label|
+---+----------------+-----+
|  0| a b c d e spark|  1.0|
|  1|             b d|  0.0|
|  2|     spark f g h|  1.0|
|  3|hadoop mapreduce|  0.0|
+---+----------------+-----+

+-----------------------+-------------------+---------+-------------------+
|made_current_at        |snapshot_id        |parent_id|is_current_ancestor|
+-----------------------+-------------------+---------+-------------------+
|2024-09-27 22:05:04.216|6283834449287593171|null     |true               |
+-----------------------+-------------------+---------+-------------------+

+-----------------------+-------------------+-------------------+---------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------

 - pandas (current: 2.1.4, required: pandas<2)
To fix the mismatches, call `mlflow.pyfunc.get_model_dependencies(model_uri)` to fetch the model's environment and install dependencies using the resulting environment file.


Unnamed: 0,run_id,experiment_id,status,artifact_uri,start_time,end_time,params.maxIter,params.regParam,tags.iceberg_parent_id,tags.mlflow.log-model.history,tags.row_count,tags.mlflow.user,tags.engineID,tags.mlflow.source.type,tags.iceberg_snapshot_id,tags.iceberg_snapshot_committed_at,tags.mlflow.source.name,tags.mlflow.source.git.commit
0,ks3e-rcak-poz8-uwku,20y8-rgzd-3tpf-i665,EXPERIMENT_RUN_FINISHED,/home/cdsw/.experiments/20y8-rgzd-3tpf-i665/ks...,2024-09-27 21:54:47.358342144+00:00,2024-09-27 21:55:19.035000064+00:00,8,0.01,,"[{""run_id"": ""ks3e-rcak-poz8-uwku"", ""artifact_p...",4,pauldefusco,qrgk2h3tnwrrm71d,LOCAL,8782431286665847158,09/27/2024,/usr/local/lib/python3.10/site-packages/ipyker...,06b7bed0031ad636f6b3ade4189ea15b164906f3
1,s21y-bleu-ym1a-2b1w,20y8-rgzd-3tpf-i665,EXPERIMENT_RUN_FINISHED,/home/cdsw/.experiments/20y8-rgzd-3tpf-i665/s2...,2024-09-27 22:05:12.874052864+00:00,2024-09-27 22:05:35.179000064+00:00,8,0.01,,"[{""run_id"": ""s21y-bleu-ym1a-2b1w"", ""artifact_p...",4,pauldefusco,qrgk2h3tnwrrm71d,LOCAL,6283834449287593171,09/27/2024,/usr/local/lib/python3.10/site-packages/ipyker...,06b7bed0031ad636f6b3ade4189ea15b164906f3


In [11]:
mlflow.get_artifact_uri()

'/home/cdsw/.experiments/43o0-djev-9a4c-t44t/onjl-k0ff-tcf6-x5nm/artifacts'

In [30]:
predictions.show()

[Stage 98:>                                                         (0 + 1) / 1]

+---+----------------+--------------------+--------------------+--------------------+--------------------+----------+
| id|            text|               words|            features|       rawPrediction|         probability|prediction|
+---+----------------+--------------------+--------------------+--------------------+--------------------+----------+
|  0| a b c d e spark|[a, b, c, d, e, s...|(262144,[74920,89...|[-4.0215432342565...|[0.01760962323702...|       1.0|
|  1|             b d|              [b, d]|(262144,[89530,14...|[3.72362151547898...|[0.97642293742550...|       0.0|
|  2|     spark f g h|    [spark, f, g, h]|(262144,[36803,17...|[-4.1749669283492...|[0.01514286794945...|       1.0|
|  3|hadoop mapreduce| [hadoop, mapreduce]|(262144,[132966,1...|[4.67558415487772...|[0.99076598259580...|       0.0|
+---+----------------+--------------------+--------------------+--------------------+--------------------+----------+



                                                                                

In [None]:
from pyspark.ml import PipelineModel 
# Path where the pipeline model was saved 
path = "path_to_saved_pipeline_model" 
# Load the fitted pipeline model loaded_pipeline_model = PipelineModel.load(path) # Now you can use the model to make predictions predictions = loaded_pipeline_model.transform(test_data)

In [13]:
test_df = spark.createDataFrame(
[
    ("0", "a b c d e spark"),
    ("1", "b d"),
    ("2", "spark f g h"),
    ("3", "hadoop mapreduce"),
],
["id", "text"],
)

In [14]:
import mlflow

logged_model = '/home/cdsw/.experiments/to9f-oq8b-qch6-u3ex/3oxt-tjek-73lg-danq/artifacts/artifacts'

# Load model as a Spark UDF.
loaded_model = mlflow.pyfunc.spark_udf(spark, model_uri=logged_model)

# Predict on a Spark DataFrame.
test_df.withColumn('predictions', loaded_model(*column_names)).collect()

 - pandas (current: 2.1.4, required: pandas<2)
To fix the mismatches, call `mlflow.pyfunc.get_model_dependencies(model_uri)` to fetch the model's environment and install dependencies using the resulting environment file.


ModuleNotFoundError: No module named 'flask'

In [19]:
exp2(training_df)

                                                                                

+---+----------------+-----+
| id|            text|label|
+---+----------------+-----+
|  0| a b c d e spark|  1.0|
|  1|             b d|  0.0|
|  2|     spark f g h|  1.0|
|  3|hadoop mapreduce|  0.0|
+---+----------------+-----+



                                                                                

+---+----------------+-----+
| id|            text|label|
+---+----------------+-----+
|  0| a b c d e spark|  1.0|
|  1|             b d|  0.0|
|  2|     spark f g h|  1.0|
|  3|hadoop mapreduce|  0.0|
|  0| a b c d e spark|  1.0|
|  1|             b d|  0.0|
|  2|     spark f g h|  1.0|
|  3|hadoop mapreduce|  0.0|
+---+----------------+-----+

+-----------------------+-------------------+-------------------+-------------------+
|made_current_at        |snapshot_id        |parent_id          |is_current_ancestor|
+-----------------------+-------------------+-------------------+-------------------+
|2023-03-29 01:09:41.31 |5150219139107687331|null               |true               |
|2023-03-29 01:11:42.278|8171776268264279750|5150219139107687331|true               |
+-----------------------+-------------------+-------------------+-------------------+

+-----------------------+-------------------+-------------------+---------+-----------------------------------------------------------

                                                                                

Unnamed: 0,run_id,experiment_id,status,artifact_uri,start_time,end_time,params.maxIter,params.regParam,tags.mlflow.source.git.commit,tags.mlflow.source.name,tags.mlflow.user,tags.mlflow.source.type,tags.row_count,tags.iceberg_parent_id,tags.iceberg_snapshot_id,tags.iceberg_snapshot_committed_at
0,kfnc-iesm-r79x-nqcp,p1y7-kv7u-fkya-ca0e,EXPERIMENT_RUN_RUNNING,/home/cdsw/.experiments/p1y7-kv7u-fkya-ca0e/kf...,2023-03-29 01:04:44.571115008+00:00,NaT,10,0.001,1d14407660f321086ef27b91b0e06ca91cd34b18,/usr/local/lib/python3.7/site-packages/ipykern...,pauldefusco,LOCAL,,,,
1,vaza-u8gd-dbkd-k4ow,p1y7-kv7u-fkya-ca0e,EXPERIMENT_RUN_FAILED,/home/cdsw/.experiments/p1y7-kv7u-fkya-ca0e/va...,2023-03-29 01:06:53.296070144+00:00,2023-03-29 01:06:56.807000064+00:00,8,0.01,1d14407660f321086ef27b91b0e06ca91cd34b18,/usr/local/lib/python3.7/site-packages/ipykern...,pauldefusco,LOCAL,,,,
2,55oy-jgoj-sxwp-tp7b,p1y7-kv7u-fkya-ca0e,EXPERIMENT_RUN_FINISHED,/home/cdsw/.experiments/p1y7-kv7u-fkya-ca0e/55...,2023-03-29 01:09:43.688317184+00:00,2023-03-29 01:09:46.864000+00:00,8,0.01,1d14407660f321086ef27b91b0e06ca91cd34b18,/usr/local/lib/python3.7/site-packages/ipykern...,pauldefusco,LOCAL,4.0,,5.150219139107687e+18,03/29/2023
3,bmxd-ywal-739v-3ikq,p1y7-kv7u-fkya-ca0e,EXPERIMENT_RUN_FINISHED,/home/cdsw/.experiments/p1y7-kv7u-fkya-ca0e/bm...,2023-03-29 01:11:43.908041984+00:00,2023-03-29 01:11:48.220000+00:00,10,0.002,1d14407660f321086ef27b91b0e06ca91cd34b18,/usr/local/lib/python3.7/site-packages/ipykern...,pauldefusco,LOCAL,8.0,5.150219139107687e+18,8.171776268264278e+18,03/29/2023


In [22]:
#Retrieve snapshot_id from Experiments page or above dataframe. Use the Snapshot ID from the first experiment.
snapshot_id = "5150219139107687331"
exp3(training_df, snapshot_id)

                                                                                

Unnamed: 0,run_id,experiment_id,status,artifact_uri,start_time,end_time,params.maxIter,params.regParam,tags.mlflow.source.git.commit,tags.mlflow.source.name,tags.mlflow.user,tags.mlflow.source.type,tags.row_count,tags.iceberg_parent_id,tags.iceberg_snapshot_id,tags.iceberg_snapshot_committed_at
0,kfnc-iesm-r79x-nqcp,p1y7-kv7u-fkya-ca0e,EXPERIMENT_RUN_RUNNING,/home/cdsw/.experiments/p1y7-kv7u-fkya-ca0e/kf...,2023-03-29 01:04:44.571115008+00:00,NaT,10,0.001,1d14407660f321086ef27b91b0e06ca91cd34b18,/usr/local/lib/python3.7/site-packages/ipykern...,pauldefusco,LOCAL,,,,
1,vaza-u8gd-dbkd-k4ow,p1y7-kv7u-fkya-ca0e,EXPERIMENT_RUN_FAILED,/home/cdsw/.experiments/p1y7-kv7u-fkya-ca0e/va...,2023-03-29 01:06:53.296070144+00:00,2023-03-29 01:06:56.807000064+00:00,8,0.01,1d14407660f321086ef27b91b0e06ca91cd34b18,/usr/local/lib/python3.7/site-packages/ipykern...,pauldefusco,LOCAL,,,,
2,55oy-jgoj-sxwp-tp7b,p1y7-kv7u-fkya-ca0e,EXPERIMENT_RUN_FINISHED,/home/cdsw/.experiments/p1y7-kv7u-fkya-ca0e/55...,2023-03-29 01:09:43.688317184+00:00,2023-03-29 01:09:46.864000+00:00,8,0.01,1d14407660f321086ef27b91b0e06ca91cd34b18,/usr/local/lib/python3.7/site-packages/ipykern...,pauldefusco,LOCAL,4.0,,5.150219139107687e+18,03/29/2023
3,bmxd-ywal-739v-3ikq,p1y7-kv7u-fkya-ca0e,EXPERIMENT_RUN_FINISHED,/home/cdsw/.experiments/p1y7-kv7u-fkya-ca0e/bm...,2023-03-29 01:11:43.908041984+00:00,2023-03-29 01:11:48.220000+00:00,10,0.002,1d14407660f321086ef27b91b0e06ca91cd34b18,/usr/local/lib/python3.7/site-packages/ipykern...,pauldefusco,LOCAL,8.0,5.150219139107687e+18,8.171776268264278e+18,03/29/2023
4,tndc-jcty-7qn9-jxsu,p1y7-kv7u-fkya-ca0e,EXPERIMENT_RUN_FINISHED,/home/cdsw/.experiments/p1y7-kv7u-fkya-ca0e/tn...,2023-03-29 01:14:32.279436032+00:00,2023-03-29 01:14:35.438000128+00:00,7,0.005,1d14407660f321086ef27b91b0e06ca91cd34b18,/usr/local/lib/python3.7/site-packages/ipykern...,pauldefusco,LOCAL,4.0,5.150219139107687e+18,8.171776268264278e+18,03/29/2023
5,4yk0-cwcg-9hcn-c7k9,p1y7-kv7u-fkya-ca0e,EXPERIMENT_RUN_FINISHED,/home/cdsw/.experiments/p1y7-kv7u-fkya-ca0e/4y...,2023-03-29 01:15:29.313084928+00:00,2023-03-29 01:15:32.180000+00:00,7,0.005,1d14407660f321086ef27b91b0e06ca91cd34b18,/usr/local/lib/python3.7/site-packages/ipykern...,pauldefusco,LOCAL,4.0,,5.150219139107687e+18,03/29/2023
