In [1]:
#****************************************************************************
# (C) Cloudera, Inc. 2020-2023
#  All rights reserved.
#
#  Applicable Open Source License: GNU Affero General Public License v3.0
#
#  NOTE: Cloudera open source products are modular software products
#  made up of hundreds of individual components, each of which was
#  individually copyrighted.  Each Cloudera open source product is a
#  collective work under U.S. Copyright Law. Your license to use the
#  collective work is as provided in your written agreement with
#  Cloudera.  Used apart from the collective work, this file is
#  licensed for your use pursuant to the open source license
#  identified above.
#
#  This code is provided to you pursuant a written agreement with
#  (i) Cloudera, Inc. or (ii) a third-party authorized to distribute
#  this code. If you do not have a written agreement with Cloudera nor
#  with an authorized and properly licensed third party, you do not
#  have any rights to access nor to use this code.
#
#  Absent a written agreement with Cloudera, Inc. (“Cloudera”) to the
#  contrary, A) CLOUDERA PROVIDES THIS CODE TO YOU WITHOUT WARRANTIES OF ANY
#  KIND; (B) CLOUDERA DISCLAIMS ANY AND ALL EXPRESS AND IMPLIED
#  WARRANTIES WITH RESPECT TO THIS CODE, INCLUDING BUT NOT LIMITED TO
#  IMPLIED WARRANTIES OF TITLE, NON-INFRINGEMENT, MERCHANTABILITY AND
#  FITNESS FOR A PARTICULAR PURPOSE; (C) CLOUDERA IS NOT LIABLE TO YOU,
#  AND WILL NOT DEFEND, INDEMNIFY, NOR HOLD YOU HARMLESS FOR ANY CLAIMS
#  ARISING FROM OR RELATED TO THE CODE; AND (D)WITH RESPECT TO YOUR EXERCISE
#  OF ANY RIGHTS GRANTED TO YOU FOR THE CODE, CLOUDERA IS NOT LIABLE FOR ANY
#  DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, PUNITIVE OR
#  CONSEQUENTIAL DAMAGES INCLUDING, BUT NOT LIMITED TO, DAMAGES
#  RELATED TO LOST REVENUE, LOST PROFITS, LOSS OF INCOME, LOSS OF
#  BUSINESS ADVANTAGE OR UNAVAILABILITY, OR LOSS OR CORRUPTION OF
#  DATA.
#
# #  Author(s): Paul de Fusco
#***************************************************************************/

In [2]:
import mlflow.spark

In [3]:
import os, warnings, sys
import mlflow
import pandas as pd
import numpy as np

In [4]:
import logging, json, shutil, datetime
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler, StandardScaler, Imputer, StringIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.linalg import DenseVector
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [5]:
import cml.data_v1 as cmldata

# Sample in-code customization of spark configurations
from pyspark import SparkContext
SparkContext.setSystemProperty('spark.executor.cores', '2')
SparkContext.setSystemProperty('spark.executor.memory', '2g')

CONNECTION_NAME = "go01-aw-dl"
conn = cmldata.get_connection(CONNECTION_NAME)
spark = conn.get_spark_session()

Setting spark.hadoop.yarn.resourcemanager.principal to pauldefusco
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/07 01:28:43 WARN SparkConf: The configuration key 'spark.yarn.access.hadoopFileSystems' has been deprecated as of Spark 3.0 and may be removed in the future. Please use the new key 'spark.kerberos.access.hadoopFileSystems' instead.
23/12/07 01:28:45 WARN JettyUtils: GET /jobs/ failed: java.util.NoSuchElementException: Failed to get the application information. If you are starting up Spark, please wait a while until it's ready.
java.util.NoSuchElementException: Failed to get the application information. If you are starting up Spark, please wait a while until it's ready.
	at org.apache.spark.status.AppStatusStore.applicationInfo(AppStatusStore.scala:51)
	at org.apache.spark.ui.jobs.AllJobsPage.render(AllJobsPa

In [6]:
import os
print("https://spark-"+os.environ["CDSW_ENGINE_ID"]+"."+os.environ["CDSW_DOMAIN"])

https://spark-trfv30j0nr2dlyrh.ml-b74f8940-b97.go01-dem.ylcu-atmi.cloudera.site


In [7]:
username = os.environ["PROJECT_OWNER"]
dbname = "MLOPS"

In [8]:
bankingDf = spark.sql("SELECT * FROM {0}.BANKING_TRANSACTIONS_{1}".format(dbname, username))

23/12/07 01:28:51 WARN HiveClientImpl: Detected HiveConf hive.execution.engine is 'tez' and will be reset to 'mr' to disable useless hive logic
Hive Session ID = cabec0ab-c5d6-4599-95f9-ba6b332857dd
23/12/07 01:28:53 WARN HiveMetaStoreClient: Failed to connect to the MetaStore Server...


In [9]:
bankingDf.printSchema()

root
 |-- name: string (nullable = true)
 |-- address: string (nullable = true)
 |-- email: string (nullable = true)
 |-- age: decimal(10,0) (nullable = true)
 |-- credit_card_balance: decimal(10,0) (nullable = true)
 |-- bank_account_balance: decimal(10,0) (nullable = true)
 |-- mortgage_balance: decimal(10,0) (nullable = true)
 |-- sec_bank_account_balance: decimal(10,0) (nullable = true)
 |-- savings_account_balance: decimal(10,0) (nullable = true)
 |-- sec_savings_account_balance: decimal(10,0) (nullable = true)
 |-- total_est_nworth: decimal(10,0) (nullable = true)
 |-- primary_loan_balance: decimal(10,0) (nullable = true)
 |-- secondary_loan_balance: decimal(10,0) (nullable = true)
 |-- college_loan_balance: decimal(10,0) (nullable = true)
 |-- aba_routing: string (nullable = true)
 |-- bank_country: string (nullable = true)
 |-- account_no: string (nullable = true)
 |-- int_account_no: string (nullable = true)
 |-- swift11: string (nullable = true)
 |-- credit_card_number: strin

In [10]:
features = ["age", 
            "credit_card_balance", 
            "bank_account_balance", 
            "mortgage_balance", 
            "primary_loan_balance",
            "sec_bank_account_balance", 
            "savings_account_balance", 
            "sec_savings_account_balance",
            "secondary_loan_balance",
            "total_est_nworth", 
            "college_loan_balance", 
            "transaction_amount", 
            "latitude", 
            "longitude",
            "fraud"
        ]

In [11]:
bankingDf = bankingDf[features]

In [12]:
def defineColTypes(df):
    """
    Method to assign df columns into two lists based on type
    Types are either string or not string
    Input: Spark DF
    Output: Lists by Column Type
    """

    numeric = []
    cat = []

    for col in bankingDf.dtypes:
        if col[1] == "string":
            cat.append(col[0])
        else:
            numeric.append(col[0])
    return numeric, cat

In [13]:
numeric, cat = defineColTypes(bankingDf)

In [14]:
def convertToFloat(df, num):
    """
    Method to convert Spark DF columns to float
    Does not require input DF columns to be of numeric type only
    Requires input list of numerical columns
    Outputs df with columns in Float format
    """

    for c in df[numeric].columns:
        df = df.withColumn(c, df[c].cast("float"))

    return df

In [15]:
bankingDf = convertToFloat(bankingDf, numeric)
bankingDf.printSchema()

root
 |-- age: float (nullable = true)
 |-- credit_card_balance: float (nullable = true)
 |-- bank_account_balance: float (nullable = true)
 |-- mortgage_balance: float (nullable = true)
 |-- primary_loan_balance: float (nullable = true)
 |-- sec_bank_account_balance: float (nullable = true)
 |-- savings_account_balance: float (nullable = true)
 |-- sec_savings_account_balance: float (nullable = true)
 |-- secondary_loan_balance: float (nullable = true)
 |-- total_est_nworth: float (nullable = true)
 |-- college_loan_balance: float (nullable = true)
 |-- transaction_amount: float (nullable = true)
 |-- latitude: float (nullable = true)
 |-- longitude: float (nullable = true)
 |-- fraud: string (nullable = true)



In [16]:
def labelEncoder(df, labelCol):
    """
    Method to transform dataframe label from categorical to numerical column type
    """
    indexer = StringIndexer(inputCol=labelCol, outputCol="label")
    df = indexer.fit(df).transform(df)
    df = df.drop(labelCol)
    return df

In [17]:
bankingDf = labelEncoder(bankingDf, "fraud")
bankingDf.printSchema()

                                                                                

root
 |-- age: float (nullable = true)
 |-- credit_card_balance: float (nullable = true)
 |-- bank_account_balance: float (nullable = true)
 |-- mortgage_balance: float (nullable = true)
 |-- primary_loan_balance: float (nullable = true)
 |-- sec_bank_account_balance: float (nullable = true)
 |-- savings_account_balance: float (nullable = true)
 |-- sec_savings_account_balance: float (nullable = true)
 |-- secondary_loan_balance: float (nullable = true)
 |-- total_est_nworth: float (nullable = true)
 |-- college_loan_balance: float (nullable = true)
 |-- transaction_amount: float (nullable = true)
 |-- latitude: float (nullable = true)
 |-- longitude: float (nullable = true)
 |-- label: double (nullable = false)



In [18]:
def trainTestSplit(df, trainPercentage):
    """
    Method to split data into train and test sets
    Requires inputs: DF to split; percentage of train set
    Returns train and test sets as spark df's
    """
    testPercentage = 1 - trainPercentage  
    
    train, test = df.randomSplit(weights=[trainPercentage,testPercentage], seed=200)

    return train, test

In [19]:
train, test = trainTestSplit(bankingDf, 0.8)

In [20]:
def makeCv(df, cols):
    """
    Method to create Spark Mllib Pipeline Object
    Pipeline: VectorAssembler, StandardScaler, LogisticRegression
    Returns pipeline object
    """

    stages = []
    
    #Assembling mixed data type transformations:
    assembler = VectorAssembler(inputCols=cols, outputCol="features")
    stages += [assembler]    

    #Scaling features
    scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=True)
    stages += [scaler]

    #Logistic Regression
    lr = LogisticRegression(featuresCol='scaledFeatures', labelCol='label')
    stages += [lr]

    #Creating and running the pipeline:
    pipeline = Pipeline(stages=stages)
    
    #Creating Evaluator 
    evaluator = MulticlassClassificationEvaluator(labelCol="label", metricName="accuracy")
    
    # Define the parameter grid to examine.
    grid = ParamGridBuilder() \
      .addGrid(lr.maxIter, [2, 4, 6, 8, 10]) \
      .addGrid(lr.regParam, [0.1, 0.4, 0.7]) \
      .addGrid(lr.elasticNetParam, [0.2, 0.4, 0.6, 0.8]) \
      .build()
    
    cv = CrossValidator(estimator=pipeline, evaluator=evaluator, estimatorParamMaps=grid, numFolds=3)

    return cv

In [21]:
features = [feature for feature in train.columns if feature != "label"]

In [22]:
cv = makeCv(train, features)

In [25]:
import mlflow.spark

mlflow.set_experiment('MySparkMlClf')

with mlflow.start_run():
    
    train.writeTo("spark_catalog.{0}.BANKING_TRANSACTIONS_TRAIN_{1}".format(dbname, username)).using("iceberg").createOrReplace()
    test.writeTo("spark_catalog.{0}.BANKING_TRANSACTIONS_TEST_{1}".format(dbname, username)).using("iceberg").createOrReplace()

    ### SHOW TABLE HISTORY AND SNAPSHOTS
    #spark.read.format("iceberg").load("spark_catalog.{0}.BANKING_TRANSACTIONS_TRAIN_{1}.history".format(dbname, username)).show(20, False)
    #spark.read.format("iceberg").load("spark_catalog.{0}.BANKING_TRANSACTIONS_TRAIN_{1}.snapshots".format(dbname, username)).show(20, False)

    train_snapshot_id = spark.read.format("iceberg").load("spark_catalog.{0}.BANKING_TRANSACTIONS_TRAIN_{1}.snapshots".format(dbname, username)).tail(1)[0][0]
    train_committed_at = spark.read.format("iceberg").load("spark_catalog.{0}.BANKING_TRANSACTIONS_TRAIN_{1}.snapshots".format(dbname, username)).select("committed_at").tail(1)[0][0].strftime('%m/%d/%Y')
    train_parent_id = spark.read.format("iceberg").load("spark_catalog.{0}.BANKING_TRANSACTIONS_TRAIN_{1}.snapshots".format(dbname, username)).select("parent_id").tail(1)[0][0]
    test_snapshot_id = spark.read.format("iceberg").load("spark_catalog.{0}.BANKING_TRANSACTIONS_TEST_{1}.snapshots".format(dbname, username)).tail(1)[0][0]
    test_committed_at = spark.read.format("iceberg").load("spark_catalog.{0}.BANKING_TRANSACTIONS_TEST_{1}.snapshots".format(dbname, username)).select("committed_at").tail(1)[0][0].strftime('%m/%d/%Y')
    test_parent_id = spark.read.format("iceberg").load("spark_catalog.{0}.BANKING_TRANSACTIONS_TEST_{1}.snapshots".format(dbname, username)).select("parent_id").tail(1)[0][0]
    
    tags = {
      "train_iceberg_snapshot_id": train_snapshot_id,
      "train_iceberg_snapshot_committed_at": train_committed_at,
      "train_iceberg_parent_id": train_parent_id,
      "train_row_count": train.count(),
      "test_iceberg_snapshot_id": test_snapshot_id,
      "test_iceberg_snapshot_committed_at": test_committed_at,
      "test_iceberg_parent_id": test_parent_id,
      "test_row_count": test.count()
    }
    
    # Run the cross validation on the training dataset. The cv.fit() call returns the best model it found.
    cvModel = cv.fit(train)
    evaluator = MulticlassClassificationEvaluator(labelCol="label", metricName="accuracy")
    
    # Evaluate the best model's performance on the test dataset and log the result.
    test_metric = evaluator.evaluate(cvModel.transform(test))
    mlflow.log_metric('test_' + evaluator.getMetricName(), test_metric) 

    # Log the best model.
    mlflow.spark.log_model(spark_model=cvModel.bestModel, artifact_path='best-model') 
    mlflow.set_tags(tags)
    mlflow.end_run()

23/12/07 02:47:05 WARN ExecutorPodsWatchSnapshotSource: Kubernetes client has been closed (this is expected if the application is shutting down.)
io.fabric8.kubernetes.client.WatcherException: too old resource version: 54653178 (54664805)
	at io.fabric8.kubernetes.client.dsl.internal.AbstractWatchManager.onStatus(AbstractWatchManager.java:265)
	at io.fabric8.kubernetes.client.dsl.internal.AbstractWatchManager.onMessage(AbstractWatchManager.java:249)
	at io.fabric8.kubernetes.client.dsl.internal.WatcherWebSocketListener.onMessage(WatcherWebSocketListener.java:93)
	at okhttp3.internal.ws.RealWebSocket.onReadMessage(RealWebSocket.java:323)
	at okhttp3.internal.ws.WebSocketReader.readMessageFrame(WebSocketReader.java:219)
	at okhttp3.internal.ws.WebSocketReader.processNextFrame(WebSocketReader.java:105)
	at okhttp3.internal.ws.RealWebSocket.loopReader(RealWebSocket.java:274)
	at okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:214)
	at okhttp3.RealCall$AsyncCall.execute(Re

In [24]:
#mlflow.end_run()