In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=4f62ab8859aac7cc62380b5c37e3ff162360b3bfbedb45e811b3ae38f27df392
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [None]:

from pyspark.ml.feature import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.classification import *
from pyspark.ml.evaluation import *
from pyspark.ml.tuning import *
from pyspark.ml import Pipeline

In [None]:
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.getOrCreate()

In [None]:
from IPython.display import display, HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

# read data

In [None]:
df = spark.read.option("header", "true").csv("fake_job_postings.csv")

In [None]:
df.cache()

DataFrame[job_id: string, title: string, location: string, department: string, salary_range: string, company_profile: string, description: string, requirements: string, benefits: string, telecommuting: string, has_company_logo: string, has_questions: string, employment_type: string, required_experience: string, required_education: string, industry: string, function: string, fraudulent: string]

In [None]:
df.printSchema()

root
 |-- job_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- location: string (nullable = true)
 |-- department: string (nullable = true)
 |-- salary_range: string (nullable = true)
 |-- company_profile: string (nullable = true)
 |-- description: string (nullable = true)
 |-- requirements: string (nullable = true)
 |-- benefits: string (nullable = true)
 |-- telecommuting: string (nullable = true)
 |-- has_company_logo: string (nullable = true)
 |-- has_questions: string (nullable = true)
 |-- employment_type: string (nullable = true)
 |-- required_experience: string (nullable = true)
 |-- required_education: string (nullable = true)
 |-- industry: string (nullable = true)
 |-- function: string (nullable = true)
 |-- fraudulent: string (nullable = true)



In [None]:
df = df.drop('job_id')

In [None]:

print((df.count(), len(df.columns)))

(17880, 17)


In [None]:
def null_value_calc(df):
    null_columns_counts = []
    numRows = df.count()
    for k in df.columns:
        nullRows = df.where(col(k).isNull()).count()
        if(nullRows > 0):
            temp = k,nullRows,(nullRows/numRows)*100
            null_columns_counts.append(temp)
    return(null_columns_counts)

null_columns_calc_list = null_value_calc(df)
spark.createDataFrame(null_columns_calc_list, ['Column_Name', 'Null_Values_Count','Null_Value_Percent']).show()

+-------------------+-----------------+--------------------+
|        Column_Name|Null_Values_Count|  Null_Value_Percent|
+-------------------+-----------------+--------------------+
|           location|              346|  1.9351230425055927|
|         department|            11547|   64.58053691275167|
|       salary_range|            15011|   83.95413870246085|
|    company_profile|             3308|  18.501118568232663|
|        description|                1|0.005592841163310962|
|       requirements|             2573|  14.390380313199106|
|           benefits|             6966|   38.95973154362416|
|      telecommuting|               89| 0.49776286353467564|
|   has_company_logo|               29|  0.1621923937360179|
|      has_questions|               30| 0.16778523489932887|
|    employment_type|             3292|   18.41163310961969|
|required_experience|             6723|  37.600671140939596|
| required_education|             7748|  43.333333333333336|
|           industry|   

#Drop any row that's not classified fraud or not (0,1)


In [None]:
df2 = df.filter("fraudulent IN('0','1')")
# Make sure it worked
df2.groupBy("fraudulent").count().orderBy(col("count").desc()).show(truncate=False)

+----------+-----+
|fraudulent|count|
+----------+-----+
|0         |16080|
|1         |886  |
+----------+-----+



In [None]:

df3 = df2.sampleBy("fraudulent", fractions={"0": 0.4, "1": 1.0}, seed=10)
# QA again
df3.groupBy("fraudulent").count().show(truncate=False)

+----------+-----+
|fraudulent|count|
+----------+-----+
|0         |6485 |
|1         |886  |
+----------+-----+



In [None]:
from pyspark.sql.functions import *

def null_value_calc(df):
    null_columns_counts = []
    numRows = df.count()
    for k in df.columns:
        nullRows = df.where(col(k).isNull()).count()
        if(nullRows > 0):
            temp = k,nullRows,(nullRows/numRows)*100
            null_columns_counts.append(temp)
    return(null_columns_counts)

null_columns_calc_list = null_value_calc(df3)
spark.createDataFrame(null_columns_calc_list, ['Column_Name', 'Null_Values_Count','Null_Value_Percent']).show()

+-------------------+-----------------+------------------+
|        Column_Name|Null_Values_Count|Null_Value_Percent|
+-------------------+-----------------+------------------+
|           location|              151| 2.048568715235382|
|         department|             4868| 66.04259937593271|
|       salary_range|             6160|  83.5707502374169|
|    company_profile|             1620|21.978021978021978|
|       requirements|             1134|15.384615384615385|
|           benefits|             3032|  41.1341744675078|
|    employment_type|             1449| 19.65811965811966|
|required_experience|             2961| 40.17094017094017|
| required_education|             3328| 45.14991181657848|
|           industry|             2030| 27.54036087369421|
|           function|             2661|36.100936100936096|
+-------------------+-----------------+------------------+



Fraudulent: The Target


#Since the percentage of nulls might seem alot more to be dropped, we may ignore the unnecessary columns and only consider the important ones only


Include only:

*   Location: Frauds may be associated with some locations over others
*   Description: Contains the actual text of the job.
* Fraudulent: The Target






In [None]:
# remove unwanted columns
# How about by subset by just the vars we need for now.
filtered = df3.na.drop(subset=["Location", "description", "fraudulent"])
print((filtered.count(), len(filtered.columns)))

(7220, 17)


In [None]:

# now change the data type to be integer after cleaning the misleading data in those columns
filter2 = filtered.withColumn("fraudulent", df["fraudulent"].cast(IntegerType())) \
        .withColumn("has_questions", df["has_questions"].cast(IntegerType())) \
        .withColumn("has_company_logo",df.has_company_logo.cast(IntegerType())) \
        .withColumn("telecommuting",df.telecommuting.cast(IntegerType()))



print(filter2.printSchema())

root
 |-- title: string (nullable = true)
 |-- location: string (nullable = true)
 |-- department: string (nullable = true)
 |-- salary_range: string (nullable = true)
 |-- company_profile: string (nullable = true)
 |-- description: string (nullable = true)
 |-- requirements: string (nullable = true)
 |-- benefits: string (nullable = true)
 |-- telecommuting: integer (nullable = true)
 |-- has_company_logo: integer (nullable = true)
 |-- has_questions: integer (nullable = true)
 |-- employment_type: string (nullable = true)
 |-- required_experience: string (nullable = true)
 |-- required_education: string (nullable = true)
 |-- industry: string (nullable = true)
 |-- function: string (nullable = true)
 |-- fraudulent: integer (nullable = true)

None


In [None]:

filter2.limit(5).toPandas()

Unnamed: 0,title,location,department,salary_range,company_profile,description,requirements,benefits,telecommuting,has_company_logo,has_questions,employment_type,required_experience,required_education,industry,function,fraudulent
0,Marketing Intern,"US, NY, New York",Marketing,,"We're Food52, and we've created a groundbreaki...","Food52, a fast-growing, James Beard Award-winn...",Experience with content management systems a m...,,0,1,0,Other,Internship,,,Marketing,0
1,Bill Review Manager,"US, FL, Fort Worth",,,SpotSource Solutions LLC is a Global Human Cap...,JOB TITLE: Itemization Review ManagerLOCATION:...,QUALIFICATIONS:RN license in the State of Texa...,Full Benefits Offered,0,1,1,Full-time,Mid-Senior level,Bachelor's Degree,Hospital & Health Care,Health Care Provider,0
2,Accounting Clerk,"US, MD,",,,,Job OverviewApex is an environmental consultin...,,,0,0,0,,,,,,0
3,Customer Service Associate - Part Time,"US, AZ, Phoenix",,,"Novitex Enterprise Solutions, formerly Pitney ...",The Customer Service Associate will be based i...,Minimum Requirements:Minimum of 6 months custo...,,0,1,0,Part-time,Entry level,High School or equivalent,Financial Services,Customer Service,0
4,Talent Sourcer (6 months fixed-term contract),"GB, LND, London",HR,,Want to build a 21st century financial service...,TransferWise is the clever new way to move mon...,We’re looking for someone who:Proven track rec...,You will join one of Europe’s most hotly tippe...,0,1,0,,,,,,0


#Check class balance
* The data is obviously imbalanced and that may be treated in different ways like:

1. Change the accuracy metric to be able to monitor both false and true positives and negatives
2. K-Fold Cross Validation
3.  Sampling the data (delete some of the data labeled 0 to maintain balance)

In [None]:
filter2.groupBy("fraudulent").count().orderBy(col("count").desc()).show(truncate=False)


+----------+-----+
|fraudulent|count|
+----------+-----+
|0         |6354 |
|1         |866  |
+----------+-----+



In [None]:
tot = filter2.count()
filtered.groupBy("fraudulent").count().withColumnRenamed('count', 'cnt_per_group').withColumn('perc_of_count_total', (col('cnt_per_group') / tot) * 100 ).show(100)

+----------+-------------+-------------------+
|fraudulent|cnt_per_group|perc_of_count_total|
+----------+-------------+-------------------+
|         0|         6354|  88.00554016620498|
|         1|          866| 11.994459833795014|
+----------+-------------+-------------------+



#Preprocessing the data, both labels and text

In [None]:
selected_df = filter2.select("description", "fraudulent")

In [None]:
selected_df.printSchema()

root
 |-- description: string (nullable = true)
 |-- fraudulent: integer (nullable = true)



#Apply the text hashing techniques:


*   HashingTF
*   Word2Vec
*   TF-IDF



In [None]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, IDF
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import NaiveBayes


In [None]:
selected_df.cache()

DataFrame[description: string, fraudulent: int]

In [None]:
# Define tokenizer
tokenizer = Tokenizer(inputCol="description", outputCol="words")

In [None]:
# Define stop words remover
stop_words_remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")


In [None]:
count_vectorizer = CountVectorizer(inputCol="filtered_words", outputCol="raw_features")


In [None]:
idf = IDF(inputCol="raw_features", outputCol="tf_idf_features")

In [None]:
# Define Naive Bayes classifier
nb = NaiveBayes(featuresCol="tf_idf_features", labelCol="fraudulent")

In [None]:
# Split data into training and testing sets
train_data, test_data = selected_df.randomSplit([0.8, 0.2], seed=42)

In [None]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[tokenizer, stop_words_remover, count_vectorizer, idf, nb])

In [None]:
# Train the model
model = pipeline.fit(train_data)

In [None]:
predictions = model.transform(test_data)

In [None]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="fraudulent", metricName="f1")
f1_score = evaluator.evaluate(predictions)
print("F1 Score:", f1_score)

F1 Score: 0.9461963358183296


In [None]:
# using DL

# using DL

In [None]:
import torch
from torch.utils.data import DataLoader, TensorDataset
from transformers import BertTokenizer, BertForSequenceClassification
from transformers import DistilBertTokenizer, DistilBertForSequenceClassification

from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
import pandas as pd
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from sklearn.feature_extraction.text import CountVectorizer
import pandas as pd
import re

In [None]:
df = pd.read_csv('fake_job_postings.csv')

In [None]:
random_sample = df.sample(n=1000)

In [None]:
df2 = random_sample[['description', 'fraudulent']]

In [None]:
df2.info()

<class 'pandas.core.frame.DataFrame'>
Index: 1000 entries, 12287 to 5271
Data columns (total 2 columns):
 #   Column       Non-Null Count  Dtype 
---  ------       --------------  ----- 
 0   description  1000 non-null   object
 1   fraudulent   1000 non-null   int64 
dtypes: int64(1), object(1)
memory usage: 23.4+ KB


In [None]:
import re
def process_text(text):
    news = re.sub(r'[^a-zA-Z\s]','',text)
    lo_news = news.lower()
    return lo_news

In [None]:
df2['description'] = df2['description'].apply(process_text)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df2['description'] = df2['description'].apply(process_text)


In [None]:
import tensorflow as tf
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.utils import pad_sequences
from tensorflow.keras.layers import LSTM,GRU,Bidirectional,Dense,Embedding
from tensorflow.keras import Sequential

In [None]:
# passing object
tk = Tokenizer()

In [None]:
#fit the text into the tokenizer
tk.fit_on_texts(df2['description'])

In [None]:
#integer encoding
seq = tk.texts_to_sequences(df2['description'])

In [None]:

#padded the vector to equalize the dimenstion
vec = pad_sequences(seq,padding='post',maxlen=50)

In [None]:
# split features and target variable
import numpy as np
x = np.array(vec)
y = np.array(df2['fraudulent'])

In [None]:
from sklearn.model_selection import train_test_split
x_train,x_test,y_train,y_test = train_test_split(x,y,test_size=0.15,random_state=32)

In [None]:
# Building model using sequential API

model = Sequential()
model.add(Embedding(input_dim=len(tk.word_index)+1,output_dim=100,input_length=50))
model.add(Bidirectional(LSTM(units=100)))
model.add(Dense(1,activation='sigmoid'))

In [None]:
model.compile(optimizer='adam',loss='binary_crossentropy',metrics=['accuracy'])

In [None]:
history = model.fit(x_train,y_train,epochs=2,batch_size=32,
                    validation_data=(x_test,y_test))

Epoch 1/2
Epoch 2/2


In [None]:
model.evaluate(x_test,y_test)



[0.23514424264431, 0.9333333373069763]

