### Launch Spark Session via CML Data

In [15]:
import cml.data_v1 as cmldata

# Sample in-code customization of spark configurations
#from pyspark import SparkContext
#SparkContext.setSystemProperty('spark.executor.cores', '1')
#SparkContext.setSystemProperty('spark.executor.memory', '2g')

CONNECTION_NAME = "go01-aw-dl"
conn = cmldata.get_connection(CONNECTION_NAME)
spark = conn.get_spark_session()

# Sample usage to run query through spark
EXAMPLE_SQL_QUERY = "show databases"
spark.sql(EXAMPLE_SQL_QUERY).show()


+--------------------+
|           namespace|
+--------------------+
|         01_car_data|
|           01_car_dw|
|              adb101|
|            airlines|
|        airlines_csv|
|    airlines_iceberg|
|airlines_iceberg_...|
|      airlines_mjain|
|          airquality|
|                ajvp|
|          atlas_demo|
|            bankdemo|
|          bca_jps_l0|
|     bri_ranger_demo|
|            cdc_data|
|cde_demo_pauldefusco|
|   cde_demo_pdefusco|
|cde_demo_pdefusco...|
|        cde_workshop|
|cde_workshop_paul...|
+--------------------+
only showing top 20 rows



In [6]:
import os
import numpy as np
import pandas as pd
from datetime import datetime
from pyspark.sql.types import LongType, IntegerType, StringType, FloatType
from pyspark.sql import functions as F
import dbldatagen as dg
import dbldatagen.distributions as dist
from dbldatagen import FakerTextFactory, DataGenerator, fakerText

class LabeledTextGen:

    '''Class to Generate Text Data'''

    def __init__(self, spark):
        self.spark = spark

    def dataGen(self, shuffle_partitions_requested = 8, partitions_requested = 8, data_rows = 10000):

        # setup use of Faker
        FakerTextUS = FakerTextFactory(locale=['en_US'])

        # partition parameters etc.
        self.spark.conf.set("spark.sql.shuffle.partitions", shuffle_partitions_requested)

        fakerDataspec = (DataGenerator(self.spark, rows=data_rows, partitions=partitions_requested)
                    .withColumnSpec("id", minValue=1, maxValue=data_rows, step=1)
                    .withColumn("text", text=FakerTextUS("address"))
                    .withColumn("label", "string", values=["0", "1"],random=True)
                    )
        df = fakerDataspec.build()
                
        df = df.withColumn("idStr", F.col("id").cast(StringType()))\
            .drop("id")\
            .withColumnRenamed("idStr", "id")
     
        df = df.withColumn("labelStr", F.col("label").cast(FloatType()))\
            .drop("label")\
            .withColumnRenamed("labelStr", "label")
        
        return df

In [7]:
dg = LabeledTextGen(spark)

training_df = dg.dataGen()

### Create and Run Experiment

In [10]:
import logging
import json
import shutil
import datetime
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
import cml.data_v1 as cmldata

In [13]:
def experimentRun(df):

    mlflow.set_experiment("inference")
    
    ### MLFLOW EXPERIMENT RUN
    with mlflow.start_run() as run:

        maxIter=8
        regParam=0.01

        tokenizer = Tokenizer(inputCol="text", outputCol="words")
        hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
        lr = LogisticRegression(maxIter=maxIter, regParam=regParam)
        pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
        model = pipeline.fit(df)

        mlflow.log_param("maxIter", maxIter)
        mlflow.log_param("regParam", regParam)

        #prediction = model.transform(test)
        mlflow.spark.log_model(model, artifact_path="artifacts")

    mlflow.end_run()
    
    experiment_id = mlflow.get_experiment_by_name("inference").experiment_id
    runs_df = mlflow.search_runs(experiment_id, run_view_type=1)
    
    return runs_df

In [14]:
experimentRun(training_df)

23/12/09 00:16:06 WARN SparkConf: The configuration key 'spark.yarn.access.hadoopFileSystems' has been deprecated as of Spark 3.0 and may be removed in the future. Please use the new key 'spark.kerberos.access.hadoopFileSystems' instead.
23/12/09 00:16:07 WARN SparkConf: The configuration key 'spark.yarn.access.hadoopFileSystems' has been deprecated as of Spark 3.0 and may be removed in the future. Please use the new key 'spark.kerberos.access.hadoopFileSystems' instead.
23/12/09 00:16:08 WARN SparkConf: The configuration key 'spark.yarn.access.hadoopFileSystems' has been deprecated as of Spark 3.0 and may be removed in the future. Please use the new key 'spark.kerberos.access.hadoopFileSystems' instead.
23/12/09 00:16:08 WARN SparkConf: The configuration key 'spark.yarn.access.hadoopFileSystems' has been deprecated as of Spark 3.0 and may be removed in the future. Please use the new key 'spark.kerberos.access.hadoopFileSystems' instead.


Unnamed: 0,run_id,experiment_id,status,artifact_uri,start_time,end_time,params.regParam,params.maxIter,tags.mlflow.user,tags.mlflow.source.git.commit,tags.mlflow.source.type,tags.engineID,tags.mlflow.source.name,tags.mlflow.log-model.history
0,jnya-231a-5rj0-pc2p,svxe-63lj-gllb-n2yp,EXPERIMENT_RUN_FAILED,/home/cdsw/.experiments/svxe-63lj-gllb-n2yp/jn...,2023-12-09 00:14:21.919896064+00:00,2023-12-09 00:14:21.924000+00:00,,,pauldefusco,710c7231a095ef42b4102f2e7c7f0b57abdd1fa9,LOCAL,j0t6i3wyfqccdmo3,/usr/local/lib/python3.9/site-packages/ipykern...,
1,poqe-9vto-wsxr-u1dn,svxe-63lj-gllb-n2yp,EXPERIMENT_RUN_FAILED,/home/cdsw/.experiments/svxe-63lj-gllb-n2yp/po...,2023-12-09 00:14:53.322520064+00:00,2023-12-09 00:15:53.132000+00:00,0.01,8.0,pauldefusco,710c7231a095ef42b4102f2e7c7f0b57abdd1fa9,LOCAL,j0t6i3wyfqccdmo3,/usr/local/lib/python3.9/site-packages/ipykern...,
2,ovr3-iznv-lgjy-4wi8,svxe-63lj-gllb-n2yp,EXPERIMENT_RUN_FINISHED,/home/cdsw/.experiments/svxe-63lj-gllb-n2yp/ov...,2023-12-09 00:16:02.336599040+00:00,2023-12-09 00:16:24.988999936+00:00,0.01,8.0,pauldefusco,710c7231a095ef42b4102f2e7c7f0b57abdd1fa9,LOCAL,j0t6i3wyfqccdmo3,/usr/local/lib/python3.9/site-packages/ipykern...,"[{""run_id"": ""ovr3-iznv-lgjy-4wi8"", ""artifact_p..."


### Create Inference Data

In [16]:
class LabeledTextInferenceGen:

    '''
    Class to Generate Text Data
    Same data as above but without lable column
    '''
    
    def __init__(self, spark):
        self.spark = spark

    def dataGen(self, shuffle_partitions_requested = 8, partitions_requested = 8, data_rows = 10000):

        # setup use of Faker
        FakerTextUS = FakerTextFactory(locale=['en_US'])

        # partition parameters etc.
        self.spark.conf.set("spark.sql.shuffle.partitions", shuffle_partitions_requested)

        fakerDataspec = (DataGenerator(self.spark, rows=data_rows, partitions=partitions_requested)
                    .withColumnSpec("id", minValue=1, maxValue=data_rows, step=1)
                    .withColumn("text", text=FakerTextUS("address"))
                    )
        df = fakerDataspec.build()
                
        df = df.withColumn("idStr", F.col("id").cast(StringType()))\
            .drop("id")\
            .withColumnRenamed("idStr", "id")
        
        return df

In [17]:
dg = LabeledTextInferenceGen(spark)

inference_df = dg.dataGen()

In [18]:
inference_df.show()

[Stage 47:>                                                         (0 + 1) / 1]

+--------------------+---+
|                text| id|
+--------------------+---+
|580 Watson Avenue...|  0|
|061 Keith Prairie...|  1|
|223 John Camp Sui...|  2|
|290 Chad Cape Sui...|  3|
|417 Pacheco Sprin...|  4|
|2732 Briggs Mount...|  5|
|Unit 3618 Box 670...|  6|
|4390 Pierce Passa...|  7|
|154 Benjamin Squa...|  8|
|467 Nicholas Forg...|  9|
|249 Davis Loaf Su...| 10|
|90120 Parker Key\...| 11|
|18194 Davis Freew...| 12|
|00431 Lisa Cliffs...| 13|
|152 Alexis Path\n...| 14|
|37300 Christina T...| 15|
|88829 Flores Pass...| 16|
|934 Gonzalez Mano...| 17|
|674 Miller Place ...| 18|
|6504 Angela Park\...| 19|
+--------------------+---+
only showing top 20 rows



                                                                                

In [20]:
column_names = inference_df.columns
print(column_names)

['text', 'id']


In [22]:
import mlflow

logged_model = '/home/cdsw/.experiments/svxe-63lj-gllb-n2yp/ovr3-iznv-lgjy-4wi8/artifacts/artifacts'

# Load model as a Spark UDF.
loaded_model = mlflow.pyfunc.spark_udf(spark, model_uri=logged_model)

# Predict on a Spark DataFrame.
inference_df.withColumn('predictions', loaded_model(*column_names)).collect()

 - mlflow (current: 2.4.1, required: mlflow==2.4)
 - pandas (current: 2.1.3, required: pandas<2)
To fix the mismatches, call `mlflow.pyfunc.get_model_dependencies(model_uri)` to fetch the model's environment and install dependencies using the resulting environment file.
2023/12/09 00:21:24 INFO mlflow.models.flavor_backend_registry: Selected backend for flavor 'python_function'
23/12/09 00:21:42 WARN TaskSetManager: Lost task 1.0 in stage 48.0 (TID 208) (100.100.37.123 executor 5): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/cmladdons/python/site-packages/mlflow/pyfunc/__init__.py", line 1273, in udf
    loaded_model, _ = SparkModelCache.get_or_load(archive_path)
  File "/opt/cmladdons/python/site-packages/mlflow/pyfunc/spark_model_cache.py", line 47, in get_or_load
    SparkModelCache._models[archive_path] = (load_model(local_model_dir), local_model_dir)
  File "/opt/cmladdons/python/site-packages/mlflow/pyfunc/__init__.py", line 597, i

PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/opt/cmladdons/python/site-packages/mlflow/pyfunc/__init__.py", line 1273, in udf
    loaded_model, _ = SparkModelCache.get_or_load(archive_path)
  File "/opt/cmladdons/python/site-packages/mlflow/pyfunc/spark_model_cache.py", line 47, in get_or_load
    SparkModelCache._models[archive_path] = (load_model(local_model_dir), local_model_dir)
  File "/opt/cmladdons/python/site-packages/mlflow/pyfunc/__init__.py", line 597, in load_model
    model_impl = importlib.import_module(conf[MAIN])._load_pyfunc(data_path)
  File "/opt/cmladdons/python/site-packages/mlflow/spark.py", line 842, in _load_pyfunc
    return _PyFuncModelWrapper(spark, _load_model(model_uri=path))
  File "/opt/cmladdons/python/site-packages/mlflow/spark.py", line 748, in _load_model
    return PipelineModel.load(model_uri)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/ml/util.py", line 332, in load
    return cls.read().load(path)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/ml/pipeline.py", line 256, in load
    metadata = DefaultParamsReader.loadMetadata(path, self.sc)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/ml/util.py", line 525, in loadMetadata
    metadataStr = sc.textFile(metadataPath, 1).first()
  File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1591, in first
    raise ValueError("RDD is empty")
ValueError: RDD is empty


23/12/09 00:22:30 WARN TaskSetManager: Lost task 5.2 in stage 48.0 (TID 229) (100.100.232.107 executor 2): TaskKilled (Stage cancelled)
23/12/09 00:22:31 WARN TaskSetManager: Lost task 3.2 in stage 48.0 (TID 230) (100.100.232.110 executor 8): TaskKilled (Stage cancelled)
23/12/09 00:22:31 WARN TaskSetManager: Lost task 4.3 in stage 48.0 (TID 236) (100.100.232.109 executor 4): TaskKilled (Stage cancelled)
23/12/09 00:22:32 WARN TaskSetManager: Lost task 0.3 in stage 48.0 (TID 233) (100.100.117.194 executor 6): TaskKilled (Stage cancelled)
23/12/09 00:22:32 WARN TaskSetManager: Lost task 6.3 in stage 48.0 (TID 234) (100.100.117.206 executor 3): TaskKilled (Stage cancelled)
23/12/09 00:22:32 WARN TaskSetManager: Lost task 1.3 in stage 48.0 (TID 232) (100.100.37.123 executor 5): TaskKilled (Stage cancelled)
23/12/09 00:22:33 WARN TaskSetManager: Lost task 2.3 in stage 48.0 (TID 235) (100.100.232.105 executor 1): TaskKilled (Stage cancelled)


### Using Pandas Option

In [25]:
iDfPandas = inference_df.toPandas()

                                                                                

In [26]:
import mlflow

logged_model = '/home/cdsw/.experiments/svxe-63lj-gllb-n2yp/ovr3-iznv-lgjy-4wi8/artifacts/artifacts'

# Load model as a PyFuncModel.
loaded_model = mlflow.pyfunc.load_model(logged_model)

# Predict on a Pandas DataFrame.
import pandas as pd
loaded_model.predict(iDfPandas)

 - mlflow (current: 2.4.1, required: mlflow==2.4)
 - pandas (current: 2.1.3, required: pandas<2)
To fix the mismatches, call `mlflow.pyfunc.get_model_dependencies(model_uri)` to fetch the model's environment and install dependencies using the resulting environment file.
2023/12/09 00:23:27 INFO mlflow.spark: File '/home/cdsw/.experiments/svxe-63lj-gllb-n2yp/ovr3-iznv-lgjy-4wi8/artifacts/artifacts/sparkml' is already on DFS, copy is not necessary.


ValueError: RDD is empty

### Loading Directly from /home/cdsw/.experiments with PySpark

In [27]:
mPath = '/home/cdsw/.experiments/svxe-63lj-gllb-n2yp/ovr3-iznv-lgjy-4wi8/artifacts/artifacts'

In [28]:
# read pickled model via pipeline api
from pyspark.ml.pipeline import PipelineModel
persistedModel = PipelineModel.load(mPath)

# predict
predictionsDF = persistedModel.transform(inference_df)

Py4JJavaError: An error occurred while calling o802.partitions.
: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/home/cdsw/.experiments/svxe-63lj-gllb-n2yp/ovr3-iznv-lgjy-4wi8/artifacts/artifacts/metadata
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:300)
	at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:240)
	at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:328)
	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:205)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
	at org.apache.spark.api.java.JavaRDDLike.partitions(JavaRDDLike.scala:61)
	at org.apache.spark.api.java.JavaRDDLike.partitions$(JavaRDDLike.scala:61)
	at org.apache.spark.api.java.AbstractJavaRDDLike.partitions(JavaRDDLike.scala:45)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.io.IOException: Input path does not exist: file:/home/cdsw/.experiments/svxe-63lj-gllb-n2yp/ovr3-iznv-lgjy-4wi8/artifacts/artifacts/metadata
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:274)
	... 25 more


Trying different folders 

In [29]:
mPath = '/home/cdsw/.experiments/svxe-63lj-gllb-n2yp/ovr3-iznv-lgjy-4wi8/artifacts/artifacts/sparkml/'

persistedModel = PipelineModel.load(mPath)

# predict
predictionsDF = persistedModel.transform(inference_df)

ValueError: RDD is empty