In [0]:
# dbfs:/FileStore/irisdata.csv

Data Load

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("mlops").getOrCreate()
from pyspark.sql.types import StructType, StructField, DoubleType, StringType

schema = StructType([
    StructField("sepal_length", DoubleType(), False),  # Continuous numeric feature
    StructField("sepal_width", DoubleType(), False),   # Continuous numeric feature
    StructField("petal_length", DoubleType(), False),  # Continuous numeric feature
    StructField("petal_width", DoubleType(), False),   # Continuous numeric feature
    StructField("class", StringType(), False)          # Categorical target variable
])

df = spark.read.format("csv").schema(schema).load("dbfs:/FileStore/irisdata.csv")

In [0]:
df.display()

sepal_length,sepal_width,petal_length,petal_width,class
5.1,3.5,1.4,0.2,Iris-setosa
4.9,3.0,1.4,0.2,Iris-setosa
4.7,3.2,1.3,0.2,Iris-setosa
4.6,3.1,1.5,0.2,Iris-setosa
5.0,3.6,1.4,0.2,Iris-setosa
5.4,3.9,1.7,0.4,Iris-setosa
4.6,3.4,1.4,0.3,Iris-setosa
5.0,3.4,1.5,0.2,Iris-setosa
4.4,2.9,1.4,0.2,Iris-setosa
4.9,3.1,1.5,0.1,Iris-setosa


Data Transformation

In [0]:
from pyspark.sql.functions import when
from pyspark.ml.feature import VectorAssembler
# Convert class to binary label: Iris-setosa = 1, Others = 0
df = df.withColumn("label", when(df["class"] == "Iris-setosa", 1).otherwise(0))

# Assemble features into a single column
feature_assembler = VectorAssembler(
    inputCols=["sepal_length", "sepal_width", "petal_length", "petal_width"],
    outputCol="features"
)
df = feature_assembler.transform(df).select("features", "label")


[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
File [0;32m<command-4199294914396520>:4[0m
[1;32m      2[0m [38;5;28;01mfrom[39;00m [38;5;21;01mpyspark[39;00m[38;5;21;01m.[39;00m[38;5;21;01mml[39;00m[38;5;21;01m.[39;00m[38;5;21;01mfeature[39;00m [38;5;28;01mimport[39;00m VectorAssembler
[1;32m      3[0m [38;5;66;03m# Convert class to binary label: Iris-setosa = 1, Others = 0[39;00m
[0;32m----> 4[0m df [38;5;241m=[39m df[38;5;241m.[39mwithColumn([38;5;124m"[39m[38;5;124mlabel[39m[38;5;124m"[39m, when(df[[38;5;124m"[39m[38;5;124mclass[39m[38;5;124m"[39m] [38;5;241m==[39m [38;5;124m"[39m[38;5;124mIris-setosa[39m[38;5;124m"[39m, [38;5;241m1[39m)[38;5;241m.[39motherwise([38;5;241m0[39m))
[1;32m      6[0m [38;5;66;03m# Assemble features into a single column[39;00m
[1;32m      7[0m feature_assembler [38;5;241

Model Training

In [0]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Split data
train_data, test_data = df.randomSplit([0.7, 0.3], seed=42)

# Train Logistic Regression model
lr = LogisticRegression(featuresCol="features", labelCol="label")
model = lr.fit(train_data)

# Evaluate the model
predictions = model.transform(test_data)
evaluator = BinaryClassificationEvaluator(labelCol="label", metricName="areaUnderROC")
roc_auc = evaluator.evaluate(predictions)

print(f"ROC AUC Score: {roc_auc}")


ROC AUC Score: 1.0


In [0]:
!pip install mlflow

Collecting mlflow
  Downloading mlflow-2.21.0-py3-none-any.whl (28.2 MB)
[?25l[K     |                                | 10 kB 10.5 MB/s eta 0:00:03[K     |                                | 20 kB 4.4 MB/s eta 0:00:07[K     |                                | 30 kB 6.4 MB/s eta 0:00:05[K     |                                | 40 kB 3.6 MB/s eta 0:00:08[K     |                                | 51 kB 4.2 MB/s eta 0:00:07[K     |                                | 61 kB 5.1 MB/s eta 0:00:06[K     |                                | 71 kB 5.2 MB/s eta 0:00:06[K     |                                | 81 kB 5.9 MB/s eta 0:00:05[K     |                                | 92 kB 6.0 MB/s eta 0:00:05[K     |▏                               | 102 kB 6.3 MB/s eta 0:00:05[K     |▏                               | 112 kB 6.3 MB/s eta 0:00:05[K     |▏                               | 122 kB 6.3 MB/s eta 0:00:05[K     |▏                               | 133 kB 6.3 MB/s eta 0:00:05[K

In [0]:
%pip install --upgrade typing-extensions

Python interpreter will be restarted.
Python interpreter will be restarted.


Logging model without registering Because of Databricks community edition feature limitations

In [0]:
# import mlflow
# import mlflow.spark

# # Start MLflow experiment
# mlflow.set_experiment("/Users/aniketpandey0796@gmail.com/iris_binary_classification")

# with mlflow.start_run():
#     mlflow.log_param("model_type", "LogisticRegression")
#     mlflow.log_metric("roc_auc", roc_auc)
    
#     # Log the model
#     mlflow.spark.log_model(model, "iris_model")

import mlflow
import mlflow.spark

# Set MLflow experiment (modify as needed)
mlflow.set_experiment("/Users/aniketpandey0796@gmail.com/iris_binary_classification")

with mlflow.start_run() as run:
    mlflow.log_param("model_type", "LogisticRegression")
    mlflow.log_metric("roc_auc", roc_auc)
    
    # Log model without registration
    model_path = "iris_binary_classification"
    mlflow.spark.log_model(model, model_path)
    
    run_id = run.info.run_id  # Capture run ID
    print(f"Model logged under run ID: {run_id}")



[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
File [0;32m<command-4199294914396523>:22[0m
[1;32m     20[0m [38;5;28;01mwith[39;00m mlflow[38;5;241m.[39mstart_run() [38;5;28;01mas[39;00m run:
[1;32m     21[0m     mlflow[38;5;241m.[39mlog_param([38;5;124m"[39m[38;5;124mmodel_type[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124mLogisticRegression[39m[38;5;124m"[39m)
[0;32m---> 22[0m     mlflow[38;5;241m.[39mlog_metric([38;5;124m"[39m[38;5;124mroc_auc[39m[38;5;124m"[39m, roc_auc)
[1;32m     24[0m     [38;5;66;03m# Log model without registration[39;00m
[1;32m     25[0m     model_path [38;5;241m=[39m [38;5;124m"[39m[38;5;124miris_binary_classification[39m[38;5;124m"[39m

[0;31mNameError[0m: name 'roc_auc' is not defined

Loading Model and getting predictions

In [0]:
# Load the logged model using run_id
logged_model_uri = f"runs:/{run_id}/iris_binary_classification"
loaded_model = mlflow.spark.load_model(logged_model_uri)

# Predict on test data
predictions = loaded_model.transform(test_data)
predictions.select("features", "label", "prediction").show()




Performing Batch Inference 

In [0]:
batch_data = spark.createDataFrame([
    (6.1, 2.9, 4.7, 1.4),  # Example new data points
    (5.4,3.4,1.5,0.4)
], ["sepal_length", "sepal_width", "petal_length", "petal_width"])

batch_data = feature_assembler.transform(batch_data).select("features")
batch_predictions = loaded_model.transform(batch_data)

batch_predictions.select("features", "prediction").show()



Performing Real-time inference Through a Function

In [0]:
def real_time_inference(sepal_length, sepal_width, petal_length, petal_width):
    """Simulates real-time inference on a single new data point."""
    from pyspark.sql import Row
    
    # Convert input data to Spark DataFrame
    new_data = spark.createDataFrame([
        Row(sepal_length=sepal_length, sepal_width=sepal_width, 
            petal_length=petal_length, petal_width=petal_width)
    ])
    
    # Transform the data to match the model's expected format
    new_data = feature_assembler.transform(new_data).select("features")
    
    # Perform real-time prediction
    prediction = loaded_model.transform(new_data)
    
    # Extract predicted class
    predicted_label = prediction.select("prediction").collect()[0]["prediction"]
    
    print(f"Real-Time Prediction: {predicted_label}")
    return predicted_label

# Example Real-time Inference Calls
real_time_inference(5.1, 3.5, 1.4, 0.2)  # Expected: Iris-setosa
real_time_inference(6.5, 3.0, 5.2, 2.0)  # Expected: Iris-virginica



Code To register the model, but it is not supported in the Databricks Community edition

In [0]:
# from mlflow.tracking import MlflowClient

# client = MlflowClient()
# model_name = "iris_classification_model"

# # # Register model
# # mlflow_model_uri = f"runs:/{mlflow.active_run().info.run_id}/iris_model"
# # client.create_registered_model(model_name)
# # client.create_model_version(name=model_name, source=mlflow_model_uri, run_id=mlflow.active_run().info.run_id)

# print(f"Model '{model_name}' registered successfully!")




Execute The Below cells to run the code Smoothly

In [0]:
!pip install mlflow
%pip install --upgrade typing-extensions

Collecting mlflow
  Downloading mlflow-2.21.0-py3-none-any.whl (28.2 MB)
[?25l[K     |                                | 10 kB 17.0 MB/s eta 0:00:02[K     |                                | 20 kB 17.7 MB/s eta 0:00:02[K     |                                | 30 kB 17.2 MB/s eta 0:00:02[K     |                                | 40 kB 5.7 MB/s eta 0:00:05[K     |                                | 51 kB 6.9 MB/s eta 0:00:05[K     |                                | 61 kB 7.0 MB/s eta 0:00:04[K     |                                | 71 kB 6.9 MB/s eta 0:00:05[K     |                                | 81 kB 7.5 MB/s eta 0:00:04[K     |                                | 92 kB 8.4 MB/s eta 0:00:04[K     |▏                               | 102 kB 6.2 MB/s eta 0:00:05[K     |▏                               | 112 kB 6.2 MB/s eta 0:00:05[K     |▏                               | 122 kB 6.2 MB/s eta 0:00:05[K     |▏                               | 133 kB 6.2 MB/s eta 0:00:05

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("mlops").getOrCreate()
from pyspark.sql.types import StructType, StructField, DoubleType, StringType
from pyspark.sql.functions import when
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
import mlflow
import mlflow.spark


schema = StructType([
    StructField("sepal_length", DoubleType(), False), 
    StructField("sepal_width", DoubleType(), False),  
    StructField("petal_length", DoubleType(), False), 
    StructField("petal_width", DoubleType(), False),  
    StructField("class", StringType(), False)         
])

df = spark.read.format("csv").schema(schema).load("dbfs:/FileStore/irisdata.csv")


# Convert class to binary label: Iris-setosa = 1, Others = 0
df = df.withColumn("label", when(df["class"] == "Iris-setosa", 1).otherwise(0))

# Assemble features into a single column
feature_assembler = VectorAssembler(
    inputCols=["sepal_length", "sepal_width", "petal_length", "petal_width"],
    outputCol="features"
)
df = feature_assembler.transform(df).select("features", "label")


# Split data
train_data, test_data = df.randomSplit([0.7, 0.3], seed=42)

# Train Logistic Regression model
lr = LogisticRegression(featuresCol="features", labelCol="label")
model = lr.fit(train_data)

# Evaluate the model
predictions = model.transform(test_data)
evaluator = BinaryClassificationEvaluator(labelCol="label", metricName="areaUnderROC")
roc_auc = evaluator.evaluate(predictions)

print(f"ROC AUC Score: {roc_auc}")


# Set MLflow experiment (modify as needed)
mlflow.set_experiment("/Users/aniketpandey0796@gmail.com/iris_binary_classification")

with mlflow.start_run() as run:
    mlflow.log_param("model_type", "LogisticRegression")
    mlflow.log_metric("roc_auc", roc_auc)
    
    # Log model without registration
    model_path = "iris_binary_classification"
    mlflow.spark.log_model(model, model_path)
    
    run_id = run.info.run_id  # Capture run ID
    print(f"Model logged under run ID: {run_id}")

# Load the logged model using run_id
logged_model_uri = f"runs:/{run_id}/iris_binary_classification"
loaded_model = mlflow.spark.load_model(logged_model_uri)

# Predict on test data
predictions = loaded_model.transform(test_data)
predictions.select("features", "label", "prediction").show()

batch_data = spark.createDataFrame([
    (6.1, 2.9, 4.7, 1.4),  # Example new data points
    (5.4,3.4,1.5,0.4)
], ["sepal_length", "sepal_width", "petal_length", "petal_width"])

batch_data = feature_assembler.transform(batch_data).select("features")
batch_predictions = loaded_model.transform(batch_data)

batch_predictions.select("features", "prediction").show()

def real_time_inference(sepal_length, sepal_width, petal_length, petal_width):
    """Simulates real-time inference on a single new data point."""
    from pyspark.sql import Row
    
    # Convert input data to Spark DataFrame
    new_data = spark.createDataFrame([
        Row(sepal_length=sepal_length, sepal_width=sepal_width, 
            petal_length=petal_length, petal_width=petal_width)
    ])
    
    # Transform the data to match the model's expected format
    new_data = feature_assembler.transform(new_data).select("features")
    
    # Perform real-time prediction
    prediction = loaded_model.transform(new_data)
    
    # Extract predicted class
    predicted_label = prediction.select("prediction").collect()[0]["prediction"]
    
    print(f"Real-Time Prediction: {predicted_label}")
    return predicted_label

# Example Real-time Inference Calls
real_time_inference(5.1, 3.5, 1.4, 0.2)  # Expected: Iris-setosa
real_time_inference(6.5, 3.0, 5.2, 2.0)  # Expected: Iris-virginica

ROC AUC Score: 1.0


2025/03/21 16:59:05 INFO mlflow.spark: Inferring pip requirements by reloading the logged model from the databricks artifact repository, which can be time-consuming. To speed up, explicitly specify the conda_env or pip_requirements when calling log_model().


Model logged under run ID: ef99dca95a2c4731a6bfdb556e9b66a3


2025/03/21 16:59:30 INFO mlflow.spark: URI 'runs:/ef99dca95a2c4731a6bfdb556e9b66a3/iris_binary_classification/sparkml' does not point to the current DFS.
2025/03/21 16:59:30 INFO mlflow.spark: File 'runs:/ef99dca95a2c4731a6bfdb556e9b66a3/iris_binary_classification/sparkml' not found on DFS. Will attempt to upload the file.
2025/03/21 16:59:34 INFO mlflow.spark: Copied SparkML model to /tmp/mlflow/eab90a2c-c6c7-4def-83d3-3641ed19b162


+-----------------+-----+----------+
|         features|label|prediction|
+-----------------+-----+----------+
|[4.4,3.0,1.3,0.2]|    1|       1.0|
|[4.6,3.2,1.4,0.2]|    1|       1.0|
|[4.6,3.6,1.0,0.2]|    1|       1.0|
|[4.7,3.2,1.3,0.2]|    1|       1.0|
|[4.8,3.1,1.6,0.2]|    1|       1.0|
|[4.8,3.4,1.6,0.2]|    1|       1.0|
|[4.8,3.4,1.9,0.2]|    1|       1.0|
|[4.9,3.1,1.5,0.1]|    1|       1.0|
|[4.9,3.1,1.5,0.1]|    1|       1.0|
|[5.0,2.3,3.3,1.0]|    0|       0.0|
|[5.0,3.0,1.6,0.2]|    1|       1.0|
|[5.0,3.4,1.6,0.4]|    1|       1.0|
|[5.0,3.5,1.3,0.3]|    1|       1.0|
|[5.0,3.5,1.6,0.6]|    1|       1.0|
|[5.1,2.5,3.0,1.1]|    0|       0.0|
|[5.1,3.4,1.5,0.2]|    1|       1.0|
|[5.1,3.5,1.4,0.2]|    1|       1.0|
|[5.1,3.8,1.6,0.2]|    1|       1.0|
|[5.2,3.4,1.4,0.2]|    1|       1.0|
|[5.2,3.5,1.5,0.2]|    1|       1.0|
+-----------------+-----+----------+
only showing top 20 rows

+-----------------+----------+
|         features|prediction|
+-----------------+-----

In [0]:
print(f"runs:/{run_id}/iris_binary_classification")

runs:/ef99dca95a2c4731a6bfdb556e9b66a3/iris_binary_classification
