In [0]:
 %sh
 rm -r /dbfs/ml_lab
 mkdir /dbfs/ml_lab
 wget -O /dbfs/ml_lab/penguins.csv https://raw.githubusercontent.com/MicrosoftLearning/mslearn-databricks/main/data/penguins.csv

--2024-02-06 09:47:01--  https://raw.githubusercontent.com/MicrosoftLearning/mslearn-databricks/main/data/penguins.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.111.133, 185.199.108.133, 185.199.109.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.111.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 9533 (9.3K) [text/plain]
Saving to: ‘/dbfs/ml_lab/penguins.csv’

     0K .........                                             100% 1.03M=0.009s

2024-02-06 09:47:01 (1.03 MB/s) - ‘/dbfs/ml_lab/penguins.csv’ saved [9533/9533]



In [0]:
df = spark.read.format("csv").option("header", "true").load("/ml_lab/penguins.csv")
display(df)

Island,CulmenLength,CulmenDepth,FlipperLength,BodyMass,Species
Torgersen,39.1,18.7,181.0,3750.0,0
Torgersen,39.5,17.4,186.0,3800.0,0
Torgersen,40.3,18.0,195.0,3250.0,0
Torgersen,,,,,0
Torgersen,36.7,19.3,193.0,3450.0,0
Torgersen,39.3,20.6,190.0,3650.0,0
Torgersen,38.9,17.8,181.0,3625.0,0
Torgersen,39.2,19.6,195.0,4675.0,0
Torgersen,34.1,18.1,193.0,3475.0,0
Torgersen,42.0,20.2,190.0,4250.0,0


In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
   
data = df.dropna().select(col("Island").astype("string"),
                           col("CulmenLength").astype("float"),
                          col("CulmenDepth").astype("float"),
                          col("FlipperLength").astype("float"),
                          col("BodyMass").astype("float"),
                          col("Species").astype("int")
                          )
display(data)

Island,CulmenLength,CulmenDepth,FlipperLength,BodyMass,Species
Torgersen,39.1,18.7,181.0,3750.0,0
Torgersen,39.5,17.4,186.0,3800.0,0
Torgersen,40.3,18.0,195.0,3250.0,0
Torgersen,36.7,19.3,193.0,3450.0,0
Torgersen,39.3,20.6,190.0,3650.0,0
Torgersen,38.9,17.8,181.0,3625.0,0
Torgersen,39.2,19.6,195.0,4675.0,0
Torgersen,34.1,18.1,193.0,3475.0,0
Torgersen,42.0,20.2,190.0,4250.0,0
Torgersen,37.8,17.1,186.0,3300.0,0


In [0]:
splits = data.randomSplit([0.7, 0.3])
train = splits[0]
test = splits[1]
print ("Training Rows:", train.count(), " Testing Rows:", test.count())

Training Rows: 235  Testing Rows: 107


In [0]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="Island", outputCol="IslandIdx")
indexedData = indexer.fit(train).transform(train).drop("Island")
display(indexedData)

CulmenLength,CulmenDepth,FlipperLength,BodyMass,Species,IslandIdx
35.0,17.9,192.0,3725.0,0,0.0
35.3,18.9,187.0,3800.0,0,0.0
35.7,16.9,185.0,3150.0,0,0.0
35.9,19.2,189.0,3800.0,0,0.0
36.4,17.1,184.0,2850.0,0,0.0
36.5,16.6,181.0,2850.0,0,0.0
37.6,17.0,185.0,3600.0,0,0.0
37.6,19.1,194.0,3750.0,0,0.0
37.7,16.0,183.0,3075.0,0,0.0
37.7,18.7,180.0,3600.0,0,0.0


In [0]:
from pyspark.ml.feature import VectorAssembler, MinMaxScaler

# Create a vector column containing all numeric features
numericFeatures = ["CulmenLength", "CulmenDepth", "FlipperLength", "BodyMass"]
numericColVector = VectorAssembler(inputCols=numericFeatures, outputCol="numericFeatures")
vectorizedData = numericColVector.transform(indexedData)
   
# Use a MinMax scaler to normalize the numeric values in the vector
minMax = MinMaxScaler(inputCol = numericColVector.getOutputCol(), outputCol="normalizedFeatures")
scaledData = minMax.fit(vectorizedData).transform(vectorizedData)
   
# Display the data with numeric feature vectors (before and after scaling)
compareNumerics = scaledData.select("numericFeatures", "normalizedFeatures")
display(compareNumerics)

numericFeatures,normalizedFeatures
"Map(vectorType -> dense, length -> 4, values -> List(35.0, 17.899999618530273, 192.0, 3725.0))","Map(vectorType -> dense, length -> 4, values -> List(0.10545460094105114, 0.5662650242480307, 0.3214285714285714, 0.30597014925373134))"
"Map(vectorType -> dense, length -> 4, values -> List(35.29999923706055, 18.899999618530273, 187.0, 3800.0))","Map(vectorType -> dense, length -> 4, values -> List(0.1163636641068892, 0.6867469491901869, 0.23214285714285712, 0.3283582089552239))"
"Map(vectorType -> dense, length -> 4, values -> List(35.70000076293945, 16.899999618530273, 185.0, 3150.0))","Map(vectorType -> dense, length -> 4, values -> List(0.13090917413884942, 0.44578309930587445, 0.19642857142857142, 0.13432835820895522))"
"Map(vectorType -> dense, length -> 4, values -> List(35.900001525878906, 19.200000762939453, 189.0, 3800.0))","Map(vectorType -> dense, length -> 4, values -> List(0.13818192915482955, 0.7228916645534547, 0.26785714285714285, 0.3283582089552239))"
"Map(vectorType -> dense, length -> 4, values -> List(36.400001525878906, 17.100000381469727, 184.0, 2850.0))","Map(vectorType -> dense, length -> 4, values -> List(0.1563637473366477, 0.46987957621471965, 0.17857142857142855, 0.04477611940298507))"
"Map(vectorType -> dense, length -> 4, values -> List(36.5, 16.600000381469727, 181.0, 2850.0))","Map(vectorType -> dense, length -> 4, values -> List(0.16000005548650567, 0.4096386137436415, 0.125, 0.04477611940298507))"
"Map(vectorType -> dense, length -> 4, values -> List(37.599998474121094, 17.0, 185.0, 3600.0))","Map(vectorType -> dense, length -> 4, values -> List(0.19999999999999998, 0.45783133776029705, 0.19642857142857142, 0.26865671641791045))"
"Map(vectorType -> dense, length -> 4, values -> List(37.599998474121094, 19.100000381469727, 194.0, 3750.0))","Map(vectorType -> dense, length -> 4, values -> List(0.19999999999999998, 0.7108434260990321, 0.3571428571428571, 0.31343283582089554))"
"Map(vectorType -> dense, length -> 4, values -> List(37.70000076293945, 16.0, 183.0, 3075.0))","Map(vectorType -> dense, length -> 4, values -> List(0.20363644686612214, 0.33734941281814085, 0.1607142857142857, 0.11194029850746269))"
"Map(vectorType -> dense, length -> 4, values -> List(37.70000076293945, 18.700000762939453, 180.0, 3600.0))","Map(vectorType -> dense, length -> 4, values -> List(0.20363644686612214, 0.6626507020823765, 0.10714285714285714, 0.26865671641791045))"


In [0]:
featVect = VectorAssembler(inputCols=["IslandIdx", "normalizedFeatures"], outputCol="featuresVector")
preppedData = featVect.transform(scaledData)[col("featuresVector").alias("features"), col("Species").alias("label")]
display(preppedData)

features,label
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.10545460094105114, 0.5662650242480307, 0.3214285714285714, 0.30597014925373134))",0
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.1163636641068892, 0.6867469491901869, 0.23214285714285712, 0.3283582089552239))",0
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.13090917413884942, 0.44578309930587445, 0.19642857142857142, 0.13432835820895522))",0
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.13818192915482955, 0.7228916645534547, 0.26785714285714285, 0.3283582089552239))",0
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.1563637473366477, 0.46987957621471965, 0.17857142857142855, 0.04477611940298507))",0
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.16000005548650567, 0.4096386137436415, 0.125, 0.04477611940298507))",0
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.19999999999999998, 0.45783133776029705, 0.19642857142857142, 0.26865671641791045))",0
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.19999999999999998, 0.7108434260990321, 0.3571428571428571, 0.31343283582089554))",0
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.20363644686612214, 0.33734941281814085, 0.1607142857142857, 0.11194029850746269))",0
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.20363644686612214, 0.6626507020823765, 0.10714285714285714, 0.26865671641791045))",0


In [0]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10, regParam=0.3)
model = lr.fit(preppedData)
print ("Model trained!")

Model trained!


In [0]:
# Prepare the test data
indexedTestData = indexer.fit(test).transform(test).drop("Island")
vectorizedTestData = numericColVector.transform(indexedTestData)
scaledTestData = minMax.fit(vectorizedTestData).transform(vectorizedTestData)
preppedTestData = featVect.transform(scaledTestData)[col("featuresVector").alias("features"), col("Species").alias("label")]
   
# Get predictions
prediction = model.transform(preppedTestData)
predicted = prediction.select("features", "probability", col("prediction").astype("Int"), col("label").alias("trueLabel"))
display(predicted)

features,probability,prediction,trueLabel
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.020833333333333332, 0.6172839215462804, 0.2542372881355932, 0.0))","Map(vectorType -> dense, length -> 3, values -> List(0.8826740555209703, 0.04321249858275545, 0.07411344589627426))",0,0
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.041666666666666664, 0.5925924704943777, 0.3050847457627119, 0.16176470588235292))","Map(vectorType -> dense, length -> 3, values -> List(0.85199812391868, 0.07176089153424155, 0.07624098454707842))",0,0
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.0625, 0.3827160784537196, 0.3898305084745763, 0.1323529411764706))","Map(vectorType -> dense, length -> 3, values -> List(0.7718228429615217, 0.14107321474696977, 0.08710394229150856))",0,0
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.16250006357828775, 0.6790123137009084, 0.0, 0.07352941176470587))","Map(vectorType -> dense, length -> 3, values -> List(0.8546349165735935, 0.03494324190472965, 0.11042184152167678))",0,0
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.17083326975504556, 0.4197530195564449, 0.4406779661016949, 0.27205882352941174))","Map(vectorType -> dense, length -> 3, values -> List(0.6587512395846942, 0.22463823869411279, 0.11661052172119316))",0,0
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.1916666030883789, 0.5061728627629757, 0.4576271186440678, 0.24999999999999997))","Map(vectorType -> dense, length -> 3, values -> List(0.6741720995738901, 0.19580533084978508, 0.13002256957632488))",0,0
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.23750003178914386, 0.5679012549176037, 0.3559322033898305, 0.08823529411764705))","Map(vectorType -> dense, length -> 3, values -> List(0.7129504513688056, 0.11996639906454865, 0.16708314956664563))",0,0
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.2708333333333333, 0.5925924704943777, 0.2542372881355932, 0.08823529411764705))","Map(vectorType -> dense, length -> 3, values -> List(0.718680209822298, 0.09899213881452806, 0.1823276513631739))",0,0
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.2875000635782877, 0.07407399994301501, 0.711864406779661, 0.5147058823529411))","Map(vectorType -> dense, length -> 3, values -> List(0.19880140775762994, 0.729565709333972, 0.07163288290839795))",1,1
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.29166666666666663, 0.8518517646388412, 0.5254237288135594, 0.5367647058823529))","Map(vectorType -> dense, length -> 3, values -> List(0.6398027253772163, 0.2012192425538446, 0.15897803206893912))",0,0


In [0]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
   
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")
   
# Simple accuracy
accuracy = evaluator.evaluate(prediction, {evaluator.metricName:"accuracy"})
print("Accuracy:", accuracy)
   
# Individual class metrics
labels = [0,1,2]
print("\nIndividual class metrics:")
for label in sorted(labels):
    print ("Class %s" % (label))
   
    # Precision
    precision = evaluator.evaluate(prediction, {evaluator.metricLabel:label,
                                                evaluator.metricName:"precisionByLabel"})
    print("\tPrecision:", precision)
   
    # Recall
    recall = evaluator.evaluate(prediction, {evaluator.metricLabel:label,
                                             evaluator.metricName:"recallByLabel"})
    print("\tRecall:", recall)
   
    # F1 score
    f1 = evaluator.evaluate(prediction, {evaluator.metricLabel:label,
                                         evaluator.metricName:"fMeasureByLabel"})
    print("\tF1 Score:", f1)
   
# Weighted (overall) metrics
overallPrecision = evaluator.evaluate(prediction, {evaluator.metricName:"weightedPrecision"})
print("Overall Precision:", overallPrecision)
overallRecall = evaluator.evaluate(prediction, {evaluator.metricName:"weightedRecall"})
print("Overall Recall:", overallRecall)
overallF1 = evaluator.evaluate(prediction, {evaluator.metricName:"weightedFMeasure"})
print("Overall F1 Score:", overallF1)

Accuracy: 0.9252336448598131

Individual class metrics:
Class 0
	Precision: 0.8596491228070176
	Recall: 1.0
	F1 Score: 0.9245283018867925
Class 1
	Precision: 1.0
	Recall: 1.0
	F1 Score: 1.0
Class 2
	Precision: 1.0
	Recall: 0.6521739130434783
	F1 Score: 0.7894736842105263
Overall Precision: 0.9357271683882604
Overall Recall: 0.9252336448598131
Overall F1 Score: 0.9201848741055602


In [0]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, MinMaxScaler
from pyspark.ml.classification import LogisticRegression
   
catFeature = "Island"
numFeatures = ["CulmenLength", "CulmenDepth", "FlipperLength", "BodyMass"]
   
# Define the feature engineering and model training algorithm steps
catIndexer = StringIndexer(inputCol=catFeature, outputCol=catFeature + "Idx")
numVector = VectorAssembler(inputCols=numFeatures, outputCol="numericFeatures")
numScaler = MinMaxScaler(inputCol = numVector.getOutputCol(), outputCol="normalizedFeatures")
featureVector = VectorAssembler(inputCols=["IslandIdx", "normalizedFeatures"], outputCol="Features")
algo = LogisticRegression(labelCol="Species", featuresCol="Features", maxIter=10, regParam=0.3)
   
# Chain the steps as stages in a pipeline
pipeline = Pipeline(stages=[catIndexer, numVector, numScaler, featureVector, algo])
   
# Use the pipeline to prepare data and fit the model algorithm
model = pipeline.fit(train)
print ("Model trained!")

Model trained!


In [0]:
prediction = model.transform(test)
predicted = prediction.select("Features", "probability", col("prediction").astype("Int"), col("Species").alias("trueLabel"))
display(predicted)

Features,probability,prediction,trueLabel
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.08727278275923295, 0.5903615011568758, 0.23214285714285712, 0.05970149253731343))","Map(vectorType -> dense, length -> 3, values -> List(0.8482318980558763, 0.058384080481285884, 0.09338402146283788))",0,0
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.10545460094105114, 0.5662650242480307, 0.2857142857142857, 0.22388059701492538))","Map(vectorType -> dense, length -> 3, values -> List(0.810065668765811, 0.09598576512828351, 0.09394856610590535))",0,0
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.12363641912286931, 0.36144588972698605, 0.375, 0.19402985074626866))","Map(vectorType -> dense, length -> 3, values -> List(0.7149079500737566, 0.181641506552482, 0.1034505433737614))",0,0
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.21090920188210227, 0.6506024636279539, -0.03571428571428571, 0.13432835820895522))","Map(vectorType -> dense, length -> 3, values -> List(0.8281261441846604, 0.043717656618644074, 0.1281561991966957))",0,0
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.21818181818181817, 0.397590375289219, 0.42857142857142855, 0.3358208955223881))","Map(vectorType -> dense, length -> 3, values -> List(0.5979323176715517, 0.2744561936751808, 0.12761148865326735))",0,0
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.23636363636363636, 0.48192781466914225, 0.4464285714285714, 0.31343283582089554))","Map(vectorType -> dense, length -> 3, values -> List(0.6171231414265103, 0.24089765177929834, 0.14197920679419138))",0,0
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.2763637195933949, 0.5421687771402204, 0.33928571428571425, 0.1492537313432836))","Map(vectorType -> dense, length -> 3, values -> List(0.670820955747459, 0.14688726532057056, 0.18229177893197043))",0,0
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.30545460094105115, 0.5662650242480307, 0.23214285714285712, 0.1492537313432836))","Map(vectorType -> dense, length -> 3, values -> List(0.6838027471520263, 0.11951565786380229, 0.19668159498417143))",0,0
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.32000011097301134, 0.0602409624710781, 0.7142857142857142, 0.582089552238806))","Map(vectorType -> dense, length -> 3, values -> List(0.16114358141173618, 0.7731122961191076, 0.06574412246915624))",1,1
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.3236364191228693, 0.8192771125867657, 0.5178571428571428, 0.6044776119402985))","Map(vectorType -> dense, length -> 3, values -> List(0.5875039263481536, 0.2476106445890177, 0.16488542906282866))",0,0


In [0]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, MinMaxScaler
from pyspark.ml.classification import DecisionTreeClassifier
   
catFeature = "Island"
numFeatures = ["CulmenLength", "CulmenDepth", "FlipperLength", "BodyMass"]
   
# Define the feature engineering and model steps
catIndexer = StringIndexer(inputCol=catFeature, outputCol=catFeature + "Idx")
numVector = VectorAssembler(inputCols=numFeatures, outputCol="numericFeatures")
numScaler = MinMaxScaler(inputCol = numVector.getOutputCol(), outputCol="normalizedFeatures")
featureVector = VectorAssembler(inputCols=["IslandIdx", "normalizedFeatures"], outputCol="Features")
algo = DecisionTreeClassifier(labelCol="Species", featuresCol="Features", maxDepth=10)
   
# Chain the steps as stages in a pipeline
pipeline = Pipeline(stages=[catIndexer, numVector, numScaler, featureVector, algo])
   
# Use the pipeline to prepare data and fit the model algorithm
model = pipeline.fit(train)
print ("Model trained!")

Model trained!


In [0]:
# Get predictions
prediction = model.transform(test)
predicted = prediction.select("Features", "probability", col("prediction").astype("Int"), col("Species").alias("trueLabel"))
   
# Generate evaluation metrics
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
   
evaluator = MulticlassClassificationEvaluator(labelCol="Species", predictionCol="prediction")
   
# Simple accuracy
accuracy = evaluator.evaluate(prediction, {evaluator.metricName:"accuracy"})
print("Accuracy:", accuracy)
   
# Class metrics
labels = [0,1,2]
print("\nIndividual class metrics:")
for label in sorted(labels):
    print ("Class %s" % (label))
   
    # Precision
    precision = evaluator.evaluate(prediction, {evaluator.metricLabel:label,
                                                    evaluator.metricName:"precisionByLabel"})
    print("\tPrecision:", precision)
   
    # Recall
    recall = evaluator.evaluate(prediction, {evaluator.metricLabel:label,
                                             evaluator.metricName:"recallByLabel"})
    print("\tRecall:", recall)
   
    # F1 score
    f1 = evaluator.evaluate(prediction, {evaluator.metricLabel:label,
                                         evaluator.metricName:"fMeasureByLabel"})
    print("\tF1 Score:", f1)
   
# Weighed (overall) metrics
overallPrecision = evaluator.evaluate(prediction, {evaluator.metricName:"weightedPrecision"})
print("Overall Precision:", overallPrecision)
overallRecall = evaluator.evaluate(prediction, {evaluator.metricName:"weightedRecall"})
print("Overall Recall:", overallRecall)
overallF1 = evaluator.evaluate(prediction, {evaluator.metricName:"weightedFMeasure"})
print("Overall F1 Score:", overallF1)

Accuracy: 0.9626168224299065

Individual class metrics:
Class 0
	Precision: 0.9787234042553191
	Recall: 0.9387755102040817
	F1 Score: 0.9583333333333333
Class 1
	Precision: 1.0
	Recall: 1.0
	F1 Score: 1.0
Class 2
	Precision: 0.88
	Recall: 0.9565217391304348
	F1 Score: 0.9166666666666666
Overall Precision: 0.9644621197057068
Overall Recall: 0.9626168224299065
Overall F1 Score: 0.963006230529595


In [0]:
model.save("/models/penguin.model")

In [0]:
from pyspark.ml.pipeline import PipelineModel

persistedModel = PipelineModel.load("/models/penguin.model")
   
newData = spark.createDataFrame ([{"Island": "Biscoe",
                                  "CulmenLength": 47.6,
                                  "CulmenDepth": 14.5,
                                  "FlipperLength": 215,
                                  "BodyMass": 5400}])
   
   
predictions = persistedModel.transform(newData)
display(predictions.select("Island", "CulmenDepth", "CulmenLength", "FlipperLength", "BodyMass", col("prediction").alias("PredictedSpecies")))

Island,CulmenDepth,CulmenLength,FlipperLength,BodyMass,PredictedSpecies
Biscoe,14.5,47.6,215,5400,1.0
