In [None]:
!pip -q install pillow pymupdf sentence-transformers faiss-cpu rank_bm25 easyocr rapidfuzz pint openai

In [None]:
import os, re, json, sys, textwrap, traceback, importlib.util
import fitz, faiss, cv2, easyocr
import numpy as np, pandas as pd
from pathlib import Path
from sentence_transformers import SentenceTransformer
from rank_bm25 import BM25Okapi
from rapidfuzz import process
from pint import UnitRegistry
import nltk; nltk.download('punkt', quiet=True)

In [None]:
##import os
#os.environ["OPENAI_API_KEY"] = ""
#My Google collab

import os
from openai import OpenAI

api_key = os.environ.get("OPENAI_API_KEY")
if not api_key:
    raise ValueError("Missing OPENAI_API_KEY environment variable. Please set it before running.")

client = OpenAI(api_key=api_key)

In [None]:
CSV_PATH = "/content/rule_functional_performance_qa.csv"
IMG_DIR = Path("/content/images")

if not IMG_DIR.exists():
    os.system("git clone -q https://github.com/anniedoris/design_qa.git /content/design_qa")
    os.makedirs(IMG_DIR, exist_ok=True)
    os.system("cp -r /content/design_qa/dataset/rule_compliance/rule_functional_performance_qa/images/* /content/images/")

print("Images ready:", len(list(IMG_DIR.glob('*.png'))))

if not Path(CSV_PATH).exists():
    print("Downloading Functional Performance CSV from DesignQA GitHub...")
    os.system("wget -q -O /content/rule_functional_performance_qa.csv "
              "https://raw.githubusercontent.com/anniedoris/design_qa/main/dataset/rule_compliance/rule_functional_performance_qa/rule_functional_performance_qa.csv")

assert Path(CSV_PATH).exists(), f"Functional Performance CSV still missing at {CSV_PATH}"

df = pd.read_csv(CSV_PATH)
print(f"CSV loaded: {len(df)} rows")
RULE_PDF = "/content/FSAE_Rules_2024_V1.pdf"
if not Path(RULE_PDF).exists():
    from google.colab import files
    print("Upload your FSAE_Rules_2024_V1.pdf")
    uploaded = files.upload()
    if uploaded:
        up_name = list(uploaded.keys())[0]
        os.replace(up_name, RULE_PDF)
assert Path(RULE_PDF).exists(), "FSAE PDF missing."

In [None]:
def read_pdf_text(pdf_path):
    doc = fitz.open(pdf_path)
    txt = []
    for p in doc:
        t = p.get_text("text")
        txt.append(t)
    full = "\n".join(txt)
    full = full.replace("\r\n", "\n").replace("\r", "\n")
    full = re.sub(r'(\w)-\n(\w)', r'\1\2', full)
    full = re.sub(r'–|—', '-', full)
    full = re.sub(r'[ \t]+', ' ', full)
    full = re.sub(r'\n{3,}', '\n\n', full)
    return full

HEADER_RES = [
    re.compile(r'^\s*Formula SAE.*Page\s+\d+\s+of\s+\d+\s*$', re.I),
    re.compile(r'^\s*Version\s+\d+(\.\d+)?\s+\d{1,2}\s+\w+\s+\d{4}\s*$', re.I),
]
TOC_LINE_RE       = re.compile(r'.+\.\s?\.\s?\.\s+\d+$')
SECTION_BANNER_RE = re.compile(r'^[A-Z]{1,4}\s*-\s+.+$')
RULE_HEAD_ANCHOR  = re.compile(r'(?m)^(?:EV|T|F|GR)\.\d+(?:\.\d+)*[a-z]?\b')

def strip_noise(raw_text):
    out = []
    for ln in raw_text.splitlines():
        s = ln.rstrip().replace('\xa0', ' ')
        if any(rx.match(s) for rx in HEADER_RES): continue
        if TOC_LINE_RE.search(s): continue
        if SECTION_BANNER_RE.match(s): continue
        out.append(s)
    return "\n".join(out)

def drop_until_first_rule(text):
    m = RULE_HEAD_ANCHOR.search(text)
    return text[m.start():] if m else text

RULE_HEAD_RE = re.compile(
    r'(?m)^(?P<rid>[A-Z]{1,4}\.\d+(?:\.\d+)*[a-z]?)(?:[ \t]+(?P<title>.+))?$'
)

def parse_rules_from_text(text):
    lines = text.splitlines()
    rules = []
    cur_id, buf = None, []

    def flush():
        nonlocal cur_id, buf, rules
        if cur_id and buf:
            content = "\n".join(buf).strip()
            if content:
                if not content.startswith(cur_id):
                    content = f"{cur_id} " + content
                rules.append((cur_id, re.sub(r'\s+', ' ', content)))
        cur_id, buf = None, []

    for ln in lines:
        s = ln.strip()
        m = RULE_HEAD_RE.match(s)
        if m:
            flush()
            cur_id = m.group("rid").strip()
            buf = [s]
        else:
            if cur_id:
                buf.append(ln)
    flush()
    return rules

raw_text   = read_pdf_text(RULE_PDF)
clean_text = strip_noise(raw_text)
clean_text = drop_until_first_rule(clean_text)
rule_pairs = parse_rules_from_text(clean_text)
rule_chunks = {rid: txt for rid, txt in rule_pairs}
print("Total rules parsed:", len(rule_chunks))

In [None]:
ureg = UnitRegistry()
Q_ = ureg.Quantity
def _clean_unit(u: str) -> str:
    return (u or "").replace("µ","u").replace("μ","u").replace("°","deg").replace("º","deg").strip()
def to_canonical(val_str, unit_str):
    try:
        q = Q_(float(str(val_str).strip()), _clean_unit(unit_str))
        qb = q.to_base_units()
        return qb.magnitude, str(qb.units)
    except Exception:
        return None, None

_RID_ID_RE = re.compile(r"\b((?:EV|T|F|GR)\.\d+(?:\.\d+)*[a-z]?)\b", flags=re.I)

def _normalize_rule_id(raw):
    s = re.sub(r'\s+', '', str(raw))
    m = re.match(r'^(EV|T|F|GR)\.(\d+(?:\.\d+)*)([a-zA-Z]?)$', s, flags=re.I)
    if not m: return None
    return f"{m.group(1).upper()}.{m.group(2)}{m.group(3).lower()}"

def explicit_rule_from_question(question, rule_chunks):
    m = _RID_ID_RE.search(str(question) or "")
    if not m: return None
    rid = _normalize_rule_id(m.group(1))
    if not rid: return None
    if rid in rule_chunks: return rid
    pm = re.match(r'^(EV|T|F|GR)\.(\d+(?:\.\d+)*)([a-z])$', rid)
    if pm:
        parent = f"{pm.group(1)}.{pm.group(2)}"
        if parent in rule_chunks: return parent
    best = process.extractOne(rid, list(rule_chunks.keys()), score_cutoff=85)
    return best[0] if best else None

In [None]:
rule_ids   = list(rule_chunks.keys())
rule_texts = [f"{rid}: {rule_chunks[rid]}" for rid in rule_ids]

embedder   = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")
dense_emb  = embedder.encode(rule_texts, normalize_embeddings=True, convert_to_numpy=True)
faiss_index= faiss.IndexFlatIP(dense_emb.shape[1]); faiss_index.add(dense_emb)
bm25       = BM25Okapi([t.split() for t in rule_texts])

def retrieve_rules_hybrid(query, top_k=4):
    qv = embedder.encode([query], normalize_embeddings=True, convert_to_numpy=True)
    D, I = faiss_index.search(qv, 30)
    dense_rank = {int(i): r for r,i in enumerate(I[0])}
    bm = bm25.get_top_n(query.split(), list(range(len(rule_texts))), n=30)
    bm25_rank = {doc_id: r for r, doc_id in enumerate(bm)}
    scores = {}
    for ranking in (dense_rank, bm25_rank):
        for doc_id, r in ranking.items():
            scores[doc_id] = scores.get(doc_id, 0) + 1.0/(60 + r)
    picked = []
    for doc_id, _ in sorted(scores.items(), key=lambda x: x[1], reverse=True):
        rid = rule_ids[doc_id]
        picked.append((rid, rule_chunks[rid]))
        if len(picked) >= top_k: break
    return picked

In [None]:
from pint import UnitRegistry
ureg = UnitRegistry()
ureg.define('lbf = pound_force')
def normalize_units_pint(text):
    def convert_match(m):
        try:
            value = float(m.group(1))
            unit = m.group(2)
            qty = value * ureg(unit)
            if qty.check('[length]'):
                return f"{qty.to(ureg.mm).magnitude:.2f} mm"
            elif qty.check('[pressure]'):
                return f"{qty.to(ureg.MPa).magnitude:.2f} MPa"
            elif qty.check('[force]'):
                return f"{qty.to(ureg.N).magnitude:.2f} N"
            elif qty.check('[torque]'):
                return f"{qty.to(ureg.N*ureg.meter).magnitude:.2f} Nm"
            elif qty.check('[mass]'):
                return f"{qty.to(ureg.kg).magnitude:.2f} kg"
            elif qty.check('[temperature]'):
                return f"{qty.to(ureg.degC).magnitude:.1f} °C"
            else:
                return f"{value} {unit}"
        except Exception:
            return m.group(0)

    return re.sub(r'(\d+(?:\.\d+)?)\s*([a-zA-Z°]+)', convert_match, text)
_reader = easyocr.Reader(['en'], gpu=False)
def ocr_image(img_path: Path) -> str:
    img = cv2.imread(str(img_path))
    if img is None:
        return ""
    text = " ".join(_reader.readtext(img, detail=0, paragraph=True))
    text = re.sub(r'\s+', ' ', text)
    text = re.sub(r'(\d)\s*m\s*m\b', r'\1 mm', text, flags=re.I)
    text = normalize_units_pint(text)
    return text.strip()

In [None]:
ANSWER_MODEL = "gpt-4o-mini"
def call_answer_llm(question, ocr_text, rules_block):
    prompt = f"""You are a technical evaluator for FSAE.
Question: {question}
Image OCR: {ocr_text}

Rule(s):
{rules_block}

Task:
1) Compare OCR values against rule thresholds clearly.
2) Answer Yes/No, or INS.
3) Write explanation with explicit numeric comparison (e.g., "Measured 25 mm < required 30 mm → fails").


Respond ONLY in this exact format:
Explanation: <one or two sentences citing the compared values and rule threshold>
Answer: Yes/No/INSUFFICIENT RULE EVIDENCE
"""
    resp = client.responses.create(
        model=ANSWER_MODEL,
        input=[{"role":"user","content":prompt}],
        temperature=0
    )
    out = resp.output_text.strip()
    m_reason = re.search(r"Explanation:\s*(.+)", out)
    m_final  = re.search(r"Answer:\s*([A-Za-z ]+)", out)
    reasoning = m_reason.group(1).strip() if m_reason else ""
    final_ans = m_final.group(1).strip() if m_final else out

    fa = str(final_ans).strip().lower()
    if "insufficient" in fa:
        final_ans = "INSUFFICIENT RULE EVIDENCE"
    elif fa.startswith("y") or " yes " in f" {fa} ":
        final_ans = "Yes"
    elif fa.startswith("n") or " no " in f" {fa} ":
        final_ans = "No"
    else:
        final_ans = "INSUFFICIENT RULE EVIDENCE"
    return reasoning, final_ans

In [None]:
TOPK = 4
def build_rules_block(question: str) -> str:
    rid = explicit_rule_from_question(question, rule_chunks)
    if rid:
        pairs = [(rid, rule_chunks[rid])]
    else:
        pairs = retrieve_rules_hybrid(question, top_k=TOPK)
    return "\n\n".join([f"{rid}: {txt}" for rid, txt in pairs])

rows = []
for idx, r in enumerate(df.itertuples(index=False), 1):
    img_name = str(r.image).strip()
    img_path = IMG_DIR / img_name
    q = str(r.question)

    print(f"[{idx}/{len(df)}] Processing {img_name} …")

    ocr_txt = ocr_image(img_path)
    rules_blk = build_rules_block(q)
    expl, ans = call_answer_llm(q, ocr_txt, rules_blk)
    rows.append({
        "image": img_name,
        "model_prediction": ans,
        "reasoning": expl
    })

PRED = pd.DataFrame(rows)

In [None]:
import sys, subprocess, pandas as pd
from pathlib import Path

EVAL_DIR = Path("/content/results")
EVAL_DIR.mkdir(parents=True, exist_ok=True)

FP_GT_CSV = CSV_PATH

if not Path("/content/design_qa").exists():
    subprocess.run(["git","clone","-q","https://github.com/anniedoris/design_qa.git","/content/design_qa"], check=True)
sys.path.append("/content/design_qa")
subprocess.run([sys.executable,"-m","pip","install","-q","rouge","nltk"], check=True)
import nltk; nltk.download('punkt', quiet=True)

GT = pd.read_csv(FP_GT_CSV).copy()

for col in ["image","answer","label","gt","expected","GroundTruth","question"]:
    if col in GT.columns:
        GT[col] = GT[col].astype(str)

for col in ["image","model_prediction","reasoning"]:
    if col in PRED.columns:
        PRED[col] = PRED[col].astype(str)
from pathlib import Path as _P
def _norm_png(x):
    s = "" if pd.isna(x) else str(x).strip()
    s = _P(s).name
    if not s.lower().endswith(".png"):
        s = f"{s}.png"
    return s.lower()

GT["image_norm"]   = GT.get("image", "").apply(_norm_png)
PRED["image_norm"] = PRED.get("image", "").apply(_norm_png)
if "ground_truth" not in GT.columns:
    for c in ["answer","label","gt","expected","GroundTruth"]:
        if c in GT.columns:
            GT = GT.rename(columns={c:"ground_truth"})
            break
J = GT.merge(PRED, on="image_norm", how="inner", suffixes=("_gt","_pred"))
missing = len(GT) - len(J)
if missing > 0:
    print(f"[WARN] {missing} GT rows had no matching prediction by image name.")
YES, NO, INS = "Yes", "No", "INSUFFICIENT RULE EVIDENCE"

def _norm_truth(s: str) -> str:
    t = (s or "").strip().lower()
    if t.startswith("y"): return YES
    if t.startswith("n"): return NO
    if "insufficient" in t: return INS
    return INS

def _norm_pred_to_yn_or_ins(s: str) -> str:
    t = (s or "").strip().lower()
    if t.startswith("y"): return YES
    if t.startswith("n"): return NO
    if "insufficient" in t: return INS
    return INS

def _compose_fp_pred(row) -> str:
    reasoning = str(row.get("reasoning","") or "").strip()
    yn = _norm_pred_to_yn_or_ins(str(row.get("model_prediction","") or ""))
    expl = reasoning if reasoning else yn
    return f"Explanation: {expl} Answer: {yn}"

SAFE_EMPTY = "__"

FP_EVAL = str(EVAL_DIR / "functional_performance_eval_official.csv")
df_eval = pd.DataFrame({
    "ground_truth": J["ground_truth"].apply(lambda x: str(x) if pd.notna(x) else SAFE_EMPTY).map(_norm_truth),
    "model_prediction": J.apply(_compose_fp_pred, axis=1).astype(str)
})
df_eval["explanation"] = SAFE_EMPTY
df_eval.to_csv(FP_EVAL, index=False, na_rep=SAFE_EMPTY)
print(f"[OK] Prepared official eval CSV → {FP_EVAL}")

FP_TWO_COL = str(EVAL_DIR / "functional_performance_for_full_eval.csv")
df_two = pd.DataFrame({
    "ground_truth": J["ground_truth"].map(_norm_truth),
    "model_prediction": J["model_prediction"].map(_norm_pred_to_yn_or_ins)
})
df_two.to_csv(FP_TWO_COL, index=False)
print(f"[OK] Two-column GT vs Prediction → {FP_TWO_COL}")
from eval.metrics.metrics import eval_functional_performance_qa
acc_macro, *_ = eval_functional_performance_qa(FP_EVAL)
print(f"\nFunctional Performance — Accuracy (macro): {acc_macro:.3f}")
num_questions = len(J)
with open("/content/functional_performance.txt", "w") as f:
    f.write("DesignQA Results\n")
    f.write("Subset: Functional_Performance\n")
    f.write(f"Num Questions: {num_questions}\n")
    f.write(f"ACC: {acc_macro:.6f}\n")
print("Score file → /content/functional_performance.txt")