In [2]:
import mlflow.spark

In [3]:
import os
import warnings
import sys
import mlflow
import pandas as pd
import numpy as np
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.model_selection import train_test_split
from sklearn.linear_model import ElasticNet

In [4]:
import logging
import json
import shutil
import datetime
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
import cml.data_v1 as cmldata

In [5]:
#Edit your connection name here:
CONNECTION_NAME = "se-aw-mdl"

conn = cmldata.get_connection(CONNECTION_NAME)
spark = conn.get_spark_session()

Setting spark.hadoop.yarn.resourcemanager.principal to pauldefusco


In [6]:
training_df = spark.createDataFrame(
[
    ("0", "a b c d e spark", 1.0),
    ("1", "b d", 0.0),
    ("2", "spark f g h", 1.0),
    ("3", "hadoop mapreduce", 0.0),
],
["id", "text", "label"],
)

In [15]:
def exp1(df):

    mlflow.set_experiment("sparkml-experiment")

    ##EXPERIMENT 1

    df.writeTo("spark_catalog.default.training").using("iceberg").createOrReplace()
    spark.sql("SELECT * FROM spark_catalog.default.training").show()

    ### SHOW TABLE HISTORY AND SNAPSHOTS
    spark.read.format("iceberg").load("spark_catalog.default.training.history").show(20, False)
    spark.read.format("iceberg").load("spark_catalog.default.training.snapshots").show(20, False)

    snapshot_id = spark.read.format("iceberg").load("spark_catalog.default.training.snapshots").select("snapshot_id").tail(1)[0][0]
    committed_at = spark.read.format("iceberg").load("spark_catalog.default.training.snapshots").select("committed_at").tail(1)[0][0].strftime('%m/%d/%Y')
    parent_id = spark.read.format("iceberg").load("spark_catalog.default.training.snapshots").select("parent_id").tail(1)[0][0]
    
    tags = {
      "iceberg_snapshot_id": snapshot_id,
      "iceberg_snapshot_committed_at": committed_at,
      "iceberg_parent_id": parent_id,
      "row_count": training_df.count()
    }
    
    ### MLFLOW EXPERIMENT RUN
    with mlflow.start_run() as run:

        maxIter=8
        regParam=0.01

        tokenizer = Tokenizer(inputCol="text", outputCol="words")
        hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
        lr = LogisticRegression(maxIter=maxIter, regParam=regParam)
        pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
        model = pipeline.fit(training_df)

        mlflow.log_param("maxIter", maxIter)
        mlflow.log_param("regParam", regParam)

        #prediction = model.transform(test)
        mlflow.set_tags(tags)

    mlflow.end_run()
    
    experiment_id = mlflow.get_experiment_by_name("sparkml-experiment").experiment_id
    runs_df = mlflow.search_runs(experiment_id, run_view_type=1)
    
    return runs_df

In [17]:
def exp2(df):
    
    mlflow.set_experiment("sparkml-experiment")
    
    ##EXPERIMENT 2

    ### ICEBERG INSERT DATA - APPEND FROM DATAFRAME

    # PRE-INSERT
    spark.sql("SELECT * FROM spark_catalog.default.training").show()

    temp_df = spark.sql("SELECT * FROM spark_catalog.default.training")
    temp_df.writeTo("spark_catalog.default.training").append()
    df = spark.sql("SELECT * FROM spark_catalog.default.training")

    # PROST-INSERT
    spark.sql("SELECT * FROM spark_catalog.default.training").show()

    spark.read.format("iceberg").load("spark_catalog.default.training.history").show(20, False)
    spark.read.format("iceberg").load("spark_catalog.default.training.snapshots").show(20, False)

    snapshot_id = spark.read.format("iceberg").load("spark_catalog.default.training.snapshots").select("snapshot_id").tail(1)[0][0]
    committed_at = spark.read.format("iceberg").load("spark_catalog.default.training.snapshots").select("committed_at").tail(1)[0][0].strftime('%m/%d/%Y')
    parent_id = spark.read.format("iceberg").load("spark_catalog.default.training.snapshots").select("parent_id").tail(1)[0][0]
    
    tags = {
      "iceberg_snapshot_id": snapshot_id,
      "iceberg_snapshot_committed_at": committed_at,
      "iceberg_parent_id": parent_id,
      "row_count": df.count()
    }
    
    ### MLFLOW EXPERIMENT RUN
    with mlflow.start_run() as run:

        maxIter=10
        regParam=0.002

        tokenizer = Tokenizer(inputCol="text", outputCol="words")
        hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
        lr = LogisticRegression(maxIter=maxIter, regParam=regParam)
        pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
        model = pipeline.fit(training_df)

        mlflow.log_param("maxIter", maxIter)
        mlflow.log_param("regParam", regParam)

        #prediction = model.transform(test)
        mlflow.set_tags(tags)

    mlflow.end_run()
    
    experiment_id = mlflow.get_experiment_by_name("sparkml-experiment").experiment_id
    runs_df = mlflow.search_runs(experiment_id, run_view_type=1)
    
    return runs_df

In [18]:
def exp3(df, snapshot_id):
    ##EXPERIMENT 3

    df = spark.read.option("snapshot-id", snapshot_id).table("spark_catalog.default.training")

    committed_at = spark.sql("SELECT committed_at FROM spark_catalog.default.training.snapshots WHERE snapshot_id = {};".format(snapshot_id)).collect()[0][0].strftime('%m/%d/%Y')
    parent_id = str(spark.sql("SELECT parent_id FROM spark_catalog.default.training.snapshots WHERE snapshot_id = {};".format(snapshot_id)).tail(1)[0][0])

    tags = {
      "iceberg_snapshot_id": snapshot_id,
      "iceberg_snapshot_committed_at": committed_at,
      "iceberg_parent_id": parent_id,
      "row_count": training_df.count()
    }

    ### MLFLOW EXPERIMENT RUN
    with mlflow.start_run() as run:

        maxIter=7
        regParam=0.005

        tokenizer = Tokenizer(inputCol="text", outputCol="words")
        hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
        lr = LogisticRegression(maxIter=maxIter, regParam=regParam)
        pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
        model = pipeline.fit(training_df)

        mlflow.log_param("maxIter", maxIter)
        mlflow.log_param("regParam", regParam)

        #prediction = model.transform(test)
        mlflow.set_tags(tags)

    mlflow.end_run()
    
    experiment_id = mlflow.get_experiment_by_name("sparkml-experiment").experiment_id
    runs_df = mlflow.search_runs(experiment_id, run_view_type=1)

    #spark.stop()
    
    return runs_df

In [16]:
exp1(training_df)

                                                                                

+---+----------------+-----+
| id|            text|label|
+---+----------------+-----+
|  0| a b c d e spark|  1.0|
|  1|             b d|  0.0|
|  2|     spark f g h|  1.0|
|  3|hadoop mapreduce|  0.0|
+---+----------------+-----+

+----------------------+-------------------+---------+-------------------+
|made_current_at       |snapshot_id        |parent_id|is_current_ancestor|
+----------------------+-------------------+---------+-------------------+
|2023-03-29 01:09:41.31|5150219139107687331|null     |true               |
+----------------------+-------------------+---------+-------------------+

+-----------------------+-------------------+---------+---------+-------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

                                                                                

Unnamed: 0,run_id,experiment_id,status,artifact_uri,start_time,end_time,params.maxIter,params.regParam,tags.mlflow.source.git.commit,tags.mlflow.source.name,tags.mlflow.user,tags.mlflow.source.type
0,kfnc-iesm-r79x-nqcp,p1y7-kv7u-fkya-ca0e,EXPERIMENT_RUN_RUNNING,/home/cdsw/.experiments/p1y7-kv7u-fkya-ca0e/kf...,2023-03-29 01:04:44.571115008+00:00,NaT,10,0.001,1d14407660f321086ef27b91b0e06ca91cd34b18,/usr/local/lib/python3.7/site-packages/ipykern...,pauldefusco,LOCAL
1,vaza-u8gd-dbkd-k4ow,p1y7-kv7u-fkya-ca0e,EXPERIMENT_RUN_FAILED,/home/cdsw/.experiments/p1y7-kv7u-fkya-ca0e/va...,2023-03-29 01:06:53.296070144+00:00,2023-03-29 01:06:56.807000064+00:00,8,0.01,1d14407660f321086ef27b91b0e06ca91cd34b18,/usr/local/lib/python3.7/site-packages/ipykern...,pauldefusco,LOCAL


In [19]:
exp2(training_df)

                                                                                

+---+----------------+-----+
| id|            text|label|
+---+----------------+-----+
|  0| a b c d e spark|  1.0|
|  1|             b d|  0.0|
|  2|     spark f g h|  1.0|
|  3|hadoop mapreduce|  0.0|
+---+----------------+-----+



                                                                                

+---+----------------+-----+
| id|            text|label|
+---+----------------+-----+
|  0| a b c d e spark|  1.0|
|  1|             b d|  0.0|
|  2|     spark f g h|  1.0|
|  3|hadoop mapreduce|  0.0|
|  0| a b c d e spark|  1.0|
|  1|             b d|  0.0|
|  2|     spark f g h|  1.0|
|  3|hadoop mapreduce|  0.0|
+---+----------------+-----+

+-----------------------+-------------------+-------------------+-------------------+
|made_current_at        |snapshot_id        |parent_id          |is_current_ancestor|
+-----------------------+-------------------+-------------------+-------------------+
|2023-03-29 01:09:41.31 |5150219139107687331|null               |true               |
|2023-03-29 01:11:42.278|8171776268264279750|5150219139107687331|true               |
+-----------------------+-------------------+-------------------+-------------------+

+-----------------------+-------------------+-------------------+---------+-----------------------------------------------------------

                                                                                

Unnamed: 0,run_id,experiment_id,status,artifact_uri,start_time,end_time,params.maxIter,params.regParam,tags.mlflow.source.git.commit,tags.mlflow.source.name,tags.mlflow.user,tags.mlflow.source.type,tags.row_count,tags.iceberg_parent_id,tags.iceberg_snapshot_id,tags.iceberg_snapshot_committed_at
0,kfnc-iesm-r79x-nqcp,p1y7-kv7u-fkya-ca0e,EXPERIMENT_RUN_RUNNING,/home/cdsw/.experiments/p1y7-kv7u-fkya-ca0e/kf...,2023-03-29 01:04:44.571115008+00:00,NaT,10,0.001,1d14407660f321086ef27b91b0e06ca91cd34b18,/usr/local/lib/python3.7/site-packages/ipykern...,pauldefusco,LOCAL,,,,
1,vaza-u8gd-dbkd-k4ow,p1y7-kv7u-fkya-ca0e,EXPERIMENT_RUN_FAILED,/home/cdsw/.experiments/p1y7-kv7u-fkya-ca0e/va...,2023-03-29 01:06:53.296070144+00:00,2023-03-29 01:06:56.807000064+00:00,8,0.01,1d14407660f321086ef27b91b0e06ca91cd34b18,/usr/local/lib/python3.7/site-packages/ipykern...,pauldefusco,LOCAL,,,,
2,55oy-jgoj-sxwp-tp7b,p1y7-kv7u-fkya-ca0e,EXPERIMENT_RUN_FINISHED,/home/cdsw/.experiments/p1y7-kv7u-fkya-ca0e/55...,2023-03-29 01:09:43.688317184+00:00,2023-03-29 01:09:46.864000+00:00,8,0.01,1d14407660f321086ef27b91b0e06ca91cd34b18,/usr/local/lib/python3.7/site-packages/ipykern...,pauldefusco,LOCAL,4.0,,5.150219139107687e+18,03/29/2023
3,bmxd-ywal-739v-3ikq,p1y7-kv7u-fkya-ca0e,EXPERIMENT_RUN_FINISHED,/home/cdsw/.experiments/p1y7-kv7u-fkya-ca0e/bm...,2023-03-29 01:11:43.908041984+00:00,2023-03-29 01:11:48.220000+00:00,10,0.002,1d14407660f321086ef27b91b0e06ca91cd34b18,/usr/local/lib/python3.7/site-packages/ipykern...,pauldefusco,LOCAL,8.0,5.150219139107687e+18,8.171776268264278e+18,03/29/2023


In [22]:
#Retrieve snapshot_id from Experiments page or above dataframe. Use the Snapshot ID from the first experiment.
snapshot_id = "5150219139107687331"
exp3(training_df, snapshot_id)

                                                                                

Unnamed: 0,run_id,experiment_id,status,artifact_uri,start_time,end_time,params.maxIter,params.regParam,tags.mlflow.source.git.commit,tags.mlflow.source.name,tags.mlflow.user,tags.mlflow.source.type,tags.row_count,tags.iceberg_parent_id,tags.iceberg_snapshot_id,tags.iceberg_snapshot_committed_at
0,kfnc-iesm-r79x-nqcp,p1y7-kv7u-fkya-ca0e,EXPERIMENT_RUN_RUNNING,/home/cdsw/.experiments/p1y7-kv7u-fkya-ca0e/kf...,2023-03-29 01:04:44.571115008+00:00,NaT,10,0.001,1d14407660f321086ef27b91b0e06ca91cd34b18,/usr/local/lib/python3.7/site-packages/ipykern...,pauldefusco,LOCAL,,,,
1,vaza-u8gd-dbkd-k4ow,p1y7-kv7u-fkya-ca0e,EXPERIMENT_RUN_FAILED,/home/cdsw/.experiments/p1y7-kv7u-fkya-ca0e/va...,2023-03-29 01:06:53.296070144+00:00,2023-03-29 01:06:56.807000064+00:00,8,0.01,1d14407660f321086ef27b91b0e06ca91cd34b18,/usr/local/lib/python3.7/site-packages/ipykern...,pauldefusco,LOCAL,,,,
2,55oy-jgoj-sxwp-tp7b,p1y7-kv7u-fkya-ca0e,EXPERIMENT_RUN_FINISHED,/home/cdsw/.experiments/p1y7-kv7u-fkya-ca0e/55...,2023-03-29 01:09:43.688317184+00:00,2023-03-29 01:09:46.864000+00:00,8,0.01,1d14407660f321086ef27b91b0e06ca91cd34b18,/usr/local/lib/python3.7/site-packages/ipykern...,pauldefusco,LOCAL,4.0,,5.150219139107687e+18,03/29/2023
3,bmxd-ywal-739v-3ikq,p1y7-kv7u-fkya-ca0e,EXPERIMENT_RUN_FINISHED,/home/cdsw/.experiments/p1y7-kv7u-fkya-ca0e/bm...,2023-03-29 01:11:43.908041984+00:00,2023-03-29 01:11:48.220000+00:00,10,0.002,1d14407660f321086ef27b91b0e06ca91cd34b18,/usr/local/lib/python3.7/site-packages/ipykern...,pauldefusco,LOCAL,8.0,5.150219139107687e+18,8.171776268264278e+18,03/29/2023
4,tndc-jcty-7qn9-jxsu,p1y7-kv7u-fkya-ca0e,EXPERIMENT_RUN_FINISHED,/home/cdsw/.experiments/p1y7-kv7u-fkya-ca0e/tn...,2023-03-29 01:14:32.279436032+00:00,2023-03-29 01:14:35.438000128+00:00,7,0.005,1d14407660f321086ef27b91b0e06ca91cd34b18,/usr/local/lib/python3.7/site-packages/ipykern...,pauldefusco,LOCAL,4.0,5.150219139107687e+18,8.171776268264278e+18,03/29/2023
5,4yk0-cwcg-9hcn-c7k9,p1y7-kv7u-fkya-ca0e,EXPERIMENT_RUN_FINISHED,/home/cdsw/.experiments/p1y7-kv7u-fkya-ca0e/4y...,2023-03-29 01:15:29.313084928+00:00,2023-03-29 01:15:32.180000+00:00,7,0.005,1d14407660f321086ef27b91b0e06ca91cd34b18,/usr/local/lib/python3.7/site-packages/ipykern...,pauldefusco,LOCAL,4.0,,5.150219139107687e+18,03/29/2023
