In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import SQLContext

In [3]:
sc = SparkContext()

In [4]:
spark = SparkSession(sc)

### Read the dataset

In [5]:
data = spark.read.csv('Du_lieu_cung_cap/turnover.csv', header=True, inferSchema=True)

In [6]:
data.show(10)

+-----------+-----+------+----+---------------+----------+-----------+-----+-----------+--------+---+------------+--------+-----------+-------+-------+
|       stag|event|gender| age|       industry|profession|    traffic|coach|head_gender|greywage|way|extraversion|independ|selfcontrol|anxiety|novator|
+-----------+-----+------+----+---------------+----------+-----------+-----+-----------+--------+---+------------+--------+-----------+-------+-------+
|7.030800821|    1|     m|35.0|          Banks|        HR|rabrecNErab|   no|          f|   white|bus|         6.2|     4.1|        5.7|    7.1|    8.3|
| 22.9650924|    1|     m|33.0|          Banks|        HR|      empjs|   no|          m|   white|bus|         6.2|     4.1|        5.7|    7.1|    8.3|
|15.93429158|    1|     f|35.0|PowerGeneration|        HR|rabrecNErab|   no|          m|   white|bus|         6.2|     6.2|        2.6|    4.8|    8.3|
|15.93429158|    1|     f|35.0|PowerGeneration|        HR|rabrecNErab|   no|          m|

In [7]:
data.count()

1129

In [8]:
data.printSchema()

root
 |-- stag: double (nullable = true)
 |-- event: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: double (nullable = true)
 |-- industry: string (nullable = true)
 |-- profession: string (nullable = true)
 |-- traffic: string (nullable = true)
 |-- coach: string (nullable = true)
 |-- head_gender: string (nullable = true)
 |-- greywage: string (nullable = true)
 |-- way: string (nullable = true)
 |-- extraversion: double (nullable = true)
 |-- independ: double (nullable = true)
 |-- selfcontrol: double (nullable = true)
 |-- anxiety: double (nullable = true)
 |-- novator: double (nullable = true)



### Categorical variable

In [9]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline

In [10]:
# indexers = [StringIndexer(inputCol=column, outputCol=column+'_idx').fit(data) for column in list(set(data.columns)-set(['stag', 'event',
#                                                                                                                        'age', 'extraversion', 
#                                                                                                                         'independ', 'independ', 
#                                                                                                                         'selfcontrol', 'anxiety',
#                                                                                                                         'novator'
#                                                                                                                        ]))]

In [11]:
categorical_columns= ['gender', 'industry', 'profession', 'traffic','coach','head_gender','greywage','way']

# The index of string values multiple columns
indexers = [
    StringIndexer(inputCol=c, outputCol='{0}_indexed'.format(c))
    for c in categorical_columns
]

# The encode of indexed values multiple columns
encoders = [OneHotEncoder(dropLast=True,inputCol=indexer.getOutputCol(),
            outputCol='{0}_encoded'.format(indexer.getOutputCol())) 
    for indexer in indexers
]

In [12]:
pipeline = Pipeline(stages=indexers + encoders)
data_indexed=pipeline.fit(data).transform(data)

In [13]:
# pipeline = Pipeline(stages=indexers)
# data_indexed = pipeline.fit(data).transform(data)

### Assembling columns

In [14]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [15]:
data_indexed.columns

['stag',
 'event',
 'gender',
 'age',
 'industry',
 'profession',
 'traffic',
 'coach',
 'head_gender',
 'greywage',
 'way',
 'extraversion',
 'independ',
 'selfcontrol',
 'anxiety',
 'novator',
 'gender_indexed',
 'industry_indexed',
 'profession_indexed',
 'traffic_indexed',
 'coach_indexed',
 'head_gender_indexed',
 'greywage_indexed',
 'way_indexed',
 'gender_indexed_encoded',
 'industry_indexed_encoded',
 'profession_indexed_encoded',
 'traffic_indexed_encoded',
 'coach_indexed_encoded',
 'head_gender_indexed_encoded',
 'greywage_indexed_encoded',
 'way_indexed_encoded']

In [16]:
data_indexed.show(5)

+-----------+-----+------+----+---------------+----------+-----------+-----+-----------+--------+---+------------+--------+-----------+-------+-------+--------------+----------------+------------------+---------------+-------------+-------------------+----------------+-----------+----------------------+------------------------+--------------------------+-----------------------+---------------------+---------------------------+------------------------+-------------------+
|       stag|event|gender| age|       industry|profession|    traffic|coach|head_gender|greywage|way|extraversion|independ|selfcontrol|anxiety|novator|gender_indexed|industry_indexed|profession_indexed|traffic_indexed|coach_indexed|head_gender_indexed|greywage_indexed|way_indexed|gender_indexed_encoded|industry_indexed_encoded|profession_indexed_encoded|traffic_indexed_encoded|coach_indexed_encoded|head_gender_indexed_encoded|greywage_indexed_encoded|way_indexed_encoded|
+-----------+-----+------+----+---------------+-

In [17]:
assembler = VectorAssembler(inputCols = ['stag',
 'age',
 'extraversion',
 'independ',
 'selfcontrol',
 'anxiety',
 'novator',
 'gender_indexed_encoded',
 'industry_indexed_encoded',
 'profession_indexed_encoded',
 'traffic_indexed_encoded',
 'coach_indexed_encoded',
 'head_gender_indexed_encoded',
 'greywage_indexed_encoded',
 'way_indexed_encoded'],
                            outputCol = 'features')

In [18]:
output = assembler.transform(data_indexed)

In [19]:
final_data = output.select('features', 'event')
final_data.show(5, False)

+------------------------------------------------------------------------------------------------------------------+-----+
|features                                                                                                          |event|
+------------------------------------------------------------------------------------------------------------------+-----+
|(50,[0,1,2,3,4,5,6,11,23,39,44,47,48],[7.030800821,35.0,6.2,4.1,5.7,7.1,8.3,1.0,1.0,1.0,1.0,1.0,1.0])             |1    |
|(50,[0,1,2,3,4,5,6,11,23,38,44,46,47,48],[22.9650924,33.0,6.2,4.1,5.7,7.1,8.3,1.0,1.0,1.0,1.0,1.0,1.0,1.0])       |1    |
|(50,[0,1,2,3,4,5,6,7,16,23,39,44,46,47,48],[15.93429158,35.0,6.2,6.2,2.6,4.8,8.3,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])|1    |
|(50,[0,1,2,3,4,5,6,7,16,23,39,44,46,47,48],[15.93429158,35.0,5.4,7.6,4.9,2.5,6.7,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])|1    |
|(50,[0,1,2,3,4,5,6,8,30,37,47,48],[8.410677618,32.0,3.0,4.1,8.0,7.1,3.7,1.0,1.0,1.0,1.0,1.0])                     |1    |
+---------------

In [20]:
final_data.count()

1129

In [21]:
final_data = final_data.na.drop()
final_data.count()

1129

- Không có dữ liệu na

### Chia dữ liệu train-test

In [22]:
train_data, test_data = final_data.randomSplit([0.8, 0.2])

In [23]:
# Rename column name 'event' to 'label' to use ParamGrid later
train_data = train_data.withColumnRenamed('event', 'label')
test_data = test_data.withColumnRenamed('event', 'label')

In [24]:
train_data.describe().show()

+-------+------------------+
|summary|             label|
+-------+------------------+
|  count|               898|
|   mean|0.5267260579064588|
| stddev|0.4995634377414268|
|    min|                 0|
|    max|                 1|
+-------+------------------+



In [25]:
test_data.describe().show()

+-------+-------------------+
|summary|              label|
+-------+-------------------+
|  count|                231|
|   mean|0.42424242424242425|
| stddev|0.49530070982069074|
|    min|                  0|
|    max|                  1|
+-------+-------------------+



### Xây dựng model với LogisticRegression

In [26]:
from pyspark.ml.classification import LogisticRegression

In [27]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [28]:
# Create a Logistic Regression Model Object
lr = LogisticRegression(featuresCol = 'features',
                              labelCol = 'label',
                              predictionCol = 'prediction'
                             )

In [29]:
# Create a parameter grid for tuning the model
lrparamGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.001, 0.01, 0.1, 0.5, 1.0, 2.0])
             #  .addGrid(lr.regParam, [0.01, 0.1, 0.5])
             .addGrid(lr.elasticNetParam, [0.0, 0.25, 0.5, 0.75, 1.0])
             #  .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
             .addGrid(lr.maxIter, [1, 5, 10, 20, 50])
             #  .addGrid(lr.maxIter, [1, 5, 10])
             .build())

In [30]:
lrevaluator = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction', metricName = 'areaUnderROC')

In [31]:
# Create 5-fold CrossValidator
lrcv = CrossValidator(estimator = lr,
                    estimatorParamMaps = lrparamGrid,
                    evaluator = lrevaluator,
                    numFolds = 5)

In [32]:
# Fit the model to the data and call this model lrcvModel
lrcvModel = lrcv.fit(train_data)
print(lrcvModel)

CrossValidatorModel_024631bd2e06


In [33]:
# Check test dataset
lrpredictions = lrcvModel.transform(test_data)

In [34]:
# Inspect results
lrpredictions.groupBy('prediction', 'label').count().show(5)

+----------+-----+-----+
|prediction|label|count|
+----------+-----+-----+
|       1.0|    0|   60|
|       0.0|    0|   73|
|       0.0|    1|   33|
|       1.0|    1|   65|
+----------+-----+-----+



### Xây dựng model với DecisionTree

In [35]:
from pyspark.ml.classification import DecisionTreeClassifier

In [36]:
# Create a classifier object and fit to the training data
dt = DecisionTreeClassifier(featuresCol='features',
                              labelCol='label',
                              predictionCol='prediction'
                            )

In [37]:
# Create a parameter grid for tuning the model
dtparamGrid = (ParamGridBuilder()
             .addGrid(dt.maxDepth, [2, 5, 10])
             .addGrid(dt.maxBins, [5, 10, 15])
             .build())

In [38]:
dtevaluator = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction')

In [39]:
# Create 5-fold CrossValidator
dtcv = CrossValidator(estimator = dt,
                      estimatorParamMaps = dtparamGrid,
                      evaluator = dtevaluator,
                      numFolds = 5)

In [40]:
# Fit the model to the data and call this model dtcvModel
dtcvModel = dtcv.fit(train_data)
print(dtcvModel)

CrossValidatorModel_6e281cdaa120


In [41]:
# Check test dataset
dtpredictions = dtcvModel.transform(test_data)

In [42]:
# Inspect results
dtpredictions.groupBy('prediction', 'label').count().show(5)

+----------+-----+-----+
|prediction|label|count|
+----------+-----+-----+
|       1.0|    0|   63|
|       0.0|    0|   70|
|       0.0|    1|   27|
|       1.0|    1|   71|
+----------+-----+-----+



### Xây dựng model với RandomForest

In [43]:
from pyspark.ml.classification import RandomForestClassifier

In [44]:
# Create a classifier object and fit to the training data
rf = RandomForestClassifier(featuresCol='features',
                              labelCol='label',
                              predictionCol='prediction')

In [45]:
# Create a parameter grid for tuning the model
rfparamGrid = (ParamGridBuilder()

               .addGrid(rf.maxDepth, [2, 5, 10])

               .addGrid(rf.maxBins, [5, 10, 15])

               .addGrid(rf.numTrees, [5, 20, 50])
             .build())

In [46]:
rfevaluator = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction')

In [47]:
# Create 5-fold CrossValidator
rfcv = CrossValidator(estimator = rf,
                      estimatorParamMaps = rfparamGrid,
                      evaluator = rfevaluator,
                      numFolds = 5)

In [48]:
# Fit the model to the data and call this model rfcvModel
rfcvModel = rfcv.fit(train_data)
print(rfcvModel)

CrossValidatorModel_3d4dab92f244


In [49]:
# Check test dataset
rfpredictions = rfcvModel.transform(test_data)

In [50]:
# Inspect results
rfpredictions.groupBy('prediction', 'label').count().show(5)

+----------+-----+-----+
|prediction|label|count|
+----------+-----+-----+
|       1.0|    0|   45|
|       0.0|    0|   88|
|       0.0|    1|   22|
|       1.0|    1|   76|
+----------+-----+-----+



### Xây dựng model với Gradient-Boosting

In [51]:
from pyspark.ml.classification import GBTClassifier

In [52]:
# Create a classifier object and fit to the training data
gb = GBTClassifier(featuresCol='features',
                              labelCol='label',
                              predictionCol='prediction')

In [53]:
# Create a parameter grid for tuning the model
gbparamGrid = (ParamGridBuilder()
             .addGrid(gb.maxDepth, [2, 5, 10])
             .addGrid(gb.maxBins, [5, 10, 15])
             .addGrid(gb.maxIter, [5, 10, 20])
             .build())

In [54]:
gbevaluator = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction')

In [55]:
# Create 5-fold CrossValidator
gbcv = CrossValidator(estimator = gb,
                      estimatorParamMaps = gbparamGrid,
                      evaluator = gbevaluator,
                      numFolds = 5)

In [56]:
# Fit the model to the data and call this model gbcvModel
gbcvModel = gbcv.fit(train_data)
print(gbcvModel)

CrossValidatorModel_4de5f6ce03b5


In [57]:
# Check test dataset
gbpredictions = gbcvModel.transform(test_data)

In [58]:
# Inspect results
gbpredictions.groupBy('prediction', 'label').count().show(5)

+----------+-----+-----+
|prediction|label|count|
+----------+-----+-----+
|       1.0|    0|   44|
|       0.0|    0|   89|
|       0.0|    1|   33|
|       1.0|    1|   65|
+----------+-----+-----+



### Đánh giá kết quả của các models

In [59]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [60]:
# Find weighted precision
acc_evaluator = MulticlassClassificationEvaluator(labelCol = 'label',
                                                  predictionCol = 'prediction',
                                                  metricName = 'accuracy')

In [61]:
lr_acc = acc_evaluator.evaluate(lrpredictions)
dtc_acc = acc_evaluator.evaluate(dtpredictions)
rfc_acc = acc_evaluator.evaluate(rfpredictions)
gbt_acc = acc_evaluator.evaluate(gbpredictions)

In [62]:
print('Results: ')
print('-'*80)
print('A logistic regression - accuracy: {0:2.2f}%'.format(lr_acc*100))
print('-'*80)
print('A single decision tree - accuracy: {0:2.2f}%'.format(dtc_acc*100))
print('-'*80)
print('A random forest ensemble - accuracy: {0:2.2f}%'.format(rfc_acc*100))
print('-'*80)
print('A ensemble using GBT - accuracy: {0:2.2f}%'.format(gbt_acc*100))

Results: 
--------------------------------------------------------------------------------
A logistic regression - accuracy: 59.74%
--------------------------------------------------------------------------------
A single decision tree - accuracy: 61.04%
--------------------------------------------------------------------------------
A random forest ensemble - accuracy: 71.00%
--------------------------------------------------------------------------------
A ensemble using GBT - accuracy: 66.67%


In [63]:
# Find AUC
lr_acc2 = lrevaluator.evaluate(lrpredictions)
dtc_acc2 = dtevaluator.evaluate(dtpredictions)
rfc_acc2 = rfevaluator.evaluate(rfpredictions)
gbt_acc2 = gbevaluator.evaluate(gbpredictions)

In [64]:
print('Results')
print('-'*80)
print('A logistic regression - accuracy: {0:2.2f}%'.format(lr_acc2*100))
print('-'*80)
print('A single decision tree has an accuracy of: {0:2.2f}%'.format(dtc_acc2*100))
print('-'*80)
print('A random forest ensemble has an accuracy of: {0:2.2f}%'.format(rfc_acc2*100))
print('-'*80)
print('An ensemble using GBT has an accuracy of: {0:2.2f}%'.format(gbt_acc2*100))

Results
--------------------------------------------------------------------------------
A logistic regression - accuracy: 65.89%
--------------------------------------------------------------------------------
A single decision tree has an accuracy of: 55.90%
--------------------------------------------------------------------------------
A random forest ensemble has an accuracy of: 76.35%
--------------------------------------------------------------------------------
An ensemble using GBT has an accuracy of: 70.95%


- Random forest has the highest accuracy so choose random forest