In [7]:
from pyspark.sql import SparkSession
import pandas as pd
import matplotlib.pyplot as plt

spark = SparkSession.builder.appName('ml-bank').getOrCreate()
df = spark.read.csv('ibm-hr-analytics-attrition.csv', header = True, inferSchema = True)

df = df.selectExpr("PerformanceRating as label","Age","Attrition","JobSatisfaction","OverTime","Gender")
df.count()

1470

In [8]:
df = df.na.drop()
df

DataFrame[label: int, Age: int, Attrition: string, JobSatisfaction: int, OverTime: string, Gender: string]

In [9]:
from pyspark.ml.feature import  StringIndexer, VectorAssembler, VectorIndexer, OneHotEncoder, MinMaxScaler
from pyspark.ml.linalg import Vectors

#AttritionIndexer  = StringIndexer(inputCol="Attrition", outputCol="AttritionIndex")
#OverTimeIndexer  = StringIndexer(inputCol="OverTime", outputCol="OverTimeIndex")
#GenderIndexer  = StringIndexer(inputCol="Gender", outputCol="GenderIndex")

#AttritionEncoder = OneHotEncoder(inputCol="AttritionIndex", outputCol="AttritionVec")
#OverTimeEncoder = OneHotEncoder(inputCol="OverTimeIndex", outputCol="OverTimeVec")
#GenderEncoder = OneHotEncoder(inputCol="GenderIndex", outputCol="GenderVec")


categorical_variables = [ "Attrition",  "OverTime", "Gender"]

indexers = [StringIndexer(inputCol=column, outputCol=column+"-index") for column in categorical_variables]
encoder = OneHotEncoder(
    inputCols=[indexer.getOutputCol() for indexer in indexers],
    outputCols=["{0}-encoded".format(indexer.getOutputCol()) for indexer in indexers]
)
assembler = VectorAssembler(
    inputCols=encoder.getOutputCols(), 
    outputCol="features"
)
#assembler = VectorAssembler(inputCols=["Age", "Attrition", "JobSatisfaction", "OverTime", "Gender"], outputCol="features")


In [19]:
train,test = df.randomSplit([0.7,0.3],seed=12345)
test.printSchema()

root
 |-- label: integer (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Attrition: string (nullable = true)
 |-- JobSatisfaction: integer (nullable = true)
 |-- OverTime: string (nullable = true)
 |-- Gender: string (nullable = true)



In [20]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.linalg import Vectors

pipeline = Pipeline(stages=indexers + [encoder, assembler])

In [21]:
result = pipeline.fit(train).transform(train) 
test = pipeline.fit(test).transform(test)
test.printSchema()

root
 |-- label: integer (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Attrition: string (nullable = true)
 |-- JobSatisfaction: integer (nullable = true)
 |-- OverTime: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Attrition-index: double (nullable = false)
 |-- OverTime-index: double (nullable = false)
 |-- Gender-index: double (nullable = false)
 |-- Attrition-index-encoded: vector (nullable = true)
 |-- OverTime-index-encoded: vector (nullable = true)
 |-- Gender-index-encoded: vector (nullable = true)
 |-- features: vector (nullable = true)



In [22]:
scaled_data = MinMaxScaler(inputCol="features", outputCol="scaled-features").fit(result).transform(result)
scaled_data.printSchema()

scaled_test_data = MinMaxScaler(inputCol="features", outputCol="scaled-features").fit(test).transform(test)
scaled_test_data.printSchema()

root
 |-- label: integer (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Attrition: string (nullable = true)
 |-- JobSatisfaction: integer (nullable = true)
 |-- OverTime: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Attrition-index: double (nullable = false)
 |-- OverTime-index: double (nullable = false)
 |-- Gender-index: double (nullable = false)
 |-- Attrition-index-encoded: vector (nullable = true)
 |-- OverTime-index-encoded: vector (nullable = true)
 |-- Gender-index-encoded: vector (nullable = true)
 |-- features: vector (nullable = true)
 |-- scaled-features: vector (nullable = true)

root
 |-- label: integer (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Attrition: string (nullable = true)
 |-- JobSatisfaction: integer (nullable = true)
 |-- OverTime: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Attrition-index: double (nullable = false)
 |-- OverTime-index: double (nullable = false)
 |-- Gender-index: doubl

In [23]:
lr = LogisticRegression(featuresCol = 'scaled-features', labelCol = 'label', maxIter=10)
lrModel = lr.fit(scaled_data)   

In [28]:
predictions = lrModel.transform(scaled_test_data)
predictions.select('Age',  'label',  'prediction', 'probability').show(10)
predictions.printSchema()

+---+-----+----------+--------------------+
|Age|label|prediction|         probability|
+---+-----+----------+--------------------+
| 18|    3|       3.0|[5.35973664113289...|
| 20|    3|       3.0|[4.10672249726723...|
| 20|    3|       3.0|[4.10672249726723...|
| 21|    3|       3.0|[1.68607575559453...|
| 21|    3|       3.0|[4.72002827902979...|
| 21|    3|       3.0|[4.10672249726723...|
| 22|    3|       3.0|[1.15029127643826...|
| 23|    3|       3.0|[4.10672249726723...|
| 23|    3|       3.0|[1.15029127643826...|
| 24|    3|       3.0|[4.72002827902979...|
+---+-----+----------+--------------------+
only showing top 10 rows

root
 |-- label: integer (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Attrition: string (nullable = true)
 |-- JobSatisfaction: integer (nullable = true)
 |-- OverTime: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Attrition-index: double (nullable = false)
 |-- OverTime-index: double (nullable = false)
 |-- Gender-index