In [1]:
!pip install --quiet pyspark 

In [1]:
import sys
import os
import json
import re
import string
import csv

import pyspark
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import udf, col, lower, desc, split, trim, expr
from pyspark.sql.types import DoubleType, StringType, BooleanType, ArrayType
from pyspark.ml.feature import Tokenizer, RegexTokenizer, StopWordsRemover, HashingTF, IDF, CountVectorizer
from pyspark.ml.linalg import Vectors, VectorUDT

import nltk
from nltk.tokenize import sent_tokenize
from nltk.stem import WordNetLemmatizer
from nltk.corpus import stopwords

In [2]:
nltk.download('punkt')
nltk.download('stopwords')
nltk.download('wordnet')

[nltk_data] Downloading package punkt to
[nltk_data]     C:\Users\User\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package stopwords to
[nltk_data]     C:\Users\User\AppData\Roaming\nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package wordnet to
[nltk_data]     C:\Users\User\AppData\Roaming\nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


True

In [3]:
# Define paths
job_post_skills_path = "linkedin_job_postings.csv"   #sys.argv[1]
stopwords_path = "stopwords.txt"  #sys.argv[2]
job_seeker_path = "test_cases.csv"  #sys.argv[3]
output_file_path = "output_job_skills_match.csv"  #sys.argv[4]

In [4]:
# Initialize Spark session
spark = SparkSession.builder \
    .appName("MatchJobSkills") \
    .getOrCreate()

## 2. Load Spark Dataframe

In [5]:
# Load posting & skills CSV into DataFrame
df_posts_skills = spark.read.csv(job_post_skills_path, header=True, inferSchema=True)

In [6]:
# Count number of rows
num_rows = df_posts_skills.count()
print(num_rows)

# Show the DataFrame schema and some sample data
df_posts_skills.printSchema()
df_posts_skills.show(5)  # Show the first 5 rows

1348488
root
 |-- job_link: string (nullable = true)
 |-- last_processed_time: string (nullable = true)
 |-- got_summary: string (nullable = true)
 |-- got_ner: string (nullable = true)
 |-- is_being_worked: string (nullable = true)
 |-- job_title: string (nullable = true)
 |-- company: string (nullable = true)
 |-- job_location: string (nullable = true)
 |-- first_seen: string (nullable = true)
 |-- search_city: string (nullable = true)
 |-- search_country: string (nullable = true)
 |-- search_position: string (nullable = true)
 |-- job_level: string (nullable = true)
 |-- job_type: string (nullable = true)

+--------------------+--------------------+-----------+-------+---------------+--------------------+--------------------+--------------------+----------+-----------+--------------+--------------------+----------+--------+
|            job_link| last_processed_time|got_summary|got_ner|is_being_worked|           job_title|             company|        job_location|first_seen|search_c

In [7]:
df_posts_skills.tail(1)

[Row(job_link='https://www.linkedin.com/jobs/view/on-demand-guest-advocate-cashier-general-merchandise-fulfillment-food-and-beverage-style-t2632-at-target-3734494804', last_processed_time='2024-01-21 00:38:44.231492+00', got_summary='t', got_ner='t', is_being_worked='f', job_title='On-Demand: Guest Advocate (Cashier), General Merchandise, Fulfillment, Food and Beverage, Style (T2632)', company='Target', job_location='Culver City, CA', first_seen='2024-01-12', search_city='Malibu', search_country='United States', search_position='Cashier Ii', job_level='Mid senior', job_type='Onsite')]

## 3. Pre-process the "job_skills" column

In [8]:
# Initialize NLTK objects
lemmatizer = WordNetLemmatizer()
stopwords_nltk = list(stopwords.words("english"))

# Load stopwords from the file into a list
with open(stopwords_path, "r") as f:
    stopwords_txt = list(f.read().splitlines())

# Combine stopwords from nltk & txt
stop_words = set(stopwords_nltk + stopwords_txt)

In [9]:
# Function to clean job_skills
def clean_tokenize_skills(skills):
    skills = re.sub(r'[^a-zA-Z,\s]', '', skills) # Remove non-alphabetical characters except commas and spaces
    skills = re.sub(r'\s*,\s*', ',', skills) # Remove extra spaces around commas
    skills = re.sub(r'\s+', ' ', skills)  # Remove extra spaces between words
    skills = skills.lower() # Convert to lowercase
    skills_list = skills.split(',') # Split skills by comma
    cleaned_skills = []
    for skill in skills_list:
        skill = skill.strip() # Remove leading and trailing whitespace
        if skill not in stop_words: # Remove stopwords
            skill = lemmatizer.lemmatize(skill) # Lemmatize skill
            skill = skill.replace(string.punctuation, '') # within each skill (element in the list), remove all punctuation including commas (as there should be no punctuation within each element)
            cleaned_skills.append(skill)
    return cleaned_skills

In [10]:
df_posts_skills.printSchema()


root
 |-- job_link: string (nullable = true)
 |-- last_processed_time: string (nullable = true)
 |-- got_summary: string (nullable = true)
 |-- got_ner: string (nullable = true)
 |-- is_being_worked: string (nullable = true)
 |-- job_title: string (nullable = true)
 |-- company: string (nullable = true)
 |-- job_location: string (nullable = true)
 |-- first_seen: string (nullable = true)
 |-- search_city: string (nullable = true)
 |-- search_country: string (nullable = true)
 |-- search_position: string (nullable = true)
 |-- job_level: string (nullable = true)
 |-- job_type: string (nullable = true)



In [11]:
# Define UDF to apply cleaning and tokenization function
clean_tokenize_udf = udf(clean_tokenize_skills, ArrayType(StringType()))

# Apply UDF to clean and tokenize job_skills column
df_posts_skills_cleaned = df_posts_skills.withColumn("cleaned_job_skills", clean_tokenize_udf(col("job_skills")))

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `job_skills` cannot be resolved. Did you mean one of the following? [`job_level`, `job_link`, `job_title`, `job_type`, `got_ner`].;
'Project [job_link#17, last_processed_time#18, got_summary#19, got_ner#20, is_being_worked#21, job_title#22, company#23, job_location#24, first_seen#25, search_city#26, search_country#27, search_position#28, job_level#29, job_type#30, clean_tokenize_skills('job_skills)#150 AS cleaned_job_skills#151]
+- Relation [job_link#17,last_processed_time#18,got_summary#19,got_ner#20,is_being_worked#21,job_title#22,company#23,job_location#24,first_seen#25,search_city#26,search_country#27,search_position#28,job_level#29,job_type#30] csv
