In [1]:
from enum import Enum, auto
from typing import Optional
from openai import OpenAI
from __future__ import annotations
from dataclasses import dataclass, field
from typing import List, Dict, Any, AsyncGenerator
from collections import Counter
import re
import asyncio 

with open('apikeys.txt', 'r', encoding='utf-8') as file:
    key1 = file.read()

In [2]:
class DemandType(Enum):
    FILE_QUERY = auto()
    QA = auto()

@dataclass
class Document:
    """Internal representation of a document in your KB."""
    file_id: str
    file_name: str
    title: str
    summary: str = ""
    keywords: List[str] = field(default_factory=list)
    text_length: int = 0
    file_type: str = "pdf"     # text / pdf / image / others
    extra_fields: Dict[str, Any] = field(default_factory=dict)

@dataclass
class QueryResult:
    """Search result after enrichment."""
    doc_id: str
    title: str
    relevance: float  # in percent
    summary: str
    key_fields_summary: str
    high_freq_terms: Dict[str, int]

@dataclass
class LLMConfig:
    """LLM-related runtime config."""
    model: str = "deepseek-chat"
    max_tokens: int = 512
    api_key: Optional[str] = None
    temperature: float = 0.2
    base_url: str = "https://api.deepseek.com"

def mock_documents() -> List[Document]:
    return [
        Document(
            file_id="75ac7d52",
            file_name="Attention Is All You Need.pdf",
            title="Attention Is All You Need: The Transformer Revolution in Sequence Modeling",
            summary=(
                "This paper introduces the Transformer, a neural network architecture based solely on attention, "
                "removing recurrence and convolution, enabling high parallelization and SOTA results on WMT14."
            ),
            keywords=[
                "transformer",
                "attention",
                "sequence",
                "machine translation",
                "parallelization",
            ],
            text_length=39448,
            file_type="pdf",
            extra_fields={"author": "Vaswani et al.", "year": "2017"},
        ),
        Document(
            file_id="9f3a21aa",
            file_name="Company Policy 2025.pdf",
            title="Company Policy and Approval Flow 2025",
            summary=(
                "This document describes internal approval flows, reimbursement rules, HR processes, and policy updates for 2025."
            ),
            keywords=["policy", "approval", "hr", "reimbursement"],
            text_length=6800,
            file_type="pdf",
            extra_fields={"department": "HR", "created_at": "2025-10-20"},
        ),
        Document(
            file_id="abcdef",
            file_name="Deep Reinforcement Curriculum",
            title="Deep Reinforcement Learning & Curriculum Learning",
            summary=(
                "This fake document introduces deep learning, reinforcement learning, curriculum learning."
            ),
            keywords=["deep learning", "reinforcement learning", "curriculum"],
            text_length=1000,
            file_type="pdf",
            extra_fields={},
        ),
    ]

In [4]:
class SemanticService:
    """
    A small semantic layer that can:
    1. Decide intent (FILE_QUERY vs QA) automatically.
    2. Always search documents first.
    3. If it's a file query -> return ranked docs.
    4. If it's QA -> call LLM with retrieved context.
    """

    def __init__(self, documents: Optional[List[Document]] = None):
        self._documents: List[Document] = documents or mock_documents()

        self._current_demand_raw: str = ""
        self._current_demand_type: Optional[DemandType] = None
        self._current_query_results: List[QueryResult] = []
        self._stopped: bool = False

        self._llm_config: LLMConfig = LLMConfig(api_key=key1)
        self._client = self._build_client(self._llm_config)

    # ======================
    # Public API
    # ======================

    def set_demand(self, user_input: str) -> bool:
        self._stopped = False
        self._current_demand_raw = user_input.strip()
        self._current_demand_type = self._classify_demand(user_input)
        self._current_query_results = self._search_and_enrich(user_input)
        return True

    def stop_current_task(self) -> bool:
        """Frontend can call this to stop streaming."""
        self._stopped = True
        return True

    def redo_task(self, user_input: str) -> bool:
        """Re-run with new input."""
        return self.set_demand(user_input)

    def get_query_file(self) -> List[str]:
        """Return only the titles for UI list."""
        return [r.title for r in self._current_query_results]

    def get_qualified_files_info(self, top_n: int = 5) -> List[Dict[str, str]]:
        """Return structured info for top N results."""
        results: List[Dict[str, str]] = []
        for r in self._current_query_results[:top_n]:
            results.append({
                "doc_id": r.doc_id,
                "title": r.title,
                "relevance_percent": f"{r.relevance:.2f}%",
                "summary": r.summary,
                "key_fields_summary": r.key_fields_summary,
                "high_freq_terms": ", ".join([f"{k}:{v}" for k, v in r.high_freq_terms.items()]),
            })
        return results

    def get_query_task_result(self) -> List[Dict[str, str]]:
        """Return all results (for debugging / inspection)."""
        return self.get_qualified_files_info(top_n=len(self._current_query_results))

    def get_LLM_reply(self) -> Any:
        if not self._current_demand_raw:
            return {"error": "no demand set"}

        if self._current_demand_type == DemandType.FILE_QUERY:
            return {
                "type": "file_query",
                "query": self._current_demand_raw,
                "results": self.get_qualified_files_info(top_n=10),
            }
        
        if not self._llm_config.api_key:
            return {"error": "QA without api key"}
    
        context_text = self._build_context_from_results(self._current_query_results)
        prompt = self._build_llm_prompt(
            query=self._current_demand_raw,
            context=context_text
        )

        try:
            resp = self._client.chat.completions.create(
                model=self._llm_config.model,
                messages=[
                    {"role": "system", "content": "You are a helpful assistant."},
                    {"role": "user", "content": prompt},
                ],
                max_tokens=self._llm_config.max_tokens,
                temperature=self._llm_config.temperature,
            )
            reply_text = resp.choices[0].message.content.strip()
        except Exception as e:
            reply_text = f"(LLM call failed) {e}"

        return {
            "type": "qa",
            "prompt_sent": prompt,
            "reply": reply_text,
        }

    async def stream_LLM_reply(self) -> AsyncGenerator[str, None]:
        """
        Async streaming version.
        - respects stop flag
        - works even with fake LLM reply
        """
        reply_obj = self.get_LLM_reply()
        text = reply_obj.get("reply", "")
        for ch in text:
            if self._stopped:
                break
            yield ch
            await asyncio.sleep(0.01)

    def set_llm_config(
        self,
        *,
        model: Optional[str] = None,
        max_tokens: Optional[int] = None,
        api_key: Optional[str] = None,
        temperature: Optional[float] = None,
        base_url: Optional[str] = None,
    ) -> bool:
        """Update LLM runtime config and rebuild client if needed."""
        if model is not None:
            self._llm_config.model = model
        if max_tokens is not None:
            self._llm_config.max_tokens = max_tokens
        if api_key is not None:
            self._llm_config.api_key = api_key
        if temperature is not None:
            self._llm_config.temperature = temperature
        if base_url is not None:
            self._llm_config.base_url = base_url

        # rebuild client every time api_key/base_url changes
        self._client = self._build_client(self._llm_config)
        return True

    # ======================
    # Internal: LLM & Intent
    # ======================

    def _build_client(self, cfg: LLMConfig) -> Optional[OpenAI]:
        if not cfg.api_key:
            return None
        return OpenAI(api_key=cfg.api_key, base_url=cfg.base_url)

    def _classify_demand(self, user_input: str) -> DemandType:
        label = self._classify_with_llm(user_input)
        if label is not None:
            if label == "FILE":
                return DemandType.FILE_QUERY
            if label == "QA":
                return DemandType.QA

        # fallback: keyword-based
        text = user_input.lower()
        file_keywords = ["file", "document", "doc", "list", "show", "open", "report", "pdf", "find", "search"]
        qa_keywords = ["why", "how", "explain", "difference", "compare", "what is", "what's"]

        has_file = any(k in text for k in file_keywords)
        has_qa = any(k in text for k in qa_keywords)

        # prefer FILE
        if has_file:
            return DemandType.FILE_QUERY
        if has_qa:
            return DemandType.QA
        return DemandType.FILE_QUERY

    def _classify_with_llm(self, user_input: str) -> Optional[str]:
        if not self._client or not self._llm_config.api_key:
            return None

        system_prompt = (
            "You are an intent classifier. "
            "You must answer with EXACTLY ONE WORD: 'FILE' or 'QA'. "
            "Do NOT explain.\n"
            "- If the user wants to search/list/view/find/open documents/files/reports -> answer FILE.\n"
            "- If the user asks for explanation/analysis/how-to/reasoning -> answer QA.\n"
            "- If it is mixed, prefer FILE."
        )
        user_prompt = f"User query:\n{user_input}\n\nYour answer (FILE or QA):"

        try:
            resp = self._client.chat.completions.create(
                model=self._llm_config.model,
                messages=[
                    {"role": "system", "content": system_prompt},
                    {"role": "user", "content": user_prompt},
                ],
                max_tokens=4,
                temperature=0.0,
            )
            raw = resp.choices[0].message.content.strip()
            raw = raw.replace(".", "").strip().upper()
            if raw in ("FILE", "QA"):
                return raw
            return None
        except Exception:
            return None

    # ======================
    # Internal: Search Layer
    # ======================

    def _search_and_enrich(self, query: str) -> List[QueryResult]:
        """Very simple term-based search + enrichment."""
        query_tokens = self._tokenize(query)
        results: List[QueryResult] = []

        scored_docs = []
        for doc in self._documents:
            score = self._compute_relevance(query_tokens, doc)
            if score > 0:
                scored_docs.append((doc, score))

        if not scored_docs:
            return []

        max_score = max(s for _, s in scored_docs) or 1.0

        for doc, score in scored_docs:
            relevance_percent = (score / max_score) * 100.0
            summary = self._summarize_document(doc)
            key_fields_summary = self._summarize_key_fields(doc)
            high_freq_terms = self._extract_high_freq_terms(doc, query_tokens)

            results.append(
                QueryResult(
                    doc_id=doc.file_id,
                    title=doc.title,
                    relevance=relevance_percent,
                    summary=summary,
                    key_fields_summary=key_fields_summary,
                    high_freq_terms=high_freq_terms,
                )
            )

        results.sort(key=lambda x: x.relevance, reverse=True)
        return results
    
    def _tokenize(self, text: str) -> List[str]:
        # split on non-alphanum; include basic CJK if needed later
        return [t for t in re.split(r"[^0-9a-zA-Z]+", text.lower()) if t]

    def _compute_relevance(self, query_tokens: List[str], doc: Document) -> float:
        if not query_tokens:
            return 0.0

        doc_tokens = self._tokenize(
            doc.summary + " " + doc.title + " " + " ".join(doc.keywords)
        )

        q_set = set(query_tokens)
        d_set = set(doc_tokens)

        # 1) keyword overlap
        keyword_matches = len(q_set & d_set)
        keyword_score = keyword_matches / len(q_set)

        # 2) tag bonus
        tag_matches = 0
        for t in doc.keywords:
            if t.lower() in q_set:
                tag_matches += 1
        tag_score = min(tag_matches, 2) * 0.2  # cap

        # 3) title bonus
        title_tokens = self._tokenize(doc.title)
        title_matches = len(q_set & set(title_tokens))
        title_score = title_matches * 0.3

        total_score = keyword_score * 0.6 + tag_score + title_score
        return total_score

    # ======================
    # Internal: Summaries
    # ======================

    def _summarize_document(self, doc: Document) -> str:
        """Return a human-readable summary; now all-English."""
        if doc.file_type == "text":
            short = doc.summary[:120]
            return short + ("..." if len(doc.summary) > 120 else "")
        else:
            base = f"This file is of type '{doc.file_type}', with title '{doc.title}'."
            if doc.extra_fields:
                base += " Extra fields: " + ", ".join(
                    [f"{k}: {v}" for k, v in doc.extra_fields.items()]
                )
            return base

    def _summarize_key_fields(self, doc: Document) -> str:
        if not doc.extra_fields:
            return "No key fields."
        return "; ".join([f"{k}={v}" for k, v in doc.extra_fields.items()])

    def _extract_high_freq_terms(
        self,
        doc: Document,
        query_tokens: List[str],
        top_k: int = 5
    ) -> Dict[str, int]:
        all_tokens = (
            self._tokenize(doc.summary)
            + query_tokens
            + [t.lower() for t in doc.keywords]
        )
        counter = Counter(all_tokens)
        return dict(counter.most_common(top_k))

    # ======================
    # Internal: Prompt
    # ======================

    def _build_context_from_results(self, results: List[QueryResult]) -> str:
        blocks = []
        for r in results:
            blocks.append(f"[{r.doc_id}] {r.title}\n{r.summary}\n")
        return "\n".join(blocks)

    def _build_llm_prompt(self, *, query: str, context: str) -> str:
        """
        English version of your previous rule-based prompt to avoid hallucination.
        """
        prompt = f"""
You are an enterprise internal knowledge-base assistant. You can ONLY use the information in the following DOCUMENTS to answer the user's question. If the documents do not contain enough information, you MUST answer: "No valid reference."

[Answering rules]
1. Be concise and accurate.
2. If you cite a document, add its id in square brackets at the end of the sentence, e.g. [75ac7d52].
3. Do NOT invent information that is not in the documents.
4. If multiple documents mention the same thing, you can merge them and cite multiple ids, e.g. [75ac7d52][9f3a21aa].

[DOCUMENTS]
{context}

[USER QUESTION]
{query}

Start answering now:
""".strip()
        return prompt

# ======================
# Demo
# ======================

if __name__ == "__main__":
    service = SemanticService()

    # just some example user queries
    user_queries = [
        "Find me documents about RAG-based LLM question answering system design.",
        "What is the difference between RAG and standard LLM QA? Please explain based on documents.",
        "Show me the company policy for 2025.",
        "Explain the Transformer architecture.",
    ]

    for q in user_queries:
        print("\n==============================")
        print(f"User query: {q}")

        # 1) send to service
        service.set_demand(q)

        # 2) let the service decide (FILE_QUERY vs QA)
        resp = service.get_LLM_reply()

        # 3) branch on what the service decided
        if resp.get("type") == "file_query":
            print("→ Detected intent: FILE_QUERY")
            print("→ Top matched documents:")
            for item in resp.get("results", []):
                print(f"  - {item['title']} (relevance={item['relevance_percent']})")
        else:
            print("→ Detected intent: QA")
            print("→ Answer:")
            print(resp.get("reply", ""))


User query: Find me documents about RAG-based LLM question answering system design.
→ Detected intent: FILE_QUERY
→ Top matched documents:
  - Attention Is All You Need: The Transformer Revolution in Sequence Modeling (relevance=100.00%)

User query: What is the difference between RAG and standard LLM QA? Please explain based on documents.
→ Detected intent: QA
→ Answer:
No valid reference.

User query: Show me the company policy for 2025.
→ Detected intent: FILE_QUERY
→ Top matched documents:
  - Company Policy and Approval Flow 2025 (relevance=100.00%)
  - Attention Is All You Need: The Transformer Revolution in Sequence Modeling (relevance=26.73%)

User query: Explain the Transformer architecture.
→ Detected intent: QA
→ Answer:
The Transformer architecture uses stacked encoder and decoder layers with multi-head self-attention mechanisms and position-wise feed-forward networks, eliminating recurrence and convolutions [75ac7d52].
