In [6]:
#****************************************************************************
# (C) Cloudera, Inc. 2020-2023
#  All rights reserved.
#
#  Applicable Open Source License: GNU Affero General Public License v3.0
#
#  NOTE: Cloudera open source products are modular software products
#  made up of hundreds of individual components, each of which was
#  individually copyrighted.  Each Cloudera open source product is a
#  collective work under U.S. Copyright Law. Your license to use the
#  collective work is as provided in your written agreement with
#  Cloudera.  Used apart from the collective work, this file is
#  licensed for your use pursuant to the open source license
#  identified above.
#
#  This code is provided to you pursuant a written agreement with
#  (i) Cloudera, Inc. or (ii) a third-party authorized to distribute
#  this code. If you do not have a written agreement with Cloudera nor
#  with an authorized and properly licensed third party, you do not
#  have any rights to access nor to use this code.
#
#  Absent a written agreement with Cloudera, Inc. (“Cloudera”) to the
#  contrary, A) CLOUDERA PROVIDES THIS CODE TO YOU WITHOUT WARRANTIES OF ANY
#  KIND; (B) CLOUDERA DISCLAIMS ANY AND ALL EXPRESS AND IMPLIED
#  WARRANTIES WITH RESPECT TO THIS CODE, INCLUDING BUT NOT LIMITED TO
#  IMPLIED WARRANTIES OF TITLE, NON-INFRINGEMENT, MERCHANTABILITY AND
#  FITNESS FOR A PARTICULAR PURPOSE; (C) CLOUDERA IS NOT LIABLE TO YOU,
#  AND WILL NOT DEFEND, INDEMNIFY, NOR HOLD YOU HARMLESS FOR ANY CLAIMS
#  ARISING FROM OR RELATED TO THE CODE; AND (D)WITH RESPECT TO YOUR EXERCISE
#  OF ANY RIGHTS GRANTED TO YOU FOR THE CODE, CLOUDERA IS NOT LIABLE FOR ANY
#  DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, PUNITIVE OR
#  CONSEQUENTIAL DAMAGES INCLUDING, BUT NOT LIMITED TO, DAMAGES
#  RELATED TO LOST REVENUE, LOST PROFITS, LOSS OF INCOME, LOSS OF
#  BUSINESS ADVANTAGE OR UNAVAILABILITY, OR LOSS OR CORRUPTION OF
#  DATA.
#
# #  Author(s): Paul de Fusco
#***************************************************************************/

In [7]:
import mlflow.spark

In [8]:
import os
import warnings
import sys
import mlflow
import pandas as pd
import numpy as np
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.model_selection import train_test_split
from sklearn.linear_model import ElasticNet

In [9]:
import logging
import json
import shutil
import datetime
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
import cml.data_v1 as cmldata

In [10]:
import cml.data_v1 as cmldata

# Sample in-code customization of spark configurations
#from pyspark import SparkContext
SparkContext.setSystemProperty('spark.executor.cores', '2')
SparkContext.setSystemProperty('spark.executor.memory', '2g')

CONNECTION_NAME = "go01-aw-dl"
conn = cmldata.get_connection(CONNECTION_NAME)
spark = conn.get_spark_session()

# Sample usage to run query through spark
EXAMPLE_SQL_QUERY = "show databases"
spark.sql(EXAMPLE_SQL_QUERY).show()

+--------------------+
|           namespace|
+--------------------+
|         01_car_data|
|           01_car_dw|
|              adb101|
|            airlines|
|        airlines_csv|
|    airlines_iceberg|
|airlines_iceberg_...|
|      airlines_mjain|
|          airquality|
|                ajvp|
|          atlas_demo|
|            bankdemo|
|          bca_jps_l0|
|     bri_ranger_demo|
|            cdc_data|
|cde_demo_pauldefusco|
|   cde_demo_pdefusco|
|cde_demo_pdefusco...|
|        cde_workshop|
|cde_workshop_paul...|
+--------------------+
only showing top 20 rows



In [11]:
import os
import numpy as np
import pandas as pd
from datetime import datetime
from pyspark.sql.types import LongType, IntegerType, StringType, FloatType
from pyspark.sql import functions as F
import dbldatagen as dg
import dbldatagen.distributions as dist
from dbldatagen import FakerTextFactory, DataGenerator, fakerText

class LabeledTextGen:

    '''Class to Generate Text Data'''

    def __init__(self, spark):
        self.spark = spark

    def dataGen(self, shuffle_partitions_requested = 8, partitions_requested = 8, data_rows = 10000):

        # setup use of Faker
        FakerTextUS = FakerTextFactory(locale=['en_US'])

        # partition parameters etc.
        self.spark.conf.set("spark.sql.shuffle.partitions", shuffle_partitions_requested)

        fakerDataspec = (DataGenerator(self.spark, rows=data_rows, partitions=partitions_requested)
                    .withColumnSpec("id", minValue=1, maxValue=data_rows, step=1)
                    .withColumn("text", text=FakerTextUS("address"))
                    .withColumn("label", "string", values=["0", "1"],random=True)
                    )
        df = fakerDataspec.build()
                
        df = df.withColumn("idStr", F.col("id").cast(StringType()))\
            .drop("id")\
            .withColumnRenamed("idStr", "id")
     
        df = df.withColumn("labelStr", F.col("label").cast(FloatType()))\
            .drop("label")\
            .withColumnRenamed("labelStr", "label")
        
        return df

In [12]:
dg = LabeledTextGen(spark)

training_df = dg.dataGen()

In [13]:
training_df.show()

23/12/08 00:16:35 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
[Stage 0:>                                                          (0 + 1) / 1]

+--------------------+---+-----+
|                text| id|label|
+--------------------+---+-----+
|USNV Braun\nFPO A...|  0|  1.0|
|030 Dustin Mills\...|  1|  1.0|
|5279 Conner Lake\...|  2|  1.0|
|7791 Hill Lodge S...|  3|  1.0|
|459 Angel Crest\n...|  4|  1.0|
|1950 Alvarado Loa...|  5|  0.0|
|88271 Donna Estat...|  6|  0.0|
|9500 Daniels Forg...|  7|  1.0|
|744 Lambert Manor...|  8|  1.0|
|PSC 8813, Box 737...|  9|  0.0|
|01940 Sally Glen ...| 10|  1.0|
|22563 Figueroa Ru...| 11|  1.0|
|8599 Lorraine Str...| 12|  1.0|
|79333 Amber Throu...| 13|  0.0|
|PSC 8733, Box 788...| 14|  0.0|
|PSC 9661, Box 911...| 15|  1.0|
|91372 Johnson Ram...| 16|  1.0|
|070 Munoz Isle\nP...| 17|  1.0|
|56386 Jeffrey Fla...| 18|  1.0|
|625 Odonnell Junc...| 19|  0.0|
+--------------------+---+-----+
only showing top 20 rows



                                                                                

In [14]:
def exp1(df):

    mlflow.set_experiment("sparkml-experiment")

    ##EXPERIMENT 1

    df.writeTo("spark_catalog.default.training").using("iceberg").createOrReplace()
    spark.sql("SELECT * FROM spark_catalog.default.training").show()

    ### SHOW TABLE HISTORY AND SNAPSHOTS
    spark.read.format("iceberg").load("spark_catalog.default.training.history").show(20, False)
    spark.read.format("iceberg").load("spark_catalog.default.training.snapshots").show(20, False)

    snapshot_id = spark.read.format("iceberg").load("spark_catalog.default.training.snapshots").select("snapshot_id").tail(1)[0][0]
    committed_at = spark.read.format("iceberg").load("spark_catalog.default.training.snapshots").select("committed_at").tail(1)[0][0].strftime('%m/%d/%Y')
    parent_id = spark.read.format("iceberg").load("spark_catalog.default.training.snapshots").select("parent_id").tail(1)[0][0]
    
    tags = {
      "iceberg_snapshot_id": snapshot_id,
      "iceberg_snapshot_committed_at": committed_at,
      "iceberg_parent_id": parent_id,
      "row_count": training_df.count()
    }
    
    ### MLFLOW EXPERIMENT RUN
    with mlflow.start_run() as run:

        maxIter=8
        regParam=0.01

        tokenizer = Tokenizer(inputCol="text", outputCol="words")
        hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
        lr = LogisticRegression(maxIter=maxIter, regParam=regParam)
        pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
        model = pipeline.fit(training_df)

        mlflow.log_param("maxIter", maxIter)
        mlflow.log_param("regParam", regParam)

        #prediction = model.transform(test)
        mlflow.set_tags(tags)
        mlflow.spark.log_model(model, artifact_path="artifacts")

    mlflow.end_run()
    
    experiment_id = mlflow.get_experiment_by_name("sparkml-experiment").experiment_id
    runs_df = mlflow.search_runs(experiment_id, run_view_type=1)
    
    return runs_df

In [15]:
def exp2(df):
    
    mlflow.set_experiment("sparkml-experiment")
    
    ##EXPERIMENT 2

    ### ICEBERG INSERT DATA - APPEND FROM DATAFRAME

    # PRE-INSERT
    spark.sql("SELECT * FROM spark_catalog.default.training").show()

    temp_df = spark.sql("SELECT * FROM spark_catalog.default.training")
    temp_df.writeTo("spark_catalog.default.training").append()
    df = spark.sql("SELECT * FROM spark_catalog.default.training")

    # PROST-INSERT
    spark.sql("SELECT * FROM spark_catalog.default.training").show()

    spark.read.format("iceberg").load("spark_catalog.default.training.history").show(20, False)
    spark.read.format("iceberg").load("spark_catalog.default.training.snapshots").show(20, False)

    snapshot_id = spark.read.format("iceberg").load("spark_catalog.default.training.snapshots").select("snapshot_id").tail(1)[0][0]
    committed_at = spark.read.format("iceberg").load("spark_catalog.default.training.snapshots").select("committed_at").tail(1)[0][0].strftime('%m/%d/%Y')
    parent_id = spark.read.format("iceberg").load("spark_catalog.default.training.snapshots").select("parent_id").tail(1)[0][0]
    
    tags = {
      "iceberg_snapshot_id": snapshot_id,
      "iceberg_snapshot_committed_at": committed_at,
      "iceberg_parent_id": parent_id,
      "row_count": df.count()
    }
    
    ### MLFLOW EXPERIMENT RUN
    with mlflow.start_run() as run:

        maxIter=10
        regParam=0.002

        tokenizer = Tokenizer(inputCol="text", outputCol="words")
        hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
        lr = LogisticRegression(maxIter=maxIter, regParam=regParam)
        pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
        model = pipeline.fit(training_df)

        mlflow.log_param("maxIter", maxIter)
        mlflow.log_param("regParam", regParam)

        #prediction = model.transform(test)
        mlflow.set_tags(tags)
        mlflow.spark.log_model(model, artifact_path="artifacts")

    mlflow.end_run()
    
    experiment_id = mlflow.get_experiment_by_name("sparkml-experiment").experiment_id
    runs_df = mlflow.search_runs(experiment_id, run_view_type=1)
    
    return runs_df

In [16]:
def exp3(df, snapshot_id):
    ##EXPERIMENT 3

    df = spark.read.option("snapshot-id", snapshot_id).table("spark_catalog.default.training")

    committed_at = spark.sql("SELECT committed_at FROM spark_catalog.default.training.snapshots WHERE snapshot_id = {};".format(snapshot_id)).collect()[0][0].strftime('%m/%d/%Y')
    parent_id = str(spark.sql("SELECT parent_id FROM spark_catalog.default.training.snapshots WHERE snapshot_id = {};".format(snapshot_id)).tail(1)[0][0])

    tags = {
      "iceberg_snapshot_id": snapshot_id,
      "iceberg_snapshot_committed_at": committed_at,
      "iceberg_parent_id": parent_id,
      "row_count": training_df.count()
    }

    ### MLFLOW EXPERIMENT RUN
    with mlflow.start_run() as run:

        maxIter=7
        regParam=0.005

        tokenizer = Tokenizer(inputCol="text", outputCol="words")
        hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
        lr = LogisticRegression(maxIter=maxIter, regParam=regParam)
        pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
        model = pipeline.fit(training_df)

        mlflow.log_param("maxIter", maxIter)
        mlflow.log_param("regParam", regParam)

        #prediction = model.transform(test)
        mlflow.set_tags(tags)
        mlflow.spark.log_model(model, artifact_path="artifacts")#registered_model_name="spark-iceberg-"+username
        
    mlflow.end_run()
    
    experiment_id = mlflow.get_experiment_by_name("sparkml-experiment").experiment_id
    runs_df = mlflow.search_runs(experiment_id, run_view_type=1)

    #spark.stop()
    
    return runs_df

In [17]:
exp1(training_df)

2023/12/08 00:17:05 INFO mlflow.tracking.fluent: Experiment with name 'sparkml-experiment' does not exist. Creating a new experiment.
23/12/08 00:17:05 WARN HiveMetaStoreClient: Failed to connect to the MetaStore Server...
                                                                                

+--------------------+---+-----+
|                text| id|label|
+--------------------+---+-----+
|8737 Mcdonald Cap...|  0|  1.0|
|Unit 2512 Box 897...|  1|  1.0|
|05321 Brittney Mi...|  2|  1.0|
|665 Moon Divide\n...|  3|  1.0|
|3152 Martin Rue S...|  4|  1.0|
|3043 Brooks Forge...|  5|  0.0|
|023 William Villa...|  6|  0.0|
|6312 Brooks Glen\...|  7|  1.0|
|560 Kathy Shoals\...|  8|  1.0|
|0746 Michael Isla...|  9|  0.0|
|204 Kenneth Locks...| 10|  1.0|
|7977 Edwards Unde...| 11|  1.0|
|PSC 1308, Box 000...| 12|  1.0|
|07864 Raymond Cur...| 13|  0.0|
|03439 Richards Pr...| 14|  0.0|
|67753 Chang Lodge...| 15|  1.0|
|70193 Alexis Squa...| 16|  1.0|
|12979 Savage Plai...| 17|  1.0|
|8207 Kimberly Str...| 18|  1.0|
|340 Ryan Courts S...| 19|  0.0|
+--------------------+---+-----+
only showing top 20 rows

+-----------------------+-------------------+---------+-------------------+
|made_current_at        |snapshot_id        |parent_id|is_current_ancestor|
+-----------------------+-----

23/12/08 00:17:27 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/12/08 00:17:27 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
23/12/08 00:17:27 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
23/12/08 00:17:27 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS


Unnamed: 0,run_id,experiment_id,status,artifact_uri,start_time,end_time,params.maxIter,params.regParam,tags.mlflow.source.type,tags.mlflow.source.git.commit,tags.engineID,tags.row_count,tags.iceberg_snapshot_id,tags.iceberg_parent_id,tags.iceberg_snapshot_committed_at,tags.mlflow.log-model.history,tags.mlflow.source.name,tags.mlflow.user
0,z5e3-co8l-apzo-yoy0,aouu-xae6-xcqh-rjd8,EXPERIMENT_RUN_FINISHED,/home/cdsw/.experiments/aouu-xae6-xcqh-rjd8/z5...,2023-12-08 00:17:18.712412160+00:00,2023-12-08 00:17:44.403000064+00:00,8,0.01,LOCAL,710c7231a095ef42b4102f2e7c7f0b57abdd1fa9,nqdqb2o23morowno,10000,5135867504820325584,,12/08/2023,"[{""run_id"": ""z5e3-co8l-apzo-yoy0"", ""artifact_p...",/usr/local/lib/python3.9/site-packages/ipykern...,pauldefusco


In [18]:
exp2(training_df)

                                                                                

+--------------------+---+-----+
|                text| id|label|
+--------------------+---+-----+
|8737 Mcdonald Cap...|  0|  1.0|
|Unit 2512 Box 897...|  1|  1.0|
|05321 Brittney Mi...|  2|  1.0|
|665 Moon Divide\n...|  3|  1.0|
|3152 Martin Rue S...|  4|  1.0|
|3043 Brooks Forge...|  5|  0.0|
|023 William Villa...|  6|  0.0|
|6312 Brooks Glen\...|  7|  1.0|
|560 Kathy Shoals\...|  8|  1.0|
|0746 Michael Isla...|  9|  0.0|
|204 Kenneth Locks...| 10|  1.0|
|7977 Edwards Unde...| 11|  1.0|
|PSC 1308, Box 000...| 12|  1.0|
|07864 Raymond Cur...| 13|  0.0|
|03439 Richards Pr...| 14|  0.0|
|67753 Chang Lodge...| 15|  1.0|
|70193 Alexis Squa...| 16|  1.0|
|12979 Savage Plai...| 17|  1.0|
|8207 Kimberly Str...| 18|  1.0|
|340 Ryan Courts S...| 19|  0.0|
+--------------------+---+-----+
only showing top 20 rows



                                                                                

+--------------------+---+-----+
|                text| id|label|
+--------------------+---+-----+
|8737 Mcdonald Cap...|  0|  1.0|
|Unit 2512 Box 897...|  1|  1.0|
|05321 Brittney Mi...|  2|  1.0|
|665 Moon Divide\n...|  3|  1.0|
|3152 Martin Rue S...|  4|  1.0|
|3043 Brooks Forge...|  5|  0.0|
|023 William Villa...|  6|  0.0|
|6312 Brooks Glen\...|  7|  1.0|
|560 Kathy Shoals\...|  8|  1.0|
|0746 Michael Isla...|  9|  0.0|
|204 Kenneth Locks...| 10|  1.0|
|7977 Edwards Unde...| 11|  1.0|
|PSC 1308, Box 000...| 12|  1.0|
|07864 Raymond Cur...| 13|  0.0|
|03439 Richards Pr...| 14|  0.0|
|67753 Chang Lodge...| 15|  1.0|
|70193 Alexis Squa...| 16|  1.0|
|12979 Savage Plai...| 17|  1.0|
|8207 Kimberly Str...| 18|  1.0|
|340 Ryan Courts S...| 19|  0.0|
+--------------------+---+-----+
only showing top 20 rows

+-----------------------+-------------------+-------------------+-------------------+
|made_current_at        |snapshot_id        |parent_id          |is_current_ancestor|
+---------

                                                                                

+-----------------------+-------------------+-------------------+---------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|committed_at           |snapshot_id        |parent_id          |operation|manifest_list                                                                                                                                                         |summary                                                                                                                                                                                                   



Unnamed: 0,run_id,experiment_id,status,artifact_uri,start_time,end_time,params.maxIter,params.regParam,tags.mlflow.source.type,tags.mlflow.source.git.commit,tags.engineID,tags.row_count,tags.iceberg_snapshot_id,tags.iceberg_parent_id,tags.iceberg_snapshot_committed_at,tags.mlflow.log-model.history,tags.mlflow.source.name,tags.mlflow.user
0,z5e3-co8l-apzo-yoy0,aouu-xae6-xcqh-rjd8,EXPERIMENT_RUN_FINISHED,/home/cdsw/.experiments/aouu-xae6-xcqh-rjd8/z5...,2023-12-08 00:17:18.712412160+00:00,2023-12-08 00:17:44.403000064+00:00,8,0.01,LOCAL,710c7231a095ef42b4102f2e7c7f0b57abdd1fa9,nqdqb2o23morowno,10000,5135867504820325584,,12/08/2023,"[{""run_id"": ""z5e3-co8l-apzo-yoy0"", ""artifact_p...",/usr/local/lib/python3.9/site-packages/ipykern...,pauldefusco
1,mqoe-hb9g-gkhm-zqva,aouu-xae6-xcqh-rjd8,EXPERIMENT_RUN_FINISHED,/home/cdsw/.experiments/aouu-xae6-xcqh-rjd8/mq...,2023-12-08 00:18:01.342988032+00:00,2023-12-08 00:18:27.620999936+00:00,10,0.002,LOCAL,710c7231a095ef42b4102f2e7c7f0b57abdd1fa9,nqdqb2o23morowno,20000,2509129823758036615,5.135867504820325e+18,12/08/2023,"[{""run_id"": ""mqoe-hb9g-gkhm-zqva"", ""artifact_p...",/usr/local/lib/python3.9/site-packages/ipykern...,pauldefusco


In [19]:
#Retrieve snapshot_id from Experiments page or above dataframe. Use the Snapshot ID from the first experiment.
snapshot_id = "5135867504820325584"
exp3(training_df, snapshot_id)

23/12/08 00:24:52 WARN HiveMetaStoreClient: Failed to connect to the MetaStore Server...


Unnamed: 0,run_id,experiment_id,status,artifact_uri,start_time,end_time,params.maxIter,params.regParam,tags.mlflow.source.type,tags.mlflow.source.git.commit,tags.engineID,tags.row_count,tags.iceberg_snapshot_id,tags.iceberg_parent_id,tags.iceberg_snapshot_committed_at,tags.mlflow.log-model.history,tags.mlflow.source.name,tags.mlflow.user
0,z5e3-co8l-apzo-yoy0,aouu-xae6-xcqh-rjd8,EXPERIMENT_RUN_FINISHED,/home/cdsw/.experiments/aouu-xae6-xcqh-rjd8/z5...,2023-12-08 00:17:18.712412160+00:00,2023-12-08 00:17:44.403000064+00:00,8,0.01,LOCAL,710c7231a095ef42b4102f2e7c7f0b57abdd1fa9,nqdqb2o23morowno,10000,5135867504820325584,,12/08/2023,"[{""run_id"": ""z5e3-co8l-apzo-yoy0"", ""artifact_p...",/usr/local/lib/python3.9/site-packages/ipykern...,pauldefusco
1,mqoe-hb9g-gkhm-zqva,aouu-xae6-xcqh-rjd8,EXPERIMENT_RUN_FINISHED,/home/cdsw/.experiments/aouu-xae6-xcqh-rjd8/mq...,2023-12-08 00:18:01.342988032+00:00,2023-12-08 00:18:27.620999936+00:00,10,0.002,LOCAL,710c7231a095ef42b4102f2e7c7f0b57abdd1fa9,nqdqb2o23morowno,20000,2509129823758036615,5.135867504820325e+18,12/08/2023,"[{""run_id"": ""mqoe-hb9g-gkhm-zqva"", ""artifact_p...",/usr/local/lib/python3.9/site-packages/ipykern...,pauldefusco
2,lr2d-xibs-etiw-0tg9,aouu-xae6-xcqh-rjd8,EXPERIMENT_RUN_FINISHED,/home/cdsw/.experiments/aouu-xae6-xcqh-rjd8/lr...,2023-12-08 00:25:08.208004864+00:00,2023-12-08 00:25:35.976999936+00:00,7,0.005,LOCAL,710c7231a095ef42b4102f2e7c7f0b57abdd1fa9,nqdqb2o23morowno,10000,5135867504820325584,,12/08/2023,"[{""run_id"": ""lr2d-xibs-etiw-0tg9"", ""artifact_p...",/usr/local/lib/python3.9/site-packages/ipykern...,pauldefusco
