In [0]:
 %sh
 rm -r /dbfs/ml_lab
 mkdir /dbfs/ml_lab
 wget -O /dbfs/ml_lab/penguins.csv https://raw.githubusercontent.com/MicrosoftLearning/mslearn-databricks/main/data/penguins.csv


rm: cannot remove '/dbfs/ml_lab': No such file or directory
--2024-03-13 06:40:07--  https://raw.githubusercontent.com/MicrosoftLearning/mslearn-databricks/main/data/penguins.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 9533 (9.3K) [text/plain]
Saving to: ‘/dbfs/ml_lab/penguins.csv’

     0K .........                                             100%  589K=0.02s

2024-03-13 06:40:08 (589 KB/s) - ‘/dbfs/ml_lab/penguins.csv’ saved [9533/9533]



In [0]:
df = spark.read.format("csv").option("header", "true").load("/ml_lab/penguins.csv")
display(df.head(5))

Island,CulmenLength,CulmenDepth,FlipperLength,BodyMass,Species
Torgersen,39.1,18.7,181.0,3750.0,0
Torgersen,39.5,17.4,186.0,3800.0,0
Torgersen,40.3,18.0,195.0,3250.0,0
Torgersen,,,,,0
Torgersen,36.7,19.3,193.0,3450.0,0


In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
   
data = df.dropna().select(col("Island").astype("string"),
                          col("CulmenLength").astype("float"),
                          col("CulmenDepth").astype("float"),
                          col("FlipperLength").astype("float"),
                          col("BodyMass").astype("float"),
                          col("Species").astype("int")
                          )

#### Split the data

In [0]:
splits = data.randomSplit([0.7, 0.3])
train = splits[0]
test = splits[1]

print("Training Rows: ", train.count(), "Testing Rows: ", test.count())

Training Rows:  249 Testing Rows:  93


#### Perform feature engineering

In [0]:
# encode the Island categorical column values as numeric indexes
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="Island", outputCol="IslandIdx")
indexedData = indexer.fit(train).transform(train).drop("Island")

display(indexedData.head(5))

CulmenLength,CulmenDepth,FlipperLength,BodyMass,Species,IslandIdx
34.5,18.100000381469727,187.0,2900.0,0,0.0
35.0,17.899999618530273,190.0,3450.0,0,0.0
35.0,17.899999618530273,192.0,3725.0,0,0.0
35.29999923706055,18.899999618530277,187.0,3800.0,0,0.0
35.70000076293945,16.899999618530273,185.0,3150.0,0,0.0


In [0]:
# Normalize (scale) numeric features
from pyspark.ml.feature import VectorAssembler, MinMaxScaler

# Create a vector column containing all numeric features
numericFeatures = ["CulmenLength", "CulmenDepth", "FlipperLength", "BodyMass"]
numericColVector = VectorAssembler(inputCols=numericFeatures, outputCol="numericFeatures")
vectorizedData = numericColVector.transform(indexedData)

# Use a MinMax scaler to normalize the numeric values in the vector
minMax = MinMaxScaler(inputCol = numericColVector.getOutputCol(), outputCol="normalizedFeatures")
scaledData = minMax.fit(vectorizedData).transform(vectorizedData)

# Display the data with numeric feature vectors (before and after scaling)
compareNumerics = scaledData.select("numericFeatures", "normalizedFeatures")
display(compareNumerics.head(5))

numericFeatures,normalizedFeatures
"Map(vectorType -> dense, length -> 4, values -> List(34.5, 18.100000381469727, 187.0, 2900.0))","Map(vectorType -> dense, length -> 4, values -> List(0.08727278275923295, 0.6172839215462804, 0.2542372881355932, 0.05970149253731343))"
"Map(vectorType -> dense, length -> 4, values -> List(35.0, 17.899999618530273, 190.0, 3450.0))","Map(vectorType -> dense, length -> 4, values -> List(0.10545460094105114, 0.5925924704943777, 0.3050847457627119, 0.22388059701492538))"
"Map(vectorType -> dense, length -> 4, values -> List(35.0, 17.899999618530273, 192.0, 3725.0))","Map(vectorType -> dense, length -> 4, values -> List(0.10545460094105114, 0.5925924704943777, 0.3389830508474576, 0.30597014925373134))"
"Map(vectorType -> dense, length -> 4, values -> List(35.29999923706055, 18.899999618530273, 187.0, 3800.0))","Map(vectorType -> dense, length -> 4, values -> List(0.1163636641068892, 0.7160492548036338, 0.2542372881355932, 0.3283582089552239))"
"Map(vectorType -> dense, length -> 4, values -> List(35.70000076293945, 16.899999618530273, 185.0, 3150.0))","Map(vectorType -> dense, length -> 4, values -> List(0.13090917413884942, 0.4691356861851216, 0.22033898305084745, 0.13432835820895522))"


#### Prepare features and labels for training

In [0]:
featVect = VectorAssembler(inputCols=["IslandIdx", "normalizedFeatures"], outputCol="featuresVector")
preppedData = featVect.transform(scaledData)[col("featuresVector").alias("features"), col("Species").alias("label")]
display(preppedData.head(5))

features,label
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.08727278275923295, 0.6172839215462804, 0.2542372881355932, 0.05970149253731343))",0
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.10545460094105114, 0.5925924704943777, 0.3050847457627119, 0.22388059701492538))",0
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.10545460094105114, 0.5925924704943777, 0.3389830508474576, 0.30597014925373134))",0
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.1163636641068892, 0.7160492548036338, 0.2542372881355932, 0.3283582089552239))",0
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.13090917413884942, 0.4691356861851216, 0.22033898305084745, 0.13432835820895522))",0


#### Train a machine learning model

In [0]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10, regParam=0.3)
model = lr.fit(preppedData)
print ("Model trained!")

Model trained!


#### Test the model

In [0]:
# Prepare the test data
indexedTestData = indexer.fit(test).transform(test).drop("Island")
vectorizedTestData = numericColVector.transform(indexedTestData)

scaledTestData = minMax.fit(vectorizedTestData).transform(vectorizedTestData)
preppedTestData = featVect.transform(scaledTestData)[col("featuresVector").alias("features"), col("Species").alias("label")]

# Get predictions
prediction = model.transform(preppedTestData)
predicted = prediction.select("features", "probability", col("prediction").astype("Int"), col("label").alias("trueLabel"))
display(predicted.head(4))

features,probability,prediction,trueLabel
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.08163265306122448, 0.33750009536743164, 0.3269230769230769, 0.1323529411764706))","Map(vectorType -> dense, length -> 3, values -> List(0.7782138409331263, 0.13252960583466478, 0.08925655323220896))",0,0
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.17142860256895726, 0.3125, 0.09615384615384616, 0.051470588235294115))","Map(vectorType -> dense, length -> 3, values -> List(0.7874150387037901, 0.09374755472069868, 0.11883740657551121))",0,0
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.1755101729412468, 0.8125, 0.23076923076923078, 0.39705882352941174))","Map(vectorType -> dense, length -> 3, values -> List(0.8247181221451012, 0.06948948605407165, 0.10579239180082718))",0,0
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.17959189901546554, 0.6375000476837158, 0.2884615384615385, 0.007352941176470587))","Map(vectorType -> dense, length -> 3, values -> List(0.8059534061451775, 0.062279257805957305, 0.13176733604886517))",0,0


In [0]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
   
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")
   
# Simple accuracy
accuracy = evaluator.evaluate(prediction, {evaluator.metricName:"accuracy"})
print("Accuracy:", accuracy)
   
# Individual class metrics
labels = [0,1,2]
print("\nIndividual class metrics:")
for label in sorted(labels):
    print ("Class %s" % (label))
   
    # Precision
    precision = evaluator.evaluate(prediction, {evaluator.metricLabel:label,
                                                evaluator.metricName:"precisionByLabel"})
    print("\tPrecision:", precision)
   
    # Recall
    recall = evaluator.evaluate(prediction, {evaluator.metricLabel:label,
                                             evaluator.metricName:"recallByLabel"})
    print("\tRecall:", recall)
   
    # F1 score
    f1 = evaluator.evaluate(prediction, {evaluator.metricLabel:label,
                                         evaluator.metricName:"fMeasureByLabel"})
    print("\tF1 Score:", f1)
   
# Weighted (overall) metrics
overallPrecision = evaluator.evaluate(prediction, {evaluator.metricName:"weightedPrecision"})
print("Overall Precision:", overallPrecision)
overallRecall = evaluator.evaluate(prediction, {evaluator.metricName:"weightedRecall"})
print("Overall Recall:", overallRecall)
overallF1 = evaluator.evaluate(prediction, {evaluator.metricName:"weightedFMeasure"})
print("Overall F1 Score:", overallF1)

Accuracy: 0.9139784946236559

Individual class metrics:
Class 0
	Precision: 0.8048780487804879
	Recall: 1.0
	F1 Score: 0.8918918918918919
Class 1
	Precision: 1.0
	Recall: 1.0
	F1 Score: 1.0
Class 2
	Precision: 1.0
	Recall: 0.68
	F1 Score: 0.8095238095238095
Overall Precision: 0.930763178599528
Overall Recall: 0.9139784946236559
Overall F1 Score: 0.9104357814035233


__Accuracy:__ The proportion of overall predictions that were correct.

__Per-class metrics:__ 

                 Precision: The proportion of predictions of this class that were correct.

                 Recall: The proportion of actual instances of this class that were correctly predicted.

                 F1 score: A combined metric for precision and recall



Note: It may initially seem like the overall accuracy metric provides the best way to evaluate a model’s predictive performance. However, consider this. Suppose Gentoo penguins make up 95% of the penguin population in your study location. A model that always predicts the label 1 (the class for Gentoo) will have an accuracy of 0.95. That doesn’t mean it’s a great model for predicting a penguin species based on the features! That’s why data scientists tend to explore additional metrics to get a better understanding of how well a classification model predicts for each possible class label.

#### Use a pipeline

You trained your model by performing the required feature engineering steps and then fitting an algorithm to the data. To use the model with some test data to generate predictions (referred to as inferencing), you had to apply the same feature engineering steps to the test data. A more efficient way to build and use models is to encapsulate the transformers used to prepare the data and the model used to train it in a pipeline.

In [0]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, MinMaxScaler
from pyspark.ml.classification import LogisticRegression



catFeature = "Island"
numFeatures = ["CulmenLength", "CulmenDepth", "FlipperLength", "BodyMass"]


# Define the feature engineering and model training algorithm steps
catIndexer = StringIndexer(inputCol=catFeature, outputCol=catFeature + "Idx")
numVector = VectorAssembler(inputCols=numFeatures, outputCol="numericFeatures")
numScaler = MinMaxScaler(inputCol = numVector.getOutputCol(), outputCol="normalizedFeatures")
featureVector = VectorAssembler(inputCols=["IslandIdx", "normalizedFeatures"], outputCol="Features")
algo = LogisticRegression(labelCol="Species", featuresCol="Features", maxIter=10, regParam=0.3)


# Chain the steps as stages in a pipeline
pipeline = Pipeline(stages=[catIndexer, numVector, numScaler, featureVector, algo])


# Use the pipeline to prepare data and fit the model algorithm
model = pipeline.fit(train)
print ("Model trained!")

Model trained!


Since the feature engineering steps are now encapsulated in the model trained by the pipeline, you can use the model with the test data without needing to apply each transformation (they’ll be applied automatically by the model).

In [0]:
# apply the pipeline to the test data
prediction = model.transform(test)
predicted = prediction.select("Features", "probability", col("prediction").astype("Int"), col("Species").alias("trueLabel"))
display(predicted.head(4))

Features,probability,prediction,trueLabel
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.12363641912286931, 0.3827160784537196, 0.3898305084745763, 0.19402985074626866))","Map(vectorType -> dense, length -> 3, values -> List(0.7323519625458714, 0.16621307955154552, 0.101434957902583))",0,0
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.20363644686612214, 0.3580246274018169, 0.1864406779661017, 0.11194029850746269))","Map(vectorType -> dense, length -> 3, values -> List(0.745251388628206, 0.12218528410677094, 0.1325633272650231))",0,0
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.2072727550159801, 0.8518517646388412, 0.3050847457627119, 0.4626865671641791))","Map(vectorType -> dense, length -> 3, values -> List(0.7914045535477935, 0.09041002424777782, 0.11818542220442854))",0,0
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.21090920188210227, 0.6790123137009084, 0.3559322033898305, 0.06716417910447761))","Map(vectorType -> dense, length -> 3, values -> List(0.7748486997058207, 0.07841177605748804, 0.1467395242366913))",0,0


#### Try Decision Tree algorithm

In [0]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, MinMaxScaler
from pyspark.ml.classification import DecisionTreeClassifier
   
catFeature = "Island"
numFeatures = ["CulmenLength", "CulmenDepth", "FlipperLength", "BodyMass"]
   
# Define the feature engineering and model steps
catIndexer = StringIndexer(inputCol=catFeature, outputCol=catFeature + "Idx")
numVector = VectorAssembler(inputCols=numFeatures, outputCol="numericFeatures")
numScaler = MinMaxScaler(inputCol = numVector.getOutputCol(), outputCol="normalizedFeatures")
featureVector = VectorAssembler(inputCols=["IslandIdx", "normalizedFeatures"], outputCol="Features")
algo = DecisionTreeClassifier(labelCol="Species", featuresCol="Features", maxDepth=10)
   
# Chain the steps as stages in a pipeline
pipeline = Pipeline(stages=[catIndexer, numVector, numScaler, featureVector, algo])
   
# Use the pipeline to prepare data and fit the model algorithm
model = pipeline.fit(train)
print ("Model trained!")

Model trained!


In [0]:
# Get predictions
prediction = model.transform(test)
predicted = prediction.select("Features", "probability", col("prediction").astype("Int"), col("Species").alias("trueLabel"))
   
# Generate evaluation metrics
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
   
evaluator = MulticlassClassificationEvaluator(labelCol="Species", predictionCol="prediction")
   
# Simple accuracy
accuracy = evaluator.evaluate(prediction, {evaluator.metricName:"accuracy"})
print("Accuracy:", accuracy)
   
# Class metrics
labels = [0,1,2]
print("\nIndividual class metrics:")
for label in sorted(labels):
    print ("Class %s" % (label))
   
    # Precision
    precision = evaluator.evaluate(prediction, {evaluator.metricLabel:label,
                                                    evaluator.metricName:"precisionByLabel"})
    print("\tPrecision:", precision)
   
    # Recall
    recall = evaluator.evaluate(prediction, {evaluator.metricLabel:label,
                                             evaluator.metricName:"recallByLabel"})
    print("\tRecall:", recall)
   
    # F1 score
    f1 = evaluator.evaluate(prediction, {evaluator.metricLabel:label,
                                         evaluator.metricName:"fMeasureByLabel"})
    print("\tF1 Score:", f1)
   
# Weighed (overall) metrics
overallPrecision = evaluator.evaluate(prediction, {evaluator.metricName:"weightedPrecision"})
print("Overall Precision:", overallPrecision)
overallRecall = evaluator.evaluate(prediction, {evaluator.metricName:"weightedRecall"})
print("Overall Recall:", overallRecall)
overallF1 = evaluator.evaluate(prediction, {evaluator.metricName:"weightedFMeasure"})
print("Overall F1 Score:", overallF1)

Accuracy: 0.989247311827957

Individual class metrics:
Class 0
	Precision: 1.0
	Recall: 0.9696969696969697
	F1 Score: 0.9846153846153847
Class 1
	Precision: 1.0
	Recall: 1.0
	F1 Score: 1.0
Class 2
	Precision: 0.9615384615384616
	Recall: 1.0
	F1 Score: 0.9803921568627451
Overall Precision: 0.989660876757651
Overall Recall: 0.989247311827957
Overall F1 Score: 0.9892700173535087


#### Save the model

In [0]:
model.save("/models/penguin.model")

In [0]:
# load the model and use it to predict the species for a new penguin observation

from pyspark.ml.pipeline import PipelineModel

persistedModel = PipelineModel.load("/models/penguin.model")
   
newData = spark.createDataFrame ([{"Island": "Biscoe",
                                  "CulmenLength": 47.6,
                                  "CulmenDepth": 14.5,
                                  "FlipperLength": 215,
                                  "BodyMass": 5400}])
   
   
predictions = persistedModel.transform(newData)
display(predictions.select("Island", "CulmenDepth", "CulmenLength", "FlipperLength", "BodyMass", col("prediction").alias("PredictedSpecies")))

Island,CulmenDepth,CulmenLength,FlipperLength,BodyMass,PredictedSpecies
Biscoe,14.5,47.6,215,5400,1.0


In [0]:
display(predictions.select("Island", "CulmenDepth", "CulmenLength", "FlipperLength", "BodyMass", col("prediction")))

Island,CulmenDepth,CulmenLength,FlipperLength,BodyMass,prediction
Biscoe,14.5,47.6,215,5400,1.0
