In [14]:
#spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName ("Logistic Regression Iris").getOrCreate ()

In [15]:
#loading the dataset
data = spark.read.csv ("iris.csv", inferSchema=True, header=True)
data.printSchema ()

root
 |-- SEPAL_LENGTH: double (nullable = true)
 |-- SEPAL_WIDTH: double (nullable = true)
 |-- PETAL_LENGTH: double (nullable = true)
 |-- PETAL_WIDTH: double (nullable = true)
 |-- CLASS: string (nullable = true)



In [16]:
#creating indexer
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer (inputCol="CLASS", outputCol="classIndex")

indexed_df = indexer.fit (data).transform (data)
indexed_df.printSchema ()

root
 |-- SEPAL_LENGTH: double (nullable = true)
 |-- SEPAL_WIDTH: double (nullable = true)
 |-- PETAL_LENGTH: double (nullable = true)
 |-- PETAL_WIDTH: double (nullable = true)
 |-- CLASS: string (nullable = true)
 |-- classIndex: double (nullable = false)



In [17]:
#assemble dataset for machine learning
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler (inputCols=["SEPAL_LENGTH", "SEPAL_WIDTH", "PETAL_LENGTH", "PETAL_WIDTH"], outputCol="features")

df = assembler.transform (indexed_df).select ("features", "classIndex")
df.printSchema ()

root
 |-- features: vector (nullable = true)
 |-- classIndex: double (nullable = false)



In [18]:
#creating logistic regressor
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression (featuresCol="features", labelCol="classIndex", maxIter=10)

In [19]:
#train-test-splitting
(train, test) = df.randomSplit ([0.7, 0.3])
df.printSchema ()

root
 |-- features: vector (nullable = true)
 |-- classIndex: double (nullable = false)



In [20]:
#fitting the model to data and predicting values
model = lr.fit (train)
unlabeled_train = train.select ("features")
unlabeled_test = test.select ("features")
train_prediction = model.transform (unlabeled_train)
test_prediction = model.transform (unlabeled_test)
test_prediction.printSchema ()

root
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [21]:
#evaluating the model
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator (labelCol="prediction")
AUC_train = evaluator.evaluate (train_prediction)
AUC = evaluator.evaluate (test_prediction)
print (AUC_train)
print (AUC)

1.0
1.0


In [22]:
test_prediction.select ("features", "probability", "prediction").toPandas ()

Unnamed: 0,features,probability,prediction
0,"[4.3, 3.0, 1.1, 0.1]","[0.9999957320484468, 4.2679477033963405e-06, 3...",0.0
1,"[4.4, 2.9, 1.4, 0.2]","[0.9998943811435351, 0.0001056185031090084, 3....",0.0
2,"[4.6, 3.2, 1.4, 0.2]","[0.999993620652776, 6.37934099104795e-06, 6.23...",0.0
3,"[4.8, 3.4, 1.6, 0.2]","[0.9999975285292191, 2.471469430289015e-06, 1....",0.0
4,"[4.9, 3.0, 1.4, 0.2]","[0.9996318057310466, 0.00036819378396433025, 4...",0.0
5,"[5.0, 2.3, 3.3, 1.0]","[9.824050606399043e-06, 0.9657917736273739, 0....",1.0
6,"[5.0, 3.3, 1.4, 0.2]","[0.9999865639141851, 1.3436079984882497e-05, 5...",0.0
7,"[5.0, 3.4, 1.5, 0.2]","[0.9999949485673423, 5.0514308085441005e-06, 1...",0.0
8,"[5.0, 3.5, 1.3, 0.3]","[0.9999986315803272, 1.3684191860659795e-06, 4...",0.0
9,"[5.0, 3.6, 1.4, 0.2]","[0.9999997039468346, 2.9605311757215375e-07, 4...",0.0


In [23]:
#saving model to disk
model.save ("models/first_model")

In [24]:
#deleting model from memory
del model
model

NameError: name 'model' is not defined

In [0]:
#loading model from disk
from pyspark.ml.classification import LogisticRegressionModel
model = LogisticRegressionModel.load ("models/first_model")
model

In [0]:
#veryfing model is ok
prediction = model.transform (test)
prediction.select ("classIndex", "prediction").toPandas ()