In [0]:
from pyspark.sql.functions import col, when
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, LinearSVC, RandomForestClassifier, MultilayerPerceptronClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import statistics as sts
import math
import re
import operator

In [0]:
file_path = "/FileStore/tables/fake_job_postings.csv"

In [0]:
# Step 2: Load the dataset
data = spark.read.csv(file_path, header=True, inferSchema=True)

In [0]:
# Step 3: Clean the dataset
# Encode the label column and remove invalid data
data = data.filter((col("fraudulent") == 0) | (col("fraudulent") == 1))
data = data.withColumn("fraudulent", col("fraudulent").cast("int"))

In [0]:
fraud_data = data[data['fraudulent'] == 1]  #fraudulent
not_fraud_data = data[data['fraudulent'] == 0]  # non fraudulent

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from pyspark.sql.functions import col

def give_null_stats(data,fraud_data):

    column_names = data.columns

    # Create an empty list to store the results
    results = []
    original_null_count=0
    original_not_null_count=0
    # Loop through each column
    for column_name in column_names:
        # Calculate the count of null and non-null values in the column
        original_null_count = data.where(col(column_name).isNull()).count()
        original_not_null_count = data.where(col(column_name).isNotNull()).count()
        
        fraud_null_count = fraud_data.where(col(column_name).isNull()).count()
        fraud_not_null_count = fraud_data.where(col(column_name).isNotNull()).count()
        fraud_count=fraud_data.count()

        # Calculate the null percentage
        original_null_percentage = (original_null_count / data.count()) * 100
        fraud_null_precentage=(fraud_null_count/data.count()*100) # fraud null/ total data that is fraud null+ not null +
        # Append the results as a tuple (column name, null percentage)
        results.append((column_name, original_null_percentage,fraud_null_precentage))

    schema = StructType([
    StructField("Column_Name", StringType(), True),  # String column
    StructField("Original_Null", DoubleType(), True),  # Double column
    StructField("Fraud_Null", DoubleType(), True)  # Double column
    ])
    # Create a new DataFrame from the results
    data_null_stats = spark.createDataFrame(results, schema=schema)

    return data_null_stats

# Show the results DataFrame
data_stats=give_null_stats(data,fraud_data)
data_stats.show()
from pyspark.sql.functions import col

filtered_df = data_stats.where(col("Original_Null") > 1)
display(filtered_df.show())
column_name = "Column_Name"

# Select the column and collect its values as a list
column_values_list = filtered_df.select(column_name).rdd.flatMap(lambda x: x).collect()

# Show the list
print(len(column_values_list))

(column_values_list)
columns_to_keep = [col(column) for column in data.columns if column not in column_values_list]

# Select the columns to keep and create a new DataFrame
new_df = data.select(*columns_to_keep)
new_df=new_df.drop('job_id')
# Show the new DataFrame
new_df.show()

+-------------------+------------------+-------------------+
|        Column_Name|     Original_Null|         Fraud_Null|
+-------------------+------------------+-------------------+
|             job_id|               0.0|                0.0|
|              title|               0.0|                0.0|
|           location|1.9863255923611929|0.11788282447247435|
|         department| 65.06542496758222|  3.241777672993045|
|       salary_range| 84.03866556642697| 3.8783449251444067|
|    company_profile| 18.89661676293764| 3.4009194860308853|
|        description|               0.0|                0.0|
|       requirements|15.153837085936578| 0.8841211835435576|
|           benefits| 40.95838736296122| 2.0806318519391724|
|      telecommuting|               0.0|                0.0|
|   has_company_logo|               0.0|                0.0|
|      has_questions|               0.0|                0.0|
|    employment_type| 19.29152422492043|  1.337970057762584|
|required_experience| 39

In [0]:
def clean_text(text):
    # Remove non-letter characters and reduce multiple spaces to a single space
    cleaned_text = regexp_replace(text, "[^a-zA-Z ]+", " ").alias("cleaned_text")
    # Convert to lowercase
    cleaned_text = lower(cleaned_text)
    return cleaned_text

# Apply the cleaning function to 'title' and 'description' columns
from pyspark.sql.functions import when, lit, col

def replace_non_binary_with_zero(df, column_name):
    """
    Check if a column in a PySpark DataFrame has data other than '0' or '1' and replace
    non-binary values with '0'.

    Args:
        df (DataFrame): The PySpark DataFrame.
        column_name (str): The name of the column to check and modify.

    Returns:
        DataFrame: The DataFrame with non-binary values replaced by '0'.
    """
    # Define a condition to check for non-binary values
    condition = (~(col(column_name) == '0') & ~(col(column_name) == '1'))

    # Replace non-binary values with '0'
    df = df.withColumn(column_name, when(condition, lit('0')).otherwise(col(column_name)))

    return df

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lower, regexp_replace

cleaned_df = new_df.withColumn("title", clean_text(col("title"))) \
               .withColumn("description", clean_text(col("description")))

cleaned_df= replace_non_binary_with_zero(cleaned_df,'telecommuting')
cleaned_df= replace_non_binary_with_zero(cleaned_df,'has_company_logo')
cleaned_df= replace_non_binary_with_zero(cleaned_df,'has_questions')

# Show the cleaned DataFrame
display(cleaned_df.show())

+--------------------+--------------------+-------------+----------------+-------------+----------+
|               title|         description|telecommuting|has_company_logo|has_questions|fraudulent|
+--------------------+--------------------+-------------+----------------+-------------+----------+
|    marketing intern|food  a fast grow...|            0|               1|            0|         0|
|customer service ...|organised   focus...|            0|               1|            0|         0|
|commissioning mac...|our client  locat...|            0|               1|            0|         0|
|account executive...|the company  esri...|            0|               1|            0|         0|
| bill review manager|job title  itemiz...|            0|               1|            1|         0|
|    accounting clerk|job overviewapex ...|            0|               0|            0|         0|
|head of content  ...|your responsibili...|            0|               1|            1|         0|


In [0]:
column_names_to_check = ["telecommuting", "has_company_logo", "has_questions"]

# Initialize a flag to track non-binary values
has_non_binary_values = False

# Iterate through the column names and check for non-binary values
for column_name in column_names_to_check:
    non_binary_values_count = cleaned_df.filter(~col(column_name).isin(['0', '1'])).count()
    if non_binary_values_count > 0:
        print(f"The column '{column_name}' contains values other than '0' or '1'.")
        has_non_binary_values = True

# If no column contains non-binary values, print a message
if not has_non_binary_values:
    print("All columns only contain '0' and '1'.")

All columns only contain '0' and '1'.


In [0]:
from pyspark.sql.functions import col


undersampling_ratio = 0.0565

# Split the DataFrame into two DataFrames: one for each class
fraudulent_df = cleaned_df.filter(col("fraudulent") == 1)
non_fraudulent_df = cleaned_df.filter(col("fraudulent") == 0)

# Undersample the majority class (non-fraudulent)
undersampled_non_fraudulent_df = non_fraudulent_df.sampleBy("fraudulent", fractions={0: undersampling_ratio, 1: 1.0})
 
# Combine the undersampled majority class with the minority class
undersampled_df = fraudulent_df.union(undersampled_non_fraudulent_df)

# Show the resulting undersampled DataFrame
undersampled_df.show()

+--------------------+--------------------+-------------+----------------+-------------+----------+
|               title|         description|telecommuting|has_company_logo|has_questions|fraudulent|
+--------------------+--------------------+-------------+----------------+-------------+----------+
|   digital developer|we are a boutique...|            0|               0|            0|         1|
|     ic e technician|ic amp e technici...|            0|               1|            1|         1|
|        forward cap |the group has rai...|            0|               0|            0|         1|
|technician instru...|technician instru...|            0|               1|            1|         1|
|     sales executive|     sales executive|            0|               0|            0|         1|
|ic e technician m...|ic amp e technici...|            0|               1|            1|         1|
|financing auto ca...|if you have exper...|            0|               0|            0|         1|


In [0]:
print(undersampled_df.where(col("fraudulent") == 1).count())
print(undersampled_df.where(col("fraudulent") == 0).count())

886
941


In [0]:
data_rdd=undersampled_df.rdd
data_rdd.toDF().printSchema()

root
 |-- title: string (nullable = true)
 |-- description: string (nullable = true)
 |-- telecommuting: string (nullable = true)
 |-- has_company_logo: string (nullable = true)
 |-- has_questions: string (nullable = true)
 |-- fraudulent: long (nullable = true)



In [0]:
stopwords = [
    "a", "about", "above", "after", "again", "against", "ain", "all",
    "am", "an", "and", "any", "are", "aren", "aren't", "as", "at",
    "be", "because", "been", "before", "being", "below", "between",
    "both", "but", "by",
    "can", "couldn", "couldn't",
    "d", "did", "didn", "didn't", "do", "does", "doesn", "doesn't",
    "doing", "don", "don't", "down", "during",
    "each",
    "few", "for", "from", "further",
    "had", "hadn", "hadn't", "has", "hasn", "hasn't", "have", "haven", "haven't", "having", "he", "her", "here", "hers", "herself", "him", "himself", "his", "how",
    "i", "if", "in", "into", "is", "isn", "isn't",
    "it", "it's", "its", "itself",
    "just",
    "ll",
    "m", "ma", "me", "mightn", "mightn't", "more", "most", "mustn", "mustn't", "my", "myself",
    "needn", "needn't", "no", "nor", "not", "now",
    "o", "of", "off", "on", "once", "only", "or", "other", "our", "ours", "ourselves", "out", "over", "own",
    "re",
    "s", "same", "shan", "shan't", "she", "she's", "should", "should've", "shouldn", "shouldn't", "so", "some", "such",
    "t", "than", "that", "that'll", "the", "their", "theirs", "them", "themselves", "then", "there", "these", "they", "this", "those", "through", "to", "too", "under", "until", "up",
    "ve", "very",
    "was", "wasn", "wasn't", "we", "were", "weren", "weren't", "what", "when", "where", "which", "while", "who", "whom", "why", "will", "with", "won", "won't", "wouldn", "wouldn't",
    "y", "you", "you'd", "you'll", "you're", "you've", "your", "yours", "yourself", "yourselves"
]


In [0]:
from pyspark.sql import Row

def tokenize(row):
    # Tokenize the "title" column
    tokenized_title = [token for token in row.title.strip().split(" ") if token.strip()]

    # Tokenize the "description" column
    tokenized_description = [token for token in row.description.strip().split(" ") if token.strip()]

     # Remove stopwords from tokenized_title and tokenized_description
    tokenized_title = [word for word in tokenized_title if word.lower() not in stopwords]
    tokenized_description = [word for word in tokenized_description if word.lower() not in stopwords]

    # Convert "hasQuestions," "has_company_logo," and "telecommuting" columns to integers
    has_questions = int(row.has_questions)
    has_company_logo = int(row.has_company_logo)
    telecommuting = int(row.telecommuting)

    # Update the row
    row = row.asDict()
    row['title'] = tokenized_title
    row['description'] = tokenized_description
    row['has_questions'] = has_questions
    row['has_company_logo'] = has_company_logo
    row['telecommuting'] = telecommuting

    # Convert back to a Row
    return Row(**row)

# Tokenize the "title" column
tokenized_rdd = data_rdd.map(tokenize)

In [0]:
tokenized_rdd.collect()

Out[36]: [Row(title=['digital', 'developer'], description=['boutique', 'digital', 'agency', 'based', 'auckland', 'serving', 'wide', 'range', 'clients', 'including', 'new', 'zealand', 'best', 'known', 'loved', 'consumer', 'brands', 'immediate', 'need', 'promising', 'digital', 'developer'], telecommuting=0, has_company_logo=0, has_questions=0, fraudulent=1),
 Row(title=['ic', 'e', 'technician'], description=['ic', 'amp', 'e', 'technician', 'bakersfield', 'ca', 'mt', 'posoprincipal', 'duties', 'responsibilities', 'calibrates', 'tests', 'maintains', 'troubleshoots', 'installs', 'power', 'plant', 'instrumentation', 'control', 'systems', 'electrical', 'equipment', 'performs', 'maintenance', 'motor', 'control', 'centers', 'motor', 'operated', 'valves', 'generators', 'excitation', 'equipment', 'motors', 'performs', 'preventive', 'predictive', 'corrective', 'maintenance', 'equipment', 'coordinating', 'work', 'various', 'team', 'members', 'designs', 'installs', 'new', 'equipment', 'system', 'mod

In [0]:
from pyspark.mllib.feature import HashingTF
from pyspark.mllib.linalg import Vectors
def apply_hashing(rdd):
    num_features = 100  # Choose an appropriate number of features
    hashingTF = HashingTF(numFeatures=num_features)

    # Transform the title and description
    title_features = hashingTF.transform(rdd.title)
    description_features = hashingTF.transform(rdd.description)
    rdd = rdd.asDict()
    rdd['title'] = title_features
    rdd['description'] = description_features

    return Row(**rdd)  # Return the transformed features

In [0]:
tf_rdd=tokenized_rdd.map(apply_hashing) 

In [0]:
tf_rdd.collect()

Out[65]: [Row(title=SparseVector(100, {37: 1.0, 93: 1.0}), description=SparseVector(100, {0: 1.0, 3: 1.0, 4: 1.0, 8: 1.0, 22: 1.0, 23: 1.0, 26: 1.0, 28: 2.0, 37: 1.0, 45: 1.0, 49: 1.0, 52: 1.0, 62: 1.0, 74: 1.0, 77: 1.0, 81: 1.0, 85: 1.0, 91: 1.0, 93: 2.0, 99: 1.0}), telecommuting=0, has_company_logo=0, has_questions=0, fraudulent=1),
 Row(title=SparseVector(100, {59: 1.0, 93: 1.0, 95: 1.0}), description=SparseVector(100, {1: 5.0, 2: 3.0, 4: 3.0, 6: 4.0, 7: 4.0, 8: 3.0, 9: 2.0, 10: 8.0, 11: 3.0, 13: 5.0, 14: 6.0, 15: 1.0, 16: 6.0, 18: 4.0, 19: 1.0, 20: 4.0, 22: 5.0, 25: 2.0, 26: 1.0, 27: 1.0, 28: 2.0, 29: 2.0, 30: 3.0, 31: 3.0, 32: 1.0, 33: 6.0, 35: 1.0, 36: 2.0, 37: 5.0, 38: 2.0, 39: 5.0, 40: 10.0, 42: 2.0, 44: 2.0, 46: 3.0, 48: 5.0, 49: 1.0, 51: 1.0, 52: 1.0, 54: 1.0, 55: 1.0, 56: 1.0, 57: 3.0, 59: 2.0, 60: 2.0, 61: 6.0, 62: 3.0, 63: 1.0, 64: 6.0, 65: 3.0, 66: 2.0, 67: 2.0, 70: 3.0, 71: 4.0, 72: 1.0, 73: 3.0, 74: 1.0, 75: 1.0, 76: 12.0, 77: 1.0, 78: 3.0, 79: 1.0, 83: 2.0, 84: 3.0, 85

In [0]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector
from pyspark.mllib.regression import LabeledPoint
from pyspark.ml.classification import LogisticRegression
from pyspark.sql import Row
def row_to_labeled_point(row):
    # Extract the label
    label = row.fraudulent

    # Combine the "title" and "description" features into one feature vector
    combined_features = Vectors.sparse(200, 
        dict(enumerate(row.title.indices, 0)) | 
        dict(enumerate(row.description.indices, 100))
    )

    # Append the "telecommuting," "has_company_logo," and "has_questions" columns
    combined_features = Vectors.dense(list(combined_features.toArray()) + [row.telecommuting, row.has_company_logo, row.has_questions])
    labeled_point = LabeledPoint(label, combined_features)
    return labeled_point

In [0]:
from pyspark.mllib.feature import HashingTF, IDF
# Compute IDF
my_rdd =tf_rdd.map(row_to_labeled_point)


In [0]:
my_rdd.collect()

Out[99]: [LabeledPoint(1.0, [37.0,93.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,3.0,4.0,8.0,22.0,23.0,26.0,28.0,37.0,45.0,49.0,52.0,62.0,74.0,77.0,81.0,85.0,91.0,93.0,99.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0]),
 LabeledPoint(1.0, [59.0,93.0,95.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0

In [0]:
train_data, test_data = my_rdd.randomSplit([0.7, 0.3], seed=123)


In [0]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS
model = LogisticRegressionWithLBFGS.train(train_data)
predictions = test_data.map(lambda x: (x.label, model.predict(x.features)))
accuracy = predictions.filter(lambda x: x[0] == x[1]).count() / float(test_data.count())
print("Accuracy:", accuracy)

Accuracy: 0.7611940298507462
