In [0]:
from pyspark.sql.types import StringType
from pyspark.sql.functions import col
import mlflow
import mlflow.spark
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, MinMaxScaler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import time
   

In [0]:
!pip install mlflow

[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m
Collecting mlflow
  Using cached mlflow-2.14.1-py3-none-any.whl (25.8 MB)
Collecting sqlalchemy<3,>=1.4.0
  Downloading SQLAlchemy-2.0.31-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.1 MB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 3.1/3.1 MB 22.4 MB/s eta 0:00:00
Collecting graphene<4
  Downloading graphene-3.3-py2.py3-none-any.whl (128 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 128.2/128.2 kB 13.2 MB/s eta 0:00:00
Collecting gitpython<4,>=3.1.9
  Downloading GitPython-3.1.43-py3-none-any.whl (207 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 207.3/207.3 kB 18.4 MB/s eta 0:00:00
Collecting pyyaml<7,>=5.1
  Downloading PyYAML-6.0.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (705 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 705.5/705.5 kB 39.3 MB/s eta 0:00:00
Collecting sqlparse<1,>=0.4.0
  Downloading sqlparse-0.5.0-py3-none-any

In [0]:
 %sh
 rm -r /dbfs/mlflow_lab
 mkdir /dbfs/mlflow_lab
 wget -O /dbfs/mlflow_lab/penguins.csv https://raw.githubusercontent.com/MicrosoftLearning/mslearn-databricks/main/data/penguins.csv

rm: cannot remove '/dbfs/mlflow_lab': No such file or directory
--2024-06-29 19:57:17--  https://raw.githubusercontent.com/MicrosoftLearning/mslearn-databricks/main/data/penguins.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 9533 (9.3K) [text/plain]
Saving to: ‘/dbfs/mlflow_lab/penguins.csv’

     0K .........                                             100% 1.73M=0.005s

2024-06-29 19:57:17 (1.73 MB/s) - ‘/dbfs/mlflow_lab/penguins.csv’ saved [9533/9533]



In [0]:
dbutils.fs.ls("dbfs:/mlflow_lab/")

[FileInfo(path='dbfs:/mlflow_lab/penguins.csv', name='penguins.csv', size=9533, modificationTime=1719691037000)]

In [0]:
data = spark.read.load("dbfs:/mlflow_lab/penguins.csv",format='csv',header=True)
data = data.dropna().select(col("Island").astype("string"),
                            col("CulmenLength").astype("float"),
                            col("CulmenDepth").astype("float"),
                            col("FlipperLength").astype("float"),
                            col("BodyMass").astype("float"),
                            col("Species").astype("int")
                          )
display(data.sample(0.2))

Island,CulmenLength,CulmenDepth,FlipperLength,BodyMass,Species
Torgersen,36.6,17.8,185.0,3700.0,0
Biscoe,37.7,18.7,180.0,3600.0,0
Biscoe,35.9,19.2,189.0,3800.0,0
Biscoe,38.8,17.2,180.0,3800.0,0
Biscoe,40.5,17.9,187.0,3200.0,0
Dream,36.4,17.0,195.0,3325.0,0
Dream,42.2,18.5,180.0,3550.0,0
Biscoe,42.0,19.5,200.0,4050.0,0
Biscoe,34.5,18.1,187.0,2900.0,0
Biscoe,39.0,17.5,186.0,3550.0,0


In [0]:
splits = data.randomSplit([0.7, 0.3])
train = splits[0]
test = splits[1]
print ("Training Rows:", train.count(), " Testing Rows:", test.count())

Training Rows: 243  Testing Rows: 99


In [0]:
# Start an MLflow run
with mlflow.start_run():
    catFeature = "Island"
    numFeatures = ["CulmenLength", "CulmenDepth", "FlipperLength", "BodyMass"]
     
    # parameters
    maxIterations = 5
    regularization = 0.5
   
    # Define the feature engineering and model steps
    catIndexer = StringIndexer(inputCol=catFeature, outputCol=catFeature + "Idx")
    numVector = VectorAssembler(inputCols=numFeatures, outputCol="numericFeatures")
    numScaler = MinMaxScaler(inputCol = numVector.getOutputCol(), outputCol="normalizedFeatures")
    featureVector = VectorAssembler(inputCols=["IslandIdx", "normalizedFeatures"], outputCol="Features")
    algo = LogisticRegression(labelCol="Species", featuresCol="Features", maxIter=maxIterations, regParam=regularization)
   
    # Chain the steps as stages in a pipeline
    pipeline = Pipeline(stages=[catIndexer, numVector, numScaler, featureVector, algo])
   
    # Log training parameter values
    print ("Training Logistic Regression model...")
    mlflow.log_param('maxIter', algo.getMaxIter())
    mlflow.log_param('regParam', algo.getRegParam())
    model = pipeline.fit(dataset=train)
      
    # Evaluate the model and log metrics
    prediction = model.transform(test)
    metrics = ["accuracy", "weightedRecall", "weightedPrecision"]
    for metric in metrics:
        evaluator = MulticlassClassificationEvaluator(labelCol="Species", predictionCol="prediction", metricName=metric)
        metricValue = evaluator.evaluate(prediction)
        print("%s: %s" % (metric, metricValue))
        mlflow.log_metric(metric, metricValue)
   
           
    # Log the model itself
    unique_model_name = "classifier-" + str(time.time())
    mlflow.spark.log_model(spark_model=model,
                           artifact_path=unique_model_name,
                           conda_env=mlflow.spark.get_default_conda_env()
                           )
    modelpath=f"/model/{unique_model_name}"
    mlflow.spark.save_model(spark_model=model,path=modelpath)
       
    print("Experiment run complete.")

Training Logistic Regression model...
accuracy: 0.898989898989899
weightedRecall: 0.898989898989899
weightedPrecision: 0.916405433646813




Experiment run complete.
