# CS5344 Group Project - Job Recommender System

**Project Group 14**

The codes were executed in Google Colab as a Python Notebook (.ipynb).

Key Environments:
- Python version 3.10.12
- OpenJDK version 11.0.22
- Pyspark version 3.5.1
- Scala version 2.12.18
- nltk version 3.8.1
- numpy version 1.25.2


## 1. Setup

In [None]:
!pip install --quiet pyspark   #  pyspark 3.5.1

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [None]:
import sys
import os
import json
import re
import string
import csv

import pyspark
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import udf, col, lower, desc, split, trim, expr
from pyspark.sql.types import DoubleType, StringType, BooleanType, ArrayType
from pyspark.ml.feature import Tokenizer, RegexTokenizer, StopWordsRemover, HashingTF, IDF, CountVectorizer
from pyspark.ml.linalg import Vectors, VectorUDT

import nltk
from nltk.tokenize import sent_tokenize
from nltk.stem import WordNetLemmatizer
from nltk.corpus import stopwords

In [None]:
nltk.download('punkt')
nltk.download('stopwords')
nltk.download('wordnet')

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.
[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.
[nltk_data] Downloading package wordnet to /root/nltk_data...


True

In [None]:
# For Google Colaboratory
import sys, os
if 'google.colab' in sys.modules:
    # mount google drive
    from google.colab import drive
    drive.mount('/content/gdrive')
    path_to_file = '/content/gdrive/MyDrive/CS5344'    # Update to your own filepath
    print(path_to_file)
    # move to Google Drive directory
    os.chdir(path_to_file)
    !pwd

Mounted at /content/gdrive
/content/gdrive/MyDrive/CS5344
/content/gdrive/MyDrive/CS5344


In [None]:
# Define paths
job_post_skills_path = "linkedin_job_posts_skills.csv"   #sys.argv[1]
stopwords_path = "stopwords.txt"  #sys.argv[2]
job_seeker_path = "test_cases.csv"  #sys.argv[3]
output_file_path = "output_job_skills_match.csv"  #sys.argv[4]

In [None]:
# Initialize Spark session
spark = SparkSession.builder \
    .appName("MatchJobSkills") \
    .getOrCreate()

## 2. Load Spark Dataframe

In [None]:
# Load posting & skills CSV into DataFrame
df_posts_skills = spark.read.csv(job_post_skills_path, header=True, inferSchema=True)

In [None]:
# Count number of rows
num_rows = df_posts_skills.count()
print(num_rows)

# Show the DataFrame schema and some sample data
df_posts_skills.printSchema()
df_posts_skills.show(5)  # Show the first 5 rows

1292937
root
 |-- sn: integer (nullable = true)
 |-- job_link_id: long (nullable = true)
 |-- last_processed_time: timestamp (nullable = true)
 |-- got_summary: boolean (nullable = true)
 |-- got_ner: boolean (nullable = true)
 |-- is_being_worked: boolean (nullable = true)
 |-- job_title: string (nullable = true)
 |-- company: string (nullable = true)
 |-- job_location: string (nullable = true)
 |-- first_seen: string (nullable = true)
 |-- search_city: string (nullable = true)
 |-- search_country: string (nullable = true)
 |-- search_position: string (nullable = true)
 |-- job_level: string (nullable = true)
 |-- job_type: string (nullable = true)
 |-- job_skills: string (nullable = true)

+---+-----------+--------------------+-----------+-------+---------------+--------------------+--------------------+--------------------+----------+-----------+--------------+--------------------+----------+--------+--------------------+
| sn|job_link_id| last_processed_time|got_summary|got_ner|is_

In [None]:
df_posts_skills.tail(1)

[Row(sn=1292936, job_link_id=3734494804, last_processed_time=datetime.datetime(2024, 1, 21, 0, 38, 44, 231492), got_summary=True, got_ner=True, is_being_worked=False, job_title='On-Demand: Guest Advocate (Cashier), General Merchandise, Fulfillment, Food and Beverage, Style (T2632)', company='Target', job_location='Culver City, CA', first_seen='2024-01-12', search_city='Malibu', search_country='United States', search_position='Cashier Ii', job_level='Mid senior', job_type='Onsite', job_skills='Customer service, Communication, Problem solving, Decision making, Adaptability, Sales promotion, Product knowledge, Cash handling, Merchandise handling, Flexible work schedule, Regular attendance, Ability to work independently and with a team, Ability to communicate effectively, Welcoming and helpful attitude, Attention to detail, Willingness to educate guests and sell products and services, Ability to learn and adapt to evolving technology needs, ability to work both independently and with a tea

## 3. Pre-process the "job_skills" column

In [None]:
# Initialize NLTK objects
lemmatizer = WordNetLemmatizer()
stopwords_nltk = list(stopwords.words("english"))

# Load stopwords from the file into a list
with open(stopwords_path, "r") as f:
    stopwords_txt = list(f.read().splitlines())

# Combine stopwords from nltk & txt
stop_words = set(stopwords_nltk + stopwords_txt)

In [None]:
# Function to clean job_skills
def clean_tokenize_skills(skills):
    skills = re.sub(r'[^a-zA-Z,\s]', '', skills) # Remove non-alphabetical characters except commas and spaces
    skills = re.sub(r'\s*,\s*', ',', skills) # Remove extra spaces around commas
    skills = re.sub(r'\s+', ' ', skills)  # Remove extra spaces between words
    skills = skills.lower() # Convert to lowercase
    skills_list = skills.split(',') # Split skills by comma
    cleaned_skills = []
    for skill in skills_list:
        skill = skill.strip() # Remove leading and trailing whitespace
        if skill not in stop_words: # Remove stopwords
            skill = lemmatizer.lemmatize(skill) # Lemmatize skill
            skill = skill.replace(string.punctuation, '') # within each skill (element in the list), remove all punctuation including commas (as there should be no punctuation within each element)
            cleaned_skills.append(skill)
    return cleaned_skills

In [None]:
# Define UDF to apply cleaning and tokenization function
clean_tokenize_udf = udf(clean_tokenize_skills, ArrayType(StringType()))

# Apply UDF to clean and tokenize job_skills column
df_posts_skills_cleaned = df_posts_skills.withColumn("cleaned_job_skills", clean_tokenize_udf(col("job_skills")))


In [None]:
# Show cleaned DataFrame
# df_posts_skills_cleaned.show(truncate=False)

# df_posts_skills_cleaned.tail(1)

# Show the DataFrame schema and some sample data
df_posts_skills_cleaned.printSchema()
df_posts_skills_cleaned.show(5)

# Observe "processed_job_skills" column
df_posts_skills_cleaned.select("job_skills", "cleaned_job_skills").show(5, truncate=False)

# Observe last row
df_posts_skills_cleaned.tail(1)

root
 |-- sn: integer (nullable = true)
 |-- job_link_id: long (nullable = true)
 |-- last_processed_time: timestamp (nullable = true)
 |-- got_summary: boolean (nullable = true)
 |-- got_ner: boolean (nullable = true)
 |-- is_being_worked: boolean (nullable = true)
 |-- job_title: string (nullable = true)
 |-- company: string (nullable = true)
 |-- job_location: string (nullable = true)
 |-- first_seen: string (nullable = true)
 |-- search_city: string (nullable = true)
 |-- search_country: string (nullable = true)
 |-- search_position: string (nullable = true)
 |-- job_level: string (nullable = true)
 |-- job_type: string (nullable = true)
 |-- job_skills: string (nullable = true)
 |-- cleaned_job_skills: array (nullable = true)
 |    |-- element: string (containsNull = true)

+---+-----------+--------------------+-----------+-------+---------------+--------------------+--------------------+--------------------+----------+-----------+--------------+--------------------+----------+---

[Row(sn=1292936, job_link_id=3734494804, last_processed_time=datetime.datetime(2024, 1, 21, 0, 38, 44, 231492), got_summary=True, got_ner=True, is_being_worked=False, job_title='On-Demand: Guest Advocate (Cashier), General Merchandise, Fulfillment, Food and Beverage, Style (T2632)', company='Target', job_location='Culver City, CA', first_seen='2024-01-12', search_city='Malibu', search_country='United States', search_position='Cashier Ii', job_level='Mid senior', job_type='Onsite', job_skills='Customer service, Communication, Problem solving, Decision making, Adaptability, Sales promotion, Product knowledge, Cash handling, Merchandise handling, Flexible work schedule, Regular attendance, Ability to work independently and with a team, Ability to communicate effectively, Welcoming and helpful attitude, Attention to detail, Willingness to educate guests and sell products and services, Ability to learn and adapt to evolving technology needs, ability to work both independently and with a tea

## 4. Apply CountVectorizer

Similar to Term Frequency (of each skills phrase)

In [None]:
# Apply CountVectorizer to count the occurrences of each phrase
vectorizer = CountVectorizer(inputCol="cleaned_job_skills", outputCol="skills_frequency", minDF=100)

vectorizer_model = vectorizer.fit(df_posts_skills_cleaned)
df_posts_skills_cleaned_vectorise = vectorizer_model.transform(df_posts_skills_cleaned)


In [None]:
# Show the DataFrame schema and some sample data
df_posts_skills_cleaned_vectorise.printSchema()
df_posts_skills_cleaned_vectorise.show(5)

# Observe "processed_job_skills" column
df_posts_skills_cleaned_vectorise.tail(3)

root
 |-- sn: integer (nullable = true)
 |-- job_link_id: long (nullable = true)
 |-- last_processed_time: timestamp (nullable = true)
 |-- got_summary: boolean (nullable = true)
 |-- got_ner: boolean (nullable = true)
 |-- is_being_worked: boolean (nullable = true)
 |-- job_title: string (nullable = true)
 |-- company: string (nullable = true)
 |-- job_location: string (nullable = true)
 |-- first_seen: string (nullable = true)
 |-- search_city: string (nullable = true)
 |-- search_country: string (nullable = true)
 |-- search_position: string (nullable = true)
 |-- job_level: string (nullable = true)
 |-- job_type: string (nullable = true)
 |-- job_skills: string (nullable = true)
 |-- cleaned_job_skills: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- skills_frequency: vector (nullable = true)

+---+-----------+--------------------+-----------+-------+---------------+--------------------+--------------------+--------------------+----------+-----------+--

[Row(sn=1292934, job_link_id=3739779610, last_processed_time=datetime.datetime(2024, 1, 21, 7, 40, 0, 304641), got_summary=True, got_ner=True, is_being_worked=False, job_title='Executive Chef, Operations Support', company='NEXDINE Hospitality', job_location='Riverhead, NY', first_seen='2024-01-14', search_city='Eastport', search_country='United States', search_position='Chef', job_level='Mid senior', job_type='Onsite', job_skills='Culinary, Chef Director, Menu writing, Cycle of cost control, Food production, Food presentation, Catering operations, Vendor partner relationships, Food purchasing, Quality control, Staff scheduling, Payroll processing, Safety, Sanitation, Food handling, Uniform guidelines, Productivity, Customer relationship management, Account retention, P&L management, Food costs, Labor costs, Supplies, Uniforms, Equipment, Inventory management, Financial reporting, People management, Recruitment, Hiring, Termination, Review process, Staff management, Customer service, Hi

## 5. Calculate TFIDF

In [None]:
# Compute Inverse Document Frequency (IDF)
# Create IDF instance to compute inverse document frequencies
idf = IDF(inputCol="skills_frequency", outputCol="skills_tfidf")
idf_model = idf.fit(df_posts_skills_cleaned_vectorise)

# Compute TF-IDF
# Call "fit" to compute the IDF vector and then call "transform" to scale the term frequencies by IDF.
df_posts_skills_tfidf = idf_model.transform(df_posts_skills_cleaned_vectorise)

In [None]:
# Show the DataFrame schema and some sample data
df_posts_skills_tfidf.printSchema()
df_posts_skills_tfidf.show(5)

# Observe "processed_job_skills" column
df_posts_skills_tfidf.tail(3)

root
 |-- sn: integer (nullable = true)
 |-- job_link_id: long (nullable = true)
 |-- last_processed_time: timestamp (nullable = true)
 |-- got_summary: boolean (nullable = true)
 |-- got_ner: boolean (nullable = true)
 |-- is_being_worked: boolean (nullable = true)
 |-- job_title: string (nullable = true)
 |-- company: string (nullable = true)
 |-- job_location: string (nullable = true)
 |-- first_seen: string (nullable = true)
 |-- search_city: string (nullable = true)
 |-- search_country: string (nullable = true)
 |-- search_position: string (nullable = true)
 |-- job_level: string (nullable = true)
 |-- job_type: string (nullable = true)
 |-- job_skills: string (nullable = true)
 |-- cleaned_job_skills: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- skills_frequency: vector (nullable = true)
 |-- skills_tfidf: vector (nullable = true)

+---+-----------+--------------------+-----------+-------+---------------+--------------------+--------------------+--

[Row(sn=1292934, job_link_id=3739779610, last_processed_time=datetime.datetime(2024, 1, 21, 7, 40, 0, 304641), got_summary=True, got_ner=True, is_being_worked=False, job_title='Executive Chef, Operations Support', company='NEXDINE Hospitality', job_location='Riverhead, NY', first_seen='2024-01-14', search_city='Eastport', search_country='United States', search_position='Chef', job_level='Mid senior', job_type='Onsite', job_skills='Culinary, Chef Director, Menu writing, Cycle of cost control, Food production, Food presentation, Catering operations, Vendor partner relationships, Food purchasing, Quality control, Staff scheduling, Payroll processing, Safety, Sanitation, Food handling, Uniform guidelines, Productivity, Customer relationship management, Account retention, P&L management, Food costs, Labor costs, Supplies, Uniforms, Equipment, Inventory management, Financial reporting, People management, Recruitment, Hiring, Termination, Review process, Staff management, Customer service, Hi

## 6. Preparing the User Profile (Preprocess Skills)

In [None]:
# Load CSV into DataFrame
df_queries = spark.read.csv(job_seeker_path, header=True, inferSchema=True)

df_queries.show(truncate=False)

+---+------------------+--------------+----------+--------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|sn |search_city       |search_country|job_level |job_type|job_skills                                                                                                                                                                                                                                                                                             |
+---+------------------+--------------+----------+--------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [None]:
# Define function to clean "skills" column & transform into skills vector + calculate TF-IDF

# Apply the same UDF to clean and tokenize job_skills column
df_queries_cleaned = df_queries.withColumn("cleaned_job_skills", clean_tokenize_udf(col("job_skills")))

# Transform into vector, using the vectorizer_model fitted on the list of job postings
df_queries_vectorise = vectorizer_model.transform(df_queries_cleaned)

# Compute the TF-IDF vector by calling "transform" based on the fitted idf_model.
df_queries_tfidf = idf_model.transform(df_queries_vectorise)

In [None]:
df_queries_tfidf.show(truncate=False)

+---+------------------+--------------+----------+--------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

## 7. Building the SQL Query & Filtered Item Profile (Filtered Skills Matrix)

In [None]:
# Register DataFrame as a temporary view (to facilitate querying with SQL)
df_posts_skills_tfidf.createOrReplaceTempView("df_posts_skills_tfidf_view")

In [None]:
# query_trial = """
# SELECT *
# FROM df_posts_skills_tfidf_view
# WHERE job_title LIKE '%Data Scientist%'
# AND search_city = 'Seattle';
# """

# spark.sql(query_trial).show(truncate=False)

query_generator(search_city="Seattle", search_country="United States", job_level="Mid senior", job_type="Onsite")   #            |United States |Mid senior|Onsite

"SELECT * FROM df_posts_skills_tfidf_view WHERE search_city = 'Seattle' AND search_country = 'United States' AND job_level = 'Mid senior' AND job_type = 'Onsite'"

In [None]:
# Define function to build SQL query
def query_generator(search_city=None, search_country=None, job_level=None, job_type=None):
    """
    Generate a dynamic SQL query based on user inputs and execute it on the DataFrame.

    Args:
        df (DataFrame): The PySpark DataFrame to query.
        search_city (str, optional): The search city. Defaults to None.
        search_country (str, optional): The search country. Defaults to None.
        job_level (str, optional): The job level. Defaults to None.
        job_type (str, optional): The job type. Defaults to None.

    Returns:
        DataFrame: The resulting DataFrame after applying the query.
    """

    # Initialize an empty list to store conditions
    conditions = []

    # Add conditions based on user inputs
    if search_city:
        conditions.append(f"search_city = '{search_city}'")

    if search_country:
        conditions.append(f"search_country = '{search_country}'")

    if job_level:
        conditions.append(f"job_level = '{job_level}'")

    if job_type:
        conditions.append(f"job_type = '{job_type}'")

    # Construct the WHERE clause by joining the conditions
    where_clause = " AND ".join(conditions)

    # Write the SQL query with the dynamic WHERE clause
    query = f"SELECT * FROM df_posts_skills_tfidf_view WHERE {where_clause}"

    return query


In [None]:
# Testing Queries

print(query_generator(search_city='Atlanta', search_country='United States', job_level='Associate', job_type='Onsite'))

print(query_generator(search_city='Atlanta', job_level='Associate', job_type='Hybrid'))

print(query_generator(search_country='United Kingdom', job_level='Mid senior', job_type='Remote'))

print(query_generator(search_country='Canada', job_type='Remote'))

print(query_generator(search_country='Australia'))

print(query_generator(search_country='Australia', job_level=None, job_type=None))

SELECT * FROM df_posts_skills_tfidf_view WHERE search_city = 'Atlanta' AND search_country = 'United States' AND job_level = 'Associate' AND job_type = 'Onsite'
SELECT * FROM df_posts_skills_tfidf_view WHERE search_city = 'Atlanta' AND job_level = 'Associate' AND job_type = 'Hybrid'
SELECT * FROM df_posts_skills_tfidf_view WHERE search_country = 'United Kingdom' AND job_level = 'Mid senior' AND job_type = 'Remote'
SELECT * FROM df_posts_skills_tfidf_view WHERE search_country = 'Canada' AND job_type = 'Remote'
SELECT * FROM df_posts_skills_tfidf_view WHERE search_country = 'Australia'
SELECT * FROM df_posts_skills_tfidf_view WHERE search_country = 'Australia'


## 8. Cosine Similarity Function

In [None]:
# Define a user-defined function (UDF) to calculate cosine similarity between two SparseVectors
# Formula: cos(a,b) = (a.b) / (||a||*||b||)

def cosine_similarity(v1, v2):
    """
    Calculate cosine similarity between two SparseVectors.
    """
    dot_product = float(v1.dot(v2))   # Calculate dot product of SparseVector v1 & v2
    norm_v1 = float(v1.norm(2))   # Find norm of the SparseVector v1
    norm_v2 = float(v2.norm(2))   # Find norm of the SparseVector v2

    # Handle Zero Division cases   # Return 0 for similarity when either of the vectors is a zero vector
    if norm_v1 * norm_v2 == 0:
        return 0.0

    similarity = dot_product / (norm_v1 * norm_v2)   # Calculate cosine similarity
    return similarity

# Wrap the function around a UDF (required to perform operations in Spark dataframes)
cosine_similarity_udf = udf(cosine_similarity, DoubleType())

## 9. Matching Most Relevant Jobs Based on User's Skills

In [None]:
# Create a list to hold the results
query_skills_best_jobs = []

# FOR EACH QUERY (row):   # Iterate through each row of the user profile DataFrame
for row_query in df_queries_tfidf.collect():

    # Select each candidate (row in df_queries_tfidf) & get the necessary information  # select only
    # row_query = df_queries_tfidf.head(1)[0]  #### Need to modify accordingly later
    print(row_query)

    row_sn = row_query['sn']
    row_search_city = row_query['search_city']   # <class 'str'>
    row_search_country = row_query['search_country']
    row_job_level = row_query['job_level']
    row_job_type = row_query['job_type']
    row_job_skills = row_query['job_skills']
    row_skills_tfidf = row_query['skills_tfidf']  # <class 'pyspark.ml.linalg.SparseVector'>

    # Construct SQL search query: search_city|search_country|job_level|job_type|job_skills
    row_sql_query = query_generator(search_city=row_search_city, search_country=row_search_country, job_level=row_job_level, job_type=row_job_type)

    # Filter the relevant rows (run SQL query)
    temp_relevant_posts_skills_df = spark.sql(row_sql_query)

    # Define a UDF function to wrap around the "row_skills_tfidf" SparseVector.
    # This was done in preparation for the next step to calculate cosine similarity, as I could not use the SparseVector directly for calculation.
    def row_skills_tfidf_vector():
        return row_skills_tfidf
    row_skills_tfidf_vector = udf(row_skills_tfidf_vector, VectorUDT())

    # Compute similarity score using the UDF for each relevant row
    temp_relevant_posts_skills_df = temp_relevant_posts_skills_df.withColumn("similarity_score", cosine_similarity_udf(col("skills_tfidf"), row_skills_tfidf_vector()))

    # Sort in descending order based on similarity_score
    temp_relevant_posts_skills_df = temp_relevant_posts_skills_df.orderBy(col("similarity_score").desc())

    # Identify top 5 best matches
    # Extract the top 5 jobs along with the corresponding columns into a Python list of dictionaries
    top5_relevant_jobs = temp_relevant_posts_skills_df.select("job_link_id", "job_title", "company", "job_location", "job_level", "job_type", "job_skills", "similarity_score").limit(5).collect()

    # Add tags and ranks to each job dictionary
    for rank, job in enumerate(top5_relevant_jobs, start=1):
        job_dict = {"query_sn": row_sn, "users_skills": row_job_skills, "job_suitability_rank": rank}  # Create new dictionary with tag and rank
        job_dict.update(job.asDict())  # Update with original dictionary
        query_skills_best_jobs.append(job_dict)

    print("Row complete")


Row(sn=1, search_city='Seattle', search_country='United States', job_level='Mid senior', job_type='Onsite', job_skills='Python, R, SQL, Machine learning, Data analysis, Statistical modeling, SAS, Matlab, Data Science, Statistics, Machine Learning, Operations Research, Communication, Problem Solving, Critical Thinking, Analytical Skills, Research, Modeling, Project Management', cleaned_job_skills=['python', 'sql', 'machine learning', 'data analysis', 'statistical modeling', 'sa', 'matlab', 'data science', 'statistic', 'machine learning', 'operations research', 'communication', 'problem solving', 'critical thinking', 'analytical skills', 'modeling', 'project management'], skills_frequency=SparseVector(21083, {0: 1.0, 5: 1.0, 9: 1.0, 16: 1.0, 25: 1.0, 28: 1.0, 82: 1.0, 96: 1.0, 254: 2.0, 447: 1.0, 575: 1.0, 764: 1.0, 1303: 1.0, 1546: 1.0, 2950: 1.0, 4370: 1.0}), skills_tfidf=SparseVector(21083, {0: 1.25, 5: 2.1811, 9: 2.3772, 16: 2.7725, 25: 3.0685, 28: 3.2001, 82: 3.9456, 96: 4.0874, 254

In [None]:
# Extract keys from the first dictionary in the list to use as column names
column_names = query_skills_best_jobs[0].keys()

# Write the data to the CSV file
with open(output_file_path, 'w', newline='') as csvfile:
    writer = csv.DictWriter(csvfile, fieldnames=column_names)

    # Write column names to the CSV file
    writer.writeheader()

    # Write each row of data to the CSV file
    for job in query_skills_best_jobs:
        writer.writerow(job)

print(f"Data has been successfully written to {output_file_path}")

Data has been successfully written to output_job_skills_match.csv
