# NLP in Pyspark's MLlib

## Fake Job Posting Predictions

create a system that automatically flags suspicious job postings on a dataset. 

#### The task
With NLP to create an alogorthim which automatically flags suspicious posts for review. 

#### The data
This dataset contains 18K job descriptions out of which about 800 are fake. The data consists of both descriptionual information and meta-information about the jobs.

In [1]:
# Create PySpark Instance

import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("NLPProject").getOrCreate()
spark

### Data Exploration and Pre-Processing
This fase will explore and load the data, then prepare and analyze it for be used in the MLLib, the steps will be:
- Load the .csv file
- Check the Dataframe to see if there is: Negative Numbers, How Many Classes

In [10]:
# import libraries and dependencies

from pyspark.ml.feature import * #CountVectorizer,StringIndexer, RegexTokenizer,StopWordsRemover
from pyspark.sql.functions import * #col, udf,regexp_replace,isnull
from pyspark.sql.types import * #StringType,IntegerType
from pyspark.ml.classification import *
from pyspark.ml.evaluation import *
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# For pipeline development
from pyspark.ml import Pipeline 

In [73]:
# Import the data
df =  spark.read.csv("Datasets/fake_job_postings.csv", inferSchema=True, header=True)

In [74]:
# Show the first 5 rows
df.limit(5).toPandas()

Unnamed: 0,job_id,title,location,department,salary_range,company_profile,description,requirements,benefits,telecommuting,has_company_logo,has_questions,employment_type,required_experience,required_education,industry,function,fraudulent
0,1,Marketing Intern,"US, NY, New York",Marketing,,"We're Food52, and we've created a groundbreaki...","Food52, a fast-growing, James Beard Award-winn...",Experience with content management systems a m...,,0,1,0,Other,Internship,,,Marketing,0
1,2,Customer Service - Cloud Video Production,"NZ, , Auckland",Success,,"90 Seconds, the worlds Cloud Video Production ...",Organised - Focused - Vibrant - Awesome!Do you...,What we expect from you:Your key responsibilit...,What you will get from usThrough being part of...,0,1,0,Full-time,Not Applicable,,Marketing and Advertising,Customer Service,0
2,3,Commissioning Machinery Assistant (CMA),"US, IA, Wever",,,Valor Services provides Workforce Solutions th...,"Our client, located in Houston, is actively se...",Implement pre-commissioning and commissioning ...,,0,1,0,,,,,,0
3,4,Account Executive - Washington DC,"US, DC, Washington",Sales,,Our passion for improving quality of life thro...,THE COMPANY: ESRI – Environmental Systems Rese...,"EDUCATION: Bachelor’s or Master’s in GIS, busi...",Our culture is anything but corporate—we have ...,0,1,0,Full-time,Mid-Senior level,Bachelor's Degree,Computer Software,Sales,0
4,5,Bill Review Manager,"US, FL, Fort Worth",,,SpotSource Solutions LLC is a Global Human Cap...,JOB TITLE: Itemization Review ManagerLOCATION:...,QUALIFICATIONS:RN license in the State of Texa...,Full Benefits Offered,0,1,1,Full-time,Mid-Senior level,Bachelor's Degree,Hospital & Health Care,Health Care Provider,0


In [75]:
# See how many categories we have in fraudulent (should be 2)
df.groupBy("fraudulent").count().orderBy(col("count").desc()).show()

+--------------------+-----+
|          fraudulent|count|
+--------------------+-----+
|                   0|16080|
|                   1|  886|
|                null|  176|
|           Full-time|   73|
|Hospital & Health...|   55|
|   Bachelor's Degree|   53|
|         Engineering|   26|
| perform quality ...|   17|
|         Unspecified|   15|
|    Mid-Senior level|   15|
|               Sales|   14|
|           Associate|   14|
|Information Techn...|   13|
| passionate about...|   13|
|           Marketing|   13|
|   Computer Software|   12|
|            Internet|   12|
|      Not Applicable|   11|
|We offer an excel...|   11|
| además con el fi...|   10|
+--------------------+-----+
only showing top 20 rows



In [76]:
# drop the fraudulents rows that is not 0 or 1
df = df.filter("fraudulent == 1 OR fraudulent == 0")

In [77]:
# See how many categories we have in fraudulent (should be 2)
df.groupBy("fraudulent").count().show()

+----------+-----+
|fraudulent|count|
+----------+-----+
|         0|16080|
|         1|  886|
+----------+-----+



In [78]:
# Show how many Null we have in the Dataframe
total_rows = df.count() 
total_drop = df.na.drop().count()
percent_drop = (total_rows-total_drop)/total_rows*100
print("Percentage of rows with Null against the Total Rows: {}%".format(percent_drop ))

Percentage of rows with Null against the Total Rows: 95.72085347164918%


In [58]:
# Cant drop all the rows with null values or will lose almost 96% of the data.
# Lets concatenate the data that we are intrested: descrption, requirements and benefits:
#df = df.withColumn('description', concat(col('description'),lit(" "),col('requirements')))

In [79]:
# Select only the columns that we need
df = df.select("description", "fraudulent")
df.limit(5).toPandas()

Unnamed: 0,description,fraudulent
0,"Food52, a fast-growing, James Beard Award-winn...",0
1,Organised - Focused - Vibrant - Awesome!Do you...,0
2,"Our client, located in Houston, is actively se...",0
3,THE COMPANY: ESRI – Environmental Systems Rese...,0
4,JOB TITLE: Itemization Review ManagerLOCATION:...,0


In [80]:
# Show how many Null we have in the new Dataframe
total_rows = df.count() 
total_drop = df.na.drop().count()
percent_drop = (total_rows-total_drop)/total_rows*100
print("Percentage of rows with Null against the Total Rows: {}%".format(percent_drop ))

Percentage of rows with Null against the Total Rows: 0.0%


In [81]:
# Now we can drop the null values
df = df.dropna()

In [82]:
# Balance the signal, reduce the amount of 0 classification against the 1, to train better or model
df = df.sampleBy("fraudulent", fractions={'0': 0.4, '1': 1.0}, seed=10)
# QA again 
df.groupBy("fraudulent").count().show(truncate=False)

+----------+-----+
|fraudulent|count|
+----------+-----+
|0         |6427 |
|1         |886  |
+----------+-----+



In [85]:
# Remove the () and / from the description Column
df = df.withColumn("description",translate(col("description"), "/", " ")) \
        .withColumn("description",translate(col("description"), "(", " ")) \
        .withColumn("description",translate(col("description"), ")", " "))

In [86]:
#  Remove any special character
# Removing anything that is not a letter
df = df.withColumn("description",regexp_replace(col('description'), '[^A-Za-z ]+', ''))

In [88]:
# Remove multiple spaces
df = df.withColumn("description",regexp_replace(col('description'), ' +', ' '))

# Lower Case all words
df = df.withColumn("description",lower(col('description')))

In [90]:
df.select("description").show(5,False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

### NLP Data Preparation
Prepare the data to analyzed by MLLib:
- Tocknize the data
- Transform the text in numeric vectors

In [92]:
# Create a ML Pipeline with three fases: Tocknizer, Remove The Stopwords, and Zero Index Label Column

# Tokenize
regex_tokenizer = RegexTokenizer(inputCol="description", outputCol="words", pattern="\\W")

# Remove Stop words
remover = StopWordsRemover(inputCol=regex_tokenizer.getOutputCol(), outputCol="filtered")

# Zero Index Label Column
indexer = StringIndexer(inputCol="fraudulent", outputCol="label")

# Create the Pipeline
pipeline = Pipeline(stages=[regex_tokenizer,remover,indexer])
data_prep_pl = pipeline.fit(df)

# Now call on the Pipeline to get our final df
feature_data = data_prep_pl.transform(df)
feature_data.show(1,False)

+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------+---------------------------------------------------------------------------------------------------------------

In [127]:
# Hashing TF
hashingTF = HashingTF(inputCol="filtered", outputCol="rawfeatures", numFeatures=20)
HTFfeaturizedData = hashingTF.transform(feature_data)

# TF-IDF
idf = IDF(inputCol="rawfeatures", outputCol="features")
idfModel = idf.fit(HTFfeaturizedData)
TFIDFfeaturizedData = idfModel.transform(HTFfeaturizedData)
TFIDFfeaturizedData.name = 'TFIDFfeaturizedData'

#rename the HTF features to features to be consistent
HTFfeaturizedData = HTFfeaturizedData.withColumnRenamed("rawfeatures","features")
HTFfeaturizedData.name = 'HTFfeaturizedData' #We will use later for printing

In [128]:
# Word2Vec
word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="filtered", outputCol="features")
model = word2Vec.fit(feature_data)

W2VfeaturizedData = model.transform(feature_data)
# W2VfeaturizedData.show(1,False)

# W2Vec Dataframes typically has negative values so we will correct for that here so that we can use the Naive Bayes classifier
scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")

# Compute summary statistics and generate MinMaxScalerModel
scalerModel = scaler.fit(W2VfeaturizedData)

# rescale each feature to range [min, max].
scaled_data = scalerModel.transform(W2VfeaturizedData)
W2VfeaturizedData = scaled_data.select('fraudulent','description','label','scaledFeatures')
W2VfeaturizedData = W2VfeaturizedData.withColumnRenamed('scaledFeatures','features')

W2VfeaturizedData.name = 'W2VfeaturizedData'

In [129]:
# Shows the Dataframes after the transformation
TFIDFfeaturizedData.limit(4).toPandas()

Unnamed: 0,description,fraudulent,words,filtered,label,rawfeatures,features
0,food a fastgrowing james beard awardwinning on...,0,"[food, a, fastgrowing, james, beard, awardwinn...","[food, fastgrowing, james, beard, awardwinning...",0.0,"(6.0, 6.0, 8.0, 2.0, 4.0, 4.0, 4.0, 5.0, 5.0, ...","(0.5216208179612858, 0.6445722349011209, 0.407..."
1,job title itemization review managerlocation f...,0,"[job, title, itemization, review, managerlocat...","[job, title, itemization, review, managerlocat...",0.0,"(11.0, 4.0, 10.0, 18.0, 8.0, 4.0, 4.0, 7.0, 6....","(0.9563048329290239, 0.42971482326741395, 0.50..."
2,job overviewapex is an environmental consultin...,0,"[job, overviewapex, is, an, environmental, con...","[job, overviewapex, environmental, consulting,...",0.0,"(10.0, 4.0, 25.0, 24.0, 18.0, 10.0, 11.0, 22.0...","(0.8693680299354762, 0.42971482326741395, 1.27..."
3,the customer service associate will be based i...,0,"[the, customer, service, associate, will, be, ...","[customer, service, associate, based, phoenix,...",0.0,"(8.0, 4.0, 8.0, 4.0, 2.0, 5.0, 6.0, 4.0, 10.0,...","(0.695494423948381, 0.42971482326741395, 0.407..."


In [130]:
HTFfeaturizedData.limit(4).toPandas()

Unnamed: 0,description,fraudulent,words,filtered,label,features
0,food a fastgrowing james beard awardwinning on...,0,"[food, a, fastgrowing, james, beard, awardwinn...","[food, fastgrowing, james, beard, awardwinning...",0.0,"(6.0, 6.0, 8.0, 2.0, 4.0, 4.0, 4.0, 5.0, 5.0, ..."
1,job title itemization review managerlocation f...,0,"[job, title, itemization, review, managerlocat...","[job, title, itemization, review, managerlocat...",0.0,"(11.0, 4.0, 10.0, 18.0, 8.0, 4.0, 4.0, 7.0, 6...."
2,job overviewapex is an environmental consultin...,0,"[job, overviewapex, is, an, environmental, con...","[job, overviewapex, environmental, consulting,...",0.0,"(10.0, 4.0, 25.0, 24.0, 18.0, 10.0, 11.0, 22.0..."
3,the customer service associate will be based i...,0,"[the, customer, service, associate, will, be, ...","[customer, service, associate, based, phoenix,...",0.0,"(8.0, 4.0, 8.0, 4.0, 2.0, 5.0, 6.0, 4.0, 10.0,..."


In [131]:
W2VfeaturizedData.limit(4).toPandas()

Unnamed: 0,fraudulent,description,label,features
0,0,food a fastgrowing james beard awardwinning on...,0.0,"[0.24843121147833588, 0.357577434919877, 0.481..."
1,0,job title itemization review managerlocation f...,0.0,"[0.24557787102800416, 0.43791577504755075, 0.3..."
2,0,job overviewapex is an environmental consultin...,0.0,"[0.2555841408061735, 0.3081030392356156, 0.487..."
3,0,the customer service associate will be based i...,0.0,"[0.28457634526134656, 0.8612176372819632, 0.19..."


### Train and Evaluate the Data

Train each classifier algorithm for each vector transformations

In [132]:
def ClassTrainEval(classifier,features,classes,train,test):

    def FindMtype(classifier):
        # Intstantiate Model
        M = classifier
        # Learn what it is
        Mtype = type(M).__name__
        
        return Mtype
    
    Mtype = FindMtype(classifier)
    

    def IntanceFitModel(Mtype,classifier,classes,features,train):
        
        if Mtype == "OneVsRest":
            # instantiate the base classifier.
            lr = LogisticRegression()
            # instantiate the One Vs Rest Classifier.
            OVRclassifier = OneVsRest(classifier=lr)
#             fitModel = OVRclassifier.fit(train)
            # Add parameters of your choice here:
            paramGrid = ParamGridBuilder() \
                .addGrid(lr.regParam, [0.1, 0.01]) \
                .build()
            #Cross Validator requires the following parameters:
            crossval = CrossValidator(estimator=OVRclassifier,
                                      estimatorParamMaps=paramGrid,
                                      evaluator=MulticlassClassificationEvaluator(),
                                      numFolds=2) # 3 is best practice
            # Run cross-validation, and choose the best set of parameters.
            fitModel = crossval.fit(train)
            return fitModel
        if Mtype == "MultilayerPerceptronClassifier":
            # specify layers for the neural network:
            # input layer of size features, two intermediate of features+1 and same size as features
            # and output of size number of classes
            # Note: crossvalidator cannot be used here
            features_count = len(features[0][0])
            layers = [features_count, features_count+1, features_count, classes]
            MPC_classifier = MultilayerPerceptronClassifier(maxIter=100, layers=layers, blockSize=128, seed=1234)
            fitModel = MPC_classifier.fit(train)
            return fitModel
        if Mtype in("LinearSVC","GBTClassifier") and classes != 2: # These classifiers currently only accept binary classification
            print(Mtype," could not be used because PySpark currently only accepts binary classification data for this algorithm")
            return
        if Mtype in("LogisticRegression","NaiveBayes","RandomForestClassifier","GBTClassifier","LinearSVC","DecisionTreeClassifier"):
  
            # Add parameters of your choice here:
            if Mtype in("LogisticRegression"):
                paramGrid = (ParamGridBuilder() \
#                              .addGrid(classifier.regParam, [0.1, 0.01]) \
                             .addGrid(classifier.maxIter, [10, 15,20])
                             .build())
                
            # Add parameters of your choice here:
            if Mtype in("NaiveBayes"):
                paramGrid = (ParamGridBuilder() \
                             .addGrid(classifier.smoothing, [0.0, 0.2, 0.4, 0.6]) \
                             .build())
                
            # Add parameters of your choice here:
            if Mtype in("RandomForestClassifier"):
                paramGrid = (ParamGridBuilder() \
                               .addGrid(classifier.maxDepth, [2, 5, 10])
#                                .addGrid(classifier.maxBins, [5, 10, 20])
#                                .addGrid(classifier.numTrees, [5, 20, 50])
                             .build())
                
            # Add parameters of your choice here:
            if Mtype in("GBTClassifier"):
                paramGrid = (ParamGridBuilder() \
#                              .addGrid(classifier.maxDepth, [2, 5, 10, 20, 30]) \
#                              .addGrid(classifier.maxBins, [10, 20, 40, 80, 100]) \
                             .addGrid(classifier.maxIter, [10, 15,50,100])
                             .build())
                
            # Add parameters of your choice here:
            if Mtype in("LinearSVC"):
                paramGrid = (ParamGridBuilder() \
                             .addGrid(classifier.maxIter, [10, 15]) \
                             .addGrid(classifier.regParam, [0.1, 0.01]) \
                             .build())
            
            # Add parameters of your choice here:
            if Mtype in("DecisionTreeClassifier"):
                paramGrid = (ParamGridBuilder() \
#                              .addGrid(classifier.maxDepth, [2, 5, 10, 20, 30]) \
                             .addGrid(classifier.maxBins, [10, 20, 40, 80, 100]) \
                             .build())
            
            #Cross Validator requires all of the following parameters:
            crossval = CrossValidator(estimator=classifier,
                                      estimatorParamMaps=paramGrid,
                                      evaluator=MulticlassClassificationEvaluator(),
                                      numFolds=2) # 3 + is best practice
            # Fit Model: Run cross-validation, and choose the best set of parameters.
            fitModel = crossval.fit(train)
            return fitModel
    
    fitModel = IntanceFitModel(Mtype,classifier,classes,features,train)
    
    # Set the column names to match the external results dataframe that we will join with later:
    columns = ['Classifier', 'Result']
    
    if Mtype in("LinearSVC","GBTClassifier") and classes != 2:
        Mtype = [Mtype] # make this a list
        score = ["N/A"]
        result = spark.createDataFrame(zip(Mtype,score), schema=columns)
    else:
        predictions = fitModel.transform(test)
        MC_evaluator = MulticlassClassificationEvaluator(metricName="accuracy") # redictionCol="prediction",
        accuracy = (MC_evaluator.evaluate(predictions))*100
        Mtype = [Mtype] # make this a string
        score = [str(accuracy)] #make this a string and convert to a list
        result = spark.createDataFrame(zip(Mtype,score), schema=columns)
        result = result.withColumn('Result',result.Result.substr(0, 5))
        
    return result
    #Also returns the fit model important scores or p values

In [133]:
from pyspark.ml.classification import *
from pyspark.ml.evaluation import *
from pyspark.sql import functions
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# Comment out Naive Bayes if your data still contains negative values
classifiers = [
                LogisticRegression()
                ,OneVsRest()
               ,LinearSVC()
               ,NaiveBayes()
               ,RandomForestClassifier()
               ,GBTClassifier()
               ,DecisionTreeClassifier()
               ,MultilayerPerceptronClassifier()
              ] 

featureDF_list = [HTFfeaturizedData,TFIDFfeaturizedData,W2VfeaturizedData]

In [134]:
for featureDF in featureDF_list:
    print('\033[1m' + featureDF.name," Results:"+ '\033[0m')
    train, test = featureDF.randomSplit([0.7, 0.3],seed = 11)
    features = featureDF.select(['features']).collect()
    # Learn how many classes there are in order to specify evaluation type based on binary or multi and turn the df into an object
    class_count = featureDF.select(countDistinct("label")).collect()
    classes = class_count[0][0]

    #set up your results table
    columns = ['Classifier', 'Result']
    vals = [("Place Holder","N/A")]
    results = spark.createDataFrame(vals, columns)

    for classifier in classifiers:
        new_result = ClassTrainEval(classifier,features,classes,train,test)
        results = results.union(new_result)
    results = results.where("Classifier!='Place Holder'")
    print(results.show(truncate=False))

[1mHTFfeaturizedData  Results:[0m
+------------------------------+------+
|Classifier                    |Result|
+------------------------------+------+
|LogisticRegression            |86.82 |
|OneVsRest                     |86.77 |
|LinearSVC                     |86.77 |
|NaiveBayes                    |86.41 |
|RandomForestClassifier        |89.36 |
|GBTClassifier                 |90.09 |
|DecisionTreeClassifier        |87.36 |
|MultilayerPerceptronClassifier|87.82 |
+------------------------------+------+

None
[1mTFIDFfeaturizedData  Results:[0m
+------------------------------+------+
|Classifier                    |Result|
+------------------------------+------+
|LogisticRegression            |86.82 |
|OneVsRest                     |86.77 |
|LinearSVC                     |86.77 |
|NaiveBayes                    |86.77 |
|RandomForestClassifier        |89.36 |
|GBTClassifier                 |90.09 |
|DecisionTreeClassifier        |87.36 |
|MultilayerPerceptronClassifier|87.68 |
