1-Cài đặt thư viện và khởi tạo Session

In [0]:
%pip install nltk
import nltk
nltk.download('stopwords')
nltk.download('wordnet')

Python interpreter will be restarted.
Collecting nltk
  Downloading nltk-3.9.1-py3-none-any.whl (1.5 MB)
Collecting regex>=2021.8.3
  Downloading regex-2024.11.6-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (780 kB)
Collecting tqdm
  Downloading tqdm-4.67.1-py3-none-any.whl (78 kB)
Installing collected packages: tqdm, regex, nltk
Successfully installed nltk-3.9.1 regex-2024.11.6 tqdm-4.67.1
Python interpreter will be restarted.


[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.
[nltk_data] Downloading package wordnet to /root/nltk_data...


Out[1]: True

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, lit
from pyspark.sql.types import ArrayType, StringType, IntegerType, DoubleType, StructType, StructField
from pyspark.ml.feature import CountVectorizer, IDF
from pyspark.ml.linalg import SparseVector, VectorUDT
import re
import string
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer

In [0]:
# Khởi tạo Spark session trong Databricks
spark = SparkSession.builder.appName("JobRecommender").getOrCreate()

# Load stopwords và lemmatizer
lemmatizer = WordNetLemmatizer()
stopwords_nltk = set(stopwords.words("english"))

# Hàm làm sạch và tokenize kỹ năng
def clean_tokenize_skills(skills):
    if skills is None:
        return []
    skills = re.sub(r'[^a-zA-Z,\s]', '', skills).lower()
    skills_list = [lemmatizer.lemmatize(skill.strip()) for skill in skills.split(',') if skill.strip() not in stopwords_nltk]
    return skills_list

clean_tokenize_udf = udf(clean_tokenize_skills, ArrayType(StringType()))

# Hàm tính cosine similarity
def cosine_similarity(v1, v2):
    if v1 is None or v2 is None:
        return 0.0
    dot_product = float(v1.dot(v2))
    norm_v1 = float(v1.norm(2))
    norm_v2 = float(v2.norm(2))
    return dot_product / (norm_v1 * norm_v2) if norm_v1 * norm_v2 != 0 else 0.0

cosine_similarity_udf = udf(cosine_similarity, DoubleType())

In [0]:
# Liệt kê các file trong /FileStore/tables/
dbutils.fs.ls("/FileStore/tables/")

Out[1]: [FileInfo(path='dbfs:/FileStore/tables/job_skills-1.csv', name='job_skills-1.csv', size=672718092, modificationTime=1744013063000),
 FileInfo(path='dbfs:/FileStore/tables/job_skills.csv', name='job_skills.csv', size=672718092, modificationTime=1743994752000),
 FileInfo(path='dbfs:/FileStore/tables/linkedin_job_postings-1.csv', name='linkedin_job_postings-1.csv', size=415328702, modificationTime=1744012726000),
 FileInfo(path='dbfs:/FileStore/tables/linkedin_job_postings.csv', name='linkedin_job_postings.csv', size=415328702, modificationTime=1743994181000)]

2-Load dữ liệu và chuyển thành Delta Table

In [0]:
# Đọc file job_skills.csv từ DBFS
df_job_skills = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("dbfs:/FileStore/tables/job_skills.csv")

print("Dữ liệu của job_skills.csv (5 dòng đầu):")
df_job_skills.show(5)

print("Schema của job_skills.csv:")
df_job_skills.printSchema()

print("Số dòng trong job_skills.csv:", df_job_skills.count())

# Đọc file linkedin_job_postings.csv từ DBFS
df_job_postings = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("dbfs:/FileStore/tables/linkedin_job_postings.csv")

print("\nDữ liệu của linkedin_job_postings.csv (5 dòng đầu):")
df_job_postings.show(5)

print("Schema của linkedin_job_postings.csv:")
df_job_postings.printSchema()

print("Số dòng trong linkedin_job_postings.csv:", df_job_postings.count())

Dữ liệu của job_skills.csv (5 dòng đầu):
+--------------------+--------------------+
|            job_link|          job_skills|
+--------------------+--------------------+
|https://www.linke...|Building Custodia...|
|https://www.linke...|Customer service,...|
|https://www.linke...|Applied Behavior ...|
|https://www.linke...|Electrical Engine...|
|https://www.linke...|Electrical Assemb...|
+--------------------+--------------------+
only showing top 5 rows

Schema của job_skills.csv:
root
 |-- job_link: string (nullable = true)
 |-- job_skills: string (nullable = true)

Số dòng trong job_skills.csv: 1296381

Dữ liệu của linkedin_job_postings.csv (5 dòng đầu):
+--------------------+--------------------+-----------+-------+---------------+--------------------+--------------------+--------------------+----------+-----------+--------------+--------------------+----------+--------+
|            job_link| last_processed_time|got_summary|got_ner|is_being_worked|           job_title|          

In [0]:
# Đọc file job_skills.csv và job_postings.csv rồi chuyển thành Delta Table
df_job_skills.write.format("delta") \
    .mode("overwrite") \
    .save("/FileStore/delta/job_skills")

df_job_postings.write.format("delta") \
    .mode("overwrite") \
    .save("/FileStore/delta/linkedin_job_postings")

# Đọc Delta Table từ đường dẫn
delta_job_skills = spark.read.format("delta").load("/FileStore/delta/job_skills")
delta_job_postings = spark.read.format("delta").load("/FileStore/delta/linkedin_job_postings")

# Gộp hai Delta Table trên cột job_link
df_combined = delta_job_postings.join(delta_job_skills, on="job_link", how="inner")

# Xóa các cột không cần thiết: got_summary, got_ner, is_being_worked
columns_to_drop = ["got_summary", "got_ner", "is_being_worked"]
df_combined = df_combined.drop(*columns_to_drop)

# Xóa thư mục Delta Table cũ để đảm bảo schema được cập nhật
dbutils.fs.rm("/FileStore/delta/job_posts_skills", recurse=True)

# Lưu Delta Table gộp với tên job_posts_skills
df_combined.write.format("delta") \
    .mode("overwrite") \
    .save("/FileStore/delta/job_posts_skills")

# Đọc Delta Table gộp để kiểm tra
df_combined_final = spark.read.format("delta").load("/FileStore/delta/job_posts_skills")
df_combined_final.show(5)
df_combined_final.printSchema()
print("Số dòng trong Delta Table job_posts_skills:", df_combined_final.count())

+--------------------+--------------------+--------------------+--------------------+--------------------+----------+------------+--------------+------------------+----------+--------+--------------------+
|            job_link| last_processed_time|           job_title|             company|        job_location|first_seen| search_city|search_country|   search_position| job_level|job_type|          job_skills|
+--------------------+--------------------+--------------------+--------------------+--------------------+----------+------------+--------------+------------------+----------+--------+--------------------+
|https://ae.linked...|2024-01-21 07:21:...|        EVS Operator|              Sundus|Abu Dhabi, Abu Dh...|2024-01-17|Saint Joseph| United States|     Unit Operator|Mid senior|  Onsite|EVS, EVS Operator...|
|https://ae.linked...|2024-01-21 14:09:...|Operations Superv...|       Skydive Dubai|Dubai, United Ara...|2024-01-17|  Gloucester| United States|  Supervisor Rides| Associate| 

3-Load và tiền xử lý dữ liệu job_posts_skills

In [0]:
# Bước 3: Load dữ liệu từ Delta Lake
job_post_skills_path = "/FileStore/delta/job_posts_skills"
df_posts_skills = spark.read.format("delta").load(job_post_skills_path)

print("Schema của df_posts_skills sau khi load từ Delta Table:")
df_posts_skills.printSchema()

#Làm sạch dữ liệu
df_posts_skills = df_posts_skills.na.drop(subset=["job_skills"])
df_posts_skills = df_posts_skills.filter(col("job_skills") != "")

# Áp dụng xử lý kỹ năng
df_posts_skills = df_posts_skills.withColumn("cleaned_job_skills", clean_tokenize_udf(col("job_skills")))

# Vector hóa dữ liệu
vectorizer = CountVectorizer(inputCol="cleaned_job_skills", outputCol="skills_frequency", minDF=10)
vectorizer_model = vectorizer.fit(df_posts_skills)
df_posts_skills = vectorizer_model.transform(df_posts_skills)

# Tính toán TF-IDF
idf = IDF(inputCol="skills_frequency", outputCol="skills_tfidf")
idf_model = idf.fit(df_posts_skills)
df_posts_skills = idf_model.transform(df_posts_skills)

# Đăng ký DataFrame như một bảng tạm thời
df_posts_skills.createOrReplaceTempView("job_posts")

Schema của df_posts_skills sau khi load từ Delta Table:
root
 |-- job_link: string (nullable = true)
 |-- last_processed_time: string (nullable = true)
 |-- job_title: string (nullable = true)
 |-- company: string (nullable = true)
 |-- job_location: string (nullable = true)
 |-- first_seen: string (nullable = true)
 |-- search_city: string (nullable = true)
 |-- search_country: string (nullable = true)
 |-- search_position: string (nullable = true)
 |-- job_level: string (nullable = true)
 |-- job_type: string (nullable = true)
 |-- job_skills: string (nullable = true)



4-Nhập profile ứng viên

In [0]:
# Bước 4: Nhập dữ liệu ứng viên thủ công
schema = StructType([
    StructField("sn", StringType(), True),
    StructField("search_city", StringType(), True),
    StructField("search_country", StringType(), True),
    StructField("job_level", StringType(), True),
    StructField("job_type", StringType(), True),
    StructField("job_skills", StringType(), True)
])

# Nhập thông tin ứng viên mới
user_input = [
    ("001", "Grand Haven", "United States", "Mid senior", "Onsite", "Nursing, Bachelor of Science in Nursing, Masters Degree in Nursing, Care management experience, Clinical experience in nursing, Licensure to practice nursing in Michigan, Population management, Selfmanagement, Education, Oversight of registries, Patient education, Patient selfmanagement, Patient care coordination, Chronic condition management"),
    ("002", "Coronado", "United States", "Mid senior", "Onsite", "RF power amplifier design, Analogue circuit design, OFDM/OFDMA 802.11 4G 5G, Cadence schematic capture layout and simulation tools, EM simulations, EMX RFPro/Momentum HFSS,Wireless communications, MATLAB, Python, Data analysis"),
    ("003", "Milwaukee", "United States", "Mid senior", "Onsite", "Chef de Cuisine, Executive Sous Chef, Hospitality, Culinary, Food and Beverage, Menu Development, Menu Engineering, Food Preparation, Sanitation, Food and Labor Cost Control, Multitasking, Time Management, ServSafe Certification, High School Diploma, Culinary Schooling, Apprenticeship, Work Experience"),
    ("004", "Manitoba", "Canada", "Mid senior", "Onsite", "AUTOCAD, MS Project, Building Code Compliance, Construction methods, Sustainability design concepts, Barrierfree access, Environmental risks, Engineering, Project management, Construction management, Medical gases, Plumbing, Steam"), #Engineering Project Coordinator(9)
    ("005", "Nebraska", "United States", "Associate", "Onsite", "Control Systems Integration, PLC Programming, HMI Programming, Operator Interface Terminal Programming, Control System Software Installation, Lead Programming, Automation Staff Supervision, Mentorship, System Startup, Client Relationship Development, Selection and Specification of PLCs and HMIs, Instrumentation, Schematic Development"),#Control Systems Integration Engineer(13)
    ("006", "Carmel", "United States", "Mid senior", "Onsite", "Cashier, Stocker, Lead capacity, Planograms, Customer service, Unloading trucks, Receiving, Opening cartons, Unpacking cartons and totes, Stocking merchandise, Rotating merchandise, Facing merchandise, Building merchandise displays, Restock merchandise"),#LEAD SALES ASSOCIATE-PT(14)
    ("007", "Calumet City", "United States", "Associate", "Onsite", "Microsoft Office Suite, UML, Flow Charting Software, Change Control/Defect Tracking Software, Project Management Methodologies, Design Tools, Bachelor's Degree in Computer Science, Understanding of SOA (Service Oriented Architecture), ASP.NET, SQL, XML, Design Patterns, Researching, Planning, Recommending Software and Systems Solutions"),#Product Analyst(16)
    ("008", "Basildon", "United Kingdom", "Mid senior", "Onsite", "English (language), CV (document), Europe (location), River Thames (location), Greenwich Peninsula (location)"),#Property Manager(27)
    ("009", "Lawton", "United States", "Associate", "Onsite", "Public relations, Planning and zoning, Building codes, Land use regulations, Surveying, Geometry, Mathematics, AutoCAD"),#Development Services Technician(50)
    ("010", "Ridgewood", "United States", "Mid senior", "Onsite", "Sales, Leadership, Mentoring, Communication, Coaching, Strategic Planning, Execution, Business Development, Team Management, Customer Relationship Management, Product Knowledge, Compliance, Regulatory, Pharmaceutical Sales, Management, Strategic Thinking, Decision Making, Talent Management, Excel, Word, Outlook, Database Applications, Bachelor's Degree, Business, Marketing, Life Sciences, CRM, Salesforce"),#Regional Sales Manager(54)
]
df_queries = spark.createDataFrame(user_input, schema)

# Làm sạch dữ liệu
df_queries = df_queries.na.drop(subset=["job_skills"])
df_queries = df_queries.filter(col("job_skills") != "")

# Áp dụng xử lý kỹ năng cho ứng viên
df_queries = df_queries.withColumn("cleaned_job_skills", clean_tokenize_udf(col("job_skills")))

# Vector hóa và tính TF-IDF cho ứng viên
df_queries = vectorizer_model.transform(df_queries)
df_queries = idf_model.transform(df_queries)

# Đăng ký DataFrame như một bảng tạm thời
df_queries.createOrReplaceTempView("df_queries")

5-Tính toán độ tương đồng và tìm top 5 công việc

In [0]:
from pyspark.sql.functions import row_number
from pyspark.sql.window import Window

# Sử dụng bí danh khi join để phân biệt cột
df_queries_alias = df_queries.alias("queries")
job_posts_alias = spark.table("job_posts").alias("posts")

# Join df_queries với job_posts dựa trên các tiêu chí
df_combined = df_queries_alias.join(
    job_posts_alias,
    (df_queries_alias.search_city== job_posts_alias.search_city) &
    (df_queries_alias.search_country == job_posts_alias.search_country) &
    (df_queries_alias.job_level == job_posts_alias.job_level) &
    (df_queries_alias.job_type == job_posts_alias.job_type),
    how="inner"
)

# Đổi tên cột để tránh xung đột và chỉ chọn các cột cần thiết
df_combined = df_combined.select(
    col("queries.sn").alias("query_sn"),
    col("posts.job_link"),
    col("posts.company"),
    col("posts.job_location"),
    col("posts.skills_tfidf"),
    col("queries.skills_tfidf").alias("query_skills_tfidf")
)

# Kiểm tra dữ liệu trước khi áp dụng UDF
df_combined = df_combined.filter(
    col("skills_tfidf").isNotNull() & col("query_skills_tfidf").isNotNull()
)

# Tính similarity_score
df_combined = df_combined.withColumn(
    "similarity_score",
    cosine_similarity_udf(col("skills_tfidf"), col("query_skills_tfidf"))
)

# Sắp xếp và lấy top 5 công việc cho mỗi ứng viên
window_spec = Window.partitionBy("query_sn").orderBy(col("similarity_score").desc())
df_combined = df_combined.withColumn(
    "job_suitability_rank",
    row_number().over(window_spec)
).filter(col("job_suitability_rank") <= 5)

# Chọn các cột yêu cầu để tạo kết quả cuối
query_skills_best_jobs = df_combined.select(
    "job_suitability_rank",
    "query_sn",
    "company",
    "job_link",
    "job_location",
    "similarity_score"
).collect()

# Đặt ngưỡng similarity_score
threshold = 0.15
predictions = []
for row in query_skills_best_jobs:
    predicted_relevant = 1 if row["similarity_score"] >= threshold else 0
    predictions.append({
        "candidate_id": row["query_sn"],
        "job_link": row["job_link"],
        "predicted_relevant": predicted_relevant
    })

# In kết quả với các trường yêu cầu
for job in query_skills_best_jobs:
    print({
        "job_suitability_rank": job["job_suitability_rank"],
        "query_sn": job["query_sn"],
        "company": job["company"],
        "job_link": job["job_link"],
        "job_location": job["job_location"],
        "similarity_score": job["similarity_score"]
    })

{'job_suitability_rank': 1, 'query_sn': '001', 'company': 'Trinity Health MI', 'job_link': 'https://www.linkedin.com/jobs/view/registered-nurse-rn-care-manager-at-trinity-health-mi-3803386312', 'job_location': 'Norton Shores, MI', 'similarity_score': 1.0}
{'job_suitability_rank': 2, 'query_sn': '001', 'company': 'Trinity Health', 'job_link': 'https://www.linkedin.com/jobs/view/registered-nurse-rn-care-manager-at-trinity-health-3799201324', 'job_location': 'Norton Shores, MI', 'similarity_score': 0.5573762752845485}
{'job_suitability_rank': 3, 'query_sn': '001', 'company': 'Hackley Community Care', 'job_link': 'https://www.linkedin.com/jobs/view/registered-nurse-rn-at-hackley-community-care-3777279642', 'job_location': 'Muskegon, MI', 'similarity_score': 0.21822716116317176}
{'job_suitability_rank': 4, 'query_sn': '001', 'company': 'Saint Joseph Mercy Health System', 'job_link': 'https://www.linkedin.com/jobs/view/registered-nurse-emergency-department-7a-7-30p-24hrs-per-week-at-saint-jo

6-Lưu kết quả và Sử dụng Ground Truth

In [0]:
# Bước 6: Lưu kết quả ra Delta Lake

# Định nghĩa schema rõ ràng cho DataFrame
output_schema = StructType([
    StructField("job_suitability_rank", IntegerType(), True),
    StructField("query_sn", StringType(), True),
    StructField("company", StringType(), True),
    StructField("job_link", StringType(), True),
    StructField("job_location", StringType(), True),
    StructField("similarity_score", DoubleType(), True)
])

# Tạo DataFrame với schema đã định nghĩa
output_df = spark.createDataFrame(query_skills_best_jobs, schema=output_schema)

# Xóa thư mục Delta Table cũ để đảm bảo schema được cập nhật
output_file_path = "/FileStore/delta/output_job_skills_match"
dbutils.fs.rm(output_file_path, recurse=True)

# Lưu vào Delta Lake
output_df.write.format("delta").mode("overwrite").save(output_file_path)

# Kiểm tra schema sau khi lưu
output_check = spark.read.format("delta").load(output_file_path)
print("Schema của Delta Table sau khi lưu:")
output_check.printSchema()

# Hiển thị dữ liệu để xác nhận
print("Dữ liệu trong Delta Table:")
output_check.show()

print(f"Dữ liệu đã được lưu thành công vào {output_file_path}")


# Bước 7: Đánh giá mô hình với Ground Truth
# Thu thập tất cả các job_link và candidate_ids từ query_skills_best_jobs
job_links = set(job["job_link"] for job in query_skills_best_jobs)
candidate_ids = set(job["query_sn"] for job in query_skills_best_jobs)

# In thông tin để hỗ trợ việc gán nhãn thủ công
print("Thông tin để tạo ground truth:")
for candidate_id in candidate_ids:
    candidate_info = spark.sql(f"""
        SELECT job_skills FROM df_queries WHERE sn = '{candidate_id}'
    """).collect()[0]
    print(f"Candidate {candidate_id}:")
    print(f"  Skills: {candidate_info['job_skills']}")
    print("---")

for job_link in job_links:
    job_info = spark.sql(f"""
        SELECT job_title, job_skills FROM job_posts WHERE job_link = '{job_link}'
    """).collect()[0]
    print(f"Job: {job_info['job_title']}")
    print(f"  Link: {job_link}")
    print(f"  Skills: {job_info['job_skills']}")
    print("---")

# Tạo ground truth thủ công dựa trên thông tin trên
# Dựa trên kết quả và kỹ năng, mình gán nhãn is_relevant (1: phù hợp, 0: không phù hợp)
ground_truth_data = [
    # Candidate 001:
    ("001", "https://www.linkedin.com/jobs/view/registered-nurse-rn-care-manager-at-trinity-health-mi-3803386312", 1),  # Phù hợp (Registered Nurse)
    ("001", "https://www.linkedin.com/jobs/view/registered-nurse-rn-care-manager-at-trinity-health-3799201324", 1),
    ("001", "https://www.linkedin.com/jobs/view/registered-nurse-rn-at-hackley-community-care-3777279642", 1),
    ("001", "https://www.linkedin.com/jobs/view/registered-nurse-emergency-department-7a-7-30p-24hrs-per-week-at-saint-joseph-mercy-health-system-3784380723", 0),
    ("001", "https://www.linkedin.com/jobs/view/registered-nurse-rn-care-manager-at-saint-joseph-mercy-health-system-3803344133", 1),

    # Candidate 002:
    ("002", "https://www.linkedin.com/jobs/view/principal-rfic-power-amplifier-design-engineer-at-quantalrf-3787746484", 1),
    ("002", "https://www.linkedin.com/jobs/view/digital-signal-process-engineer-sr-23-033-with-security-clearance-at-clearancejobs-3753472438", 0),
    ("002", "https://www.linkedin.com/jobs/view/senior-bioinformatics-scientist-at-exact-sciences-3792998507", 0),
    ("002", "https://www.linkedin.com/jobs/view/lead-autopilot-gnc-engineer-at-natilus-3638525392", 0),
    ("002", "https://www.linkedin.com/jobs/view/senior-principal-electro-optical-engineer-at-leonardo-drs-3793889578", 0),

    # Candidate 003: Kỹ năng liên quan đến Teaching, Classroom Management
    ("003", "https://www.linkedin.com/jobs/view/chef-de-cuisine-at-goodwin-recruiting-3805761853", 1), #Phù hợp (Chef)
    ("003", "https://www.linkedin.com/jobs/view/sous-chef-at-the-union-house-fine-dining-3787838849", 1),
    ("003", "https://www.linkedin.com/jobs/view/bravo-host-at-bertucci-s-3670729205", 0),
    ("003", "https://www.linkedin.com/jobs/view/bravo-host-bravo-brookfield-at-earl-enterprises-3670725900", 0), 
    ("003", "https://www.linkedin.com/jobs/view/restaurant-manager-at-maggiano-s-little-italy-3586346752", 0),

    # Candidate 004: Project Management, Engineering Coordination
    ("004", "https://ca.linkedin.com/jobs/view/engineering-project-coordinator-at-shared-health-soins-communs-3767503688", 1),  # Phù hợp
    ("004", "https://ca.linkedin.com/jobs/view/senior-project-manager-at-bird-construction-3800438330", 0),
    ("004", "https://ca.linkedin.com/jobs/view/project-manager-at-colliers-project-leaders-canada-3767684577", 0),
    ("004", "https://ca.linkedin.com/jobs/view/senior-project-manager-at-dmc-recruitment-group-3801585855", 0),
    ("004", "https://ca.linkedin.com/jobs/view/charg%C3%A9-de-projet-s%C3%A9nior-at-bgis-3785854187", 0),

    # Candidate 005: Control Systems, Integration Engineering
    ("005", "https://www.linkedin.com/jobs/view/control-systems-integration-engineer-at-olsson-3793857817", 1),  # Phù hợp
    ("005", "https://www.linkedin.com/jobs/view/control-systems-integration-engineer-at-olsson-3793858841", 1),  # Phù hợp
    ("005", "https://www.linkedin.com/jobs/view/ae-i-lead-at-cargill-3801439972", 0),  # Không phù hợp
    ("005", "https://www.linkedin.com/jobs/view/ae-i-technician-at-cargill-3745037053", 0),  # Không phù hợp
    ("005", "https://www.linkedin.com/jobs/view/maintenance-technician-iii-at-cargill-3807747882", 0),  # Không phù hợp

    # Candidate 006: Retail Sales, Customer Service
    ("006", "https://www.linkedin.com/jobs/view/lead-sales-associate-pt-at-dollar-general-3580065885", 1),  # Phù hợp
    ("006", "https://www.linkedin.com/jobs/view/retail-associate-part-time-west-washington-st-indianapolis-at-goodwill-of-central-southern-indiana-3796424455", 0),
    ("006", "https://www.linkedin.com/jobs/view/retail-associate-part-time-crawfordsville-in-at-goodwill-of-central-southern-indiana-3796421574", 0),
    ("006", "https://www.linkedin.com/jobs/view/purchasing-and-receiving-manager-at-davidson-hospitality-group-3777074000", 0),
    ("006", "https://www.linkedin.com/jobs/view/retail-associate-part-time-avon-in-at-goodwill-of-central-southern-indiana-3796424444", 0),

    # Candidate 007: Tax Analysis, Product Analysis
    ("007", "https://www.linkedin.com/jobs/view/senior-associate-tax-product-analyst-at-kpmg-us-3764869128", 1),  # Phù hợp
    ("007", "https://www.linkedin.com/jobs/view/pacs-cpacs-hemo-implementation-specialist-at-technogen-inc-3800249519", 0),
    ("007", "https://www.linkedin.com/jobs/view/property-manager-at-sudler-property-management-3806867162", 0), 
    ("007", "https://www.linkedin.com/jobs/view/senior-analyst-bangkok-based-relocation-provided-at-agoda-3748492368", 0),
    ("007", "https://www.linkedin.com/jobs/view/volunteer-executive-director-at-black-building-leadership-and-community-knowledge-3803987946", 0),  

    # Candidate 008: Property Management
    ("008", "https://uk.linkedin.com/jobs/view/property-manager-at-knight-dragon-ltd-3805009895", 1),  # Phù hợp
    ("008", "https://uk.linkedin.com/jobs/view/rrpa-diary-manager-london-approved-premises-ref-76835-at-hm-prison-and-probation-service-3772076193", 1),
    ("008", "https://uk.linkedin.com/jobs/view/cable-transport-installation-manager-japan-at-rwe-3789009132", 1),
    ("008", "https://uk.linkedin.com/jobs/view/production-team-leaders-days-at-energy-jobline-3803177048", 1),
    ("008", "https://uk.linkedin.com/jobs/view/general-practitioner-at-jupiter-recruitment-corporation-ltd-3804404066", 1),

    # Candidate 009: Development Services, Planning
    ("009", "https://www.linkedin.com/jobs/view/development-services-technician-at-county-of-san-joaquin-3769465536", 1),  # Phù hợp
    ("009", "https://www.linkedin.com/jobs/view/planner-iii-at-county-of-calaveras-3700430020", 1),
    ("009", "https://www.linkedin.com/jobs/view/electrical-engineer-at-rtm-engineering-consultants-3669230001", 0),
    ("009", "https://www.linkedin.com/jobs/view/building-inspector-i-ii-at-merced-county-3803413040", 0), 
    ("009", "https://www.linkedin.com/jobs/view/construction-manager-at-m-i-homes-inc-3755121888", 0),

    # Candidate 010: Sales Management, Regional Sales
    ("010", "https://www.linkedin.com/jobs/view/regional-sales-manager-mid-atlantic-at-alfasigma-usa-inc-3806588517", 1),  # Phù hợp
    ("010", "https://www.linkedin.com/jobs/view/territory-manager-at-jobot-3805148755", 1), 
    ("010", "https://www.linkedin.com/jobs/view/assistant-vice-president-transplant-at-atrium-health-3745432228", 0),
    ("010", "https://www.linkedin.com/jobs/view/store-manager-at-community-choice-financial-family-of-brands-3797850347", 0),
    ("010", "https://www.linkedin.com/jobs/view/director-of-sales-marketing-at-aimbridge-hospitality-3763398286", 0),
]
# Tạo DataFrame ground truth
ground_truth_schema = StructType([
    StructField("candidate_id", StringType(), True),
    StructField("job_link", StringType(), True),
    StructField("is_relevant", IntegerType(), True)
])

df_ground_truth = spark.createDataFrame(ground_truth_data, ground_truth_schema)

# Lưu ground truth vào Delta Table
dbutils.fs.rm("/FileStore/delta/ground_truth", recurse=True)
df_ground_truth.write.format("delta").mode("overwrite").save("/FileStore/delta/ground_truth")
print("Ground Truth đã được lưu vào /FileStore/delta/ground_truth")

# Chuyển ground truth thành dictionary để tra cứu nhanh
ground_truth = {(row["candidate_id"], row["job_link"]): row["is_relevant"] 
                for row in df_ground_truth.collect()}

# So sánh dự đoán với ground truth và tính các chỉ số
true_positives = 0
false_positives = 0
false_negatives = 0
true_negatives = 0

for pred in predictions:
    candidate_id = pred["candidate_id"]
    job_link = pred["job_link"]
    predicted_relevant = pred["predicted_relevant"]
    actual_relevant = ground_truth.get((candidate_id, job_link), 0)

    if predicted_relevant == 1 and actual_relevant == 1:
        true_positives += 1
    elif predicted_relevant == 1 and actual_relevant == 0:
        false_positives += 1
    elif predicted_relevant == 0 and actual_relevant == 1:
        false_negatives += 1
    elif predicted_relevant == 0 and actual_relevant == 0:
        true_negatives += 1


# Tính precision, recall, F1-score
precision = true_positives / (true_positives + false_positives) if (true_positives + false_positives) > 0 else 0
recall = true_positives / (true_positives + false_negatives) if (true_positives + false_negatives) > 0 else 0
f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0

# In kết quả đánh giá
# In kết quả đánh giá
print("Kết quả đánh giá mô hình:")
print(f"True Positives: {true_positives}")
print(f"False Positives: {false_positives}")
print(f"False Negatives: {false_negatives}")
print(f"True Negatives: {true_negatives}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1-Score: {f1_score:.4f}")

Schema của Delta Table sau khi lưu:
root
 |-- job_suitability_rank: integer (nullable = true)
 |-- query_sn: string (nullable = true)
 |-- company: string (nullable = true)
 |-- job_link: string (nullable = true)
 |-- job_location: string (nullable = true)
 |-- similarity_score: double (nullable = true)

Dữ liệu trong Delta Table:
+--------------------+--------+--------------------+--------------------+--------------------+-------------------+
|job_suitability_rank|query_sn|             company|            job_link|        job_location|   similarity_score|
+--------------------+--------+--------------------+--------------------+--------------------+-------------------+
|                   3|     003|          Bertucci's|https://www.linke...|      Brookfield, WI|0.14662195167403572|
|                   4|     003|    Earl Enterprises|https://www.linke...|      Brookfield, WI| 0.1412224158312091|
|                   5|     003|Maggiano's Little...|https://www.linke...|       Milwaukee, W

In [0]:
from pyspark.sql import SparkSession

# Khởi tạo Spark session
spark = SparkSession.builder.appName("ViewSavedResults").getOrCreate()

# Đọc dữ liệu từ Delta Table chứa kết quả gợi ý
output_df = spark.read.format("delta").load("/FileStore/delta/output_job_skills_match")

# Hiển thị kết quả
print("Kết quả gợi ý công việc từ Delta Table:")
output_df.show()

# Đọc dữ liệu Ground Truth để kiểm tra
ground_truth_df = spark.read.format("delta").load("/FileStore/delta/ground_truth")

# Hiển thị Ground Truth
print("Ground Truth từ Delta Table:")
ground_truth_df.show()

# Lấy danh sách dự đoán từ output_df
predictions = []
for row in output_df.collect():
    predicted_relevant = 1 if row["similarity_score"] >= 0.15 else 0
    predictions.append({
        "candidate_id": row["query_sn"],
        "job_link": row["job_link"],
        "predicted_relevant": predicted_relevant
    })

# Chuyển Ground Truth thành dictionary
ground_truth = {(row["candidate_id"], row["job_link"]): row["is_relevant"] 
                for row in ground_truth_df.collect()}

# Tính các chỉ số
true_positives = 0
false_positives = 0
false_negatives = 0
true_negatives = 0

for pred in predictions:
    candidate_id = pred["candidate_id"]
    job_link = pred["job_link"]
    predicted_relevant = pred["predicted_relevant"]
    actual_relevant = ground_truth.get((candidate_id, job_link), 0)

    if predicted_relevant == 1 and actual_relevant == 1:
        true_positives += 1
    elif predicted_relevant == 1 and actual_relevant == 0:
        false_positives += 1
    elif predicted_relevant == 0 and actual_relevant == 1:
        false_negatives += 1
    elif predicted_relevant == 0 and actual_relevant == 0:
        true_negatives += 1

# Tính các chỉ số đánh giá
precision = true_positives / (true_positives + false_positives) if (true_positives + false_positives) > 0 else 0
recall = true_positives / (true_positives + false_negatives) if (true_positives + false_negatives) > 0 else 0
f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0

# In kết quả đánh giá
print("Kết quả đánh giá mô hình (ngưỡng = 0.15):")
print(f"True Positives: {true_positives}")
print(f"False Positives: {false_positives}")
print(f"False Negatives: {false_negatives}")
print(f"True Negatives: {true_negatives}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1-Score: {f1_score:.4f}")

Kết quả gợi ý công việc từ Delta Table:
+--------------------+--------+--------------------+--------------------+--------------------+-------------------+
|job_suitability_rank|query_sn|             company|            job_link|        job_location|   similarity_score|
+--------------------+--------+--------------------+--------------------+--------------------+-------------------+
|                   3|     003|          Bertucci's|https://www.linke...|      Brookfield, WI|0.14662195167403572|
|                   4|     003|    Earl Enterprises|https://www.linke...|      Brookfield, WI| 0.1412224158312091|
|                   5|     003|Maggiano's Little...|https://www.linke...|       Milwaukee, WI|0.13826915112079338|
|                   1|     004|Shared Health-Soi...|https://ca.linked...|Winnipeg, Manitob...| 0.6227528616809993|
|                   2|     004|   Bird Construction|https://ca.linked...|Winnipeg, Manitob...|0.14062922621939664|
|                   3|     004|Colliers 