# Global setup and package installation used in most phases

## Colab + GPU Detection Utilities

In [1]:
import subprocess

def is_running_in_colab():
    try:
        import google.colab
        return True
    except ImportError:
        return False

def get_available_gpu_memory_gb():
    try:
        output = subprocess.check_output(
            ["nvidia-smi", "--query-gpu=memory.free", "--format=csv,nounits,noheader"],
            encoding="utf-8"
        )
        free_mem_mb = int(output.strip().split("\n")[0])
        return free_mem_mb / 1024
    except Exception:
        return 0.0


## install dependencies

In [None]:
if is_running_in_colab():
    # Install the required packages
    !pip install kagglehub pandas
    !pip install -q transformers accelerate bitsandbytes sentencepiece pydantic huggingface_hub xformers
    !pip install regex json5
    !pip install sentence-transformers scikit-learn
    !pip install rapidfuzz unidecode

else:
    %pip install kagglehub pandas
    %pip install -q transformers accelerate sentencepiece pydantic huggingface_hub xformers
    #%pip install torch==2.2.2 torchvision==0.17.2 torchaudio==2.2.2 --index-url https://download.pytorch.org/whl/cu121
    #%pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu121
    %pip install --pre torch torchvision torchaudio --index-url https://download.pytorch.org/whl/nightly/cu128
    %pip install -U bitsandbytes
    %pip install regex json5
    %pip install sentence-transformers scikit-learn
    %pip install rapidfuzz unidecode


## Login to huggingface

In [2]:
from huggingface_hub import login
import os

# Set your token here securely or prompt for it in Colab
# Recommended: store in Colab secrets or environment variable
HF_TOKEN = os.getenv("HUGGINGFACE_TOKEN")


if not HF_TOKEN:
    if is_running_in_colab():
        # If running in Colab, use the Colab secrets
        try:
            from google.colab import userdata
            HF_TOKEN = userdata.get('HF_TOKEN')
            if not HF_TOKEN:
                raise ValueError("⚠️ Hugging Face token not found in Colab secrets.")
            print("🔑 Hugging Face token found in Colab secrets.")
        except ImportError:
            print("⚠️ Unable to authenticate in Colab. Please set your Hugging Face token manually.")
    else:
        # Prompt for token if not set in environment
        print("🔑 Please enter your Hugging Face token:")
        # For Colab or local prompt input
        HF_TOKEN = input("🔑 Enter your Hugging Face token: ").strip()

login(token=HF_TOKEN)


## Setup Kaggle Credentials

In [3]:
import shutil

def setup_kaggle_credentials():
    kaggle_path = os.path.expanduser('~/.kaggle/kaggle.json')
    if not os.path.exists(kaggle_path):
        from google.colab import files
        print("📂 Upload kaggle.json file...")
        uploaded = files.upload()
        os.makedirs(os.path.dirname(kaggle_path), exist_ok=True)
        for filename in uploaded.keys():
            shutil.move(filename, kaggle_path)
        os.chmod(kaggle_path, 0o600)
        print(f"✅ Kaggle credentials setup at {kaggle_path}")
    else:
        print(f"✅ Kaggle credentials already exist at {kaggle_path}")

setup_kaggle_credentials()

✅ Kaggle credentials already exist at C:\Users\rubyj/.kaggle/kaggle.json


## Mount Google Drive (Colab)

In [None]:
if is_running_in_colab():
   from google.colab import drive
   drive.mount('/content/drive')

##  Load Qwen-Instruct with Fallback to Quantized

In [4]:
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig, pipeline
import torch

def load_model_pipeline(model_name: str, hf_token: str):
    has_cuda = torch.cuda.is_available()
    free_mem = torch.cuda.get_device_properties(0).total_memory / (1024 ** 3) if has_cuda else 0
    print(f"💻 CUDA: {has_cuda} | GPU Memory: {free_mem:.2f} GB")

    device_map = {"": 0} if has_cuda else "cpu"
    use_4bit = has_cuda and free_mem < 24

    # Set quantization config
    quant_config = BitsAndBytesConfig(
        load_in_4bit=True if use_4bit else False,
        bnb_4bit_use_double_quant=True,
        bnb_4bit_compute_dtype=torch.float16,
        bnb_4bit_quant_type="nf4"
    ) if use_4bit else None

    # Load tokenizer
    tokenizer = AutoTokenizer.from_pretrained(model_name, token=hf_token, trust_remote_code=True)
    tokenizer.pad_token = tokenizer.eos_token  # ✅ Fix warning about pad_token

    # Load model
    model = AutoModelForCausalLM.from_pretrained(
        model_name,
        device_map=device_map,
        quantization_config=quant_config,
        torch_dtype=torch.float16 if not quant_config else None,
        trust_remote_code=True,
        token=hf_token
    )

    print(f"✅ Model loaded on {next(model.parameters()).device}")
    return pipeline("text-generation", model=model, tokenizer=tokenizer, batch_size=1)


In [5]:
llm_pipeline = load_model_pipeline(
    model_name="Qwen/Qwen2-7B-Instruct",
    hf_token=HF_TOKEN
)

💻 CUDA: True | GPU Memory: 15.92 GB


Sliding Window Attention is enabled but not implemented for `sdpa`; unexpected results may be encountered.


Loading checkpoint shards:   0%|          | 0/4 [00:00<?, ?it/s]

Device set to use cuda:0


✅ Model loaded on cuda:0


# Global utilities

### Utility to save json to a folder

In [6]:
import json
import os
# 📦 Save JSON Output with Safety
def save_json_output(data, output_path: str, indent: int = 4, overwrite: bool = True):
    output_dir = os.path.dirname(output_path)
    os.makedirs(output_dir, exist_ok=True)

    if os.path.exists(output_path):
        if overwrite:
            os.remove(output_path)
        else:
            raise FileExistsError(f"File {output_path} already exists and overwrite=False.")

    with open(output_path, "w", encoding="utf-8") as f:
        json.dump(data, f, indent=indent, ensure_ascii=False)

    print(f"✅ Saved output to {output_path}")


### Utility to load file

In [7]:
from typing import Any
import json

# 📂 Load normalized JSON data
def load_json_file(file_path: str) -> Any:
    if not os.path.isfile(file_path):
        raise FileNotFoundError(f"File not found: {file_path}")
    with open(file_path, 'r', encoding='utf-8') as file:
        return json.load(file)

In [8]:
from pathlib import Path
from typing import List, Dict
# ─────────────────────────────────────────────────────────────
# Utility: Load structured JSON records from one or more files
# ─────────────────────────────────────────────────────────────
def load_json_records(paths: List[Path]) -> List[Dict]:
    records = []
    for path in paths:
        with open(path, "r", encoding="utf-8") as f:
            data = json.load(f)
            if isinstance(data, list):
                records.extend(data)
            else:
                records.append(data)
    return records

### Configurations  

In [9]:
# ==============================
# 🛠 CONFIGURATION
# ==============================

class Config:
    JSON_OUTPUT_DIR = "json_outputs_all_data"
    JSON_OUTPUT_NORMALIZED_DIR = "json_outputs_all_data/normalized"
    JSON_OUTPUT_NORMALIZED_JD = "json_outputs_all_data/normalized/jd"
    JSON_OUTPUT_NORMALIZED_RESUME = "json_outputs_all_data/normalized/resume"
    JSON_OUTPUT_SCORING_DIR = "json_outputs_all_data/scoring"
    JSON_OUTPUT_SCORING_SPLIT_DIR = "json_outputs_all_data/scoring/split"
  


# Pre Phase3 Embedding-Based Resume-JD Relevance Generator

## Generate Sementic relevance file

In [None]:
from sentence_transformers import SentenceTransformer, util
from typing import Tuple

# Load embedding model once (lightweight)
domain_model = SentenceTransformer("all-MiniLM-L6-v2")

def compute_domain_similarity(domain1: str, domain2: str, threshold: float = 0.5) -> Tuple[float, bool]:
    """
    Compute cosine similarity between two domain strings using sentence embeddings.

    Returns:
        (similarity_score, is_similar) where:
            - similarity_score is a float between 0 and 1
            - is_similar is True if similarity >= threshold
    """
    if not domain1 or not domain2:
        return 1.0, True  # Treat empty/missing domain as matching

    embeddings = domain_model.encode([domain1, domain2], convert_to_tensor=True)
    similarity = util.cos_sim(embeddings[0], embeddings[1]).item()
    return round(similarity, 4), similarity >= threshold


In [None]:
sim_score, is_similar = compute_domain_similarity("healthcare analytics", "medical data science")
print(f"Similarity: {sim_score}, Match: {is_similar}")


In [None]:
from typing import List, Dict
from pathlib import Path
import json
from sentence_transformers import SentenceTransformer, util
import pandas as pd
from collections import defaultdict

def generate_relevance_map(
    resume_files: List[Path],
    jd_files: List[Path],
    model_name: str = "all-MiniLM-L6-v2",
    min_score: float = 0.2
) -> Dict[str, List[Dict]]:
    model = SentenceTransformer(model_name)

    resumes = load_json_records(resume_files)
    jds = load_json_records(jd_files)

    resumes_df = pd.DataFrame([r for r in resumes if r.get("input_text") and r.get("output_json")])
    jds_df = pd.DataFrame([j for j in jds if j.get("input_text") and j.get("output_json")])

    resume_embeds = model.encode(resumes_df["input_text"].tolist(), convert_to_tensor=True)
    jd_embeds = model.encode(jds_df["input_text"].tolist(), convert_to_tensor=True)
    scores = util.cos_sim(resume_embeds, jd_embeds).cpu().numpy()

    results = []
    for i, r_row in resumes_df.iterrows():
        for j, jd_row in jds_df.iterrows():
            r_domain = r_row["domain"].strip().lower()
            jd_domain = jd_row["domain"].strip().lower()
            domain_score, domain_match = compute_domain_similarity(r_domain, jd_domain, threshold=0.5)
            if not domain_match:
                continue  # Skip this resume–JD pair


            score = float(scores[i][j])
            if score < min_score:
                continue

            label = (
                "strong" if score >= 0.65 else
                "medium" if score >= 0.4 else
                "weak"
            )

            # Append result with extended metadata
            results.append({
                "resume_id": r_row["record_id"],
                "jd_id": jd_row["record_id"],
                "resume_domain": r_row["domain"],
                "jd_domain": jd_row["domain"],
                "domain_similarity": round(domain_score, 3),
                "resume_jd_similarity": round(score, 3),
                "semantic_match_label": label
            })

    print(f"Generated relevance map with {len(results)} pairs.")
    # print count of weak, medium, strong matches
    weak_count = sum(1 for r in results if r["semantic_match_label"] == "weak")
    medium_count = sum(1 for r in results if r["semantic_match_label"] == "medium")
    strong_count = sum(1 for r in results if r["semantic_match_label"] == "strong")
    print(f"Weak matches: {weak_count}, Medium matches: {medium_count}, Strong matches: {strong_count}")

    top_k = 3  # Change as needed
    reverse_map = defaultdict(list)

    # Group by resume_id and sort by similarity score
    for r in results:
        reverse_map[r["resume_id"]].append(r)

    # Sort and keep top K
    resume_top_matches = {
        resume_id: sorted(matches, key=lambda x: x["resume_jd_similarity"], reverse=True)[:top_k]
        for resume_id, matches in reverse_map.items()
    }


    return {
    "semantic_relevance_scores": results,
    "resume_top_matches": resume_top_matches
}



In [None]:
from pathlib import Path

resume_paths = list(Path(Config.JSON_OUTPUT_NORMALIZED_RESUME).glob("resumes_*.json"))
jd_paths = list(Path(Config.JSON_OUTPUT_NORMALIZED_JD).glob("jds_*.json"))
relevance_data = generate_relevance_map(resume_paths, jd_paths)
relevance_map_file = os.path.join(Config.JSON_OUTPUT_SCORING_DIR, 'semantic_relevance_scores.json')
save_json_output(relevance_data, relevance_map_file)


## Generate smaller semantic file for future scoring

In [None]:
from typing import List, Dict, Tuple, Set
from collections import defaultdict, Counter
import random

# === 1. FILTERING ===
def filter_pairs_by_score(pairs: List[Dict], threshold: float) -> List[Dict]:
    return [p for p in pairs if p.get("resume_jd_similarity", 0.0) >= threshold]


# === 2. GROUPING ===
def group_pairs_by_quality_and_resume(pairs: List[Dict]) -> Tuple[
    Dict[str, List[Dict]],
    Dict[int, Dict[str, List[Dict]]],
    Dict[str, List[Dict]]
]:
    quality_buckets = defaultdict(list)
    resume_to_qualities = defaultdict(lambda: defaultdict(list))
    domain_coverage = defaultdict(list)

    for p in pairs:
        q = p.get("semantic_match_label", "unknown").lower()
        rid = p["resume_id"]
        res_dom = p.get("resume_domain", "unknown").lower()
        jd_dom = p.get("jd_domain", "unknown").lower()

        quality_buckets[q].append(p)
        resume_to_qualities[rid][q].append(p)
        domain_coverage[res_dom].append(p)
        domain_coverage[jd_dom].append(p)

    return quality_buckets, resume_to_qualities, domain_coverage


# === 3. RESUME-BALANCED SAMPLING ===
def resume_balanced_sampling(
    resume_to_qualities: Dict[int, Dict[str, List[Dict]]],
    target_qualities: List[str],
    seen_pairs: Set[Tuple[int, int]]
) -> Dict[str, List[Dict]]:
    selected = defaultdict(list)
    resume_count = 0

    for resume_id, qmap in resume_to_qualities.items():
        used = 0
        for q in target_qualities:
            if q in qmap:
                pair = random.choice(qmap[q])
                key = (pair["resume_id"], pair["jd_id"])
                if key not in seen_pairs:
                    selected[q].append(pair)
                    seen_pairs.add(key)
                    used += 1
        if used > 0:
            resume_count += 1

    return selected, resume_count


# === 4. DOMAIN QUOTA ENFORCEMENT ===
def enforce_domain_quota(
    quality_buckets: Dict[str, List[Dict]],
    seen_pairs: Set[Tuple[int, int]],
    target_counts: Dict[str, int],
    current_counts: Dict[str, int],
    min_per_domain: int
) -> Dict[str, List[Dict]]:
    selected = defaultdict(list)
    domain_pair_cache = defaultdict(list)

    for q in quality_buckets:
        for p in quality_buckets[q]:
            d1 = p.get("resume_domain", "unknown").lower()
            d2 = p.get("jd_domain", "unknown").lower()
            domain_pair_cache[(q, d1)].append(p)
            domain_pair_cache[(q, d2)].append(p)

    for (q, domain), group in domain_pair_cache.items():
        if current_counts[q] >= target_counts[q]:
            continue  # already filled this quality

        candidates = [p for p in group if (p["resume_id"], p["jd_id"]) not in seen_pairs]
        room_left = target_counts[q] - current_counts[q]
        take_count = min(min_per_domain, len(candidates), room_left)
        sampled = random.sample(candidates, take_count)

        for p in sampled:
            key = (p["resume_id"], p["jd_id"])
            if key not in seen_pairs:
                selected[q].append(p)
                seen_pairs.add(key)
                current_counts[q] += 1
                if current_counts[q] >= target_counts[q]:
                    break  # stop sampling more of this quality

    return selected



# === 5. FILL REMAINING WITH DIVERSITY ===
def fill_remaining_by_diverse_domains(
    quality_buckets: Dict[str, List[Dict]],
    selected_by_quality: Dict[str, List[Dict]],
    target_counts: Dict[str, int],
    seen_pairs: Set[Tuple[int, int]]
) -> Dict[str, List[Dict]]:
    for q in target_counts:
        remaining = target_counts[q] - len(selected_by_quality[q])
        if remaining <= 0:
            continue

        available = [
            p for p in quality_buckets[q]
            if (p["resume_id"], p["jd_id"]) not in seen_pairs
        ]

        # Bucket by domain
        domain_groups = defaultdict(list)
        for p in available:
            domain_groups[p.get("resume_domain", "unknown").lower()].append(p)
            domain_groups[p.get("jd_domain", "unknown").lower()].append(p)

        sampled = []
        domain_cycle = list(domain_groups.keys())
        random.shuffle(domain_cycle)

        while remaining > 0 and domain_cycle:
            domain = domain_cycle.pop(0)
            group = domain_groups[domain]
            group = [p for p in group if (p["resume_id"], p["jd_id"]) not in seen_pairs]
            if not group:
                continue
            chosen = random.choice(group)
            sampled.append(chosen)
            seen_pairs.add((chosen["resume_id"], chosen["jd_id"]))
            domain_groups[domain].remove(chosen)
            domain_cycle.append(domain)
            remaining -= 1

        selected_by_quality[q].extend(sampled)

    return selected_by_quality


In [None]:
# === 6. MAIN WRAPPER ===
def generate_balanced_sample(
    all_pairs: List[Dict],
    target_counts: Dict[str, int],
    score_threshold: float = 0.1,
    min_per_domain: int = 50
) -> List[Dict]:
    print(f"🔍 Total pairs in input: {len(all_pairs)}")
    filtered = filter_pairs_by_score(all_pairs, score_threshold)
    print(f"✅ After score ≥ {score_threshold}: {len(filtered)}")

    quality_buckets, resume_to_qualities, domain_coverage = group_pairs_by_quality_and_resume(filtered)
    print("📊 Match quality counts:")
    for q in target_counts:
        print(f"  {q.upper():<7}: {len(quality_buckets[q])}")
    print(f"👥 Unique resumes: {len(resume_to_qualities)}")

    seen_pairs = set()

    # Resume-balanced
    selected_by_quality, resume_count = resume_balanced_sampling(resume_to_qualities, list(target_counts.keys()), seen_pairs)
    print(f"👤 Resume-balanced resumes: {resume_count}")

    # Domain quotas
    current_counts = {q: len(selected_by_quality[q]) for q in target_counts}
    domain_quota_selected = enforce_domain_quota(
        quality_buckets,
        seen_pairs,
        target_counts,
        current_counts,
        min_per_domain
    )

    for q in target_counts:
        selected_by_quality[q].extend(domain_quota_selected.get(q, []))

    # Fill remaining
    selected_by_quality = fill_remaining_by_diverse_domains(quality_buckets, selected_by_quality, target_counts, seen_pairs)

    # Final merge
    final_sample = []
    print("\n📦 Final sampled count:")
    for q in target_counts:
        group = selected_by_quality[q]
        print(f"  {q.upper():<7}: {len(group)}")
        final_sample.extend(group)

    print(f"\n🎯 Total selected: {len(final_sample)}")

    # Domain coverage
    resume_domains = [p.get("resume_domain", "unknown").lower() for p in final_sample]
    jd_domains = [p.get("jd_domain", "unknown").lower() for p in final_sample]
    print("\n📊 Resume domain coverage (top 10):")
    for dom, count in Counter(resume_domains).most_common(10):
        print(f"  {dom:<30} {count}")

    print("\n📊 JD domain coverage (top 10):")
    for dom, count in Counter(jd_domains).most_common(10):
        print(f"  {dom:<30} {count}")

    return final_sample

In [None]:
# 🎯 Define how many samples per match type
target_counts = {
    "strong": 20000,
    "medium": 25000,
    "weak": 15000
}

# 📥 Load the filtered list only
input_path = os.path.join(Config.JSON_OUTPUT_SCORING_DIR, 'semantic_relevance_scores.json')
full_data  = load_json_file(input_path)
relevance_data = full_data.get("semantic_relevance_scores", [])

# 🧠 Generate the balanced subset
sampled = generate_balanced_sample(
    all_pairs=relevance_data,
    target_counts=target_counts,
    score_threshold=0.2
)

# Wrap in valid JSON object structure
relevance_wrapped = {
    "semantic_relevance_scores": sampled
}

# 💾 Save the sampled subset
relevance_map_file = os.path.join(Config.JSON_OUTPUT_SCORING_DIR, 'relevant_pairs.json')
save_json_output(relevance_wrapped, relevance_map_file)


# Phase 3 Rubric-Based Scoring Engine

## Rule-Based Scoring Functions

In [10]:
scoring_config = {
    "matching": {
        # Global fuzzy threshold for fuzzy matching (0–100)
        "fuzzy_threshold_default": 85,
        "section_thresholds": {
            "skills": 85,
            "tools": 80,
            "certifications": 88,
            "responsibilities": 83,
            "education": 87
        }
    },
    "weights": {
        "skills": {
            "exact": 1.0,
            "substring": 0.8,
            "fuzzy": 0.5
        },
        "tools": {
            "exact": 1.0,
            "substring": 0.7,
            "fuzzy": 0.4
        },
        "certifications": {
            "exact": 1.0,
            "substring": 0.9,
            "fuzzy": 0.5
        },
        "responsibilities": {
            "exact": 1.0,
            "substring": 0.85,
            "fuzzy": 0.5
        },
        "education": {
            "exact": 1.0,
            "substring": 0.75,
            "fuzzy": 0.5
        }
    }
}


In [11]:
from rapidfuzz import fuzz
import re
from typing import List, Tuple, Dict, Any
from unidecode import unidecode


In [12]:
def normalize(text: str) -> str:
    text = unidecode(text.lower())
    text = re.sub(r"[^\w\s]", " ", text)
    return re.sub(r"\s+", " ", text).strip()

def normalize_score(score: float) -> float:
    return max(0.0, min(round(score, 3), 1.0))

In [13]:
def hybrid_match(jd_terms: List[str], resume_text: str, section: str) -> Tuple[Dict[str, int], List[Dict[str, str]]]:
    """
    Classifies JD terms matched in resume_text as 'exact', 'substring', or 'fuzzy'.
    Returns match counts and match breakdown list.
    """
    if not jd_terms:
        return {"exact": 0, "substring": 0, "fuzzy": 0}, []


    fuzzy_threshold = scoring_config["matching"].get("section_thresholds", {}).get(
        section,
        scoring_config["matching"].get("fuzzy_threshold_default", 85)
    )
    resume_tokens = set(re.findall(r"\b[\w\+\-\.#]{3,}\b", resume_text.lower()))
    normalized_text = normalize(resume_text)

    counts = {"exact": 0, "substring": 0, "fuzzy": 0}
    matched_terms = []

    for term in jd_terms:
        normalized_term = normalize(term)

        if normalized_term in resume_tokens:
            counts["exact"] += 1
            matched_terms.append({"term": term, "type": "exact"})
        elif normalized_term in normalized_text:
            counts["substring"] += 1
            matched_terms.append({"term": term, "type": "substring"})
        elif fuzz.partial_ratio(normalized_term, normalized_text) >= fuzzy_threshold:
            counts["fuzzy"] += 1
            matched_terms.append({"term": term, "type": "fuzzy"})

    return counts, matched_terms

In [14]:
def score_from_match_counts(counts: Dict[str, int], total: int, weights: Dict[str, float]) -> float:
    weighted_sum = (
        weights.get("exact", 1.0) * counts["exact"] +
        weights.get("substring", 0.8) * counts["substring"] +
        weights.get("fuzzy", 0.5) * counts["fuzzy"]
    )
    return normalize_score(weighted_sum / total) if total else 1.0

In [15]:
from typing import List, Dict, Any, Union

def flatten_field_values(records: List[Dict[str, Any]], field_name: str) -> str:
    lines: List[str] = []
    for record in records or []:
        value = record.get(field_name)
        if isinstance(value, list):
            lines.extend([str(v) for v in value if isinstance(v, (str, int, float))])
        elif isinstance(value, (str, int, float)):
            lines.append(str(value))
    return " ".join(lines)


In [16]:
def score_skills_rule(resume_skills, resume_other, jd_required, jd_optional):
    resume_text = (
        " ".join(resume_skills or []) + " " +
        flatten_field_values(resume_other, "content")
    )

    weights = scoring_config["weights"].get("skills", {"exact": 1.0, "substring": 0.8, "fuzzy": 0.5})

    r_counts, r_matches = hybrid_match(jd_required, resume_text, section="skills")
    o_counts, o_matches = hybrid_match(jd_optional, resume_text, section="skills")

    r_score = score_from_match_counts(r_counts, len(jd_required), weights)
    o_score = score_from_match_counts(o_counts, len(jd_optional), weights)

    final_score = normalize_score(0.8 * r_score + 0.2 * o_score)
    reason = (
        f"Required: {r_counts}, Optional: {o_counts}. "
        f"Matched skills: {[m['term'] + ' (' + m['type'] + ')' for m in r_matches + o_matches]}"
    )
    return final_score, reason


In [17]:
def score_certifications_rule(resume_certs, resume_other, jd_certs):
    if not jd_certs:
        return 1.0, "No certifications required by JD."

    cert_text = (
        " ".join(cert.get("certification", "") for cert in resume_certs or []) + " " +
        flatten_field_values(resume_other, "content")
    )

    weights = scoring_config["weights"].get("certifications", {"exact": 1.0, "substring": 0.9, "fuzzy": 0.5})

    counts, matches = hybrid_match(jd_certs, cert_text, section="certifications")
    score = score_from_match_counts(counts, len(jd_certs), weights)

    reason = f"Certifications matched: {counts}. Matched: {[m['term'] + ' (' + m['type'] + ')' for m in matches]}"
    return score, reason


In [19]:
def score_education_rule(resume_education, jd_degrees):
    if not jd_degrees:
        return 1.0, "No preferred degrees listed in JD."
    if not resume_education:
        return 0.0, "No education information found in resume."

    resume_text = " ".join((edu.get("degree", "") or "") for edu in resume_education)

    weights = scoring_config["weights"].get("education", {"exact": 1.0, "substring": 0.75, "fuzzy": 0.5})

    counts, matches = hybrid_match(jd_degrees, resume_text, section="education")
    score = score_from_match_counts(counts, len(jd_degrees), weights)

    reason = f"Degrees matched: {counts}. Matched: {[m['term'] + ' (' + m['type'] + ')' for m in matches]}"
    return score, reason


In [20]:
def score_experience_rule(resume_years, jd_required_years, cap: float = 40.0):
    if not jd_required_years or resume_years is None:
        return 0.5, "Missing required or actual experience data."

    numbers = re.findall(r'\d+(?:\.\d+)?', jd_required_years)
    if not numbers:
        return 1.0, "JD experience string did not specify clear years."

    required_years = float(min(numbers))
    if required_years == 0:
        return 1.0, "JD required years = 0."

    resume_years_capped = min(resume_years, cap)
    score = resume_years_capped / required_years
    reason = f"Resume: {resume_years} yrs (capped to {resume_years_capped}), JD requires: {required_years} yrs."
    return normalize_score(score), reason

In [21]:
def score_tools_rule(resume_skills, resume_experience, resume_other, resume_projects, jd_tools):
    if not jd_tools:
        return 1.0, "No tools required by JD."

    resume_text = (
        " ".join(resume_skills or []) + " " +
        flatten_field_values(resume_experience, "description") + " " +
        flatten_field_values(resume_other, "content") +
        flatten_field_values(resume_projects, "description")
    )

    weights = scoring_config["weights"].get("tools", {"exact": 1.0, "substring": 0.7, "fuzzy": 0.4})

    counts, matches = hybrid_match(jd_tools, resume_text, section ="tools")
    score = score_from_match_counts(counts, len(jd_tools), weights)

    reason = f"Tools matched: {counts}. Matched: {[m['term'] + ' (' + m['type'] + ')' for m in matches]}"
    return score, reason


In [22]:
def score_responsibilities_rule(resume_experience, resume_other, resume_projects, jd_responsibilities):
    if not jd_responsibilities:
        return 1.0, "No responsibilities listed in JD."

    resume_text = (
        flatten_field_values(resume_experience, "description") + " " +
        flatten_field_values(resume_other, "content") + " " +
        flatten_field_values(resume_projects, "description")
    )

    weights = scoring_config["weights"].get("responsibilities", {"exact": 1.0, "substring": 0.85, "fuzzy": 0.4})

    counts, matches = hybrid_match(jd_responsibilities, resume_text, section="responsibilities")
    score = score_from_match_counts(counts, len(jd_responsibilities), weights)

    reason = f"Responsibilities matched: {counts}. Matched: {[m['term'] + ' (' + m['type'] + ')' for m in matches]}"
    return score, reason


In [23]:
def compute_all_rule_scores(resume_json: Dict[str, Any], jd_json: Dict[str, Any]) -> Dict[str, Dict[str, Any]]:
    scores = {}

    skills_score, skills_reason = score_skills_rule(
        resume_json.get("skills", []),
        resume_json.get("other", []),
        jd_json.get("required_skills", []),
        jd_json.get("optional_skills", [])
    )
    scores["skills"] = {"score": skills_score, "reason": skills_reason}

    cert_score, cert_reason = score_certifications_rule(
        resume_json.get("certifications", []),
        resume_json.get("other", []),
        jd_json.get("certifications", [])
    )
    scores["certifications"] = {"score": cert_score, "reason": cert_reason}

    edu_score, edu_reason = score_education_rule(
        resume_json.get("education", []),
        jd_json.get("preferred_degrees", [])
    )
    scores["education"] = {"score": edu_score, "reason": edu_reason}

    exp_score, exp_reason = score_experience_rule(
        resume_json.get("total_experience_years", 0.0),
        jd_json.get("required_experience_years", "")
    )
    scores["experience"] = {"score": exp_score, "reason": exp_reason}

    tools_score, tools_reason = score_tools_rule(
        resume_json.get("skills", []),
        resume_json.get("experience", []),
        resume_json.get("other", []),
        resume_json.get("projects", []),
        jd_json.get("tools_and_technologies", [])
    )
    scores["tools"] = {"score": tools_score, "reason": tools_reason}

    resp_score, resp_reason = score_responsibilities_rule(
        resume_json.get("experience", []),
        resume_json.get("other", []),
        resume_json.get("projects", []),
        jd_json.get("job_responsibilities", [])
    )
    scores["responsibilities"] = {"score": resp_score, "reason": resp_reason}

    return scores

### unit test for each scorer

In [None]:
print("=== Testing: score_skills_rule ===")
resume_skills = ["Python", "SQL", "Excel"]
resume_other = [{"section_name": "Training", "content": "Completed MongoDB, Tableau, Excel"}]
jd_required_skills = ["Python", "MongoDB"]
jd_optional_skills = ["Tableau", "Java"]

score, reason = score_skills_rule(resume_skills, resume_other, jd_required_skills, jd_optional_skills)
print(f"Score: {score}\nReason: {reason}\n")


In [None]:
print("=== Testing: score_certifications_rule ===")
resume_certs = [{"certification": "AWS Certified"}, {"certification": "Azure"}]
resume_other = [{"section_name": "Achievements", "content": "Google Cloud certified"}]
jd_certs = ["AWS Certified", "Google Cloud"]

score, reason = score_certifications_rule(resume_certs, resume_other, jd_certs)
print(f"Score: {score}\nReason: {reason}\n")


In [None]:
print("=== Testing: score_education_rule ===")
resume_education = [{"degree": "Bachelor of Computer Science"}, {"degree": "MBA"}]
jd_degrees = ["Computer Science", "Information Technology"]

score, reason = score_education_rule(resume_education, jd_degrees)
print(f"Score: {score}\nReason: {reason}\n")


In [None]:
print("=== Testing: score_experience_rule ===")
resume_years = 6.0
jd_experience = "3–5 years"

score, reason = score_experience_rule(resume_years, jd_experience)
print(f"Score: {score}\nReason: {reason}\n")


In [None]:
print("=== Testing: score_tools_rule ===")
resume_skills = ["Python", "Docker"]
resume_experience = [
    {"job_title": "DevOps Engineer", "description": ["Used AWS, Docker, and Jenkins"]},
    {"job_title": "Software Engineer", "description": ["Built APIs with Flask"]}
]
resume_other = [{"section_name": "Misc", "content": "Worked on Kubernetes and Terraform"}]
resume_projects = [{"description": "Built ML model with Scikit-learn and deployed on AWS"}]
jd_tools = ["AWS", "Docker", "Kubernetes", "GCP"]

score, reason = score_tools_rule(resume_skills, resume_experience, resume_other, resume_projects, jd_tools)
print(f"Score: {score}\nReason: {reason}\n")


In [None]:
print("=== Testing: score_responsibilities_rule ===")
resume_experience = [
    {"job_title": "Data Analyst", "description": ["Created dashboards using Power BI", "Cleaned large datasets"]},
]
resume_other = [{"section_name": "Leadership", "content": "Led team of 5 analysts"}]
resume_projects = [{"description": "Automated data pipeline using Python"}]
jd_responsibilities = [
    "Created dashboards using Power BI",
    "Automated data pipeline using Python",
    "Built ETL workflows"
]

score, reason = score_responsibilities_rule(resume_experience, resume_other, resume_projects, jd_responsibilities)
print(f"Score: {score}\nReason: {reason}\n")


In [None]:
from pprint import pprint

print("=== Testing: compute_all_rule_scores ===")

resume_json = {
    "skills": ["Python", "Docker"],
    "certifications": [{"certification": "AWS Certified"}, {"certification": "Azure"}],
    "education": [{"degree": "Bachelor of Computer Science"}, {"degree": "MBA"}],
    "total_experience_years": 4.5,
    "experience": [
        {"job_title": "DevOps Engineer", "description": ["Used AWS, Docker, and Jenkins"]},
        {"job_title": "Data Analyst", "description": ["Created dashboards using Power BI"]}
    ],
    "other": [
        {"section_name": "Leadership", "content": "Led team of 5 analysts"},
        {"section_name": "Achievements", "content": "Google Cloud certified"}
    ],
    "projects": [
        {"description": "Built ML model with Scikit-learn and deployed on AWS"}
    ]
}

jd_json = {
    "required_skills": ["Python", "MongoDB"],
    "optional_skills": ["Tableau", "Java"],
    "certifications": ["AWS Certified", "Google Cloud"],
    "preferred_degrees": ["Computer Science", "Information Technology"],
    "required_experience_years": "3+ years",
    "tools_and_technologies": ["AWS", "Docker", "Kubernetes", "GCP"],
    "job_responsibilities": [
        "Created dashboards using Power BI",
        "Automated data pipeline using Python",
        "Built ETL workflows"
    ]
}

results = compute_all_rule_scores(resume_json, jd_json)


pprint(results)


## LLM-Based Scoring Functions (Structured Prompt)

In [24]:
LLM_SCORING_SCHEMA = """{
  "skills": {
    "score": float,
    "reason": str
  },
  "certifications": {
    "score": float,
    "reason": str
  },
  "education": {
    "score": float,
    "reason": str
  },
  "experience": {
    "score": float,
    "reason": str
  },
  "tools": {
    "score": float,
    "reason": str
  },
  "responsibilities": {
    "score": float,
    "reason": str
  },
  "soft_skills": {
    "score": float,
    "reason": str
  },
  "transferable_skills": {
    "score": float,
    "reason": str
  },
  "leadership": {
    "score": float,
    "reason": str
  },
  "grammar_cleanliness": {
    "score": float,
    "reason": str
  }
}"""


In [25]:
LLM_SCORING_PROMPT_TEMPLATE = """
You are an expert resume evaluator.

Your task is to **compare** a candidate's resume and a job description and assign **section-wise ATS scores**. Each section receives:
- a score between 0.0 and 1.0
- a short reason explaining why

You must return a valid JSON object. Do not return the resume. Do not repeat input. Do not include markdown or explanations.

RESUME:
{resume_json}

JOB DESCRIPTION:
{jd_json}

Output format (STRICTLY FOLLOW THIS STRUCTURE):
{schema}

Now respond ONLY with a JSON object in this format:
"""


In [26]:
import regex
import json5
from typing import Dict

def extract_json_block(text: str) -> Dict:
    """
    Extract the last valid JSON object block from the text using recursive regex and json5.
    Handles smart quotes, trailing commas, and prefers LLM's final output JSON.
    """
    # Normalize smart quotes
    text = text.replace("“", "\"").replace("”", "\"").replace("‘", "'").replace("’", "'")

    # Match all nested JSON-like blocks
    matches = regex.findall(r"\{(?:[^{}]|(?R))*\}", text, flags=regex.DOTALL)

    expected_keys = {"skills", "experience", "education", "certifications"}

    for block in reversed(matches):
        try:
            parsed = json5.loads(block)
            if isinstance(parsed, dict) and expected_keys.intersection(parsed.keys()):
                return parsed
        except Exception:
            continue

    print("❌ No valid JSON block found in LLM output.")
    print("🔎 Last few lines:\n", text[-500:])
    raise ValueError("No valid JSON block found.")


In [27]:
def score_with_llm(resume_json: dict, jd_json: dict, resume_id="resume", jd_id="jd") -> dict:
    """
    Use an LLM pipeline to compute ATS scores with reasoning per section.
    """
    prompt = LLM_SCORING_PROMPT_TEMPLATE.format(
        schema=LLM_SCORING_SCHEMA,
        resume_json=json.dumps(resume_json, indent=2),
        jd_json=json.dumps(jd_json, indent=2)
    )

    try:
        outputs = llm_pipeline(
            prompt,
            max_new_tokens=1024,
            do_sample=False,
            temperature=None,
            top_p=None,
            top_k=None,
            pad_token_id=llm_pipeline.tokenizer.pad_token_id
        )
        response_text = outputs[0]["generated_text"]
        #print("💬 LLM response preview:\n", response_text)  

        return extract_json_block(response_text)
        

    except Exception as e:
        print(f"❌ LLM inference failed for {resume_id} x {jd_id}: {str(e)}")
        print("🧪 Raw output preview:\n", response_text)
        return {}


In [None]:
from pprint import pprint

print("=== Testing: score_with_llm ===")

# Sample resume JSON
resume_json = {
    "skills": ["Python", "Docker"],
    "certifications": [{"certification": "AWS Certified"}, {"certification": "Azure"}],
    "education": [{"degree": "Bachelor of Computer Science"}, {"degree": "MBA"}],
    "total_experience_years": 4.5,
    "experience": [
        {"job_title": "DevOps Engineer", "description": ["Used AWS, Docker, and Jenkins"]},
        {"job_title": "Data Analyst", "description": ["Created dashboards using Power BI"]}
    ],
    "other": [
        {"section_name": "Leadership", "content": "Led team of 5 analysts"},
        {"section_name": "Achievements", "content": "Google Cloud certified"}
    ],
    "projects": [
        {"description": "Built ML model with Scikit-learn and deployed on AWS"}
    ]
}

# Sample job description JSON
jd_json = {
    "required_skills": ["Python", "MongoDB"],
    "optional_skills": ["Tableau", "Java"],
    "certifications": ["AWS Certified", "Google Cloud"],
    "preferred_degrees": ["Computer Science", "Information Technology"],
    "required_experience_years": "3+ years",
    "tools_and_technologies": ["AWS", "Docker", "Kubernetes", "GCP"],
    "job_responsibilities": [
        "Created dashboards using Power BI",
        "Automated data pipeline using Python",
        "Built ETL workflows"
    ]
}

# Run LLM-based scoring
llm_scores = score_with_llm(resume_json, jd_json)

print("=== LLM Scoring Output ===")
pprint(llm_scores)


In [None]:
from pprint import pprint

print("=== Testing: score_with_llm ===")

# Sample resume JSON
resume_json = {
            "resume_id": 88907739,
            "total_experience_years": 11.3,
            "summary": "High-achieving management professional and effective consultant possessing excellent communication, organizational and analytical capabilities with about 4 years of experience in devising innovative strategies and solutions to resolve complex business challenges.",
            "education": [
                {
                    "degree": "Master of Science",
                    "field": "Software Management",
                    "institution": "Carnegie Mellon University",
                    "year": ",",
                    "gpa": 3.8
                },
                {
                    "degree": "MBA",
                    "field": "International Business",
                    "institution": "Institute of Technology & Management",
                    "year": ",",
                    "gpa": 4.0
                },
                {
                    "degree": "MBA",
                    "field": "International Business",
                    "institution": "International Business Institute of Technology and Management India",
                    "year": ",",
                    "gpa": 4.0
                }
            ],
            "experience": [
                {
                    "job_title": "Consultant",
                    "company": "Company Name",
                    "start_date": "06/2015",
                    "end_date": "Current",
                    "description": [
                        "Managed and delivered a project to implement and integrate a new content management platform to create a unified brand experience, support scalability, growth and enhance digital presence for client's business - post acquisition",
                        "Led cross-functional global teams consisting of technical, business and functional representatives and achieved key milestones on time with quality deliverables",
                        "Prioritized, escalated and resolved issues with internal and external stakeholders",
                        "Directly managed 3rd party vendor and offshore teams."
                    ]
                },
                {
                    "job_title": "Product Strategy Intern",
                    "company": "Company Name",
                    "start_date": "09/2015",
                    "end_date": "12/2015",
                    "description": [
                        "Led a practicum team at Carnegie Mellon University to understand IBM Bluemix (PaaS), cloud based solution and use business frameworks to perform market, competitor and customer journey analysis",
                        "Liaised with cross functional teams to assess opportunities in marketplace, determine synergies and align business unit goals with corporate strategy",
                        "Worked with senior management and stakeholders to develop strategy for to enhance awareness, increase conversion and explore new market opportunities to scale the client's user base."
                    ]
                },
                {
                    "job_title": "Assistant Operations Manager",
                    "company": "Company Name",
                    "start_date": "07/2012",
                    "end_date": "10/2013",
                    "description": [
                        "Business Strategy & Vendor Management: Automation of Hub, typical model and replication",
                        "Reported to Chief Operating Officer to recommend company wide automation strategies and vendor selection",
                        "Conducted gap analysis, market research, competitor and financial analysis to propose short, mid and long term strategies to the Executive team",
                        "Project Management: RFID Project Member of the core project management team responsible for coordinated of cross-functional teams to achieve project milestones",
                        "Focused on process improvement and optimization to enhance team productivity",
                        "Defined the Key Performance Indicator's to evaluate vendors."
                    ]
                }
            ],
            "skills": [
                "Strategy & Operations Process Optimization",
                "Digital Transformation",
                "Cross Functional Team Management",
                "Project/Product Management",
                "Agile/Lean Methodologies",
                "Work History",
                "Client",
                "Data Analysis",
                "E-Commerce",
                "senior management",
                "Financial",
                "financial analysis",
                "functional",
                "Google Analytics",
                "Government",
                "Hub",
                "IBM",
                "International Business",
                "investments",
                "IP",
                "Marketing plan",
                "market research",
                "Market Strategy",
                "marketing",
                "market",
                "MBA",
                ".NET",
                "academic",
                "ADA",
                "Adobe",
                "Apple",
                "approach",
                "Automation",
                "business development",
                "Business Process",
                "Business Strategy",
                "Consulting",
                "content management",
                "Conversion",
                "Client",
                "Data Analysis",
                "E-Commerce",
                "senior management",
                "Financial",
                "financial analysis",
                "functional",
                "Google Analytics",
                "Government",
                "Hub",
                "IBM",
                "International Business",
                "investments",
                "IP",
                "Marketing plan",
                "market research",
                "Market Strategy",
                "marketing",
                "market",
                "MBA",
                "C#",
                "Excel",
                "Microsoft Office Suite",
                "Power Point",
                "Word",
                "Network",
                "Object Oriented Analysis and Design",
                "optimization",
                "policies",
                "process improvement",
                "Project Management",
                "proposals",
                "quality",
                "Requirement",
                "Research",
                "RFP",
                "Scrum",
                "SDLC",
                "Speech",
                "MS SQL",
                "Strategy",
                "Strategy Development",
                "Vendor Management",
                "Management",
                "Visio",
                "websites"
            ],
            "certifications": [],
            "projects": [
                {
                    "project_title": "Online E-commerce store",
                    "description": "Conceptualized and launched Online E-commerce store, developed Product Strategy and Roadmap, and produced Engineering, Financial and Marketing plan",
                    "start_date": "08/2014",
                    "end_date": "12/2015"
                },
                {
                    "project_title": "Commercialization of IP",
                    "description": "Developed Go-to-Market Strategy, Product Roadmap and proposed Business Model to launch CMU's Automatic Speech Recognition Technology and presented to Sand Hill Angel Investors",
                    "start_date": "08/2014",
                    "end_date": "12/2015"
                },
                {
                    "project_title": "Survivable Social Network on Chip",
                    "description": "Performed Object Oriented Analysis and Design along with the estimation, planning, development, measurement and tracking of the software project using the hybrid development approach",
                    "start_date": "08/2014",
                    "end_date": "12/2015"
                }
            ],
            "languages": [],
            "other": []
        }

# Sample job description JSON
jd_json = {
            "jd_id": 3906094741,
            "inferred_domain": "consulting",
            "title": "Director, Property Tax",
            "summary": "Director, Property Tax role at Kroll, focusing on tax consulting and valuation projects.",
            "required_experience_years": "7",
            "preferred_degrees": [
                "Accounting",
                "Economics",
                "Finance",
                "Management",
                "Real Estate"
            ],
            "required_skills": [
                "Management",
                "Sales"
            ],
            "optional_skills": [],
            "tools_and_technologies": [
                "Excel",
                "Word",
                "PowerPoint"
            ],
            "certifications": [
                "ASA",
                "CPA",
                "CFA",
                "MAI"
            ],
            "soft_skills": [
                "Leadership",
                "Client Relationship Management",
                "Analytical Skills",
                "Independence",
                "Teamwork",
                "Communication",
                "Diversity Awareness"
            ],
            "job_responsibilities": [
                "Client Research",
                "Data Analysis",
                "Presentation Development",
                "Valuation Techniques",
                "Tax Hearing Preparation",
                "Project Reporting",
                "Tax Projection Scenarios",
                "Business Solution Implementation",
                "Junior Staff Development",
                "Practice Growth"
            ],
            "job_location": "Atlanta, GA",
            "remote_option": ",",
            "employment_type": "full-time",
            "travel_requirements": "N/A",
            "physical_requirements": "N/A",
            "benefits": [],
            "company_information": "Kroll is a global firm providing services in governance, risk, and transparency.",
            "equal_opportunity_policy": "Kroll is committed to creating an inclusive work environment and is an equal opportunity employer.",
            "other": [
                {
                    "section_name": "Experience Level",
                    "content": "Director"
                }
            ]
        }

# Run LLM-based scoring
llm_scores = score_with_llm(resume_json, jd_json)

print("=== LLM Scoring Output ===")
pprint(llm_scores)


## Combine Section Scores

In [28]:
merge_weights_config = {
    "skills": {"rule": 0.6, "llm": 0.4},
    "certifications": {"rule": 0.5, "llm": 0.5},
    "education": {"rule": 0.5, "llm": 0.5},
    "experience": {"rule": 0.5, "llm": 0.5},
    "tools": {"rule": 0.6, "llm": 0.4},
    "responsibilities": {"rule": 0.5, "llm": 0.5},
    "soft_skills": {"rule": 0.0, "llm": 1.0},
    "transferable_skills": {"rule": 0.0, "llm": 1.0},
    "leadership": {"rule": 0.0, "llm": 1.0},
    "grammar_cleanliness": {"rule": 0.0, "llm": 1.0}
}


In [29]:
from typing import Dict, Any

def merge_scores(rule_scores: Dict[str, Dict[str, Any]],
                 llm_scores: Dict[str, Dict[str, Any]],
                 weights_config: Dict[str, Dict[str, float]]) -> Dict[str, Dict[str, Any]]:
    """
    Merges rule-based and LLM-based scores using weighted averages and combines reasons.
    Returns final section-wise score dictionary.
    """
    merged = {}
    all_sections = set(rule_scores) | set(llm_scores)

    for section in sorted(all_sections):
        rule = rule_scores.get(section, {})
        llm = llm_scores.get(section, {})

        rule_score = rule.get("score", 0.0)
        llm_score = llm.get("score", 0.0)
        rule_reason = rule.get("reason", "")
        llm_reason = llm.get("reason", "")

        weights = weights_config.get(section, {"rule": 0.5, "llm": 0.5})
        final_score = (rule_score * weights["rule"]) + (llm_score * weights["llm"])

        merged_reason = f"(Rule {weights['rule']:.1f}): {rule_reason} | (LLM {weights['llm']:.1f}): {llm_reason}"

        merged[section] = {
            "score": round(final_score, 3),
            "reason": merged_reason
        }

    return merged


In [30]:
def compute_total_ats_score(merged_scores: Dict[str, Dict[str, Any]],
                            section_weights: Dict[str, float]) -> Dict[str, Any]:
    """
    Computes the final weighted ATS score from merged section scores.
    Returns dict with final score and breakdown.
    """
    total_weight = sum(section_weights.values())
    if total_weight == 0:
        raise ValueError("Total section weights must be greater than zero.")

    weighted_sum = 0.0
    breakdown = []

    for section, weight in section_weights.items():
        score = merged_scores.get(section, {}).get("score", 0.0)
        weighted_sum += score * weight
        breakdown.append(f"{section}: {score:.2f} × {weight:.2f}")

    final_score = round(weighted_sum / total_weight, 3)
    return {
        "final_ats_score": final_score,
        "explanation": f"Weighted average across sections → {' | '.join(breakdown)}"
    }


In [31]:
section_weights_config = {
    "skills": 0.15,
    "certifications": 0.10,
    "education": 0.10,
    "experience": 0.20,
    "tools": 0.10,
    "responsibilities": 0.15,
    "soft_skills": 0.05,
    "transferable_skills": 0.05,
    "leadership": 0.05,
    "grammar_cleanliness": 0.05
}


In [32]:
from pprint import pprint

print("=== Testing: merge_scores and compute_total_ats_score ===")

# Sample rule-based scores
rule_scores = {
    "skills": {"score": 0.6, "reason": "Rule: Matched some required skills."},
    "certifications": {"score": 0.4, "reason": "Rule: No direct certification match."},
    "education": {"score": 0.9, "reason": "Rule: Degree aligned well."},
    "experience": {"score": 1.0, "reason": "Rule: Resume years > JD years."},
    "tools": {"score": 0.5, "reason": "Rule: Partial tool match."},
    "responsibilities": {"score": 0.3, "reason": "Rule: Low overlap on tasks."}
}

# Sample LLM-based scores
llm_scores = {
    "skills": {"score": 0.8, "reason": "LLM: Python and Docker match JD."},
    "certifications": {"score": 0.6, "reason": "LLM: AWS match, missing GCP."},
    "education": {"score": 0.7, "reason": "LLM: One degree matches preferred list."},
    "experience": {"score": 0.7, "reason": "LLM: Related roles, less domain alignment."},
    "tools": {"score": 0.4, "reason": "LLM: Excel mentioned, others missing."},
    "responsibilities": {"score": 0.2, "reason": "LLM: Few relevant duties matched."},
    "soft_skills": {"score": 0.5, "reason": "LLM: Communication and teamwork evident."},
    "transferable_skills": {"score": 0.4, "reason": "LLM: PM and cross-functional skills."},
    "leadership": {"score": 0.6, "reason": "LLM: Led global teams in resume."},
    "grammar_cleanliness": {"score": 0.9, "reason": "LLM: Clean formatting and language."}
}


# Merge section scores
merged_scores = merge_scores(rule_scores, llm_scores, merge_weights_config)

# Print merged scores
print("\n=== Merged Scores ===")
pprint(merged_scores)

# Compute total ATS score
final_result = compute_total_ats_score(merged_scores, section_weights_config)

# Print final ATS score
print("\n=== Final ATS Score ===")
pprint(final_result)


=== Testing: merge_scores and compute_total_ats_score ===

=== Merged Scores ===
{'certifications': {'reason': '(Rule 0.5): Rule: No direct certification '
                              'match. | (LLM 0.5): LLM: AWS match, missing '
                              'GCP.',
                    'score': 0.5},
 'education': {'reason': '(Rule 0.5): Rule: Degree aligned well. | (LLM 0.5): '
                         'LLM: One degree matches preferred list.',
               'score': 0.8},
 'experience': {'reason': '(Rule 0.5): Rule: Resume years > JD years. | (LLM '
                          '0.5): LLM: Related roles, less domain alignment.',
                'score': 0.85},
 'grammar_cleanliness': {'reason': '(Rule 0.0):  | (LLM 1.0): LLM: Clean '
                                   'formatting and language.',
                         'score': 0.9},
 'leadership': {'reason': '(Rule 0.0):  | (LLM 1.0): LLM: Led global teams in '
                          'resume.',
                'score': 0.6},
 

## Utilities to load semantic matching Resume-JD and find matching pairs

In [33]:
from typing import List, Tuple
import os

# ✅ Updated: Load and filter relevant resume–JD pairs
def find_matching_pairs(
    relevance_json_path: str,
    match_labels: List[str] = ["strong", "medium", "weak"]
) -> List[Tuple[int, int]]:
    """
    Load relevance map and return (resume_id, jd_id) tuples for selected match labels.
    """
    data = load_json_file(relevance_json_path)
    results = data.get("semantic_relevance_scores", [])

    filtered_pairs = [
        (item["resume_id"], item["jd_id"])
        for item in results
        if item.get("semantic_match_label") in match_labels
    ]
    return filtered_pairs


In [34]:
relevance_map_file = os.path.join(Config.JSON_OUTPUT_SCORING_DIR, 'relevant_pairs.json')
matching_pairs = find_matching_pairs(relevance_map_file, match_labels=["strong", "medium", "weak"])

print(f"✅ Found {len(matching_pairs)} matching resume–JD pairs")
print(matching_pairs[:5])


✅ Found 7911 matching resume–JD pairs
[(32140087, 3904068266), (58428843, 3904068266), (92200491, 3904068266), (15581242, 3904068266), (23296286, 3904068266)]


In [35]:
def get_match_metadata(resume_id: str, jd_id: str, relevance_map: Dict[str, List[Dict]]) -> Dict:
    for entry in relevance_map.get("semantic_relevance_scores", []):
        if str(entry["resume_id"]) == str(resume_id) and str(entry["jd_id"]) == str(jd_id):
            return {
                "domain": entry.get("resume_domain", ""),
                "resume_jd_similarity": entry.get("resume_jd_similarity", 0.0),
                "semantic_match_label": entry.get("semantic_match_label", "weak")
            }
    raise ValueError(f"No semantic match metadata found for resume_id={resume_id}, jd_id={jd_id}")


## Phase 3: Scoring Loop

### Checkpoint Handling (JSON)

In [36]:
from datetime import datetime, timezone

def load_resume_checkpoint(path: str) -> int:
    if not os.path.exists(path):
        return 0
    with open(path, "r", encoding="utf-8") as file:
        return json.load(file).get("last_index", 0)

def save_resume_checkpoint(path: str, index: int):
    data = {
        "last_index": index,
        "timestamp": datetime.now(timezone.utc).isoformat()
    }
    save_json_output(data, path)
   

### Scoring a Single Resume-JD Pair

In [37]:
from typing import List, Dict, Optional, Any
from pathlib import Path

def find_record_by_id(
    records: List[Dict[str, Any]],
    record_id: str,
    id_field: str = "record_id"
) -> Optional[Dict[str, Any]]:
    """Finds a record in a list of dicts by a specified record ID field."""
    return next((r for r in records if str(r.get(id_field)) == str(record_id)), None)


In [39]:
from pathlib import Path

def score_resume_vs_jd(
    resume_id: str,
    jd_id: str,
    resume_json_dir: str,
    jd_json_dir: str,
    relevance_map: Dict[str, List[Dict]],
) -> Dict:
    # Load resume + JD parsed JSONs
    resume_files = list(Path(resume_json_dir).glob(f"resumes_{resume_id}_*.json"))
    jd_files = list(Path(jd_json_dir).glob(f"jds_{jd_id}_*.json"))

    if not resume_files:
        raise FileNotFoundError(f"No resume file found for resume_id: {resume_id}")
    if not jd_files:
        raise FileNotFoundError(f"No JD file found for jd_id: {jd_id}")

    resume_records = load_json_file(str(resume_files[0]))
    jd_records = load_json_file(str(jd_files[0]))


    resume_record = find_record_by_id(resume_records, resume_id)
    jd_record = find_record_by_id(jd_records, jd_id)


    if not resume_record:
        raise ValueError(f"Resume record_id {resume_id} not found in file {resume_files[0].name}")
    if not jd_record:
        raise ValueError(f"JD record_id {jd_id} not found in file {jd_files[0].name}")

    resume_data = resume_record.get("output_json", {})
    jd_data = jd_record.get("output_json", {})


    # Rule-based scores
    rule_scores = compute_all_rule_scores(resume_data, jd_data)

    # LLM-based scores
    llm_scores = score_with_llm(resume_data, jd_data, resume_id=resume_id, jd_id=jd_id)

    # Merge section-wise scores
    section_scores = merge_scores(rule_scores, llm_scores, merge_weights_config)

    # Compute final ATS score
    final_score_result = compute_total_ats_score(section_scores, section_weights_config)
    final_score = final_score_result["final_ats_score"]

    # Derive match quality from final ATS score
    if final_score >= 0.75:
        match_quality = "strong"
    elif final_score >= 0.5:
        match_quality = "medium"
    else:
        match_quality = "weak"

    # Enrich with metadata from semantic map
    match_meta = get_match_metadata(resume_id, jd_id, relevance_map)

    return {
        "resume_id": resume_id,
        "jd_id": jd_id,
        "domain": match_meta["domain"],
        "resume_jd_similarity": match_meta["resume_jd_similarity"],
        "semantic_match_label": match_meta["semantic_match_label"],  # Relevance from Phase 1
        "section_scores": section_scores,
        "match_quality": match_quality,  # ATS score-based match
        "final_ats_score": final_score
    }


### Test Single Resume-JD Pair

In [None]:
from pprint import pprint
import os

# === Setup file paths ===
relevance_map_file = os.path.join(Config.JSON_OUTPUT_SCORING_DIR, 'semantic_relevance_scores.json')
resume_json_dir = Config.JSON_OUTPUT_NORMALIZED_RESUME
jd_json_dir = Config.JSON_OUTPUT_NORMALIZED_JD

# === Load relevance map ===
relevance_map = load_json_file(relevance_map_file)

# === Pick a strong match from the top for testing ===
sample_pair = None
for item in relevance_map["semantic_relevance_scores"]:
    if item["semantic_match_label"] == "strong":
        sample_pair = (item["resume_id"], item["jd_id"])
        break

if not sample_pair:
    raise ValueError("No strong match found in relevance map to test.")

resume_id, jd_id = sample_pair
print(f"=== Testing score_resume_vs_jd() for resume_id={resume_id}, jd_id={jd_id} ===")

# === Run scoring ===
result = score_resume_vs_jd(
    resume_id=resume_id,
    jd_id=jd_id,
    resume_json_dir=resume_json_dir,
    jd_json_dir=jd_json_dir,
    relevance_map=relevance_map
)

# === Show result ===
pprint(result)


test_file = os.path.join(Config.JSON_OUTPUT_SCORING_DIR, 'test_phase3_scoring.json')

save_json_output(result, test_file)


### Score pairs in batches

In [40]:
import os
from typing import Dict, List
from pathlib import Path
from datetime import datetime
from pprint import pprint

def score_and_save_in_batches(
    relevance_path: str,
    resume_json_dir: str,
    jd_json_dir: str,
    output_dir: str,
    limit: Optional[int] = None
):
    os.makedirs(output_dir, exist_ok=True)
    checkpoint_path = os.path.join(output_dir, "ats_scoring_checkpoint.json")
    error_log_path = os.path.join(output_dir, "ats_scoring_errors.json")

    # Load checkpoint or start at 0
    start_index = load_resume_checkpoint(checkpoint_path)

    # Load relevance map
    relevance_data = load_json_file(relevance_path)
    relevant_pairs: List[Dict] = relevance_data.get("semantic_relevance_scores", [])

    if not relevant_pairs:
        raise ValueError("No valid resume–JD pairs found in relevance map.")

    total = len(relevant_pairs)
    end_index = total if limit is None else min(start_index + limit, total)
    print(f"🔁 Scoring {end_index - start_index} pairs starting from index {start_index}")

    processed = 0
    error_records = []

    for idx in range(start_index, end_index):
        pair = relevant_pairs[idx]
        resume_id = str(pair["resume_id"])
        jd_id = str(pair["jd_id"])

        try:
            result = score_resume_vs_jd(
                resume_id=resume_id,
                jd_id=jd_id,
                resume_json_dir=resume_json_dir,
                jd_json_dir=jd_json_dir,
                relevance_map=relevance_data
            )
            score = result["final_ats_score"]
            quality = result["match_quality"]
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

            output_name = f"{resume_id}_{jd_id}_{quality}_{timestamp}_{score:.2f}.json"
            output_path = os.path.join(output_dir, output_name)
            save_json_output(result, output_path)

            processed += 1
        except Exception as e:
            print(f"⚠️ Skipping pair (resume_id={resume_id}, jd_id={jd_id}): {e}")
            error_records.append({
                "resume_id": resume_id,
                "jd_id": jd_id,
                "error": str(e)
            })
            
    if error_records:
        existing_errors = []
        if os.path.exists(error_log_path):
            with open(error_log_path, "r", encoding="utf-8") as f:
                existing_errors = json.load(f)
        existing_errors.extend(error_records)
        save_json_output(existing_errors, error_log_path)

    # Save checkpoint
    save_resume_checkpoint(checkpoint_path, end_index)
    print(f"✅ Processed {processed} resume–JD pairs. New checkpoint index: {end_index}")


## Execute Scoring

In [41]:
score_and_save_in_batches(
    relevance_path=os.path.join(Config.JSON_OUTPUT_SCORING_DIR, "relevant_pairs.json"),
    resume_json_dir=Config.JSON_OUTPUT_NORMALIZED_RESUME,
    jd_json_dir=Config.JSON_OUTPUT_NORMALIZED_JD,
    output_dir=Config.JSON_OUTPUT_SCORING_DIR,
    limit=None
)


🔁 Scoring 7911 pairs starting from index 0
✅ Saved output to json_outputs_all_data/scoring\32140087_3904068266_medium_20250618_175527_0.61.json
✅ Saved output to json_outputs_all_data/scoring\58428843_3904068266_weak_20250618_175546_0.39.json
✅ Saved output to json_outputs_all_data/scoring\92200491_3904068266_weak_20250618_175602_0.34.json
✅ Saved output to json_outputs_all_data/scoring\15581242_3904068266_weak_20250618_175618_0.42.json
✅ Saved output to json_outputs_all_data/scoring\23296286_3904068266_weak_20250618_175633_0.38.json
✅ Saved output to json_outputs_all_data/scoring\98108571_3904068266_weak_20250618_175643_0.16.json
✅ Saved output to json_outputs_all_data/scoring\13348915_3904068266_weak_20250618_175702_0.44.json
✅ Saved output to json_outputs_all_data/scoring\28198029_3904068266_weak_20250618_175716_0.36.json
✅ Saved output to json_outputs_all_data/scoring\25839123_3904068266_weak_20250618_175734_0.27.json


You seem to be using the pipelines sequentially on GPU. In order to maximize efficiency please use a dataset


✅ Saved output to json_outputs_all_data/scoring\15620421_3904068266_weak_20250618_175749_0.49.json
✅ Saved output to json_outputs_all_data/scoring\16511249_3904068266_weak_20250618_175806_0.48.json
✅ Saved output to json_outputs_all_data/scoring\55104715_3904068266_weak_20250618_175821_0.42.json
✅ Saved output to json_outputs_all_data/scoring\85918100_3904068266_medium_20250618_175831_0.52.json
✅ Saved output to json_outputs_all_data/scoring\18062906_3904068266_weak_20250618_175843_0.33.json
✅ Saved output to json_outputs_all_data/scoring\31199035_3904068266_weak_20250618_175851_0.15.json
✅ Saved output to json_outputs_all_data/scoring\27607632_3904068266_weak_20250618_175900_0.35.json
✅ Saved output to json_outputs_all_data/scoring\51349448_3904068266_weak_20250618_175914_0.38.json
✅ Saved output to json_outputs_all_data/scoring\19156751_3904068266_weak_20250618_175925_0.44.json
✅ Saved output to json_outputs_all_data/scoring\34131484_3904068266_weak_20250618_175937_0.37.json
✅ Saved 