In [1]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.ml.regression import LinearRegression
spark = SparkSession.builder.appName('logistic_reg').getOrCreate()


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/12/02 01:42:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### DataFrame

In [12]:
df = spark.read.csv('./irisdata.csv', header=True, inferSchema=True)

In [13]:
df.printSchema()

root
 |-- sepal_length: double (nullable = true)
 |-- sepal_width: double (nullable = true)
 |-- petal_length: double (nullable = true)
 |-- petal_widthCm: double (nullable = true)
 |-- species: string (nullable = true)



In [14]:
df.show(5)

+------------+-----------+------------+-------------+-----------+
|sepal_length|sepal_width|petal_length|petal_widthCm|    species|
+------------+-----------+------------+-------------+-----------+
|         5.1|        3.5|         1.4|          0.2|Iris-setosa|
|         4.9|        3.0|         1.4|          0.2|Iris-setosa|
|         4.7|        3.2|         1.3|          0.2|Iris-setosa|
|         4.6|        3.1|         1.5|          0.2|Iris-setosa|
|         5.0|        3.6|         1.4|          0.2|Iris-setosa|
+------------+-----------+------------+-------------+-----------+
only showing top 5 rows



### Feature Transformers

#### StringIndexer

In [15]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol='species', outputCol='speciesIndex')
iris = indexer.fit(df).transform(df)
iris.sample(fraction=0.1).show()

+------------+-----------+------------+-------------+---------------+------------+
|sepal_length|sepal_width|petal_length|petal_widthCm|        species|speciesIndex|
+------------+-----------+------------+-------------+---------------+------------+
|         4.6|        3.4|         1.4|          0.3|    Iris-setosa|         0.0|
|         4.4|        2.9|         1.4|          0.2|    Iris-setosa|         0.0|
|         5.2|        3.4|         1.4|          0.2|    Iris-setosa|         0.0|
|         5.0|        3.2|         1.2|          0.2|    Iris-setosa|         0.0|
|         5.1|        3.8|         1.6|          0.2|    Iris-setosa|         0.0|
|         6.7|        3.1|         4.4|          1.4|Iris-versicolor|         1.0|
|         5.8|        2.7|         4.1|          1.0|Iris-versicolor|         1.0|
|         5.4|        3.0|         4.5|          1.5|Iris-versicolor|         1.0|
|         5.5|        2.6|         4.4|          1.2|Iris-versicolor|         1.0|
|   

#### OneHotEncoder

In [16]:
from pyspark.ml.feature import OneHotEncoder

encoded = OneHotEncoder(inputCol="speciesIndex", outputCol="species_vec")
new_df = encoded.fit(iris).transform(iris)
new_df.show()

+------------+-----------+------------+-------------+-----------+------------+-------------+
|sepal_length|sepal_width|petal_length|petal_widthCm|    species|speciesIndex|  species_vec|
+------------+-----------+------------+-------------+-----------+------------+-------------+
|         5.1|        3.5|         1.4|          0.2|Iris-setosa|         0.0|(2,[0],[1.0])|
|         4.9|        3.0|         1.4|          0.2|Iris-setosa|         0.0|(2,[0],[1.0])|
|         4.7|        3.2|         1.3|          0.2|Iris-setosa|         0.0|(2,[0],[1.0])|
|         4.6|        3.1|         1.5|          0.2|Iris-setosa|         0.0|(2,[0],[1.0])|
|         5.0|        3.6|         1.4|          0.2|Iris-setosa|         0.0|(2,[0],[1.0])|
|         5.4|        3.9|         1.7|          0.4|Iris-setosa|         0.0|(2,[0],[1.0])|
|         4.6|        3.4|         1.4|          0.3|Iris-setosa|         0.0|(2,[0],[1.0])|
|         5.0|        3.4|         1.5|          0.2|Iris-setosa|     

### Vector Assembler

In [17]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=["sepal_length", "sepal_width", "petal_length", "petal_widthCm"], 
    outputCol="features")

dataset = assembler.transform(new_df)
dataset.show()

+------------+-----------+------------+-------------+-----------+------------+-------------+-----------------+
|sepal_length|sepal_width|petal_length|petal_widthCm|    species|speciesIndex|  species_vec|         features|
+------------+-----------+------------+-------------+-----------+------------+-------------+-----------------+
|         5.1|        3.5|         1.4|          0.2|Iris-setosa|         0.0|(2,[0],[1.0])|[5.1,3.5,1.4,0.2]|
|         4.9|        3.0|         1.4|          0.2|Iris-setosa|         0.0|(2,[0],[1.0])|[4.9,3.0,1.4,0.2]|
|         4.7|        3.2|         1.3|          0.2|Iris-setosa|         0.0|(2,[0],[1.0])|[4.7,3.2,1.3,0.2]|
|         4.6|        3.1|         1.5|          0.2|Iris-setosa|         0.0|(2,[0],[1.0])|[4.6,3.1,1.5,0.2]|
|         5.0|        3.6|         1.4|          0.2|Iris-setosa|         0.0|(2,[0],[1.0])|[5.0,3.6,1.4,0.2]|
|         5.4|        3.9|         1.7|          0.4|Iris-setosa|         0.0|(2,[0],[1.0])|[5.4,3.9,1.7,0.4]|
|

In [18]:
dataset.printSchema()

root
 |-- sepal_length: double (nullable = true)
 |-- sepal_width: double (nullable = true)
 |-- petal_length: double (nullable = true)
 |-- petal_widthCm: double (nullable = true)
 |-- species: string (nullable = true)
 |-- speciesIndex: double (nullable = false)
 |-- species_vec: vector (nullable = true)
 |-- features: vector (nullable = true)



### Estimator

In [20]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(featuresCol='features', labelCol='speciesIndex')
trainData, testData = dataset.randomSplit([0.7, 0.3])


In [21]:
model = lr.fit(trainData)
summary = model.evaluate(testData)

22/12/02 01:47:52 WARN InstanceBuilder$JavaBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
22/12/02 01:47:53 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
22/12/02 01:47:53 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS


[Stage 19:>                                                         (0 + 1) / 1]                                                                                

### Evaluator

In [22]:
summary.accuracy

0.9361702127659575

### Pipeline

In [29]:
#Using Pipeline #import module
from pyspark.ml import Pipeline

df = spark.read.csv('./irisdata.csv', header=True, inferSchema=True)

#feature transformers
indexer = StringIndexer(inputCol='species', outputCol='speciesIndex')

#Apply OneHotEncoder to Species column
encoded = OneHotEncoder(inputCol="speciesIndex", outputCol="species_vec")

#Merge multiple columns into a vector column
assembler = VectorAssembler(
    inputCols=["sepal_length", "sepal_width", "petal_length", "petal_widthCm"], 
    outputCol="features")

#model
lr = LogisticRegression(featuresCol='features', labelCol='speciesIndex')

#Create pipeline and pass it to stages
pipeline = Pipeline(stages=[ indexer, encoded,
           assembler, lr
])

trainData, testData = df.randomSplit([0.7, 0.3])

#Use .fit() and .transform() on the pipeline
df_transformed = pipeline.fit(trainData).transform(trainData)

df_transformed.select("species", "features", "probability", "prediction").show()

+---------------+-----------------+--------------------+----------+
|        species|         features|         probability|prediction|
+---------------+-----------------+--------------------+----------+
|    Iris-setosa|[4.3,3.0,1.1,0.1]|       [1.0,0.0,0.0]|       0.0|
|    Iris-setosa|[4.4,2.9,1.4,0.2]|       [1.0,0.0,0.0]|       0.0|
|    Iris-setosa|[4.4,3.0,1.3,0.2]|       [1.0,0.0,0.0]|       0.0|
|    Iris-setosa|[4.4,3.2,1.3,0.2]|       [1.0,0.0,0.0]|       0.0|
|    Iris-setosa|[4.5,2.3,1.3,0.3]|[1.0,5.1584556500...|       0.0|
|    Iris-setosa|[4.6,3.1,1.5,0.2]|       [1.0,0.0,0.0]|       0.0|
|    Iris-setosa|[4.6,3.4,1.4,0.3]|       [1.0,0.0,0.0]|       0.0|
|    Iris-setosa|[4.6,3.6,1.0,0.2]|       [1.0,0.0,0.0]|       0.0|
|    Iris-setosa|[4.7,3.2,1.3,0.2]|       [1.0,0.0,0.0]|       0.0|
|    Iris-setosa|[4.7,3.2,1.6,0.2]|       [1.0,0.0,0.0]|       0.0|
|    Iris-setosa|[4.8,3.0,1.4,0.1]|       [1.0,0.0,0.0]|       0.0|
|    Iris-setosa|[4.8,3.0,1.4,0.3]|       [1.0,0