In [None]:
import os
import re
from dotenv import load_dotenv
from langchain_neo4j import Neo4jGraph
from pathlib import Path
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage


In [2]:
env_path = Path(".env").resolve()         
load_dotenv(env_path)


True

In [7]:
from __future__ import annotations

import json
import logging
import os
import re
import numpy as np
from dataclasses import dataclass
from difflib import SequenceMatcher
from langchain_openai import ChatOpenAI
from pathlib import Path
from typing import Any, Dict, List, Optional, Sequence

In [3]:
graph = Neo4jGraph(refresh_schema=False)


In [8]:
try:  # Optional dependency guard for clearer import errors downstream
    from langchain_neo4j import Neo4jGraph
except ImportError as exc:  # pragma: no cover - provides actionable hint if package missing
    raise ImportError(
        "langchain-neo4j is required. Install it with `pip install langchain-neo4j`."
    ) from exc

try:  # Same idea for langchain OpenAI helpers
    from langchain_openai import ChatOpenAI, OpenAIEmbeddings
except ImportError as exc:  # pragma: no cover - provides actionable hint if package missing
    raise ImportError(
        "langchain-openai is required. Install it with `pip install langchain-openai`."
    ) from exc

from langchain_core.messages import HumanMessage, SystemMessage

logger = logging.getLogger(__name__)

In [9]:
def _load_environment() -> None:
    env_path = Path(__file__).resolve().parent / ".env"
    if env_path.exists():
        load_dotenv(env_path)
    else:
        load_dotenv()


In [10]:
@dataclass
class SkillRecord:
    """Lightweight container for skill descriptors returned from the graph."""

    element_id: str
    name: str
    description: str
    importance: float
    level: float

    @property
    def text(self) -> str:
        """Canonical text representation used for embedding comparisons."""
        if self.description:
            return f"{self.name}: {self.description}"
        return self.name


class SkillGraphClient:
    """Wrapper around Neo4j queries used by the RAG pipeline."""

    CODE_PATTERN = re.compile(r"^\d{2}-\d{4}\.\d{2}$")

    def __init__(
        self,
        graph: Optional[Neo4jGraph] = None,
        *,
        database: Optional[str] = None,
    ) -> None:
        _load_environment()

        if graph is not None:
            self.graph = graph
        else:
            uri = os.getenv("NEO4J_URI")
            username = os.getenv("NEO4J_USERNAME")
            password = os.getenv("NEO4J_PASSWORD")
            database = database or os.getenv("NEO4J_DATABASE", "neo4j")

            if not all([uri, username, password]):
                raise EnvironmentError(
                    "Neo4j credentials are missing. Set NEO4J_URI, NEO4J_USERNAME, and NEO4J_PASSWORD."
                )

            self.graph = Neo4jGraph(url=uri, username=username, password=password, database=database)

    # ---------------------------------------------------------------------
    # Occupation lookups
    # ---------------------------------------------------------------------
    def resolve_occupation(self, title_or_code: str, limit: int = 5) -> List[Dict[str, Any]]:
        """Return candidate occupations for a user supplied title or O*NET code."""
        candidate_title = title_or_code.strip()
        params: Dict[str, Any] = {"title": candidate_title, "limit": limit * 5}

        if self.CODE_PATTERN.match(candidate_title):
            logger.debug("Resolving occupation by exact code: %s", candidate_title)
            query = (
                "MATCH (o:Occupation {code: $code})\n"
                "RETURN o.code AS code, o.title AS title, o.description AS description"
            )
            return self.graph.query(query, {"code": candidate_title})

        query = (
            "MATCH (o:Occupation)\n"
            "WHERE toLower(o.title) CONTAINS toLower($title)\n"
            "   OR toLower($title) CONTAINS toLower(o.title)\n"
            "RETURN o.code AS code, o.title AS title, o.description AS description\n"
            "LIMIT toInteger($limit)"
        )
        records = self.graph.query(query, params)

        if not records:
            logger.debug("No direct substring match for '%s'; broadening search via all occupations.", candidate_title)
            # Wider scan – still limited to keep latency reasonable
            broad_query = (
                "MATCH (o:Occupation)\n"
                "RETURN o.code AS code, o.title AS title, o.description AS description\n"
                "LIMIT toInteger($limit)"
            )
            records = self.graph.query(broad_query, {"limit": limit * 20})

        scored: List[Dict[str, Any]] = []
        for record in records:
            ratio = SequenceMatcher(None, candidate_title.lower(), record["title"].lower()).ratio()
            scored.append({**record, "score": ratio})

        scored.sort(key=lambda item: item["score"], reverse=True)
        return scored[:limit]

    # ---------------------------------------------------------------------
    # Skill retrieval
    # ---------------------------------------------------------------------
    def occupation_skills(self, occupation_code: str, *, limit: Optional[int] = None) -> List[SkillRecord]:
        """Fetch skill descriptors tied to an occupation ordered by importance."""
        params = {"code": occupation_code}
        cypher = (
            "MATCH (o:Occupation {code: $code})-[r:REQUIRES_SKILL]->(skill:ContentElement)\n"
            "WHERE 'Skill' IN labels(skill)\n"
            "RETURN skill.element_id AS element_id,\n"
            "       skill.name AS name,\n"
            "       coalesce(skill.description, '') AS description,\n"
            "       toFloat(coalesce(r.importance, 0)) AS importance,\n"
            "       toFloat(coalesce(r.level, 0)) AS level\n"
            "ORDER BY importance DESC"
        )

        if limit:
            cypher += "\nLIMIT toInteger($limit)"
            params["limit"] = limit

        records = self.graph.query(cypher, params)
        return [
            SkillRecord(
                element_id=record["element_id"],
                name=record["name"],
                description=record["description"],
                importance=float(record["importance"] or 0.0),
                level=float(record["level"] or 0.0),
            )
            for record in records
        ]

In [None]:
class SkillMatcher:
    """Embeds and aligns free-form user skills against graph descriptors."""

    def __init__(self, embedder: Optional[OpenAIEmbeddings] = None) -> None:
        _load_environment()
        self.embedder = embedder or OpenAIEmbeddings(model="text-embedding-3-small")
        self._skill_embedding_cache: Dict[str, np.ndarray] = {}

    def match(
        self,
        user_skill_texts: Sequence[str],
        graph_skills: Sequence[SkillRecord],
        *,
        similarity_threshold: float = 0.7,
    ) -> Dict[str, List[Dict[str, Any]]]:
        """Return skills the user already covers and those likely missing."""
        if not graph_skills:
            return {"matched": [], "missing": []}

        if not user_skill_texts:
            missing_payload = [
                {
                    "skill": skill,
                    "score": None,
                    "matched_user_skill": None,
                }
                for skill in graph_skills
            ]
            return {"matched": [], "missing": missing_payload}

        # Deduplicate user skill phrases while keeping order
        seen = set()
        normalised_user_skills = []
        for skill_text in user_skill_texts:
            key = skill_text.strip().lower()
            if key and key not in seen:
                normalised_user_skills.append(skill_text.strip())
                seen.add(key)

        user_vectors = self.embedder.embed_documents(normalised_user_skills)
        user_matrix = np.asarray(user_vectors, dtype=np.float32)

        skill_matrix = []
        for skill in graph_skills:
            vector = self._cached_skill_embedding(skill)
            skill_matrix.append(vector)
        skill_matrix_np = np.asarray(skill_matrix, dtype=np.float32)

        user_norms = np.linalg.norm(user_matrix, axis=1, keepdims=True)
        skill_norms = np.linalg.norm(skill_matrix_np, axis=1, keepdims=True)
        user_norms[user_norms == 0] = 1e-12
        skill_norms[skill_norms == 0] = 1e-12
        similarity_matrix = (user_matrix / user_norms) @ (skill_matrix_np / skill_norms).T

        best_user_indices = np.argmax(similarity_matrix, axis=0)
        best_scores = similarity_matrix[best_user_indices, np.arange(similarity_matrix.shape[1])]

        matched: List[Dict[str, Any]] = []
        missing: List[Dict[str, Any]] = []
        for idx, skill in enumerate(graph_skills):
            score = float(best_scores[idx])
            best_user_skill = normalised_user_skills[int(best_user_indices[idx])]
            payload = {
                "skill": skill,
                "score": round(score, 3),
                "matched_user_skill": best_user_skill,
            }
            if score >= similarity_threshold:
                matched.append(payload)
            else:
                payload["matched_user_skill"] = None
                missing.append(payload)

        matched.sort(key=lambda item: (item["skill"].importance, item["score"]), reverse=True)
        missing.sort(key=lambda item: item["skill"].importance, reverse=True)
        return {"matched": matched, "missing": missing}

    # ------------------------------------------------------------------ #
    def _cached_skill_embedding(self, skill: SkillRecord) -> np.ndarray:
        cached = self._skill_embedding_cache.get(skill.element_id)
        if cached is not None:
            return cached
        embedding = self.embedder.embed_query(skill.text)
        vector = np.asarray(embedding, dtype=np.float32)
        self._skill_embedding_cache[skill.element_id] = vector
        return vector


In [None]:
class SkillGraphRAG:
    """High-level orchestrator that produces grounded skill profiles."""

    def __init__(
        self,
        *,
        graph_client: Optional[SkillGraphClient] = None,
        matcher: Optional[SkillMatcher] = None,
        llm: Optional[ChatOpenAI] = None,
        similarity_threshold: float = 0.72,
    ) -> None:
        _load_environment()
        self.graph_client = graph_client or SkillGraphClient()
        self.matcher = matcher or SkillMatcher()
        self.similarity_threshold = similarity_threshold

        self.llm = llm or ChatOpenAI(model="gpt-4o-mini", temperature=0.2)

    def generate_skill_profile(
        self,
        parsed_resume: Dict[str, Any],
        target_roles: Sequence[str],
        *,
        qna: Optional[Sequence[Dict[str, str]]] = None,
        max_skills_per_role: int = 25,
        summarise: bool = True,
    ) -> Dict[str, Any]:
        """Build a structured, graph-grounded skill profile for the user."""
        technical_skills = parsed_resume.get("technical_skills", [])
        soft_skills = parsed_resume.get("soft_skills", [])
        user_skill_texts = [*technical_skills, *soft_skills]

        role_profiles: List[Dict[str, Any]] = []
        for requested_role in target_roles:
            occupation_matches = self.graph_client.resolve_occupation(requested_role, limit=3)
            if not occupation_matches:
                role_profiles.append(
                    {
                        "requested_role": requested_role,
                        "occupation_match": None,
                        "skills_covered": [],
                        "skill_gaps": [],
                        "all_skills": [],
                    }
                )
                continue

            best_match = occupation_matches[0]
            occ_code = best_match["code"]
            graph_skills = self.graph_client.occupation_skills(occ_code, limit=max_skills_per_role)
            matching = self.matcher.match(user_skill_texts, graph_skills, similarity_threshold=self.similarity_threshold)

            role_profiles.append(
                {
                    "requested_role": requested_role,
                    "occupation_match": best_match,
                    "alternate_matches": occupation_matches[1:],
                    "skills_covered": matching["matched"],
                    "skill_gaps": matching["missing"],
                    "all_skills": graph_skills,
                }
            )

        analysis = None
        if summarise:
            analysis = self._summarise_profile(role_profiles, user_skill_texts, qna=qna, parsed_resume=parsed_resume)

        return {
            "user_skills": user_skill_texts,
            "profiles": role_profiles,
            "analysis": analysis,
        }

    # ------------------------------------------------------------------
    def _summarise_profile(
        self,
        role_profiles: Sequence[Dict[str, Any]],
        user_skills: Sequence[str],
        *,
        qna: Optional[Sequence[Dict[str, str]]] = None,
        parsed_resume: Optional[Dict[str, Any]] = None,
    ) -> str:
        """Create a narrative summary grounded in the knowledge graph facts."""
        summary_payload = {
            "user_skills": list(user_skills),
            "qna": list(qna) if qna else [],
            "profiles": [
                {
                    "requested_role": profile["requested_role"],
                    "occupation_match": profile["occupation_match"],
                    "top_skills_covered": [
                        {
                            "name": item["skill"].name,
                            "importance": item["skill"].importance,
                            "score": item["score"],
                        }
                        for item in profile["skills_covered"][:8]
                    ],
                    "top_skill_gaps": [
                        {
                            "name": item["skill"].name,
                            "importance": item["skill"].importance,
                        }
                        for item in profile["skill_gaps"][:8]
                    ],
                }
                for profile in role_profiles
            ],
        }

        if parsed_resume:
            summary_payload["resume_snapshot"] = {
                "experience": parsed_resume.get("experience"),
                "education": parsed_resume.get("education"),
                "summary": parsed_resume.get("summary"),
            }

        messages = [
            SystemMessage(
                content=(
                    "You are an AI career coach that must ground every assessment in the provided "
                    "knowledge graph facts. Do not hallucinate new skills. When discussing gaps, reference "
                    "the occupation title and highlight why the skill matters."
                )
            ),
            HumanMessage(
                content=(
                    "Using the structured context below, craft a concise skill profile that\n"
                    "1. Highlights the strongest validated skills.\n"
                    "2. Lists the most critical skill gaps per role with actionable guidance.\n"
                    "3. Mentions any notable preferences from the questionnaire if available.\n"
                    "Context: " + json.dumps(summary_payload, indent=2)
                )
            ),
        ]
        response = self.llm.invoke(messages)
        return response.content

In [None]:
def build_skill_profile(
    parsed_resume: Dict[str, Any],
    target_roles: Sequence[str],
    *,
    qna: Optional[Sequence[Dict[str, str]]] = None,
    summarise: bool = True,
    similarity_threshold: float = 0.72,
    max_skills_per_role: int = 25,
) -> Dict[str, Any]:
    """Convenience helper to run the end-to-end SkillGraph RAG pipeline."""
    rag = SkillGraphRAG(similarity_threshold=similarity_threshold)
    return rag.generate_skill_profile(
        parsed_resume,
        target_roles,
        qna=qna,
        max_skills_per_role=max_skills_per_role,
        summarise=summarise,
    )


def _demo() -> None:  # pragma: no cover - utility for manual verification
    """Quick demonstration when executing this module directly."""
    sample_resume = {
        "technical_skills": ["Python", "Data Analysis", "Machine Learning", "SQL"],
        "soft_skills": ["Collaboration", "Communication"],
        "experience": "2 years as a junior data analyst focusing on reporting and dashboarding.",
        "education": ["B.S. in Information Systems"],
    }
    target_roles = ["Data Scientist", "Business Intelligence Analyst"]

    rag = SkillGraphRAG()
    profile = rag.generate_skill_profile(sample_resume, target_roles, summarise=True)

    print(json.dumps({
        "profiles": profile["profiles"],
        "analysis": profile["analysis"],
    }, indent=2))


if __name__ == "__main__":  # pragma: no cover - manual execution guard
    try:
        _demo()
    except Exception as exc:  # Basic diagnostic when running ad hoc from CLI
        logger.error("Demo execution failed: %s", exc)
        raise
