In [0]:
import pandas as pd
import sparknlp
import matplotlib.pyplot as plt
import seaborn as sns
from sparknlp.annotator import BertEmbeddings
from pyspark.sql.types import StringType
from sentence_transformers import SentenceTransformer
from pyspark.sql.functions import udf, explode
from pyspark.sql.types import ArrayType, FloatType
from pyspark.ml.linalg import DenseVector
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf, array, concat, col
from pyspark.ml.classification import RandomForestClassifier, GBTClassifier, LogisticRegression
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator

## Preprocess kaggle dataset

In [0]:
df = pd.read_csv('fake_job_postings.csv')

In [0]:
# Load the gemini dataset
gemini_data = pd.read_csv("fake_job_postings_with_AI.csv")

gemini_data["required_experience_gemini"] = gemini_data["required_experience_gemini"].replace(to_replace=["None", "None specified"], value=None)
gemini_data["required_education_gemini"] = gemini_data["required_education_gemini"].replace(to_replace=["None", "None specified"], value=None)
gemini_data["employment_type_gemini"] = gemini_data["employment_type_gemini"].replace(to_replace=["None", "None specified"], value=None)
gemini_data["requirements_gemini"] = gemini_data["requirements_gemini"].replace(to_replace=["None", "None specified"], value=None)

enriched_columns = ["required_experience", "required_education", "employment_type", "requirements", "industry", "function"]

# Enrich the dataset with the gemini results
for idx, job in gemini_data.iterrows():
    rows_filter = df["job_id"] == job["job_id"]
    original_job_idx = df.index[rows_filter][0]
    original_job = df[rows_filter].loc[original_job_idx]

    for c in enriched_columns:
        if pd.isna(df.loc[original_job_idx, c]) and not pd.isna(gemini_data.loc[idx, f"{c}_gemini"]):
            df.loc[original_job_idx, c] = job[f"{c}_gemini"]

df["benefits"] = df["benefits"].replace(to_replace=[None], value="")
df["company_profile"] = df["company_profile"].replace(to_replace=[None], value="")
df["description"] = df["description"].replace(to_replace=[None], value="")
df["requirements"] = df["requirements"].replace(to_replace=[None], value="")

In [0]:
# Select relevant columns for preprocessing
df = df[['title', 'department', 'company_profile', 'description', 'requirements', 'benefits', 'employment_type', 'required_experience', 'required_education', 'industry', 'function', 'fraudulent']]

# Replace missing values with empty strings
for c in df.columns:
    df[c].replace(pd.NA, '', inplace=True)
    if pd.api.types.is_string_dtype(df[c]): 
        df[c] = df[c].str.lower()

# Add word count features to specific columns
count_cols = ['company_profile', 'description', 'requirements', 'benefits']
for c in count_cols:
    df[c + '_word_count'] = df[c].str.split().str.len()

df['total_word_count'] = df.apply(lambda row: sum(row[c].split().__len__() for c in df.columns if isinstance(row[c], str)), axis=1)

# Concatenate all text columns into a single column for embedding creation
cols = ['title', 'department', 'company_profile', 'description', 'requirements', 'benefits', 'employment_type', 'required_experience', 'required_education', 'industry', 'function']
df['all_text'] = df[cols].apply(lambda row: ' '.join(row.values.astype(str)), axis=1)
df = df[['all_text', 'company_profile_word_count', 'description_word_count', 'requirements_word_count', 'benefits_word_count', 'total_word_count', 'fraudulent']]
processed_kaggle_dataset = df
processed_kaggle_dataset.to_csv('processed_fake_job_postings.csv', index=False)

In [None]:
# Create spark dataframe 
df = spark.createDataFrame(processed_kaggle_dataset)
display(df)

In [None]:

# Load pre-trained sentence transformer model
model = SentenceTransformer('all-MiniLM-L6-v2') 

# embed text using Sentence Transformer
def embed_text(text):
    return model.encode([text]).tolist()[0]  

embed_udf = udf(embed_text, ArrayType(FloatType())) 

# Apply the UDF to the DataFrame
df = df.withColumn("embeddings_vector", embed_udf("all_text"))

# Normalize word count columns
word_count_cols = ["company_profile_word_count", "description_word_count", "requirements_word_count", "benefits_word_count", "total_word_count"]
for c in word_count_cols:
    min_value = df.agg({f"{c}": "min"}).collect()[0][0]
    max_value = df.agg({f"{c}": "max"}).collect()[0][0]
    
    df = df.withColumn(
        f"{c}_normalized",
        (col(c) -min_value) / (max_value - min_value)
    )

# Create a numeric array from the word count columns
df = df.withColumn(
    "numeric_features_array",
    array(*[col(f"{c}_normalized") for c in word_count_cols])
)


# Concatenate the numeric array with embeddings_vector
df = df.withColumn(
    "concatenated_features",
    concat(col("numeric_features_array"), col("embeddings_vector"))
    )

def array_to_vector(array):
    return Vectors.dense(array)

array_to_vector_udf = udf(array_to_vector, VectorUDT())

# Convert embeddings column to DenseVector
df = df.withColumn("features", array_to_vector_udf("concatenated_features"))
display(df.limit(5))

In [0]:
full_df = df
df = df.select('features', 'fraudulent')

## Test models
Train and test models on training and validation split that maintained the same class 
proportions to account for
the imbalance.

In [0]:
# Separate the non-fraudulent and fraudulent data
non_fraud = df.filter(col("fraudulent") == 0)
fraud = df.filter(col("fraudulent") == 1)

# Split the data into training and validation sets, with label ratios similar to the original data (5% fraudulent and 95% non-fraudulent).
train_non_fraud, val_non_fraud = non_fraud.randomSplit([0.8, 0.2], seed=42)
train_fraud, val_fraud = fraud.randomSplit([0.8, 0.2], seed=42)

train_df = train_non_fraud.union(train_fraud)
val_df = val_non_fraud.union(val_fraud)


In [0]:
# Initialize evaluators
evaluator_auc = BinaryClassificationEvaluator(labelCol="fraudulent", metricName="areaUnderROC")
evaluator_pr = BinaryClassificationEvaluator(labelCol="fraudulent", metricName="areaUnderPR")
results = {}

# Define classifiers dictionary with parameters directly
classifiers = {
    "RandomForest": RandomForestClassifier(labelCol="fraudulent", featuresCol="features", maxDepth=10, numTrees=10, seed=1),
    "GBTClassifier": GBTClassifier(labelCol="fraudulent", featuresCol="features", maxDepth=10, maxIter=10, seed=1),
    "LogisticRegression": LogisticRegression(labelCol="fraudulent", featuresCol="features", regParam=0.1, maxIter=10),
    "FFNetwork": MultilayerPerceptronClassifier(labelCol="fraudulent", featuresCol="features", layers=[389, 32, 2], maxIter=6, seed=1)
}

# Train and evaluate the models
for name, classifier in classifiers.items():
    print(f"Model: {name}")

    model = classifier.fit(train_df)
    predictions = model.transform(val_df)
    
    # Compute AUC-ROC
    auc_score = evaluator_auc.evaluate(predictions)
    
    # Compute AUC-PR
    pr_score = evaluator_pr.evaluate(predictions)
    
    # Compute F1 score manually
    tp = predictions.filter((predictions["prediction"] == 1) & (predictions["fraudulent"] == 1)).count()
    fp = predictions.filter((predictions["prediction"] == 1) & (predictions["fraudulent"] == 0)).count()
    fn = predictions.filter((predictions["prediction"] == 0) & (predictions["fraudulent"] == 1)).count()
    tn = predictions.filter((predictions["prediction"] == 0) & (predictions["fraudulent"] == 0)).count()

    precision = tp / (tp + fp) if (tp + fp) > 0 else 0
    recall = tp / (tp + fn) if (tp + fn) > 0 else 0
    f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
    tnr = tn / (tn + fp) if (tn + fp) > 0 else 0  
    # Compute Balanced Accuracy
    balanced_accuracy = (recall + tnr) / 2  

    results[name] = {
        "auc": auc_score,
        "pr": pr_score,
        "f1": f1_score,
        "balanced_accuracy": balanced_accuracy
    }

    print(f"Result of Model {name}:")
    print(f"  - AUC (ROC): {auc_score}")
    print(f"  - AUC (PR): {pr_score}")
    print(f"  - F1: {f1_score}")
    print(f"  - Balanced Accuracy: {balanced_accuracy}")

# Print out the results for all models
print("Results for all models:")
for name, result in results.items():
    print(f"{name}: AUC (ROC) = {result['auc']}, AUC (PR) = {result['pr']}, F1 = {result['f1']}, Balanced Accuracy = {result['balanced_accuracy']}")


Model: RandomForest


Downloading artifacts:   0%|          | 0/20 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

Result of Model RandomForest:
  - AUC (ROC): 0.9350638511814978
  - AUC (PR): 0.6730406208208748
  - F1: 0.5217391304347826
  - Balanced Accuracy: 0.6795475113122171
Model: GBTClassifier


Downloading artifacts:   0%|          | 0/20 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

Result of Model GBTClassifier:
  - AUC (ROC): 0.9477516339869297
  - AUC (PR): 0.6854486604991734
  - F1: 0.6044776119402985
  - Balanced Accuracy: 0.7644193061840121
Model: LogisticRegression


Downloading artifacts:   0%|          | 0/15 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

Result of Model LogisticRegression:
  - AUC (ROC): 0.9146385118149828
  - AUC (PR): 0.5656679716403981
  - F1: 0.2598870056497175
  - Balanced Accuracy: 0.5760633484162896
Model: FFNetwork
Result of Model FFNetwork:
  - AUC (ROC): 0.835743589743589
  - AUC (PR): 0.26424125673583426
  - F1: 0
  - Balanced Accuracy: 0.5
Results for all models:
RandomForest: AUC (ROC) = 0.9350638511814978, AUC (PR) = 0.6730406208208748, F1 = 0.5217391304347826, Balanced Accuracy = 0.6795475113122171
GBTClassifier: AUC (ROC) = 0.9477516339869297, AUC (PR) = 0.6854486604991734, F1 = 0.6044776119402985, Balanced Accuracy = 0.7644193061840121
LogisticRegression: AUC (ROC) = 0.9146385118149828, AUC (PR) = 0.5656679716403981, F1 = 0.2598870056497175, Balanced Accuracy = 0.5760633484162896
FFNetwork: AUC (ROC) = 0.835743589743589, AUC (PR) = 0.26424125673583426, F1 = 0, Balanced Accuracy = 0.5
