In [1]:
import findspark
findspark.init("/usr/local/spark")
import pyspark

*importing required packages*

In [2]:
import os
import pandas as pd
from pyspark.sql import SparkSession

*Create object of spark session and loading the data*

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('ml-model').getOrCreate()
df = spark.read.csv('final_csv.csv', header = True, inferSchema = True)
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- Sno: integer (nullable = true)
 |-- Candidate Ref: integer (nullable = true)
 |-- DOJ Extended: string (nullable = true)
 |-- Duration to accept offer: integer (nullable = true)
 |-- Notice period: integer (nullable = true)
 |-- Offered band: string (nullable = true)
 |-- Pecent hike expected in CTC: double (nullable = true)
 |-- Percent hike offered in CTC: double (nullable = true)
 |-- Percent difference CTC: double (nullable = true)
 |-- Joining Bonus: string (nullable = true)
 |-- Candidate relocate actual: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Candidate Source: string (nullable = true)
 |-- Rex in Yrs: integer (nullable = true)
 |-- Location: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- LOB_id: integer (nullable = true)
 |-- LOB: string (nullable = true)
 |-- Status: string (nullable = true)



In [4]:


#Check dimension's
print((df.count(),len(df.columns)))



(8995, 20)


In [5]:
df.toPandas()

Unnamed: 0,_c0,Sno,Candidate Ref,DOJ Extended,Duration to accept offer,Notice period,Offered band,Pecent hike expected in CTC,Percent hike offered in CTC,Percent difference CTC,Joining Bonus,Candidate relocate actual,Gender,Candidate Source,Rex in Yrs,Location,Age,LOB_id,LOB,Status
0,0,1,2110407,Yes,14,30,E2,-20.79,13.16,42.86,No,No,Female,Agency,7,Noida,34.0,1,ERS,Joined
1,1,2,2112635,No,18,30,E2,50.00,320.00,180.00,No,No,Male,Employee Referral,8,Chennai,34.0,2,INFRA,Joined
2,2,3,2112838,No,3,45,E2,42.84,42.84,0.00,No,No,Male,Agency,4,Noida,27.0,2,INFRA,Joined
3,3,4,2115021,No,26,30,E2,42.84,42.84,0.00,No,No,Male,Employee Referral,4,Noida,34.0,2,INFRA,Joined
4,4,5,2115125,Yes,1,120,E2,42.59,42.59,0.00,No,Yes,Male,Employee Referral,6,Noida,34.0,2,INFRA,Joined
5,5,6,2117167,Yes,17,30,E1,42.83,42.83,0.00,No,No,Male,Employee Referral,2,Noida,34.0,2,INFRA,Joined
6,6,7,2119124,Yes,37,30,E2,31.58,31.58,0.00,No,No,Male,Employee Referral,7,Noida,32.0,2,INFRA,Joined
7,7,9,2127572,Yes,16,0,E1,-20.00,-20.00,0.00,No,No,Female,Direct,8,Noida,34.0,3,Healthcare,Joined
8,8,11,2138169,No,1,30,E1,-22.22,-22.22,0.00,No,No,Female,Employee Referral,3,Gurgaon,26.0,4,BFSI,Joined
9,9,12,2143362,No,6,30,E1,240.00,220.00,-5.88,No,No,Male,Employee Referral,3,Chennai,34.0,5,CSMP,Joined


In [6]:
#Drop unwanted columns
data = df.drop(*['_c0', 'Sno', 'Candidate Ref','Percent difference CTC','Percent hike offered in CTC','LOB_id','Age','Pecent hike expected in CTC'])
data.columns

['DOJ Extended',
 'Duration to accept offer',
 'Notice period',
 'Offered band',
 'Joining Bonus',
 'Candidate relocate actual',
 'Gender',
 'Candidate Source',
 'Rex in Yrs',
 'Location',
 'LOB',
 'Status']

In [7]:
data.dtypes

[('DOJ Extended', 'string'),
 ('Duration to accept offer', 'int'),
 ('Notice period', 'int'),
 ('Offered band', 'string'),
 ('Joining Bonus', 'string'),
 ('Candidate relocate actual', 'string'),
 ('Gender', 'string'),
 ('Candidate Source', 'string'),
 ('Rex in Yrs', 'int'),
 ('Location', 'string'),
 ('LOB', 'string'),
 ('Status', 'string')]

*data transforming*

In [8]:
data=data.withColumnRenamed("DOJ Extended","DOJ_Extended")\
    .withColumnRenamed("Duration to accept offer","Duration_to_accept_offer")\
    .withColumnRenamed("Notice period","Notice_period")\
    .withColumnRenamed("Offered band","Offered_band")\
    .withColumnRenamed("Joining Bonus","Joining_Bonus")\
    .withColumnRenamed("Candidate relocate actual","Candidate_relocate_actual")\
    .withColumnRenamed("Candidate Source","Candidate_Source")\
    .withColumnRenamed("Rex in Yrs","Rex_in_Yrs")
    

*Using HotEncoder for categorial data*

In [9]:
#new code for hotencoder
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

categorical_columns= ['DOJ_Extended', 'Offered_band', 'Joining_Bonus', 'Candidate_relocate_actual','Gender','Candidate_Source','Location','LOB','Status']

# The index of string vlaues multiple columns
indexers = [
    StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c))
    for c in categorical_columns
]

# The encode of indexed vlaues multiple columns
encoders = [OneHotEncoder(dropLast=False,inputCol=indexer.getOutputCol(),
            outputCol="{0}_encoded".format(indexer.getOutputCol())) 
    for indexer in indexers
]

# Vectorizing encoded values
assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders],outputCol="features")

pipeline = Pipeline(stages=indexers + encoders+[assembler])
model=pipeline.fit(data)
data = data.fillna(0)
final = model.transform(data)

In [10]:
#Drop unwanted columns
final = final.drop(*['DOJ_Extended_indexed_encoded','Joining_Bonus_indexed_encoded','Status_indexed_encoded','LOB_indexed_encoded','Location_indexed_encoded','Candidate_Source_indexed_encoded','Gender_indexed_encoded','Candidate_relocate_actual_indexed_encoded','Offered_band_indexed_encoded','Gender','features','DOJ_Extended', 'Offered_band', 'Joining_Bonus','Candidate_relocate_actual','Candidate_Source','Location','Status','LOB'])
final.columns

['Duration_to_accept_offer',
 'Notice_period',
 'Rex_in_Yrs',
 'DOJ_Extended_indexed',
 'Offered_band_indexed',
 'Joining_Bonus_indexed',
 'Candidate_relocate_actual_indexed',
 'Gender_indexed',
 'Candidate_Source_indexed',
 'Location_indexed',
 'LOB_indexed',
 'Status_indexed']

*Assemble the required features with VectorAssembler*

In [11]:
# Assemble all the features with VectorAssembler
required_features = ['DOJ_Extended_indexed',
                    'Duration_to_accept_offer',
                    'Notice_period',
                    'Offered_band_indexed',
                    'Joining_Bonus_indexed',
                    'Candidate_relocate_actual_indexed',
                    'Candidate_Source_indexed',
                    'Location_indexed',
                    'Rex_in_Yrs'
                   ]

from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=required_features, outputCol='features')

transformed_data = assembler.transform(final)
transformed_data

DataFrame[Duration_to_accept_offer: int, Notice_period: int, Rex_in_Yrs: int, DOJ_Extended_indexed: double, Offered_band_indexed: double, Joining_Bonus_indexed: double, Candidate_relocate_actual_indexed: double, Gender_indexed: double, Candidate_Source_indexed: double, Location_indexed: double, LOB_indexed: double, Status_indexed: double, features: vector]

In [12]:
import pandas as pd
pd.DataFrame(transformed_data.take(5), columns=transformed_data.columns).transpose()

Unnamed: 0,0,1,2,3,4
Duration_to_accept_offer,14,18,3,26,1
Notice_period,30,30,45,30,120
Rex_in_Yrs,7,8,4,4,6
DOJ_Extended_indexed,1,0,0,0,1
Offered_band_indexed,1,1,1,1,1
Joining_Bonus_indexed,0,0,0,0,0
Candidate_relocate_actual_indexed,0,0,0,0,1
Gender_indexed,1,0,0,0,0
Candidate_Source_indexed,1,2,1,2,2
Location_indexed,1,0,1,1,1


In [13]:


#Model_Dataframe
model_df = transformed_data.select(['features','Status_Indexed'])
model_df = model_df.withColumnRenamed("Status_Indexed","label")
model_df.printSchema()



root
 |-- features: vector (nullable = true)
 |-- label: double (nullable = true)



*Spliting into training & testing Dataframe*

In [14]:


#Split into training & testing Dataframe
training_df,test_df = model_df.randomSplit([0.75,0.25])



**Model- Decision Tree**

In [15]:


#Decision Tree Classifier
from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'label', maxDepth = 3)
dtModel = dt.fit(training_df)
predictions = dtModel.transform(test_df)
predictions.select('label', 'rawPrediction', 'prediction', 'probability').show(10)



+-----+--------------+----------+--------------------+
|label| rawPrediction|prediction|         probability|
+-----+--------------+----------+--------------------+
|  0.0|[3185.0,586.0]|       0.0|[0.84460355343410...|
|  0.0|[3185.0,586.0]|       0.0|[0.84460355343410...|
|  1.0| [675.0,420.0]|       0.0|[0.61643835616438...|
|  0.0|[3185.0,586.0]|       0.0|[0.84460355343410...|
|  0.0| [675.0,420.0]|       0.0|[0.61643835616438...|
|  0.0| [675.0,420.0]|       0.0|[0.61643835616438...|
|  0.0|[3185.0,586.0]|       0.0|[0.84460355343410...|
|  1.0| [675.0,420.0]|       0.0|[0.61643835616438...|
|  1.0| [675.0,420.0]|       0.0|[0.61643835616438...|
|  0.0|[3185.0,586.0]|       0.0|[0.84460355343410...|
+-----+--------------+----------+--------------------+
only showing top 10 rows



*Evaluating the model*

In [16]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

multi_evaluator = MulticlassClassificationEvaluator(labelCol = 'label', metricName = 'accuracy')
print('Decision Tree Accuracy:', multi_evaluator.evaluate(predictions))

Decision Tree Accuracy: 0.8174748398902104


*Gradient Boosted Tree Classifier*

In [17]:


#Gradient-Boosted Tree Classifier
from pyspark.ml.classification import GBTClassifier
gbt = GBTClassifier(maxIter=10)
gbtModel = gbt.fit(training_df)
predictions = gbtModel.transform(test_df)
predictions.select('label', 'rawPrediction', 'prediction', 'probability').show(10)



+-----+--------------------+----------+--------------------+
|label|       rawPrediction|prediction|         probability|
+-----+--------------------+----------+--------------------+
|  0.0|[0.85257949371547...|       0.0|[0.84620732786123...|
|  0.0|[0.84802374523115...|       0.0|[0.84501780914818...|
|  1.0|[0.11463416238998...|       0.0|[0.55706732617137...|
|  0.0|[0.65871521439731...|       0.0|[0.78875387916357...|
|  0.0|[0.09260494384440...|       0.0|[0.54617056604991...|
|  0.0|[0.09260494384440...|       0.0|[0.54617056604991...|
|  0.0|[0.79158688457706...|       0.0|[0.82965353156840...|
|  1.0|[0.09260494384440...|       0.0|[0.54617056604991...|
|  1.0|[0.09260494384440...|       0.0|[0.54617056604991...|
|  0.0|[0.77634712398489...|       0.0|[0.82530252436118...|
+-----+--------------------+----------+--------------------+
only showing top 10 rows



*Evaluating the Gradient Bossted Tree Classifier*

In [18]:

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
#Evaluate our Gradient-Boosted Tree Classifier.
evaluator = BinaryClassificationEvaluator()
print("Test Area Under ROC: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))



Test Area Under ROC: 0.7438560461591125


In [19]:
print(gbt.explainParams())

cacheNodeIds: If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. Users can set how often should the cache be checkpointed or disable it by setting checkpointInterval. (default: False)
checkpointInterval: set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. (default: 10)
featuresCol: features column name. (default: features)
labelCol: label column name. (default: label)
lossType: Loss function which GBT tries to minimize (case-insensitive). Supported options: logistic (default: logistic)
maxBins: Max number of bins for discretizing continuous features.  Must be >=2 and >= number of categories for any categorical feature. (default: 32)
maxDepth: Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. (default: 5)
ma

In [20]:


from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
paramGrid = (ParamGridBuilder()
             .addGrid(gbt.maxDepth, [2, 4, 6])
             .addGrid(gbt.maxBins, [20, 60])
             .addGrid(gbt.maxIter, [10, 20])
             .build())
cv = CrossValidator(estimator=gbt, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)
# Run cross validations.  This can take about 6 minutes since it is training over 20 trees!
cvModel = cv.fit(training_df)
predictions = cvModel.transform(test_df)
evaluator.evaluate(predictions)



0.7608269242859254