In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import json
import re

In [None]:
# read dataset
df = pd.read_csv('../data/sgjobdata.csv.xz')
df.head()

In [None]:
# convert category in json format to a string format

def categories_json_to_string(df, col="categories", sep="; "):
    out = df.copy()

    def parse_and_join(x):
        if pd.isna(x):
            return None
        if isinstance(x, str):
            try:
                data = json.loads(x)
            except json.JSONDecodeError:
                return None
        else:
            data = x

        if isinstance(data, list):
            return sep.join(
                d.get("category") for d in data
                if isinstance(d, dict) and "category" in d
            )

        return None

    out[col] = out[col].map(parse_and_join)
    return out

###
def clean_title_light(t: str) -> str:
    t = (t or "").strip().lower()
    # Keep bracket content, seniority, domain hints; remove only separators/noisy symbols
    t = re.sub(r"[_#|]+", " ", t)
    t = re.sub(r"\s+", " ", t).strip()
    return t


In [None]:
df.info()
df_categories_str = categories_json_to_string(df)
df_categories_str = df_categories_str.dropna(subset=["title"])
df_categories_str["title_analysis"] = df_categories_str["title"] + "-" + df_categories_str["categories"]
df_categories_str["title_analysis"] = df_categories_str["title_analysis"].map(clean_title_light)
df_categories_str.head()

In [None]:
def explode_categories(df, col="categories"):
    out = df.copy()

    out[col] = (
        out[col]
        .dropna()
        .map(lambda x: json.loads(x) if isinstance(x, str) else x)
    )

    out = out.explode(col, ignore_index=True)

    out["category_id"] = out[col].map(lambda d: d.get("id") if isinstance(d, dict) else None)
    out["category"]    = out[col].map(lambda d: d.get("category") if isinstance(d, dict) else None)

    return out.drop(columns=[col])

df_exploded = explode_categories(df)
df_exploded = df_exploded.dropna(subset=["category_id", "category"])
df_exploded.head()

In [None]:
# if company name contains 'kpmg', show list of such rows
df_exploded[df_exploded['postedCompany_name'].str.contains('kpmg', case=False, na=False)].sort_values('metadata_originalPostingDate', ascending=False)


In [None]:
df_exploded[df_exploded['category'] == 'Accounting / Auditing / Taxation'][['category','postedCompany_name','metadata_jobPostId','numberOfVacancies']] \
    .groupby('postedCompany_name').numberOfVacancies.sum().sort_values(ascending=False)

## CLean Job Title data & use it to predict top 3 skillsets using LLM

In [None]:
import re
import json
import time
import numpy as np
import pandas as pd
from tqdm import tqdm

from sentence_transformers import SentenceTransformer
from sklearn.cluster import MiniBatchKMeans

import torch
from transformers import (
    AutoConfig,
    AutoTokenizer,
    AutoModelForCausalLM,
    AutoModelForSeq2SeqLM,
    pipeline,
)
from transformers.pipelines import PIPELINE_REGISTRY


# ============================================================
# A) Cleaning (minimal, preserve nuance)
# ============================================================
def clean_title_light(t: str) -> str:
    t = (t or "").strip().lower()
    # Keep bracket content, seniority, domain hints; remove only separators/noisy symbols
    t = re.sub(r"[_#|]+", " ", t)
    t = re.sub(r"\s+", " ", t).strip()
    return t


# ============================================================
# B) Embedding + scalable "loose" clustering
# ============================================================
def embed_unique_titles(unique_titles: list[str],
                        model_name: str = "sentence-transformers/all-MiniLM-L6-v2",
                        device: str = "cpu",
                        batch_size: int = 256) -> np.ndarray:
    embedder = SentenceTransformer(model_name, device=device)
    emb = embedder.encode(
        unique_titles,
        batch_size=batch_size,
        show_progress_bar=True,
        convert_to_numpy=True,
        normalize_embeddings=True
    )
    return emb


def cluster_embeddings_loose(embeddings: np.ndarray,
                             n_clusters: int = 800,
                             batch_size: int = 8192,
                             random_state: int = 42) -> np.ndarray:
    km = MiniBatchKMeans(
        n_clusters=n_clusters,
        batch_size=batch_size,
        random_state=random_state,
        n_init="auto"
    )
    return km.fit_predict(embeddings)


def canonical_title_per_cluster(u: pd.DataFrame) -> pd.DataFrame:
    """
    u must contain: job_title_cleaned, cluster_id, count
    Returns canonical mapping: cluster_id -> clustered_job_title (most frequent in cluster)
    """
    canon = (
        u.sort_values(["cluster_id", "count"], ascending=[True, False])
         .groupby("cluster_id", as_index=False)
         .head(1)[["cluster_id", "job_title_cleaned"]]
         .rename(columns={"job_title_cleaned": "clustered_job_title"})
    )
    return canon


# ============================================================
# C) LLM backend (supports seq2seq and causal, robust)
# ============================================================
def _supports_pipeline(task_name: str) -> bool:
    try:
        PIPELINE_REGISTRY.check_task(task_name)
        return True
    except KeyError:
        return False


class SkillInferencer:
    """
    A robust local inferencer that works on Mac CPU:
    - Supports causal instruct models (text-generation)
    - Supports seq2seq models (Flan-T5) using pipeline if available,
      otherwise uses direct model.generate() to avoid pipeline task errors.
    """
    def __init__(self,
                 hf_model_name: str,
                 backend: str = "auto",  # "auto" | "seq2seq" | "causal"
                 device: str | None = None):
        self.hf_model_name = hf_model_name
        self.backend = backend
        self.device = device  # "cpu" or "cuda" (we assume cpu on M1)
        self.tokenizer = None
        self.model = None
        self.gen_pipe = None
        self._init_model()

    def _init_model(self):
        cfg = AutoConfig.from_pretrained(self.hf_model_name)
        model_type = getattr(cfg, "model_type", "").lower()

        if self.backend == "auto":
            # Heuristic: t5/mt5/bart/pegasus => seq2seq; else causal
            if model_type in {"t5", "mt5", "bart", "mbart", "pegasus"}:
                self.backend = "seq2seq"
            else:
                self.backend = "causal"

        if self.device is None:
            # M1: CPU
            self.device = "cuda" if torch.cuda.is_available() else "cpu"

        self.tokenizer = AutoTokenizer.from_pretrained(self.hf_model_name)

        if self.backend == "seq2seq":
            self.model = AutoModelForSeq2SeqLM.from_pretrained(self.hf_model_name)
            self.model.to(self.device)
            # Use pipeline only if task exists; otherwise we use generate() directly
            if _supports_pipeline("text2text-generation"):
                pipe_device = 0 if self.device == "cuda" else -1
                self.gen_pipe = pipeline(
                    "text2text-generation",
                    model=self.model,
                    tokenizer=self.tokenizer,
                    device=pipe_device
                )
            else:
                self.gen_pipe = None

        elif self.backend == "causal":
            self.model = AutoModelForCausalLM.from_pretrained(
                self.hf_model_name,
                torch_dtype=torch.float16 if self.device == "cuda" else torch.float32
            )
            self.model.to(self.device)
            pipe_device = 0 if self.device == "cuda" else -1
            self.gen_pipe = pipeline(
                "text-generation",
                model=self.model,
                tokenizer=self.tokenizer,
                device=pipe_device
            )
        else:
            raise ValueError("backend must be 'auto', 'seq2seq', or 'causal'")

    @staticmethod
    def _prompt(title: str) -> str:
        return (
            "Return JSON only.\n"
            "Task: Infer the top 5 core skills required for the job title.\n"
            f"Job title: {title}\n"
            "Rules:\n"
            "- Exactly 5 skills\n"
            "- Short noun phrases (1–3 words)\n"
            "- No explanation\n"
            'JSON schema: {"skills":["skill1","skill2","skill3","skill4","skill5"]}'
        )

    @staticmethod
    def _parse_json(text: str) -> dict:
        # First try: extract first {...}
        s = text.find("{")
        e = text.rfind("}") + 1
        if s != -1 and e != -1 and e > s:
            candidate = text[s:e]
            try:
                obj = json.loads(candidate)
                skills = obj.get("skills", [])
                if isinstance(skills, list):
                    skills = [re.sub(r"\s+", " ", str(x).strip()) for x in skills if str(x).strip()]
                    skills = skills[:3]
                    while len(skills) < 3:
                        skills.append("unknown")
                    return {"skills": skills}
            except Exception:
                pass

        # Fallback: pull quoted strings
        items = [x for x in re.findall(r'"([^"]+)"', text) if x.lower() != "skills"]
        items = [re.sub(r"\s+", " ", x.strip()) for x in items if x.strip()]
        items = items[:3]
        while len(items) < 3:
            items.append("unknown")
        return {"skills": items}

    def infer(self, title: str, max_new_tokens: int = 80) -> dict:
        prompt = self._prompt(title)

        if self.backend == "causal":
            out = self.gen_pipe(
                prompt,
                max_new_tokens=max_new_tokens,
                do_sample=False,
                temperature=0.0,
                return_full_text=False
            )[0]["generated_text"]
            return self._parse_json(out)

        # seq2seq
        if self.gen_pipe is not None:
            out = self.gen_pipe(prompt, max_new_tokens=max_new_tokens, do_sample=False)[0]["generated_text"]
            return self._parse_json(out)

        # seq2seq fallback: direct generate() (avoids pipeline task errors)
        inputs = self.tokenizer(prompt, return_tensors="pt", truncation=True).to(self.device)
        with torch.no_grad():
            gen_ids = self.model.generate(**inputs, max_new_tokens=max_new_tokens)
        out = self.tokenizer.decode(gen_ids[0], skip_special_tokens=True)
        return self._parse_json(out)


# ============================================================
# D) End-to-end scalable function (1M rows safe)
# ============================================================
def build_title_skill_table(
    df: pd.DataFrame,
    raw_col: str,
    # embedding
    embed_model_name: str = "sentence-transformers/all-MiniLM-L6-v2",
    embed_device: str = "cpu",
    embed_batch_size: int = 256,
    # clustering
    n_clusters: int = 800,
    km_batch_size: int = 8192,
    # HF model
    hf_model_name: str = "Qwen/Qwen2.5-1.5B-Instruct",
    llm_backend: str = "auto",         # "auto"|"causal"|"seq2seq"
    skills_per: str = "cluster",       # "cluster" recommended
    # performance
    max_new_tokens: int = 60,
) -> pd.DataFrame:
    """
    Output columns (exactly):
    a. job_title_raw (pre-clean)
    b. job_title_cleaned
    c. clustered_job_title
    d. top_3_skills_json (dict)
    """
    if raw_col not in df.columns:
        raise ValueError(f"Missing required column: {raw_col}")
    if skills_per not in {"cluster", "unique_title"}:
        raise ValueError("skills_per must be 'cluster' or 'unique_title'")

    t0 = time.time()

    # 1) Clean
    df_work = df.copy()
    df_work["job_title_cleaned"] = df_work[raw_col].map(clean_title_light)

    # 2) Unique + freq (for canonical titles)
    u = (
        df_work.groupby("job_title_cleaned")
        .size()
        .reset_index(name="count")
        .sort_values("count", ascending=False)
        .reset_index(drop=True)
    )
    unique_titles = u["job_title_cleaned"].tolist()

    print(f"Rows: {len(df_work):,}")
    print(f"Unique cleaned titles: {len(unique_titles):,}")

    # 3) Embed uniques
    print("\n[1/4] Embedding unique titles...")
    emb = embed_unique_titles(unique_titles, embed_model_name, embed_device, embed_batch_size)
    print("Embeddings:", emb.shape, emb.dtype)

    # 4) Cluster uniques
    print("\n[2/4] Clustering (MiniBatchKMeans)...")
    u["cluster_id"] = cluster_embeddings_loose(emb, n_clusters=n_clusters, batch_size=km_batch_size)

    # 5) Canonical per cluster
    canon = canonical_title_per_cluster(u)
    u = u.merge(canon, on="cluster_id", how="left")

    # 6) Load local LLM backend
    print("\n[3/4] Loading local HF model...")
    inferencer = SkillInferencer(hf_model_name=hf_model_name, backend=llm_backend, device=None)
    print("LLM backend selected:", inferencer.backend)

    # 7) Decide inference targets
    if skills_per == "cluster":
        targets = canon["clustered_job_title"].drop_duplicates().tolist()
        key_col = "clustered_job_title"
        print(f"Skill inference targets (clusters): {len(targets):,}")
    else:
        targets = unique_titles
        key_col = "job_title_cleaned"
        print(f"Skill inference targets (unique titles): {len(targets):,}")

    # 8) Infer skills once per target (cache)
    print("\n[4/4] Inferring top-3 skills (cached)...")
    cache = {}
    for t in tqdm(targets):
        if t not in cache:
            cache[t] = inferencer.infer(t, max_new_tokens=max_new_tokens)

    u["top_3_skills_json"] = u[key_col].map(cache)

    # 9) Map back to 1M rows
    df_out = df_work.merge(
        u[["job_title_cleaned", "cluster_id", "clustered_job_title", "top_3_skills_json"]],
        on="job_title_cleaned",
        how="left"
    )

    df_out = df_out[[raw_col, "job_title_cleaned", "clustered_job_title", "top_3_skills_json"]].rename(
        columns={raw_col: "job_title_raw"}
    )

    print(f"\nDone. Total seconds: {round(time.time() - t0, 1)}")
    return df_out


# ============================================================
# E) Run a fail-fast small test first
# ============================================================
def sanity_test(df: pd.DataFrame, raw_col: str) -> pd.DataFrame:
    small = df[[raw_col]].dropna().head(50).copy()
    return build_title_skill_table(
        df=small,
        raw_col=raw_col,
        n_clusters=10,
        skills_per="cluster",
        hf_model_name="Qwen/Qwen2.5-0.5B-Instruct",  # fast
        llm_backend="causal",
        max_new_tokens=50
    )

In [25]:
_ = sanity_test(df_categories_str, raw_col="title_analysis")

df_out = build_title_skill_table(
    df=df_categories_str,
    raw_col="title_analysis",
    n_clusters=300,                 # 50 is too coarse for real job titles
    skills_per="cluster",
    hf_model_name="Qwen/Qwen2.5-1.5B-Instruct",
    llm_backend="causal",
    max_new_tokens=60
)

 70%|███████   | 211/300 [1:01:55<1:30:27, 60.99s/it]Both `max_new_tokens` (=60) and `max_length`(=20) seem to have been set. `max_new_tokens` will take precedence. Please refer to the documentation for more information. (https://huggingface.co/docs/transformers/main/en/main_classes/text_generation)
 71%|███████   | 212/300 [1:02:09<1:08:39, 46.81s/it]Both `max_new_tokens` (=60) and `max_length`(=20) seem to have been set. `max_new_tokens` will take precedence. Please refer to the documentation for more information. (https://huggingface.co/docs/transformers/main/en/main_classes/text_generation)
 71%|███████   | 213/300 [1:02:22<53:08, 36.65s/it]  Both `max_new_tokens` (=60) and `max_length`(=20) seem to have been set. `max_new_tokens` will take precedence. Please refer to the documentation for more information. (https://huggingface.co/docs/transformers/main/en/main_classes/text_generation)
 71%|███████▏  | 214/300 [1:02:36<42:38, 29.75s/it]Both `max_new_tokens` (=60) and `max_length`(=


Done. Total seconds: 5195.5


In [26]:
df_out.to_parquet('sgjobdata_titleskills_v2_5skills.parquet', index=False)