<a href="https://colab.research.google.com/github/venu72561-chinnam/Agentic-Vehicle-Diagnostics-Service-Intelligence-RAG-API-/blob/main/Agentic_Vehicle_Diagnostics_%26_Service_Intelligence.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip -q install faiss-cpu sentence-transformers transformers accelerate pypdf pandas numpy

import os, re, json, time, uuid
from typing import List, Dict, Any, Optional
from datetime import datetime

import numpy as np
import pandas as pd

import faiss
from sentence_transformers import SentenceTransformer
from pypdf import PdfReader

from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, pipeline

[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m23.8/23.8 MB[0m [31m30.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m329.1/329.1 kB[0m [31m9.9 MB/s[0m eta [36m0:00:00[0m
[?25h

In [None]:
CONFIG = {
    "project_name": "vehicle-diagnostics-agentic-rag",
    "chunk_size": 450,
    "chunk_overlap": 80,
    "top_k": 5,
    "embedding_model": "sentence-transformers/all-MiniLM-L6-v2",
    "gen_model": "google/flan-t5-base",     # local LLM (no API key)
    "artifacts_dir": "artifacts_vda",       # vector index + metadata saved here
}

os.makedirs(CONFIG["artifacts_dir"], exist_ok=True)

print("CONFIG:\n", json.dumps(CONFIG, indent=2))

CONFIG:
 {
  "project_name": "vehicle-diagnostics-agentic-rag",
  "chunk_size": 450,
  "chunk_overlap": 80,
  "top_k": 5,
  "embedding_model": "sentence-transformers/all-MiniLM-L6-v2",
  "gen_model": "google/flan-t5-base",
  "artifacts_dir": "artifacts_vda"
}


In [None]:
BUILTIN_KB_DOCS = [
    {
        "doc_id": "dtc_p0171",
        "title": "DTC P0171 - System Too Lean (Bank 1)",
        "text": (
            "P0171 indicates the engine is running lean (too much air, too little fuel). "
            "Common causes include vacuum leaks, dirty/failed MAF sensor, low fuel pressure, "
            "clogged fuel injector, exhaust leaks upstream of O2 sensor, or faulty O2 sensor. "
            "Typical symptoms: rough idle, hesitation, poor fuel economy. "
            "Checks: inspect intake/vacuum hoses, smoke test, clean MAF, fuel pressure test, "
            "check O2 sensor readings and fuel trims."
        ),
        "source": "builtin_kb"
    },
    {
        "doc_id": "dtc_p0300",
        "title": "DTC P0300 - Random/Multiple Cylinder Misfire",
        "text": (
            "P0300 is random/multiple misfire. Causes: spark plugs/coils, vacuum leak, "
            "fuel delivery issues, injector problems, compression issues. "
            "Diagnosis: check misfire counters, inspect plugs/coils, verify fuel trims, "
            "perform compression/leak-down test if needed."
        ),
        "source": "builtin_kb"
    },
    {
        "doc_id": "fuel_trim_basics",
        "title": "Fuel Trim Basics",
        "text": (
            "Short-term fuel trim (STFT) and long-term fuel trim (LTFT) indicate how the ECU adjusts fueling. "
            "High positive trims suggest lean condition (adding fuel). "
            "A large positive LTFT (> +10% to +20%) often points to vacuum leaks, MAF errors, or low fuel pressure."
        ),
        "source": "builtin_kb"
    },
    {
        "doc_id": "vacuum_leak_procedure",
        "title": "Vacuum Leak Check Procedure",
        "text": (
            "To diagnose vacuum leaks: inspect all intake boots and vacuum hoses, check PCV system, "
            "use smoke test, observe fuel trims at idle vs higher RPM. "
            "Vacuum leaks often cause higher trims at idle that improve at higher RPM."
        ),
        "source": "builtin_kb"
    },
    {
        "doc_id": "o2_sensor_notes",
        "title": "O2 Sensor & Exhaust Leak Notes",
        "text": (
            "An exhaust leak upstream of the oxygen sensor can introduce air, causing a false lean reading. "
            "O2 sensors should switch normally; slow or stuck readings may indicate sensor issues. "
            "Cross-check with fuel trims and MAF readings."
        ),
        "source": "builtin_kb"
    },
]

print("Built-in KB docs:", len(BUILTIN_KB_DOCS))

Built-in KB docs: 5


In [None]:
def chunk_text(text: str, chunk_size: int, overlap: int) -> List[str]:
    text = re.sub(r"\s+", " ", text).strip()
    if not text:
        return []
    chunks = []
    i = 0
    while i < len(text):
        chunk = text[i:i+chunk_size]
        chunks.append(chunk)
        i += max(1, chunk_size - overlap)
    return chunks

def build_chunks(docs: List[Dict[str, Any]], chunk_size: int, overlap: int):
    chunks, metas = [], []
    for d in docs:
        for idx, ch in enumerate(chunk_text(d["text"], chunk_size, overlap)):
            chunks.append(ch)
            metas.append({
                "doc_id": d["doc_id"],
                "title": d["title"],
                "source": d["source"],
                "chunk_id": idx
            })
    return chunks, metas

ALL_DOCS = BUILTIN_KB_DOCS + pdf_docs
chunks, metas = build_chunks(ALL_DOCS, CONFIG["chunk_size"], CONFIG["chunk_overlap"])

print("Total docs:", len(ALL_DOCS))
print("Total chunks:", len(chunks))


# Embeddings + FAISS index
embedder = SentenceTransformer(CONFIG["embedding_model"])

def build_faiss_index(chunks: List[str]) -> (faiss.IndexFlatIP, np.ndarray):
    emb = embedder.encode(chunks, normalize_embeddings=True, show_progress_bar=True)
    emb = np.asarray(emb, dtype="float32")
    dim = emb.shape[1]
    index = faiss.IndexFlatIP(dim)  # cosine via inner product on normalized vectors
    index.add(emb)
    return index, emb

index, emb_matrix = build_faiss_index(chunks)
print("FAISS index built ✅ | vectors:", index.ntotal)


Total docs: 5
Total chunks: 6


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


modules.json:   0%|          | 0.00/349 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/116 [00:00<?, ?B/s]

README.md: 0.00B [00:00, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/612 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/90.9M [00:00<?, ?B/s]

Loading weights:   0%|          | 0/103 [00:00<?, ?it/s]

BertModel LOAD REPORT from: sentence-transformers/all-MiniLM-L6-v2
Key                     | Status     |  | 
------------------------+------------+--+-
embeddings.position_ids | UNEXPECTED |  | 

Notes:
- UNEXPECTED	:can be ignored when loading from different task/architecture; not ok if you expect identical arch.


tokenizer_config.json:   0%|          | 0.00/350 [00:00<?, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

Batches:   0%|          | 0/1 [00:00<?, ?it/s]

FAISS index built ✅ | vectors: 6


In [None]:
INDEX_PATH = os.path.join(CONFIG["artifacts_dir"], "faiss.index")
META_PATH = os.path.join(CONFIG["artifacts_dir"], "metas.json")
CHUNKS_PATH = os.path.join(CONFIG["artifacts_dir"], "chunks.json")
CONFIG_PATH = os.path.join(CONFIG["artifacts_dir"], "config.json")

def save_artifacts():
    faiss.write_index(index, INDEX_PATH)
    with open(META_PATH, "w") as f:
        json.dump(metas, f, indent=2)
    with open(CHUNKS_PATH, "w") as f:
        json.dump(chunks, f, indent=2)
    with open(CONFIG_PATH, "w") as f:
        json.dump(CONFIG, f, indent=2)
    print("Saved artifacts ✅ to:", CONFIG["artifacts_dir"])

def load_artifacts():
    global index, metas, chunks
    if not (os.path.exists(INDEX_PATH) and os.path.exists(META_PATH) and os.path.exists(CHUNKS_PATH)):
        raise FileNotFoundError("Artifacts not found. Build index first or run save_artifacts().")
    index = faiss.read_index(INDEX_PATH)
    with open(META_PATH, "r") as f:
        metas = json.load(f)
    with open(CHUNKS_PATH, "r") as f:
        chunks = json.load(f)
    print("Loaded artifacts ✅ from:", CONFIG["artifacts_dir"], "| vectors:", index.ntotal)

save_artifacts()


Saved artifacts ✅ to: artifacts_vda


In [None]:
def retrieve(query: str, top_k: int = 5) -> List[Dict[str, Any]]:
    qv = embedder.encode([query], normalize_embeddings=True)
    qv = np.asarray(qv, dtype="float32")
    scores, ids = index.search(qv, top_k)
    results = []
    for score, i in zip(scores[0], ids[0]):
        if i < 0:
            continue
        results.append({
            "score": float(score),
            "text": chunks[i],
            "meta": metas[i]
        })
    return results

print("\nRetriever test:")
hits = retrieve("P0171 lean rough idle high fuel trims vacuum leak", top_k=3)
for h in hits:
    print(f" - {h['meta']['title']} | score={h['score']:.3f}")



Retriever test:
 - DTC P0171 - System Too Lean (Bank 1) | score=0.676
 - Vacuum Leak Check Procedure | score=0.562
 - O2 Sensor & Exhaust Leak Notes | score=0.471


In [None]:
tok = AutoTokenizer.from_pretrained(CONFIG["gen_model"])
mdl = AutoModelForSeq2SeqLM.from_pretrained(CONFIG["gen_model"])
generator = pipeline("text-generation", model=mdl, tokenizer=tok, max_length=256)

def rag_answer(user_input: str, retrieved: List[Dict[str, Any]]) -> str:
    evidence = "\n".join([f"- ({r['meta']['title']}) {r['text']}" for r in retrieved])

    prompt = f"""
You are an automotive diagnostic assistant.
Use ONLY the evidence below. If evidence is insufficient, say what is missing.

User input: {user_input}

Evidence:
{evidence}

Return in this format:
1) Likely root causes (ranked)
2) Quick checks (step-by-step)
3) Risk/urgency (Low/Medium/High) with reason
4) Suggested next action
"""
    out = generator(prompt, do_sample=False)[0]["generated_text"]
    return out

Loading weights:   0%|          | 0/282 [00:00<?, ?it/s]

Passing `generation_config` together with generation-related arguments=({'max_length'}) is deprecated and will be removed in future versions. Please pass either a `generation_config` object OR all generation parameters explicitly, but not both.
The model 'T5ForConditionalGeneration' is not supported for text-generation. Supported models are ['PeftModelForCausalLM', 'AfmoeForCausalLM', 'ApertusForCausalLM', 'ArceeForCausalLM', 'AriaTextForCausalLM', 'BambaForCausalLM', 'BartForCausalLM', 'BertLMHeadModel', 'BertGenerationDecoder', 'BigBirdForCausalLM', 'BigBirdPegasusForCausalLM', 'BioGptForCausalLM', 'BitNetForCausalLM', 'BlenderbotForCausalLM', 'BlenderbotSmallForCausalLM', 'BloomForCausalLM', 'BltForCausalLM', 'CamembertForCausalLM', 'LlamaForCausalLM', 'CodeGenForCausalLM', 'CohereForCausalLM', 'Cohere2ForCausalLM', 'CpmAntForCausalLM', 'CTRLLMHeadModel', 'CwmForCausalLM', 'Data2VecTextForCausalLM', 'DbrxForCausalLM', 'DeepseekV2ForCausalLM', 'DeepseekV3ForCausalLM', 'DiffLlamaForCa

In [None]:
def diagnostic_agent(user_input: str) -> Dict[str, Any]:
    retrieved = retrieve(user_input, top_k=CONFIG["top_k"])
    answer = rag_answer(user_input, retrieved)

    evidence_citations = [
        {"title": r["meta"]["title"], "doc_id": r["meta"]["doc_id"], "score": round(r["score"], 4)}
        for r in retrieved
    ]
    return {
        "agent": "diagnostic_agent",
        "answer": answer,
        "evidence": evidence_citations
    }

def risk_agent(user_input: str, diagnostic_text: str) -> Dict[str, Any]:
    # Day-1 heuristic; you can replace with an LLM or a classifier later
    text = (user_input + " " + diagnostic_text).lower()

    high_keywords = ["stall", "no start", "overheat", "loss of power", "flashing", "severe misfire"]
    med_keywords = ["rough idle", "hesitation", "lean", "misfire", "high fuel trim", "hard start"]

    if any(k in text for k in high_keywords):
        risk = "High"
        reason = "Indicators suggest immediate drivability/safety risk."
    elif any(k in text for k in med_keywords):
        risk = "Medium"
        reason = "Drivability issue that can worsen and damage components over time."
    else:
        risk = "Low"
        reason = "No strong indicators of immediate failure; monitor and verify."

    return {"agent": "risk_agent", "risk": risk, "reason": reason}

def workflow_agent(user_input: str, risk: str) -> Dict[str, Any]:
    # Ticket JSON for a service workflow system
    ticket = {
        "ticket_id": f"TKT-{uuid.uuid4().hex[:8].upper()}",
        "created_at": datetime.now().isoformat(),
        "category": "Vehicle Diagnostics",
        "priority": {"High": "P1", "Medium": "P2", "Low": "P3"}[risk],
        "summary": user_input[:160],
        "recommended_steps": [
            "Inspect intake boots, vacuum hoses, and PCV lines for leaks",
            "Run smoke test to confirm vacuum leak",
            "Clean/inspect MAF sensor; verify MAF readings",
            "Check fuel pressure and injector performance",
            "Inspect exhaust leak upstream of O2 sensor and verify O2 switching",
            "Re-check STFT/LTFT at idle and under load after repairs"
        ],
        "notes": "Generated by workflow_agent with RAG evidence support."
    }
    return {"agent": "workflow_agent", "ticket": ticket}

def run_agent_system(user_input: str) -> Dict[str, Any]:
    diag = diagnostic_agent(user_input)
    risk = risk_agent(user_input, diag["answer"])
    workflow = workflow_agent(user_input, risk["risk"])

    return {
        "input": user_input,
        "diagnostic": diag,
        "risk": risk,
        "workflow": workflow
    }


In [None]:
example_input = "DTC P0171. Rough idle and hesitation. LTFT +18% at idle. What should I check first?"
result = run_agent_system(example_input)

print("\n================= AGENT SYSTEM OUTPUT =================")
print(json.dumps(result, indent=2))



The following generation flags are not valid and may be ignored: ['temperature']. Set `TRANSFORMERS_VERBOSITY=info` for more details.
Passing `generation_config` together with generation-related arguments=({'do_sample'}) is deprecated and will be removed in future versions. Please pass either a `generation_config` object OR all generation parameters explicitly, but not both.



{
  "input": "DTC P0171. Rough idle and hesitation. LTFT +18% at idle. What should I check first?",
  "diagnostic": {
    "agent": "diagnostic_agent",
    "answer": "\nYou are an automotive diagnostic assistant.\nUse ONLY the evidence below. If evidence is insufficient, say what is missing.\n\nUser input: DTC P0171. Rough idle and hesitation. LTFT +18% at idle. What should I check first?\n\nEvidence:\n- (DTC P0171 - System Too Lean (Bank 1)) P0171 indicates the engine is running lean (too much air, too little fuel). Common causes include vacuum leaks, dirty/failed MAF sensor, low fuel pressure, clogged fuel injector, exhaust leaks upstream of O2 sensor, or faulty O2 sensor. Typical symptoms: rough idle, hesitation, poor fuel economy. Checks: inspect intake/vacuum hoses, smoke test, clean MAF, fuel pressure test, check O2 sensor readings and fuel trims.\n- (DTC P0171 - System Too Lean (Bank 1)) e test, check O2 sensor readings and fuel trims.\n- (O2 Sensor & Exhaust Leak Notes) An exha

In [None]:
def quick_eval():
    tests = [
        ("P0171 lean code vacuum leak high LTFT", ["P0171", "Vacuum Leak", "Fuel Trim"]),
        ("P0300 misfire random multiple cylinder plugs coils", ["P0300", "Misfire"]),
    ]
    print("\n================= QUICK EVAL =================")
    for q, expects in tests:
        hits = retrieve(q, top_k=3)
        titles = " | ".join([h["meta"]["title"] for h in hits])
        ok = any(any(exp.lower() in t.lower() for t in [h["meta"]["title"] for h in hits]) for exp in expects)
        print(f"Query: {q}\nTop hits: {titles}\nPass: {ok}\n")

quick_eval()



Query: P0171 lean code vacuum leak high LTFT
Top hits: DTC P0171 - System Too Lean (Bank 1) | Vacuum Leak Check Procedure | O2 Sensor & Exhaust Leak Notes
Pass: True

Query: P0300 misfire random multiple cylinder plugs coils
Top hits: DTC P0300 - Random/Multiple Cylinder Misfire | DTC P0171 - System Too Lean (Bank 1) | DTC P0171 - System Too Lean (Bank 1)
Pass: True



In [None]:
FASTAPI_APP_PATH = "api_app.py"

FASTAPI_CODE = r'''
from fastapi import FastAPI
from pydantic import BaseModel
from typing import Any, Dict, List
import json, os, re, uuid
import numpy as np
import faiss
from sentence_transformers import SentenceTransformer
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, pipeline

ART_DIR = "artifacts_vda"
INDEX_PATH = os.path.join(ART_DIR, "faiss.index")
META_PATH = os.path.join(ART_DIR, "metas.json")
CHUNKS_PATH = os.path.join(ART_DIR, "chunks.json")
CONFIG_PATH = os.path.join(ART_DIR, "config.json")

app = FastAPI(title="Vehicle Diagnostics Agentic RAG API", version="1.0")

class DiagnoseRequest(BaseModel):
    user_input: str

def load_artifacts():
    with open(CONFIG_PATH, "r") as f:
        config = json.load(f)
    index = faiss.read_index(INDEX_PATH)
    with open(META_PATH, "r") as f:
        metas = json.load(f)
    with open(CHUNKS_PATH, "r") as f:
        chunks = json.load(f)
    return config, index, metas, chunks

CONFIG, INDEX, METAS, CHUNKS = load_artifacts()

embedder = SentenceTransformer(CONFIG["embedding_model"])
tok = AutoTokenizer.from_pretrained(CONFIG["gen_model"])
mdl = AutoModelForSeq2SeqLM.from_pretrained(CONFIG["gen_model"])
generator = pipeline("text2text-generation", model=mdl, tokenizer=tok, max_length=256)

def retrieve(query: str, top_k: int):
    qv = embedder.encode([query], normalize_embeddings=True)
    qv = np.asarray(qv, dtype="float32")
    scores, ids = INDEX.search(qv, top_k)
    results = []
    for score, i in zip(scores[0], ids[0]):
        if i < 0:
            continue
        results.append({
            "score": float(score),
            "text": CHUNKS[i],
            "meta": METAS[i]
        })
    return results

def rag_answer(user_input: str, retrieved: List[Dict[str, Any]]) -> str:
    evidence = "\n".join([f"- ({r['meta']['title']}) {r['text']}" for r in retrieved])
    prompt = f"""
You are an automotive diagnostic assistant.
Use ONLY the evidence below. If evidence is insufficient, say what is missing.

User input: {user_input}

Evidence:
{evidence}

Return in this format:
1) Likely root causes (ranked)
2) Quick checks (step-by-step)
3) Risk/urgency (Low/Medium/High) with reason
4) Suggested next action
"""
    return generator(prompt, do_sample=False)[0]["generated_text"]

def risk_agent(user_input: str, diagnostic_text: str) -> Dict[str, str]:
    text = (user_input + " " + diagnostic_text).lower()
    high_keywords = ["stall", "no start", "overheat", "loss of power", "flashing", "severe misfire"]
    med_keywords = ["rough idle", "hesitation", "lean", "misfire", "high fuel trim", "hard start"]
    if any(k in text for k in high_keywords):
        return {"risk": "High", "reason": "Indicators suggest immediate drivability/safety risk."}
    if any(k in text for k in med_keywords):
        return {"risk": "Medium", "reason": "Drivability issue that can worsen over time."}
    return {"risk": "Low", "reason": "No strong indicators of immediate failure; monitor and verify."}

@app.get("/health")
def health():
    return {"status": "ok", "vectors": INDEX.ntotal}

@app.post("/diagnose")
def diagnose(req: DiagnoseRequest):
    retrieved = retrieve(req.user_input, top_k=CONFIG["top_k"])
    answer = rag_answer(req.user_input, retrieved)
    risk = risk_agent(req.user_input, answer)
    ticket = {
        "ticket_id": f"TKT-{uuid.uuid4().hex[:8].upper()}",
        "category": "Vehicle Diagnostics",
        "priority": {"High":"P1","Medium":"P2","Low":"P3"}[risk["risk"]],
        "summary": req.user_input[:160],
    }
    return {
        "input": req.user_input,
        "answer": answer,
        "risk": risk,
        "evidence": [{"title": r["meta"]["title"], "doc_id": r["meta"]["doc_id"], "score": r["score"]} for r in retrieved],
        "ticket": ticket
    }
'''

with open(FASTAPI_APP_PATH, "w") as f:
    f.write(FASTAPI_CODE)

print("\nWrote FastAPI app ✅ ->", FASTAPI_APP_PATH)
print("To run locally/Colab (advanced):")
print("  !pip -q install fastapi uvicorn")
print("  !uvicorn api_app:app --host 0.0.0.0 --port 8000")
print("Then open: /docs")


Wrote FastAPI app ✅ -> api_app.py
To run locally/Colab (advanced):
  !pip -q install fastapi uvicorn
  !uvicorn api_app:app --host 0.0.0.0 --port 8000
Then open: /docs
