<a href="https://colab.research.google.com/github/ratul619/RAG_STA/blob/main/RAG_STA_trial1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [21]:
pip install -q chromadb sentence-transformers transformers accelerate safetensors torch --upgrade


In [22]:
pip install -q bitsandbytes

In [None]:
#!/usr/bin/env python3
import os
import json
from typing import List, Dict, Any, Optional

import torch
import chromadb
from chromadb.config import Settings
from sentence_transformers import SentenceTransformer
from transformers import AutoTokenizer, AutoModelForCausalLM


class LocalTimingRAG:
    def __init__(self, model_name: Optional[str] = None, db_path: str = "./timing_db"):
        # Embeddings (local, CPU friendly)
        self.embedding_model = SentenceTransformer("all-MiniLM-L6-v2")

        # Local LLM and tokenizer
        self.llm_model, self.tokenizer = self._load_local_llm(model_name)

        # Vector DB (local persistent)
        self.chroma_client = chromadb.PersistentClient(path=db_path, settings=Settings(allow_reset=True))
        self.collection = self.chroma_client.get_or_create_collection("timing_paths")

    def _load_local_llm(self, model_name: Optional[str] = None):
        """Load a local LLM with better instruction following"""
        name = model_name or "microsoft/DialoGPT-large"  # Better than medium

        tokenizer = AutoTokenizer.from_pretrained(name, use_fast=False)
        if tokenizer.pad_token is None:
            tokenizer.pad_token = tokenizer.eos_token

        kwargs = {}
        if torch.cuda.is_available():
            kwargs["torch_dtype"] = torch.float16
            kwargs["device_map"] = "auto"
        else:
            kwargs["torch_dtype"] = torch.float32

        model = AutoModelForCausalLM.from_pretrained(name, **kwargs)
        return model, tokenizer

    def _read_json_safe(self, filepath: str):
        """Safely read JSON with multiple encoding attempts"""
        encodings = ['utf-8', 'utf-16', 'utf-16le', 'utf-16be', 'latin-1', 'cp1252']

        for encoding in encodings:
            try:
                with open(filepath, 'r', encoding=encoding) as f:
                    return json.load(f)
            except (UnicodeDecodeError, json.JSONDecodeError):
                continue

        raise ValueError(f"Could not read {filepath} with any supported encoding")

    def verify_timing_json(self, filepath: str):
        """Verify timing JSON structure with encoding detection"""
        print(f"\n=== Verifying JSON: {filepath} ===")
        try:
            data = self._read_json_safe(filepath)

            print(f"File: {filepath}")
            print(f"Paths: {len(data.get('paths', []))}")

            for i, path in enumerate(data.get('paths', [])[:2]):
                print(f"  Path {i+1}:")
                print(f"    Startpoint: {path.get('startpoint', {}).get('instance', 'N/A')}")
                print(f"    Endpoint: {path.get('endpoint', {}).get('instance', 'N/A')}")
                print(f"    Slack: {path.get('summary', {}).get('slack', 'N/A')}")
                print(f"    Clock Group: {path.get('report', {}).get('group', 'N/A')}")
        except Exception as e:
            print(f"Error reading {filepath}: {e}")

    def index_timing_reports(self, reports_dir: str):
        """Index all timing JSON files under reports_dir"""
        if not os.path.isdir(reports_dir):
            raise FileNotFoundError(f"Directory not found: {reports_dir}")

        files = [f for f in os.listdir(reports_dir) if f.lower().endswith(".json")]
        print(f"Found {len(files)} JSON files to index")

        for fname in files:
            print(f"Indexing {fname}...")
            self._index_single_report(os.path.join(reports_dir, fname))

    def _index_single_report(self, filepath: str):
        """Index single report with encoding detection"""
        data = self._read_json_safe(filepath)

        paths: List[Dict[str, Any]] = data.get("paths", [])
        if not paths:
            print(f"  Warning: No paths found in {filepath}")
            return

        documents: List[str] = []
        embeddings: List[List[float]] = []
        metas: List[Dict[str, Any]] = []
        ids: List[str] = []

        for i, path in enumerate(paths):
            doc = self._format_path_for_indexing(path)
            vec = self.embedding_model.encode(doc).tolist()

            meta = {
                "file": os.path.basename(filepath),
                "path_index": i,
                "startpoint": path.get("startpoint", {}).get("instance", ""),
                "endpoint": path.get("endpoint", {}).get("instance", ""),
                "slack": path.get("summary", {}).get("slack", None),
                "clock_group": path.get("report", {}).get("group", ""),
                "path_type": path.get("report", {}).get("path_type", ""),
            }

            documents.append(doc)
            embeddings.append(vec)
            metas.append(meta)
            ids.append(f"{os.path.basename(filepath)}::{i}")

        self.collection.add(documents=documents, embeddings=embeddings, metadatas=metas, ids=ids)
        print(f"  Indexed {len(paths)} paths from {filepath}")

    def _format_path_for_indexing(self, path: Dict[str, Any]) -> str:
        def fmt(v):
            return "N/A" if v is None else str(v)

        header = [
            f"Startpoint: {fmt(path.get('startpoint', {}).get('instance'))}",
            f"Endpoint: {fmt(path.get('endpoint', {}).get('instance'))}",
            f"Slack: {fmt(path.get('summary', {}).get('slack'))} ns",
            f"Arrival: {fmt(path.get('summary', {}).get('data_arrival_time'))} ns",
            f"Required: {fmt(path.get('summary', {}).get('data_required_time'))} ns",
            f"Clock Group: {fmt(path.get('report', {}).get('group'))}",
            f"Path Type: {fmt(path.get('report', {}).get('path_type'))}",
        ]
        text = " | ".join(header)

        # Data path details
        stages = path.get("data_path", {}).get("stages", [])
        if stages:
            text += "\nData Path:"
            for s in stages:
                name = s.get("name", "N/A")
                cell = s.get("cell_type", "")
                incr = s.get("incr", None)
                text += f"\n- {name}"
                if cell:
                    text += f" [{cell}]"
                if incr is not None:
                    text += f" delay={incr}ns"

        return text

    def search_timing_data(self, query: str, top_k: int = 5) -> Dict[str, Any]:
        query_vec = self.embedding_model.encode(query).tolist()
        result = self.collection.query(query_embeddings=[query_vec], n_results=top_k)
        return result

    def _get_all_slack_data(self) -> List[Dict[str, Any]]:
        """Get all slack data from the collection for accurate analysis"""
        all_data = self.collection.get()
        all_metas = all_data['metadatas']

        all_slack_data = []
        for meta in all_metas:
            slack = meta.get("slack")
            if slack is not None:
                try:
                    slack_float = float(slack)
                    all_slack_data.append({
                        "slack": slack_float,
                        "startpoint": meta.get("startpoint", ""),
                        "endpoint": meta.get("endpoint", ""),
                        "file": meta.get("file", ""),
                        "path_type": meta.get("path_type", ""),
                        "clock_group": meta.get("clock_group", "")
                    })
                except (ValueError, TypeError):
                    continue

        # Sort by slack value (ascending = worst first)
        all_slack_data.sort(key=lambda x: x["slack"])
        return all_slack_data

    def debug_data(self):
        """Enhanced debug function"""
        print("\n=== DEBUG: Checking indexed data ===")

        all_data = self.collection.get()
        print(f"Total indexed paths: {len(all_data['ids'])}")

        if all_data['metadatas']:
            # Get all slack values
            all_slacks = []
            path_types = set()
            clock_groups = set()

            for meta in all_data['metadatas']:
                slack = meta.get("slack")
                if slack is not None:
                    try:
                        all_slacks.append(float(slack))
                    except (ValueError, TypeError):
                        pass
                path_type = meta.get("path_type", "")
                if path_type:
                    path_types.add(path_type)
                clock_group = meta.get("clock_group", "")
                if clock_group:
                    clock_groups.add(clock_group)

            if all_slacks:
                print(f"Slack range: {min(all_slacks):.4f} to {max(all_slacks):.4f} ns")
                print(f"Path types: {', '.join(path_types)}")
                print(f"Clock groups: {', '.join(clock_groups)}")

                # Show worst 5
                sorted_slacks = sorted(all_slacks)
                print(f"Worst 5 slacks: {[f'{s:.4f}' for s in sorted_slacks[:5]]}")
                print(f"Best 5 slacks: {[f'{s:.4f}' for s in sorted_slacks[-5:]]}")

    def query(self, question: str, top_k: int = 5) -> str:
        """Improved query method with direct data analysis"""
        print(f"DEBUG: Searching for: '{question}'")

        # Get ALL slack data for accurate analysis
        all_slack_data = self._get_all_slack_data()

        if not all_slack_data:
            return "No slack data found."

        print(f"DEBUG: Total slack values: {len(all_slack_data)}")

        question_lower = question.lower()

        # Answer specific questions with direct data analysis
        if "worst slack" in question_lower or "minimum slack" in question_lower:
            worst = all_slack_data[0]  # First item after sorting (minimum slack)
            return f"Worst slack: {worst['slack']:.4f} ns\nPath: {worst['startpoint']} -> {worst['endpoint']}\nFile: {worst['file']}\nClock Group: {worst['clock_group']}"

        if "best slack" in question_lower or "highest slack" in question_lower or "maximum slack" in question_lower:
            best = all_slack_data[-1]  # Last item after sorting (maximum slack)
            return f"Best slack: {best['slack']:.4f} ns\nPath: {best['startpoint']} -> {best['endpoint']}\nFile: {best['file']}\nClock Group: {best['clock_group']}"

        if "how many paths" in question_lower or "total paths" in question_lower:
            return f"Total paths in report: {len(all_slack_data)}"

        if "min paths" in question_lower or "max paths" in question_lower or "path type" in question_lower:
            path_types = set(item['path_type'] for item in all_slack_data if item['path_type'])
            return f"Path types in report: {', '.join(path_types) if path_types else 'Unknown'}"

        if "clock group" in question_lower or "clock groups" in question_lower:
            clock_groups = set(item['clock_group'] for item in all_slack_data if item['clock_group'])
            return f"Clock groups in report: {', '.join(clock_groups) if clock_groups else 'Unknown'}"

        # NEW: Endpoint frequency analysis
        if "endpoint" in question_lower and ("max" in question_lower or "most" in question_lower or "frequent" in question_lower):
            from collections import Counter
            endpoints = [item['endpoint'] for item in all_slack_data if item['endpoint']]
            endpoint_counts = Counter(endpoints)
            most_common = endpoint_counts.most_common(5)

            result = "Most frequent endpoints:\n"
            for i, (endpoint, count) in enumerate(most_common):
                result += f"{i+1}. {endpoint}: {count} times\n"
            return result

        # NEW: Startpoint frequency analysis
        if "startpoint" in question_lower and ("max" in question_lower or "most" in question_lower or "frequent" in question_lower):
            from collections import Counter
            startpoints = [item['startpoint'] for item in all_slack_data if item['startpoint']]
            startpoint_counts = Counter(startpoints)
            most_common = startpoint_counts.most_common(5)

            result = "Most frequent startpoints:\n"
            for i, (startpoint, count) in enumerate(most_common):
                result += f"{i+1}. {startpoint}: {count} times\n"
            return result

        # NEW: Path frequency analysis (startpoint -> endpoint pairs)
        if "path" in question_lower and ("max" in question_lower or "most" in question_lower or "frequent" in question_lower):
            from collections import Counter
            paths = [f"{item['startpoint']} -> {item['endpoint']}" for item in all_slack_data if item['startpoint'] and item['endpoint']]
            path_counts = Counter(paths)
            most_common = path_counts.most_common(5)

            result = "Most frequent paths:\n"
            for i, (path, count) in enumerate(most_common):
                result += f"{i+1}. {path}: {count} times\n"
            return result

        # NEW: Slack distribution analysis
        if "slack range" in question_lower or "slack statistics" in question_lower:
            slacks = [item['slack'] for item in all_slack_data]
            return f"""Slack Statistics:
- Total paths: {len(all_slack_data)}
- Worst slack: {min(slacks):.4f} ns
- Best slack: {max(slacks):.4f} ns
- Average slack: {sum(slacks)/len(slacks):.4f} ns
- Median slack: {sorted(slacks)[len(slacks)//2]:.4f} ns"""

        if "show paths" in question_lower or "list paths" in question_lower:
            # Show first 10 paths
            result = f"First 10 timing paths (out of {len(all_slack_data)}):\n"
            for i, item in enumerate(all_slack_data[:10]):
                result += f"{i+1}. Slack: {item['slack']:.4f} ns | Path: {item['startpoint']} -> {item['endpoint']}\n"
            return result

        # For other questions, show summary statistics
        slacks = [item['slack'] for item in all_slack_data]
        path_types = set(item['path_type'] for item in all_slack_data if item['path_type'])
        clock_groups = set(item['clock_group'] for item in all_slack_data if item['clock_group'])

        return f"""Timing Report Summary:
- Total paths: {len(all_slack_data)}
- Worst slack: {min(slacks):.4f} ns
- Best slack: {max(slacks):.4f} ns
- Average slack: {sum(slacks)/len(slacks):.4f} ns
- Path types: {', '.join(path_types) if path_types else 'Unknown'}
- Clock groups: {', '.join(clock_groups) if clock_groups else 'Unknown'}"""


def main():
    # Optional: silence HF symlink warning on Windows
    # os.environ["HF_HUB_DISABLE_SYMLINKS_WARNING"] = "1"

    rag = LocalTimingRAG()

    # Ensure timing_reports exists and contains JSON files
    reports_dir = "./timing_reports"
    if not os.path.isdir(reports_dir):
        print("Create ./timing_reports and place your timing JSON files there.")
        return

    # Check JSON files first
    print("=== Verifying JSON files ===")
    for filename in os.listdir(reports_dir):
        if filename.endswith('.json'):
            rag.verify_timing_json(f"{reports_dir}/{filename}")

    print("\n=== Indexing timing reports ===")
    rag.index_timing_reports(reports_dir)
    print("Indexing complete.")

    # DEBUG: Check if data is properly indexed
    rag.debug_data()

    # Test queries
    print("\n=== Testing queries ===")
    # Test these queries to see the new functionality:
    test_queries = [
    "which is the endpoint occuring max number of times?",
    "which is the startpoint occuring max number of times?",
    "which is the most frequent path?",
    "show me the most common endpoints",
    "what are the most frequent startpoints?",
    "list the most common paths"
]

    for query in test_queries:
        print(f"\nQ: {query}")
        result = rag.query(query)
        print(f"A: {result}")

    # Interactive REPL
    print("\n" + "="*50)
    print("Timing RAG ready. Type your question (or 'exit').")
    print("="*50)

    while True:
        try:
            q = input("\nQ> ").strip()
        except (EOFError, KeyboardInterrupt):
            break
        if not q or q.lower() in {"exit", "quit"}:
            break
        ans = rag.query(q)
        print(f"\nA> {ans}")


if __name__ == "__main__":
    main()

KeyboardInterrupt: 

=== Verifying JSON files ===

=== Verifying JSON: ./timing_reports/caravel.min-hkspi_clk-min_timing_full.json ===
File: ./timing_reports/caravel.min-hkspi_clk-min_timing_full.json
Paths: 659
  Path 1:
    Startpoint: chip_core/housekeeping/_6778_
    Endpoint: chip_core/housekeeping/_6778_
    Slack: 0.3252
    Clock Group: hkspi_clk
  Path 2:
    Startpoint: chip_core/housekeeping/_6656_
    Endpoint: chip_core/housekeeping/_6654_
    Slack: 0.6112
    Clock Group: hkspi_clk

=== Indexing timing reports ===
Found 1 JSON files to index
Indexing caravel.min-hkspi_clk-min_timing_full.json...


InternalError: Query error: Database error: error returned from database: (code: 1032) attempt to write a readonly database

In [None]:
#!/usr/bin/env python3
import os
import json
import chromadb
from sentence_transformers import SentenceTransformer
from transformers import AutoTokenizer, AutoModelForCausalLM
import torch
from typing import List, Dict, Any, Optional
import shutil

class LocalTimingRAG:
    def __init__(self, model_name: str = "microsoft/DialoGPT-large"):
        self.model_name = model_name
        self.embedding_model = SentenceTransformer('all-MiniLM-L6-v2')
        self.llm_model, self.tokenizer = self._load_local_llm()

        # Get device
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        print(f"Using device: {self.device}")

        # Use in-memory database to avoid file permission issues
        self._setup_database()

        self.history = []

    def _setup_database(self):
        """Setup ChromaDB with in-memory storage"""
        try:
            # Use in-memory database to avoid file permission issues
            self.client = chromadb.Client()
            # Try to get existing collection, if it exists, delete it first
            try:
                existing_collection = self.client.get_collection(name="timing_reports")
                self.client.delete_collection(name="timing_reports")
            except:
                pass  # Collection doesn't exist, which is fine

            self.collection = self.client.create_collection(name="timing_reports")
            print("Using in-memory database")
        except Exception as e:
            print(f"Error setting up database: {e}")
            # Fallback to persistent client with a different path
            try:
                # Clear existing database
                if os.path.exists("./temp_chroma_db"):
                    shutil.rmtree("./temp_chroma_db")

                self.client = chromadb.PersistentClient(path="./temp_chroma_db")
                self.collection = self.client.create_collection(name="timing_reports")
                print("Using persistent database at ./temp_chroma_db")
            except Exception as e2:
                print(f"Error with persistent database: {e2}")
                # Last resort - create a new in-memory client
                self.client = chromadb.Client()
                try:
                    existing_collection = self.client.get_collection(name="timing_reports")
                    self.client.delete_collection(name="timing_reports")
                except:
                    pass
                self.collection = self.client.create_collection(name="timing_reports")
                print("Using fallback in-memory database")

    def _load_local_llm(self):
        """Load the local LLM and tokenizer"""
        try:
            tokenizer = AutoTokenizer.from_pretrained(self.model_name)
            model = AutoModelForCausalLM.from_pretrained(
                self.model_name,
                torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32,
                device_map="auto" if torch.cuda.is_available() else None
            )
            return model, tokenizer
        except Exception as e:
            print(f"Error loading model {self.model_name}: {e}")
            print("Falling back to DialoGPT-medium...")
            try:
                tokenizer = AutoTokenizer.from_pretrained("microsoft/DialoGPT-medium")
                model = AutoModelForCausalLM.from_pretrained(
                    "microsoft/DialoGPT-medium",
                    torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32,
                    device_map="auto" if torch.cuda.is_available() else None
                )
                return model, tokenizer
            except Exception as e2:
                print(f"Error loading fallback model: {e2}")
                return None, None

    def _read_json_file(self, file_path: str) -> Dict[str, Any]:
        """Read JSON file with proper encoding handling"""
        try:
            # Try UTF-8 first
            with open(file_path, 'r', encoding='utf-8') as f:
                return json.load(f)
        except UnicodeDecodeError:
            try:
                # Try UTF-16
                with open(file_path, 'r', encoding='utf-16') as f:
                    return json.load(f)
            except UnicodeDecodeError:
                try:
                    # Try UTF-16 with BOM
                    with open(file_path, 'r', encoding='utf-16-sig') as f:
                        return json.load(f)
                except UnicodeDecodeError:
                    try:
                        # Try latin-1 as fallback
                        with open(file_path, 'r', encoding='latin-1') as f:
                            return json.load(f)
                    except Exception as e:
                        print(f"Error reading {file_path}: {e}")
                        return {}

    def index_timing_reports(self, directory: str):
        """Index all JSON files in the directory"""
        print("=== Indexing timing reports ===")

        json_files = [f for f in os.listdir(directory) if f.endswith('.json')]
        print(f"Found {len(json_files)} JSON files to index")

        for filename in json_files:
            file_path = os.path.join(directory, filename)
            print(f"Indexing {filename}...")

            try:
                data = self._read_json_file(file_path)
                if not data:
                    print(f"Failed to read {filename}")
                    continue

                # Extract text and metadata for each path
                for i, path in enumerate(data.get('paths', [])):
                    # Create a comprehensive text representation
                    text_parts = []

                    # Add path metadata
                    if 'startpoint' in path:
                        text_parts.append(f"Startpoint: {path['startpoint'].get('instance', 'N/A')}")
                    if 'endpoint' in path:
                        text_parts.append(f"Endpoint: {path['endpoint'].get('instance', 'N/A')}")
                    if 'report' in path:
                        report = path['report']
                        text_parts.append(f"Group: {report.get('group', 'N/A')}")
                        text_parts.append(f"Path Type: {report.get('path_type', 'N/A')}")

                    # Add summary information
                    if 'summary' in path:
                        summary = path['summary']
                        if 'slack' in summary:
                            text_parts.append(f"Slack: {summary['slack']}")
                        if 'hold_time_requirement' in summary and summary['hold_time_requirement'] is not None:
                            text_parts.append(f"Hold Time Requirement: {summary['hold_time_requirement']}")
                        if 'clock_skew' in summary and summary['clock_skew'] is not None:
                            text_parts.append(f"Clock Skew: {summary['clock_skew']}")

                    # Add launch clock path stages
                    if 'launch_clock_path' in path and 'stages' in path['launch_clock_path']:
                        text_parts.append("Launch Clock Path:")
                        for stage in path['launch_clock_path']['stages']:
                            if stage.get('type') == 'cell_pin':
                                text_parts.append(f"  {stage.get('name', 'N/A')} ({stage.get('cell_type', 'N/A')})")

                    # Add data path stages
                    if 'data_path' in path and 'stages' in path['data_path']:
                        text_parts.append("Data Path:")
                        for stage in path['data_path']['stages']:
                            if stage.get('type') == 'cell_pin':
                                text_parts.append(f"  {stage.get('name', 'N/A')} ({stage.get('cell_type', 'N/A')})")

                    # Add capture clock path stages
                    if 'capture_clock_path' in path and 'stages' in path['capture_clock_path']:
                        text_parts.append("Capture Clock Path:")
                        for stage in path['capture_clock_path']['stages']:
                            if stage.get('type') == 'cell_pin':
                                text_parts.append(f"  {stage.get('name', 'N/A')} ({stage.get('cell_type', 'N/A')})")

                    # Combine all text
                    full_text = "\n".join(text_parts)

                    # Generate embedding
                    embedding = self.embedding_model.encode(full_text).tolist()

                    # Store in ChromaDB
                    self.collection.add(
                        embeddings=[embedding],
                        documents=[full_text],
                        metadatas=[{
                            'filename': filename,
                            'path_index': i,
                            'startpoint': path.get('startpoint', {}).get('instance', 'N/A'),
                            'endpoint': path.get('endpoint', {}).get('instance', 'N/A'),
                            'slack': path.get('summary', {}).get('slack', 'N/A'),
                            'hold_time_requirement': path.get('summary', {}).get('hold_time_requirement', 'N/A'),
                            'clock_skew': path.get('summary', {}).get('clock_skew', 'N/A'),
                            'group': path.get('report', {}).get('group', 'N/A')
                        }],
                        ids=[f"{filename}_{i}"]
                    )

                print(f"Successfully indexed {filename}")

            except Exception as e:
                print(f"Error indexing {filename}: {e}")

        print("Indexing complete!")

    def _get_all_slack_data(self) -> List[Dict]:
        """Get all slack-related data from the collection"""
        all_data = []
        results = self.collection.get()

        for i, metadata in enumerate(results['metadatas']):
            if metadata.get('slack') != 'N/A':
                try:
                    slack_value = float(metadata['slack'])
                    all_data.append({
                        'slack': slack_value,
                        'startpoint': metadata.get('startpoint', 'N/A'),
                        'endpoint': metadata.get('endpoint', 'N/A'),
                        'group': metadata.get('group', 'N/A'),
                        'hold_time_requirement': metadata.get('hold_time_requirement', 'N/A'),
                        'clock_skew': metadata.get('clock_skew', 'N/A'),
                        'document': results['documents'][i]
                    })
                except (ValueError, TypeError):
                    continue

        return all_data

    def _get_clock_skew_data(self) -> List[Dict]:
        """Get all clock skew data from the collection"""
        all_data = []
        results = self.collection.get()

        for i, metadata in enumerate(results['metadatas']):
            if metadata.get('clock_skew') != 'N/A':
                try:
                    clock_skew_value = float(metadata['clock_skew'])
                    all_data.append({
                        'clock_skew': clock_skew_value,
                        'startpoint': metadata.get('startpoint', 'N/A'),
                        'endpoint': metadata.get('endpoint', 'N/A'),
                        'group': metadata.get('group', 'N/A'),
                        'slack': metadata.get('slack', 'N/A'),
                        'document': results['documents'][i]
                    })
                except (ValueError, TypeError):
                    continue

        return all_data

    def _get_hold_time_data(self) -> List[Dict]:
        """Get all hold time requirement data from the collection"""
        all_data = []
        results = self.collection.get()

        for i, metadata in enumerate(results['metadatas']):
            if metadata.get('hold_time_requirement') != 'N/A':
                try:
                    hold_time_value = float(metadata['hold_time_requirement'])
                    all_data.append({
                        'hold_time_requirement': hold_time_value,
                        'startpoint': metadata.get('startpoint', 'N/A'),
                        'endpoint': metadata.get('endpoint', 'N/A'),
                        'group': metadata.get('group', 'N/A'),
                        'slack': metadata.get('slack', 'N/A'),
                        'document': results['documents'][i]
                    })
                except (ValueError, TypeError):
                    continue

        return all_data

    def _classify_question(self, question: str, data: List[Dict]) -> str:
        """Classify the question type using simple pattern matching"""
        question_lower = question.lower()

        if "worst" in question_lower and "slack" in question_lower:
            return "worst_slack"
        elif "best" in question_lower and "slack" in question_lower:
            return "best_slack"
        elif "total" in question_lower or "number" in question_lower or "how many" in question_lower:
            return "total_paths"
        elif "type" in question_lower:
            return "path_types"
        elif "group" in question_lower:
            return "clock_groups"
        elif "stat" in question_lower:
            return "slack_stats"
        elif "endpoint" in question_lower:
            return "endpoint_frequency"
        elif "hold" in question_lower and "worst" in question_lower:
            return "worst_hold_time_req"
        elif "hold" in question_lower and "best" in question_lower:
            return "best_hold_time_req"
        elif "hold" in question_lower and "requirement" in question_lower:
            return "hold_time_req"
        elif "skew" in question_lower:
            return "clock_skew"
        else:
            return "general"

    def query(self, question: str, top_k: int = 3) -> str:
        """Query the RAG system"""
        # Add to history
        self.history.append({'question': question, 'answer': ''})

        # Get all slack data for comprehensive analysis
        all_slack_data = self._get_all_slack_data()

        # Classify the question
        question_type = self._classify_question(question, all_slack_data)

        # Generate response based on question type
        if question_type == "worst_slack":
            if not all_slack_data:
                return "No slack data available."

            worst_slack = min(all_slack_data, key=lambda x: x['slack'])
            return f"The worst slack is {worst_slack['slack']}ns for the path from {worst_slack['startpoint']} to {worst_slack['endpoint']} in group {worst_slack['group']}."

        elif question_type == "best_slack":
            if not all_slack_data:
                return "No slack data available."

            best_slack = max(all_slack_data, key=lambda x: x['slack'])
            return f"The best slack is {best_slack['slack']}ns for the path from {best_slack['startpoint']} to {best_slack['endpoint']} in group {best_slack['group']}."

        elif question_type == "total_paths":
            total_paths = len(all_slack_data)
            return f"Total number of timing paths: {total_paths}"

        elif question_type == "slack_stats":
            if not all_slack_data:
                return "No slack data available."

            slack_values = [x['slack'] for x in all_slack_data]
            avg_slack = sum(slack_values) / len(slack_values)
            min_slack = min(slack_values)
            max_slack = max(slack_values)

            return f"Slack statistics:\n- Average: {avg_slack:.3f}ns\n- Minimum: {min_slack:.3f}ns\n- Maximum: {max_slack:.3f}ns\n- Total paths: {len(slack_values)}"

        elif question_type == "clock_skew":
            clock_skew_data = self._get_clock_skew_data()
            if not clock_skew_data:
                return "No clock skew data available."

            # Find the path with worst slack and get its clock skew
            worst_slack_path = min(all_slack_data, key=lambda x: x['slack'])
            for path in clock_skew_data:
                if (path['startpoint'] == worst_slack_path['startpoint'] and
                    path['endpoint'] == worst_slack_path['endpoint']):
                    return f"The clock skew for the path from {path['startpoint']} to {path['endpoint']} is {path['clock_skew']}ns."

            return f"Clock skew data available for {len(clock_skew_data)} paths. Clock skew values range from {min([x['clock_skew'] for x in clock_skew_data]):.3f}ns to {max([x['clock_skew'] for x in clock_skew_data]):.3f}ns."

        elif question_type == "hold_time_req":
            hold_time_data = self._get_hold_time_data()
            if not hold_time_data:
                return "No hold time requirement data available."

            # Find the path with worst slack and get its hold time requirement
            worst_slack_path = min(all_slack_data, key=lambda x: x['slack'])
            for path in hold_time_data:
                if (path['startpoint'] == worst_slack_path['startpoint'] and
                    path['endpoint'] == worst_slack_path['endpoint']):
                    return f"The hold time requirement for the path from {path['startpoint']} to {path['endpoint']} is {path['hold_time_requirement']}ns."

            return f"Hold time requirement data available for {len(hold_time_data)} paths. Hold time requirements range from {min([x['hold_time_requirement'] for x in hold_time_data]):.3f}ns to {max([x['hold_time_requirement'] for x in hold_time_data]):.3f}ns."

        elif question_type == "worst_hold_time_req":
            hold_time_data = self._get_hold_time_data()
            if not hold_time_data:
                return "No hold time requirement data available."

            worst_hold_time = min(hold_time_data, key=lambda x: x['hold_time_requirement'])
            return f"The worst hold time requirement is {worst_hold_time['hold_time_requirement']}ns for the path from {worst_hold_time['startpoint']} to {worst_hold_time['endpoint']} in group {worst_hold_time['group']}."

        elif question_type == "best_hold_time_req":
            hold_time_data = self._get_hold_time_data()
            if not hold_time_data:
                return "No hold time requirement data available."

            best_hold_time = max(hold_time_data, key=lambda x: x['hold_time_requirement'])
            return f"The best hold time requirement is {best_hold_time['hold_time_requirement']}ns for the path from {best_hold_time['startpoint']} to {best_hold_time['endpoint']} in group {best_hold_time['group']}."

        else:
            # For general questions, provide a simple response without using the LLM
            return "I can help you analyze timing data. Try asking about:\n- Worst/best slack values\n- Clock skew analysis\n- Hold time requirements\n- Total number of paths\n- Slack statistics"

def main():
    # Create RAG system
    rag = LocalTimingRAG()

    # Index timing reports
    rag.index_timing_reports('./timing_reports/')

    # Interactive query loop
    print("\n=== Timing RAG System Ready ===")
    print("Ask questions about your timing data. Type 'quit' to exit.")

    while True:
        question = input("\nQuestion: ").strip()
        if question.lower() in ['quit', 'exit', 'q']:
            break

        if question:
            answer = rag.query(question)
            print(f"Answer: {answer}")

if __name__ == "__main__":
    main()

Using device: cuda
Using in-memory database
=== Indexing timing reports ===
Found 1 JSON files to index
Indexing caravel.min-hkspi_clk-min_timing_full.json...
Successfully indexed caravel.min-hkspi_clk-min_timing_full.json
Indexing complete!

=== Timing RAG System Ready ===
Ask questions about your timing data. Type 'quit' to exit.

Question: what is the worst slack?
Answer: The worst slack is 0.3252ns for the path from chip_core/housekeeping/_6778_ to chip_core/housekeeping/_6778_ in group hkspi_clk.

Question: what is the clock skew for that slack?
Answer: The clock skew for the path from chip_core/housekeeping/_6778_ to chip_core/housekeeping/_6778_ is 0.4998999999999967ns.

Question: how much is the hold time requirement for that path?
Answer: The hold time requirement for the path from chip_core/housekeeping/_6778_ to chip_core/housekeeping/_6778_ is 0.1002ns.

Question: what is the next path slack?
Answer: I can help you analyze timing data. Try asking about:
- Worst/best slack v

# New Section