In [57]:

import findspark
findspark.init()
#importing all essential libraries
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import VectorAssembler, VectorIndexer
from pyspark.ml.feature import OneHotEncoder

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator


from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import LinearSVC
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import MultilayerPerceptronClassifier

from pyspark.ml.evaluation import MulticlassClassificationEvaluator


In [58]:
#create a spark session
spark = SparkSession.builder.master("local").appName('Assignment2').getOrCreate()
sc=spark.sparkContext

In [59]:
#loaded the data here
data = spark.read.csv("fake_job_postings.csv", header=True)

In [60]:
data.show()

+------+--------------------+--------------------+----------+-------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------+-------------+---------------+-------------------+--------------------+--------------------+--------------------+----------+
|job_id|               title|            location|department| salary_range|     company_profile|         description|        requirements|            benefits|       telecommuting|has_company_logo|has_questions|employment_type|required_experience|  required_education|            industry|            function|fraudulent|
+------+--------------------+--------------------+----------+-------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------+-------------+---------------+-------------------+--------------------+--------------------+--------------------+----------+
|     1|    Marketing Intern|    U

In [61]:
data.printSchema()

root
 |-- job_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- location: string (nullable = true)
 |-- department: string (nullable = true)
 |-- salary_range: string (nullable = true)
 |-- company_profile: string (nullable = true)
 |-- description: string (nullable = true)
 |-- requirements: string (nullable = true)
 |-- benefits: string (nullable = true)
 |-- telecommuting: string (nullable = true)
 |-- has_company_logo: string (nullable = true)
 |-- has_questions: string (nullable = true)
 |-- employment_type: string (nullable = true)
 |-- required_experience: string (nullable = true)
 |-- required_education: string (nullable = true)
 |-- industry: string (nullable = true)
 |-- function: string (nullable = true)
 |-- fraudulent: string (nullable = true)



In [62]:
#converted label column to int
data = data.withColumn("fraudulent", data["fraudulent"].cast("int"))
data = data.drop("job_id")
data.printSchema()

root
 |-- title: string (nullable = true)
 |-- location: string (nullable = true)
 |-- department: string (nullable = true)
 |-- salary_range: string (nullable = true)
 |-- company_profile: string (nullable = true)
 |-- description: string (nullable = true)
 |-- requirements: string (nullable = true)
 |-- benefits: string (nullable = true)
 |-- telecommuting: string (nullable = true)
 |-- has_company_logo: string (nullable = true)
 |-- has_questions: string (nullable = true)
 |-- employment_type: string (nullable = true)
 |-- required_experience: string (nullable = true)
 |-- required_education: string (nullable = true)
 |-- industry: string (nullable = true)
 |-- function: string (nullable = true)
 |-- fraudulent: integer (nullable = true)



In [63]:
# removed any other fraudulent data other than 0 or 1 value
data = data.filter((data["fraudulent"] == 0) | (data["fraudulent"] == 1))
data.select('fraudulent').distinct().show()

+----------+
|fraudulent|
+----------+
|         1|
|         0|
+----------+



In [64]:
#calculating the total rows present in database
data.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in data.columns)).show()
rows = data.count()
print(rows)

+-----+--------+----------+------------+---------------+-----------+------------+--------+-------------+----------------+-------------+---------------+-------------------+------------------+--------+--------+----------+
|title|location|department|salary_range|company_profile|description|requirements|benefits|telecommuting|has_company_logo|has_questions|employment_type|required_experience|required_education|industry|function|fraudulent|
+-----+--------+----------+------------+---------------+-----------+------------+--------+-------------+----------------+-------------+---------------+-------------------+------------------+--------+--------+----------+
|    0|     337|     11039|       14258|           3206|          0|        2571|    6949|            0|               0|            0|           3273|               6675|              7661|    4667|    6158|         0|
+-----+--------+----------+------------+---------------+-----------+------------+--------+-------------+----------------

In [65]:
#To check for missing values and removing columns which are having missing values greater than 1%
amount_missing_df = data.select([(count(when(isnan(c) |col(c).isNull(),c ))).alias(c) for c in data.columns]).first()
print("the missing values are", amount_missing_df)
amount_missing_df_percent = data.select([(count(when(isnan(c) |col(c).isNull(),c))/(rows/100)).alias(c) for c in data.columns]).first()
print("the missing value percent is",amount_missing_df_percent)
cols_to_dop = [c for c in amount_missing_df.asDict() if amount_missing_df[c] >= (rows/100)]
print("the dropped column is",cols_to_dop)
data = data.drop(*cols_to_dop)

the missing values are Row(title=0, location=337, department=11039, salary_range=14258, company_profile=3206, description=0, requirements=2571, benefits=6949, telecommuting=0, has_company_logo=0, has_questions=0, employment_type=3273, required_experience=6675, required_education=7661, industry=4667, function=6158, fraudulent=0)
the missing value percent is Row(title=0.0, location=1.986325592361193, department=65.06542496758223, salary_range=84.03866556642697, company_profile=18.89661676293764, description=0.0, requirements=15.15383708593658, benefits=40.95838736296122, telecommuting=0.0, has_company_logo=0.0, has_questions=0.0, employment_type=19.29152422492043, required_experience=39.34339266768832, required_education=45.155015914181305, industry=27.507957090651892, function=36.296121655074856, fraudulent=0.0)
the dropped column is ['location', 'department', 'salary_range', 'company_profile', 'requirements', 'benefits', 'employment_type', 'required_experience', 'required_education', '

In [66]:
data.printSchema()

root
 |-- title: string (nullable = true)
 |-- description: string (nullable = true)
 |-- telecommuting: string (nullable = true)
 |-- has_company_logo: string (nullable = true)
 |-- has_questions: string (nullable = true)
 |-- fraudulent: integer (nullable = true)



In [67]:
#combining data of all columns in a single column called alldata. the data is sepereated by a space " "
data=data.withColumn("alldata",concat_ws(" ",col("title"),col("description")))

In [68]:
#turning 0,1 data into int
data = data.withColumn("telecommuting", data["telecommuting"].cast("int"))
data = data.withColumn("has_company_logo", data["has_company_logo"].cast("int"))
data = data.withColumn("has_questions", data["has_questions"].cast("int"))

In [69]:
#removing anything that is not a letter , removing multiple spaces and converting all text to lower case letters
data = data.withColumn("alldata",regexp_replace("alldata", "[^A-Za-z_]", " "))
data = data.withColumn("alldata",regexp_replace("alldata", " +", " "))
data = data.withColumn("alldata", lower(col("alldata")))
data.show()

+--------------------+--------------------+-------------+----------------+-------------+----------+--------------------+
|               title|         description|telecommuting|has_company_logo|has_questions|fraudulent|             alldata|
+--------------------+--------------------+-------------+----------------+-------------+----------+--------------------+
|    Marketing Intern|Food52, a fast-gr...|            0|               1|            0|         0|marketing intern ...|
|Customer Service ...|Organised - Focus...|            0|               1|            0|         0|customer service ...|
|Commissioning Mac...|Our client, locat...|            0|               1|            0|         0|commissioning mac...|
|Account Executive...|THE COMPANY: ESRI...|            0|               1|            0|         0|account executive...|
| Bill Review Manager|JOB TITLE: Itemiz...|            0|               1|            1|         0|bill review manag...|
|    Accounting Clerk|Job Overvi

In [70]:
#undersampling the fraudulent column as number of fraudulent value 0 is high
major_df = data.filter(col("fraudulent") == 0)
minor_df = data.filter(col("fraudulent") == 1)
ratio = int(major_df.count()/minor_df.count())
ratio -= 1
print("ratio: {}".format(ratio))

data = data.sampleBy("fraudulent", fractions={0:(1/ratio),1:1}, seed=0 )
data.groupBy("fraudulent").count().orderBy(desc("count")).show()

ratio: 17
+----------+-----+
|fraudulent|count|
+----------+-----+
|         0|  921|
|         1|  886|
+----------+-----+



In [71]:
#spliting the data into words seperated by spaces.
data = data.withColumn("alldata",split(col("alldata")," "))
data.show()

+--------------------+--------------------+-------------+----------------+-------------+----------+--------------------+
|               title|         description|telecommuting|has_company_logo|has_questions|fraudulent|             alldata|
+--------------------+--------------------+-------------+----------------+-------------+----------+--------------------+
|Customer Service ...|The Customer Serv...|            0|               1|            0|         0|[customer, servic...|
|Talent Management...|(We have more tha...|            0|               0|            0|         0|[talent, manageme...|
|Graduates: Englis...|Play with kids, g...|            0|               1|            1|         0|[graduates, engli...|
|   Digital Developer|We are a boutique...|         null|            null|         null|         1|[digital, develop...|
|Ruby Automation E...|Position # 1Ruby ...|            0|               0|            1|         0|[ruby, automation...|
|Customer Service ...|The Custom

In [72]:
#removing all the stopwords from the text.
sga = StopWordsRemover(inputCol="alldata",outputCol="stopall");
data = sga.transform(data)
data = data.drop("alldata")
data = data.withColumnRenamed("stopall","alldata")
data.show()

+--------------------+--------------------+-------------+----------------+-------------+----------+--------------------+
|               title|         description|telecommuting|has_company_logo|has_questions|fraudulent|             alldata|
+--------------------+--------------------+-------------+----------------+-------------+----------+--------------------+
|Customer Service ...|The Customer Serv...|            0|               1|            0|         0|[customer, servic...|
|Talent Management...|(We have more tha...|            0|               0|            0|         0|[talent, manageme...|
|Graduates: Englis...|Play with kids, g...|            0|               1|            1|         0|[graduates, engli...|
|   Digital Developer|We are a boutique...|         null|            null|         null|         1|[digital, develop...|
|Ruby Automation E...|Position # 1Ruby ...|            0|               0|            1|         0|[ruby, automation...|
|Customer Service ...|The Custom

In [73]:
#converting words into vector form for training with ml models
cv = CountVectorizer(inputCol="alldata", outputCol="vec")
model = cv.fit(data)
data = model.transform(data)
data.show()

+--------------------+--------------------+-------------+----------------+-------------+----------+--------------------+--------------------+
|               title|         description|telecommuting|has_company_logo|has_questions|fraudulent|             alldata|                 vec|
+--------------------+--------------------+-------------+----------------+-------------+----------+--------------------+--------------------+
|Customer Service ...|The Customer Serv...|            0|               1|            0|         0|[customer, servic...|(15535,[1,2,8,9,1...|
|Talent Management...|(We have more tha...|            0|               0|            0|         0|[talent, manageme...|(15535,[2,3,4,5,6...|
|Graduates: Englis...|Play with kids, g...|            0|               1|            1|         0|[graduates, engli...|(15535,[14,61,108...|
|   Digital Developer|We are a boutique...|         null|            null|         null|         1|[digital, develop...|(15535,[2,17,43,5...|
|Ruby 

In [74]:
#removing possible null values from int columns
data = data.na.fill(0)

In [75]:
#encoding the int columns and using mixing the vectors into one vector column
encoder = OneHotEncoder(inputCols=["telecommuting", "has_company_logo", "has_questions"],outputCols=["Vec1", "Vec2","Vec3"])
model = encoder.fit(data)
encoded = model.transform(data)

assembler1 = VectorAssembler(inputCols=["vec", "Vec1", "Vec2","Vec3"], outputCol="final")

data = assembler1.transform(encoded)
data = data.select("final","fraudulent")
data.show()

+--------------------+----------+
|               final|fraudulent|
+--------------------+----------+
|(15538,[1,2,8,9,1...|         0|
|(15538,[2,3,4,5,6...|         0|
|(15538,[14,61,108...|         0|
|(15538,[2,17,43,5...|         1|
|(15538,[2,7,14,19...|         0|
|(15538,[1,8,9,10,...|         0|
|(15538,[0,1,2,3,8...|         1|
|(15538,[0,1,10,11...|         0|
|(15538,[2,14,22,2...|         1|
|(15538,[1,2,17,18...|         0|
|(15538,[2,3,4,5,6...|         0|
|(15538,[0,2,3,8,9...|         1|
|(15538,[20,193,15...|         1|
|(15538,[2,15,87,9...|         0|
|(15538,[1,2,7,8,1...|         0|
|(15538,[0,1,2,3,8...|         1|
|(15538,[1,2,3,4,5...|         0|
|(15538,[0,1,8,9,1...|         0|
|(15538,[0,2,13,14...|         0|
|(15538,[1,8,9,10,...|         0|
+--------------------+----------+
only showing top 20 rows



In [76]:
#using a random split of 7:3 ration for training and test dataset respectively
data = data.withColumnRenamed("final","features")
data = data.select("features","fraudulent")
trainTest = data.randomSplit([0.7, 0.3])
traindf = trainTest[0]
testdf = trainTest[1]

In [77]:
testdf.show()

+--------------------+----------+
|            features|fraudulent|
+--------------------+----------+
|(15538,[0,1,2,3,4...|         0|
|(15538,[0,1,2,3,4...|         0|
|(15538,[0,1,2,3,4...|         0|
|(15538,[0,1,2,3,4...|         0|
|(15538,[0,1,2,3,4...|         1|
|(15538,[0,1,2,3,4...|         0|
|(15538,[0,1,2,3,4...|         0|
|(15538,[0,1,2,3,4...|         0|
|(15538,[0,1,2,3,4...|         0|
|(15538,[0,1,2,3,4...|         1|
|(15538,[0,1,2,3,4...|         0|
|(15538,[0,1,2,3,4...|         1|
|(15538,[0,1,2,3,4...|         1|
|(15538,[0,1,2,3,4...|         1|
|(15538,[0,1,2,3,4...|         1|
|(15538,[0,1,2,3,4...|         0|
|(15538,[0,1,2,3,4...|         1|
|(15538,[0,1,2,3,4...|         1|
|(15538,[0,1,2,3,4...|         0|
|(15538,[0,1,2,3,4...|         1|
+--------------------+----------+
only showing top 20 rows



In [78]:
#Here we run the Logistic Regression model
lr = LogisticRegression(labelCol="fraudulent", featuresCol="features", maxIter=100)
model=lr.fit(traindf)
predict_test=model.transform(testdf)
evaluator2=MulticlassClassificationEvaluator(labelCol="fraudulent", predictionCol = "prediction",metricName="accuracy")
acc = evaluator2.evaluate(predict_test)
print("Prediction Accuracy: ", acc)
evaluator5=MulticlassClassificationEvaluator(labelCol="fraudulent", predictionCol = "prediction",metricName="f1")
acc2 = evaluator5.evaluate(predict_test)
print("Prediction f1: ", acc2)

Prediction Accuracy:  0.8614800759013282
Prediction f1:  0.8614201003972137


In [79]:
# crossvalidation is used here to try different parameters that best fit the model with highest accuracy and f1 scrore
paramGrid = ParamGridBuilder()\
    .addGrid(lr.maxIter,[10, 20, 30])\
    .addGrid(lr.elasticNetParam,[0.0, 0.5, 1.0])\
    .addGrid(lr.regParam,[0.01, 0.5, 2.0]) \
    .build()

cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator2, numFolds=10)
cvModel = cv.fit(traindf)
bmodel= cvModel.bestModel
# running this model can take some time
predict_train=bmodel.transform(traindf)
predict_test=bmodel.transform(testdf)
evaluator2=MulticlassClassificationEvaluator(labelCol="fraudulent", predictionCol = "prediction",metricName="accuracy")
acc = evaluator2.evaluate(predict_test)
print("Prediction Accuracy: ", acc)
evaluator5=MulticlassClassificationEvaluator(labelCol="fraudulent", predictionCol = "prediction",metricName="f1")
acc2 = evaluator5.evaluate(predict_test)
print("Prediction f1: ", acc2)

Prediction Accuracy:  0.9089184060721063
Prediction f1:  0.9088881769716683


In [80]:
#with this we print the best parameters for the model.
print("the best maxIter is: ", bmodel._java_obj.getMaxIter())
print("the best elasticNetParam is: ", bmodel._java_obj.getElasticNetParam())
print("the best regParam is: ", bmodel._java_obj.getRegParam())

the best maxIter is:  10
the best elasticNetParam is:  0.5
the best regParam is:  0.01


In [81]:
#LinearSVC is done here
lsvc = LinearSVC(labelCol="fraudulent", featuresCol="features", maxIter=100)
lsvc1 = lsvc.fit(traindf)
pred = lsvc1.transform(testdf)
#pred.show(3)
evaluator2=MulticlassClassificationEvaluator(labelCol="fraudulent", predictionCol = "prediction",metricName="accuracy")
acc = evaluator2.evaluate(pred)
print("Prediction Accuracy: ", acc)
evaluator6=MulticlassClassificationEvaluator(labelCol="fraudulent", predictionCol = "prediction",metricName="f1")
acc2 = evaluator6.evaluate(pred)
print("Prediction f1: ", acc2)

Prediction Accuracy:  0.8747628083491461
Prediction f1:  0.8747943765059446


In [94]:
#for LinearSVC, parameters are changed and best parameter and accuracy is calculated.
paramGrid = ParamGridBuilder()\
    .addGrid(lsvc.maxIter,[10, 20, 50])\
    .addGrid(lsvc.threshold,[0.0, 0.5, 1.0])\
    .addGrid(lsvc.regParam,[0.005, 0.01, 0.5]) \
    .build()

cv = CrossValidator(estimator=lsvc, estimatorParamMaps=paramGrid, evaluator=evaluator2, numFolds=10)
cvModel = cv.fit(traindf)
bmodel= cvModel.bestModel
# running this model can take some time
predict_train=bmodel.transform(traindf)
predict_test=bmodel.transform(testdf)
evaluator2=MulticlassClassificationEvaluator(labelCol="fraudulent", predictionCol = "prediction",metricName="accuracy")
acc = evaluator2.evaluate(predict_test)
print("Prediction Accuracy: ", acc)
evaluator5=MulticlassClassificationEvaluator(labelCol="fraudulent", predictionCol = "prediction",metricName="f1")
acc2 = evaluator5.evaluate(predict_test)
print("Prediction f1: ", acc2)
print("the best maxIter is: ", bmodel._java_obj.getMaxIter())
print("the best threshold is: ", bmodel._java_obj.getThreshold())
print("the best regParam is: ", bmodel._java_obj.getRegParam())

Prediction Accuracy:  0.8785578747628083
Prediction f1:  0.8785884863087947
the best maxIter is:  50
the best threshold is:  0.0
the best regParam is:  0.005


In [83]:
#Random Forest Classifier is run here
rf = RandomForestClassifier(labelCol="fraudulent", featuresCol="features")
model = rf.fit(traindf)
predrd = model.transform(testdf)
#pred.show(3)
evaluator2=MulticlassClassificationEvaluator(labelCol="fraudulent", predictionCol = "prediction",metricName="accuracy")
acc = evaluator2.evaluate(predrd)
print("Prediction Accuracy: ", acc)
evaluator7=MulticlassClassificationEvaluator(labelCol="fraudulent", predictionCol = "prediction",metricName="f1")
acc2 = evaluator7.evaluate(predrd)
print("Prediction f1: ", acc2)

Prediction Accuracy:  0.7419354838709677
Prediction f1:  0.7342268951708071


In [84]:
#parameters are again changed and best parameters and accuracy is printed
paramGrid = ParamGridBuilder()\
    .addGrid(rf.maxDepth,[10, 15, 20])\
    .addGrid(rf.numTrees,[5, 10, 20, 30])\
    .build()

cv = CrossValidator(estimator=rf, estimatorParamMaps=paramGrid, evaluator=evaluator2, numFolds=10)
cvModel = cv.fit(traindf)
bmodel= cvModel.bestModel
# running this model can take some time
predict_train=bmodel.transform(traindf)
predict_test=bmodel.transform(testdf)
evaluator2=MulticlassClassificationEvaluator(labelCol="fraudulent", predictionCol = "prediction",metricName="accuracy")
acc = evaluator2.evaluate(predict_test)
print("Prediction Accuracy: ", acc)
evaluator5=MulticlassClassificationEvaluator(labelCol="fraudulent", predictionCol = "prediction",metricName="f1")
acc2 = evaluator5.evaluate(predict_test)
print("Prediction f1: ", acc2)
print("the best maxDepth is: ", bmodel._java_obj.getMaxDepth())
print("the best numTrees is: ", bmodel._java_obj.getNumTrees())

Prediction Accuracy:  0.8538899430740038
Prediction f1:  0.8532729175712896
the best maxDepth is:  20
the best numTrees is:  30


In [91]:
#layers are present in multiplayer perceptron classifier with 1st and last number to be properly put for model to run
layers=[15538,2,2,2]
trainer = MultilayerPerceptronClassifier(labelCol="fraudulent", featuresCol="features", maxIter=30,layers=layers)
# train the model
model2 = trainer.fit(traindf)
# compute accuracy on the test set
result2 = model2.transform(testdf)
evaluator4 = MulticlassClassificationEvaluator(labelCol="fraudulent", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator4.evaluate(result2)
print("Prediction Accuracy: ",accuracy)
evaluator8 = MulticlassClassificationEvaluator(labelCol="fraudulent", predictionCol="prediction", metricName="f1")
accuracy2 = evaluator8.evaluate(result2)
print("Prediction f1: ",accuracy2)

Prediction Accuracy:  0.8519924098671727
Prediction f1:  0.8520020024780092


In [92]:
#cross-validation for MultiplayerPerceptronClassifier
paramGrid = ParamGridBuilder()\
    .addGrid(trainer.maxIter,[10, 25, 50])\
    .addGrid(trainer.stepSize,[0.01, 0.02, 0.04])\
    .addGrid(trainer.blockSize,[10, 25, 50]) \
    .build()

cv = CrossValidator(estimator=trainer, estimatorParamMaps=paramGrid, evaluator=evaluator4, numFolds=10)
cvModel = cv.fit(traindf)
bmodel= cvModel.bestModel
# running this model can take some time
predict_train=bmodel.transform(traindf)
predict_test=bmodel.transform(testdf)
evaluator2=MulticlassClassificationEvaluator(labelCol="fraudulent", predictionCol = "prediction",metricName="accuracy")
acc = evaluator2.evaluate(predict_test)
print("Prediction Accuracy: ", acc)
evaluator5=MulticlassClassificationEvaluator(labelCol="fraudulent", predictionCol = "prediction",metricName="f1")
acc2 = evaluator5.evaluate(predict_test)
print("Prediction f1: ", acc2)
print("the best maxIter is: ", bmodel._java_obj.getMaxIter())
print("the best stepSize is: ", bmodel._java_obj.getStepSize())
print("the best blockSize is: ", bmodel._java_obj.getBlockSize())

Prediction Accuracy:  0.8614800759013282
Prediction f1:  0.8614591277086479
the best maxIter is:  50
the best stepSize is:  0.01
the best blockSize is:  10
