In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("ResumeJobMatching_SparkXGB") \
    .getOrCreate()

# (Optional) Check your Spark context
print(spark.version)


In [1]:
spark.conf.set("spark.pyspark.virtualenv.enabled", "true")

VBox()

Starting Spark application


ID,Kind,State,Spark UI,Driver log,User,Current session?
2,pyspark,idle,Link,,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
# Read the resume DataFrame (adjust the S3 paths accordingly)
resume_df = spark.read.parquet("s3://resume-matching-new/000000_0-hadoop_20250403011440_9f04e66f-5b4f-4a4a-9eec-883d8ed76fb2-1")
# Read the job listing DataFrame
job_df = spark.read.parquet("s3://resume-matching-new/000000_0-hadoop_20250403003123_1681fb76-9adb-4370-9a1c-5ab7d1d925d9-1")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
print(resume_df)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[_col0: string, _col1: string, _col2: string]

In [4]:
resume_df = resume_df.selectExpr("_col0 as resumeID", "_col1 as category", "_col2 as resume")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
print(job_df)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[_col0: string, _col1: string, _col2: string, _col3: string, _col4: string, _col5: string, _col6: string, _col7: string, _col8: int, _col9: string, _col10: string, _col11: int, _col12: int]

In [6]:
job_df = job_df.selectExpr(
    "_col0 as job_posting_id", "_col1 as job_title", "_col2 as company_name", 
    "_col3 as job_location", "_col4 as job_seniority_level", 
    "_col5 as job_employment_type", "_col6 as job_industries",
    "_col7 as job_summary", "_col8 as job_num_applicants", 
    "_col9 as job_posted_date", "_col10 as job_posted_time", 
    "_col11 as job_posted_year", "_col12 as job_posted_month"
)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, IDF

# Resume text processing pipeline
tokenizer_resume = Tokenizer(inputCol="resume", outputCol="resume_tokens")
remover_resume = StopWordsRemover(inputCol="resume_tokens", outputCol="resume_clean")
cv_resume = CountVectorizer(inputCol="resume_clean", outputCol="resume_tf", vocabSize=1000)
idf_resume = IDF(inputCol="resume_tf", outputCol="resume_tfidf")

# Job summary text processing pipeline
tokenizer_job = Tokenizer(inputCol="job_summary", outputCol="job_tokens")
remover_job = StopWordsRemover(inputCol="job_tokens", outputCol="job_clean")
cv_job = CountVectorizer(inputCol="job_clean", outputCol="job_tf", vocabSize=1000)
idf_job = IDF(inputCol="job_tf", outputCol="job_tfidf")


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
from pyspark.ml import Pipeline
import pyspark.sql.functions as F

# Pipeline for resumes
pipeline_resume = Pipeline(stages=[tokenizer_resume, remover_resume, cv_resume, idf_resume])
resume_model = pipeline_resume.fit(resume_df)
resume_transformed = resume_model.transform(resume_df)
# Add a text length feature for resumes
resume_transformed = resume_transformed.withColumn("resume_length", F.length("resume"))

# Pipeline for job summaries
pipeline_job = Pipeline(stages=[tokenizer_job, remover_job, cv_job, idf_job])
job_model = pipeline_job.fit(job_df)
job_transformed = job_model.transform(job_df)
# Add a text length feature for job summaries
job_transformed = job_transformed.withColumn("job_length", F.length("job_summary"))


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
pairs_df = resume_transformed.crossJoin(job_transformed)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
from pyspark.ml.linalg import DenseVector, VectorUDT
from pyspark.sql.types import DoubleType
import math

def cosine_similarity(v1, v2):
    # Compute the dot product and norms
    dot = float(v1.dot(v2))
    norm1 = math.sqrt(v1.dot(v1))
    norm2 = math.sqrt(v2.dot(v2))
    if norm1 == 0 or norm2 == 0:
        return 0.0
    return dot / (norm1 * norm2)

cosine_similarity_udf = F.udf(cosine_similarity, DoubleType())

# Compute cosine similarity using the TF–IDF vectors from resume and job text
pairs_df = pairs_df.withColumn(
    "cosine_similarity", 
    cosine_similarity_udf(F.col("resume_tfidf"), F.col("job_tfidf"))
)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

# Index and encode categorical features
indexer_category = StringIndexer(inputCol="category", outputCol="cat_index")
indexer_jobtitle = StringIndexer(inputCol="job_title", outputCol="jobtitle_index")

encoder = OneHotEncoder(inputCols=["cat_index", "jobtitle_index"],
                        outputCols=["cat_vec", "jobtitle_vec"])

# Assemble numeric and encoded features
assembler = VectorAssembler(
    inputCols=["resume_length", "job_length", "cosine_similarity", "cat_vec", "jobtitle_vec"],
    outputCol="features"
)

# Build the feature pipeline
feature_pipeline = Pipeline(stages=[indexer_category, indexer_jobtitle, encoder, assembler])
pairs_prepped = feature_pipeline.fit(pairs_df).transform(pairs_df)

# For demonstration purposes, create a binary label based on a threshold.
# Here, for example, you might label pairs with cosine_similarity above 0.25 as a match.
pairs_prepped = pairs_prepped.withColumn("label", F.when(F.col("cosine_similarity") >= 0.25, 1).otherwise(0))


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

An error was encountered:
name 'Pipeline' is not defined
Traceback (most recent call last):
NameError: name 'Pipeline' is not defined



In [12]:
%%local pip install <xgboost>

/usr/bin/sh: -c: line 0: syntax error near unexpected token `newline'
/usr/bin/sh: -c: line 0: `/usr/bin/python3 -m pip install <xgboost>'
Note: you may need to restart the kernel to use updated packages.


In [1]:
# Split data into training and test sets
(train_df, test_df) = pairs_prepped.randomSplit([0.8, 0.2], seed=42)

# Import the Spark XGBoost classifier. (This example assumes you are using an XGBoost version with Spark integration.)
from xgboost.spark import SparkXGBClassifier

# Initialize the model with parameters.
xgb = SparkXGBClassifier(
    featuresCol="features",
    labelCol="label",
    objective="binary:logistic",
    num_round=100,         # number of boosting rounds; adjust accordingly
    maxDepth=5,
    scalePosWeight=1.0     # set based on class imbalance if necessary
)

# Train the model
xgb_model = xgb.fit(train_df)

# Make predictions on the test set
preds = xgb_model.transform(test_df)


VBox()

Starting Spark application


ID,Kind,State,Spark UI,Driver log,User,Current session?
1,pyspark,idle,Link,,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

An error was encountered:
name 'pairs_prepped' is not defined
Traceback (most recent call last):
NameError: name 'pairs_prepped' is not defined



In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="label")
auc = evaluator.evaluate(preds)
print(f"Test AUC: {auc:.2f}")
