In [1]:
import findspark
findspark.init()

In [2]:
import pyspark 
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName("NLP").getOrCreate()

cores = spark._jsc.sc().getExecutorMemoryStatus().keySet().size()
print("You are working with", cores, "core(s)")
spark

You are working with 1 core(s)


In [4]:
from pyspark.ml.feature import * 
from pyspark.sql.functions import * 
from pyspark.sql.types import StringType,IntegerType

In [5]:
postings = spark.read.csv('fake_job_postings.csv',inferSchema=True,header=True)

postings.limit(4).toPandas()
postings.filter("fraudulent=1").show(1,False)
postings.printSchema()

+------+-----------------+---------------+----------+------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [6]:
postings.count()

17880

In [7]:
from pyspark.sql.functions import *

def null_value_calc(df):
    null_columns_counts = []
    numRows = df.count()
    for k in df.columns:
        nullRows = df.where(col(k).isNull()).count()
        if(nullRows > 0):
            temp = k,nullRows,(nullRows/numRows)*100
            null_columns_counts.append(temp)
    return(null_columns_counts)

null_columns_calc_list = null_value_calc(postings)
spark.createDataFrame(null_columns_calc_list, ['Column_Name', 'Null_Values_Count','Null_Value_Percent']).show()


+-------------------+-----------------+--------------------+
|        Column_Name|Null_Values_Count|  Null_Value_Percent|
+-------------------+-----------------+--------------------+
|           location|              346|  1.9351230425055927|
|         department|            11547|   64.58053691275167|
|       salary_range|            15011|   83.95413870246085|
|    company_profile|             3308|  18.501118568232663|
|        description|                1|0.005592841163310962|
|       requirements|             2573|  14.390380313199106|
|           benefits|             6966|   38.95973154362416|
|      telecommuting|               89| 0.49776286353467564|
|   has_company_logo|               29|  0.1621923937360179|
|      has_questions|               30| 0.16778523489932887|
|    employment_type|             3293|     18.417225950783|
|required_experience|             6725|   37.61185682326622|
| required_education|             7753|   43.36129753914989|
|           industry|   

In [8]:
og_len = postings.count()
drop_len = postings.na.drop().count()
print("Total Null Rows:",og_len-drop_len)
print("Percentage Null Rows", (og_len-drop_len)/og_len)

Total Null Rows: 17094
Percentage Null Rows 0.9560402684563758


In [9]:
df = postings.na.drop(subset=["fraudulent","description"])
df.count()

17704

In [10]:
df.groupBy("fraudulent").count().orderBy(col("count").desc()).show(10)
df = df.filter("fraudulent IN('0','1')")

df.groupBy("fraudulent").count().show(truncate=False)


+--------------------+-----+
|          fraudulent|count|
+--------------------+-----+
|                   0|16087|
|                   1|  886|
|           Full-time|   72|
|Hospital & Health...|   55|
|   Bachelor's Degree|   53|
|         Engineering|   24|
| perform quality ...|   17|
|         Unspecified|   15|
|    Mid-Senior level|   15|
|           Associate|   14|
+--------------------+-----+
only showing top 10 rows

+----------+-----+
|fraudulent|count|
+----------+-----+
|0         |16087|
|1         |886  |
+----------+-----+



In [11]:
df = df.sampleBy("fraudulent", fractions={'0': 0.3, '1': 1.0}, seed=100)

df.groupBy("fraudulent").count().show(truncate=False)


indexer = StringIndexer(inputCol="fraudulent", outputCol="label")
df = indexer.fit(df).transform(df)
df.limit(6).toPandas()


df.select("description").show(1,False)


+----------+-----+
|fraudulent|count|
+----------+-----+
|0         |4830 |
|1         |886  |
+----------+-----+

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [12]:
#Removing anything that is not a letter
df = df.withColumn("description",regexp_replace(df["description"], '[^A-Za-z ]+', ''))
# Remove multiple spaces
df = df.withColumn("description",regexp_replace(df["description"], ' +', ' '))
# Lower case everything
df = df.withColumn("description",lower(df["description"]))

In [13]:
df.limit(5).toPandas()

Unnamed: 0,job_id,title,location,department,salary_range,company_profile,description,requirements,benefits,telecommuting,has_company_logo,has_questions,employment_type,required_experience,required_education,industry,function,fraudulent,label
0,2,Customer Service - Cloud Video Production,"NZ, , Auckland",Success,,"90 Seconds, the worlds Cloud Video Production ...",organised focused vibrant awesomedo you have a...,What we expect from you:Your key responsibilit...,What you will get from usThrough being part of...,0,1,0,Full-time,Not Applicable,,Marketing and Advertising,Customer Service,0,0.0
1,6,Accounting Clerk,"US, MD,",,,,job overviewapex is an environmental consultin...,,,0,0,0,,,,,,0,0.0
2,7,Head of Content (m/f),"DE, BE, Berlin",ANDROIDPIT,20000-28000,"Founded in 2009, the Fonpit AG rose with its i...",your responsibilitiesmanage the englishspeakin...,Your Know-How: ...,Your Benefits: Being part of a fast-growing co...,0,1,1,Full-time,Mid-Senior level,Master's Degree,Online Media,Management,0,0.0
3,8,Lead Guest Service Specialist,"US, CA, San Francisco",,,Airenvy’s mission is to provide lucrative yet ...,who is airenvyhey there we are seasoned entrep...,"Experience with CRM software, live chat, and p...",Competitive Pay. You'll be able to eat steak e...,0,1,1,,,,,,0,0.0
4,14,Installers,"US, FL, Orlando",,,Growing event production company providing sta...,event industry installers needed orlando fl ne...,"Valid driver's license,Somewhat Clean driving ...",,0,1,1,Full-time,Not Applicable,Unspecified,Events Services,Other,0,0.0


In [14]:
regex_tokenizer = RegexTokenizer(inputCol="description", outputCol="words", pattern="\\W")
df = regex_tokenizer.transform(df)

df.limit(5).toPandas()


Unnamed: 0,job_id,title,location,department,salary_range,company_profile,description,requirements,benefits,telecommuting,has_company_logo,has_questions,employment_type,required_experience,required_education,industry,function,fraudulent,label,words
0,2,Customer Service - Cloud Video Production,"NZ, , Auckland",Success,,"90 Seconds, the worlds Cloud Video Production ...",organised focused vibrant awesomedo you have a...,What we expect from you:Your key responsibilit...,What you will get from usThrough being part of...,0,1,0,Full-time,Not Applicable,,Marketing and Advertising,Customer Service,0,0.0,"[organised, focused, vibrant, awesomedo, you, ..."
1,6,Accounting Clerk,"US, MD,",,,,job overviewapex is an environmental consultin...,,,0,0,0,,,,,,0,0.0,"[job, overviewapex, is, an, environmental, con..."
2,7,Head of Content (m/f),"DE, BE, Berlin",ANDROIDPIT,20000-28000,"Founded in 2009, the Fonpit AG rose with its i...",your responsibilitiesmanage the englishspeakin...,Your Know-How: ...,Your Benefits: Being part of a fast-growing co...,0,1,1,Full-time,Mid-Senior level,Master's Degree,Online Media,Management,0,0.0,"[your, responsibilitiesmanage, the, englishspe..."
3,8,Lead Guest Service Specialist,"US, CA, San Francisco",,,Airenvy’s mission is to provide lucrative yet ...,who is airenvyhey there we are seasoned entrep...,"Experience with CRM software, live chat, and p...",Competitive Pay. You'll be able to eat steak e...,0,1,1,,,,,,0,0.0,"[who, is, airenvyhey, there, we, are, seasoned..."
4,14,Installers,"US, FL, Orlando",,,Growing event production company providing sta...,event industry installers needed orlando fl ne...,"Valid driver's license,Somewhat Clean driving ...",,0,1,1,Full-time,Not Applicable,Unspecified,Events Services,Other,0,0.0,"[event, industry, installers, needed, orlando,..."


In [15]:
from pyspark.ml.feature import StopWordsRemover

remover = StopWordsRemover(inputCol="words", outputCol="filtered")
feature_data = remover.transform(df)
    
feature_data.limit(5).toPandas()

Unnamed: 0,job_id,title,location,department,salary_range,company_profile,description,requirements,benefits,telecommuting,...,has_questions,employment_type,required_experience,required_education,industry,function,fraudulent,label,words,filtered
0,2,Customer Service - Cloud Video Production,"NZ, , Auckland",Success,,"90 Seconds, the worlds Cloud Video Production ...",organised focused vibrant awesomedo you have a...,What we expect from you:Your key responsibilit...,What you will get from usThrough being part of...,0,...,0,Full-time,Not Applicable,,Marketing and Advertising,Customer Service,0,0.0,"[organised, focused, vibrant, awesomedo, you, ...","[organised, focused, vibrant, awesomedo, passi..."
1,6,Accounting Clerk,"US, MD,",,,,job overviewapex is an environmental consultin...,,,0,...,0,,,,,,0,0.0,"[job, overviewapex, is, an, environmental, con...","[job, overviewapex, environmental, consulting,..."
2,7,Head of Content (m/f),"DE, BE, Berlin",ANDROIDPIT,20000-28000,"Founded in 2009, the Fonpit AG rose with its i...",your responsibilitiesmanage the englishspeakin...,Your Know-How: ...,Your Benefits: Being part of a fast-growing co...,0,...,1,Full-time,Mid-Senior level,Master's Degree,Online Media,Management,0,0.0,"[your, responsibilitiesmanage, the, englishspe...","[responsibilitiesmanage, englishspeaking, edit..."
3,8,Lead Guest Service Specialist,"US, CA, San Francisco",,,Airenvy’s mission is to provide lucrative yet ...,who is airenvyhey there we are seasoned entrep...,"Experience with CRM software, live chat, and p...",Competitive Pay. You'll be able to eat steak e...,0,...,1,,,,,,0,0.0,"[who, is, airenvyhey, there, we, are, seasoned...","[airenvyhey, seasoned, entrepreneurs, heart, s..."
4,14,Installers,"US, FL, Orlando",,,Growing event production company providing sta...,event industry installers needed orlando fl ne...,"Valid driver's license,Somewhat Clean driving ...",,0,...,1,Full-time,Not Applicable,Unspecified,Events Services,Other,0,0.0,"[event, industry, installers, needed, orlando,...","[event, industry, installers, needed, orlando,..."


In [16]:
# Word2Vec
word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="filtered", outputCol="features")
model = word2Vec.fit(feature_data)

In [17]:

W2VfeaturizedData = model.transform(feature_data)

In [18]:
scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")


In [19]:
# Compute summary statistics and generate MinMaxScalerModel
scalerModel = scaler.fit(W2VfeaturizedData)

In [20]:
scaled_data = scalerModel.transform(W2VfeaturizedData)
W2VfeaturizedData = scaled_data.select('fraudulent','description','label','scaledFeatures')
W2VfeaturizedData = W2VfeaturizedData.withColumnRenamed('scaledFeatures','features')

W2VfeaturizedData.name = 'W2VfeaturizedData' # We will need this to print later



In [21]:

def ClassTrainEval(classifier,features,classes,train,test):

    def FindMtype(classifier):
        # Intstantiate Model
        M = classifier
        Mtype = type(M).__name__
        
        return Mtype
    
    Mtype = FindMtype(classifier)
    

    def IntanceFitModel(Mtype,classifier,classes,features,train):

        if Mtype in("LogisticRegression","RandomForestClassifier","LinearSVC","DecisionTreeClassifier"):
  
            if Mtype in("LogisticRegression"):
                paramGrid = (ParamGridBuilder().addGrid(classifier.regParam, [0.1, 0.01]) \
                             .addGrid(classifier.maxIter, [10, 15,20])
                             .build())
                
            if Mtype in("RandomForestClassifier"):
                paramGrid = (ParamGridBuilder().addGrid(classifier.maxDepth, [2, 5, 10])
                                .addGrid(classifier.maxBins, [5, 10, 20])
                                .addGrid(classifier.numTrees, [5, 20, 50])
                             .build())
                
                
            if Mtype in("LinearSVC"):
                paramGrid = (ParamGridBuilder().addGrid(classifier.maxIter, [10, 15]).addGrid(classifier.regParam, [0.1, 0.01])                              .build())
            
            if Mtype in("DecisionTreeClassifier"):
                paramGrid = (ParamGridBuilder().addGrid(classifier.maxDepth, [2, 5, 10, 20, 30]) \
                             .addGrid(classifier.maxBins, [10, 20, 40, 80, 100]) \
                             .build())
            

            crossval = CrossValidator(estimator=classifier,
                                      estimatorParamMaps=paramGrid,
                                      evaluator=MulticlassClassificationEvaluator(),
                                      numFolds=2)
            fitModel = crossval.fit(train)
            return fitModel
    
    fitModel = IntanceFitModel(Mtype,classifier,classes,features,train)
    
    
    if fitModel is not None:
        

        if Mtype in("DecisionTreeClassifier","RandomForestClassifier"):
            
            BestModel = fitModel.bestModel
            print(" ")
            print('\033[1m' + Mtype," Feature Importances"+ '\033[0m')
            print("(Scores add up to 1)")
            print("Lowest score is the least important")
            print(" ")
            print(BestModel.featureImportances)
            
            if Mtype in("DecisionTreeClassifier"):
                global DT_featureimportances
                DT_featureimportances = BestModel.featureImportances.toArray()
                global DT_BestModel
                DT_BestModel = BestModel

            if Mtype in("RandomForestClassifier"):
                global RF_featureimportances
                RF_featureimportances = BestModel.featureImportances.toArray()
                global RF_BestModel
                RF_BestModel = BestModel

        if Mtype in("LogisticRegression"):
            # Get Best Model
            BestModel = fitModel.bestModel
            print(" ")
            print('\033[1m' + Mtype," Coefficient Matrix"+ '\033[0m')
            print("You should compares these relative to eachother")
            print("Coefficients: \n" + str(BestModel.coefficientMatrix))
            print("Intercept: " + str(BestModel.interceptVector))
            global LR_coefficients
            LR_coefficients = BestModel.coefficientMatrix.toArray()
            global LR_BestModel
            LR_BestModel = BestModel

        if Mtype in("LinearSVC"):
            # Get Best Model
            BestModel = fitModel.bestModel
            print(" ")
            print('\033[1m' + Mtype," Coefficients"+ '\033[0m')
            print("You should compares these relative to eachother")
            print("Coefficients: \n" + str(BestModel.coefficients))
            global LSVC_coefficients
            LSVC_coefficients = BestModel.coefficients.toArray()
            global LSVC_BestModel
            LSVC_BestModel = BestModel
        
   
    columns = ['Classifier', 'Result']
    
    if Mtype in("LinearSVC") and classes != 2:
        Mtype = [Mtype] # make this a list
        score = ["N/A"]
        result = spark.createDataFrame(zip(Mtype,score), schema=columns)
    else:
        predictions = fitModel.transform(test)
        MC_evaluator = MulticlassClassificationEvaluator(metricName="accuracy") # redictionCol="prediction",
        accuracy = (MC_evaluator.evaluate(predictions))*100
        Mtype = [Mtype] # make this a string
        score = [str(accuracy)] #make this a string and convert to a list
        result = spark.createDataFrame(zip(Mtype,score), schema=columns)
        result = result.withColumn('Result',result.Result.substr(0, 5))
        
    return result

In [22]:
from pyspark.ml.classification import *
from pyspark.ml.evaluation import *
from pyspark.sql import functions
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [23]:

classifiers = [
                LogisticRegression()
               ,LinearSVC()
               ,RandomForestClassifier()
               ,DecisionTreeClassifier()
              ] 

featureDF_list = [W2VfeaturizedData]



In [24]:
for featureDF in featureDF_list:
    print('\033[1m' + featureDF.name," Results:"+ '\033[0m')
    train, test = featureDF.randomSplit([0.7, 0.3],seed = 11)
    features = featureDF.select(['features']).collect()
    class_count = featureDF.select(countDistinct("label")).collect()
    classes = class_count[0][0]

    columns = ['Classifier', 'Result']
    vals = [("Place Holder","N/A")]
    results = spark.createDataFrame(vals, columns)

    for classifier in classifiers:
        new_result = ClassTrainEval(classifier,features,classes,train,test)
        results = results.union(new_result)
    results = results.where("Classifier!='Place Holder'")
    print(results.show(truncate=False))


[1mW2VfeaturizedData  Results:[0m
 
[1mLogisticRegression  Coefficient Matrix[0m
You should compares these relative to eachother
Coefficients: 
DenseMatrix([[-1.23336405, -1.39634348,  1.28240701]])

Intercept: [-1.3596502327972995]
 
[1mLinearSVC  Coefficients[0m
You should compares these relative to eachother
Coefficients: 
[0.0,-0.09217599428487948,-0.028624116430602246]
 
[1mRandomForestClassifier  Feature Importances[0m
(Scores add up to 1)
Lowest score is the least important
 
(3,[0,1,2],[0.2984048421765647,0.3337409755894872,0.3678541822339482])
 
[1mDecisionTreeClassifier  Feature Importances[0m
(Scores add up to 1)
Lowest score is the least important
 
(3,[0,1,2],[0.33737568139611807,0.33360552745426525,0.32901879114961663])
+----------------------+------+
|Classifier            |Result|
+----------------------+------+
|LogisticRegression    |83.83 |
|LinearSVC             |83.83 |
|RandomForestClassifier|87.58 |
|DecisionTreeClassifier|86.29 |
+---------------------

In [25]:
class_count = featureDF.select(countDistinct("label")).collect()
classes = class_count[0][0]

In [27]:
predictions = RF_BestModel.transform(test)
print("Predicted Fraudulent:")
predictions.select("fraudulent","description").filter("prediction=1").orderBy(predictions["prediction"].desc()).show(3,False)
print(" ")
print("Predicted Not Fraudulent:")
predictions.select("fraudulent","description").filter("prediction=0").orderBy(predictions["prediction"].desc()).show(3,False)


Predicted Fraudulent:
+----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

+----------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|fraudulent|description                                                                                                                                                                                                                                                                                                                                         |
+----------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------