## Table Question Answering

环境与依赖

In [1]:
import re
import pandas as pd
from difflib import SequenceMatcher
from mindspore import context
from mindnlp.transformers import AutoTokenizer, AutoModelForQuestionAnswering, pipeline

  setattr(self, word, getattr(machar, word).flat[0])
  return self._float_to_str(self.smallest_subnormal)
  setattr(self, word, getattr(machar, word).flat[0])
  return self._float_to_str(self.smallest_subnormal)
                                                       mindspore.device_context.ascend.op_precision.op_precision_mode(),
                                                       mindspore.device_context.ascend.op_precision.matmul_allow_hf32(),
                                                       mindspore.device_context.ascend.op_precision.conv_allow_hf32(),
                                                       mindspore.device_context.ascend.op_tuning.op_compile() instead.
Building prefix dict from the default dictionary ...
Loading model from cache /tmp/jieba.cache
Loading model cost 2.127 seconds.
Prefix dict has been built successfully.


加载模型与 QA 管线

In [2]:
MODEL = "distilbert-base-uncased-distilled-squad"
tok = AutoTokenizer.from_pretrained(MODEL)
mdl = AutoModelForQuestionAnswering.from_pretrained(MODEL)
qa = pipeline(
    "question-answering",
    model=mdl,
    tokenizer=tok,
    max_answer_len=8,                 
    handle_impossible_answer=True,
    clean_up_tokenization_spaces=False
)



通用工具函数

In [3]:
# 计算两个字符串相似度
def ngram_sim(a: str, b: str) -> float:
    return SequenceMatcher(None, str(a).lower(), str(b).lower()).ratio()
    
# 判断是否为数值列
def is_numeric_series(s: pd.Series) -> bool:
    s = s.astype(str).str.replace(",", "").str.replace("_", "")
    return pd.to_numeric(s, errors="coerce").notna().mean() >= 0.7
    
# 把列转成可计算的数字
def to_numeric(series: pd.Series) -> pd.Series:
    s = series.astype(str).str.replace(",", "").str.replace("_", "")
    s = s.str.replace(r"(?i)\s*k$", "e3", regex=True)\
         .str.replace(r"(?i)\s*m$", "e6", regex=True)\
         .str.replace(r"(?i)\s*b$", "e9", regex=True)
    return pd.to_numeric(s, errors="coerce")

# 选出实体
def pick_entity_col(df: pd.DataFrame):
    best, col = -1.0, df.columns[0]
    for c in df.columns:
        if is_numeric_series(df[c]): 
            continue
        ratio = df[c].nunique() / max(1, len(df))
        if ratio > best:
            best, col = ratio, c
    return col

# 在所有数值列里，找跟问题最相关的那一列
def best_numeric_col(question: str, df: pd.DataFrame):
    cands = [c for c in df.columns if is_numeric_series(df[c])]
    if not cands:
        return None
    return max(cands, key=lambda c: ngram_sim(question, c))

# 测量型问题要抽取数值，计数型问题才数行数，所以需要 looks_like_measurement_question 筛选出测量型问题来阻止误用 COUNT。
def looks_like_measurement_question(question: str, df: pd.DataFrame) -> bool:
    ql = question.lower()
    for c in df.columns:
        cname = str(c).lower()
        if cname in ql and is_numeric_series(df[c]):
            if re.search(r"\b(has|have|does)\b", ql) or re.search(r"(是多少|有多少)", question):
                return True
    return False

表格序列化、切块与分块

In [4]:
# 将表格变成几行文本
def serialize_table(df: pd.DataFrame) -> str:
    cols = list(df.columns)
    ent_col = pick_entity_col(df)
    lines = []
    DELIM_RE = r"[,/;|、]|(?:\s+and\s+)|(?:\s+与\s+)|(?:\s+和\s+)"

    for _, r in df.iterrows():
        ent = str(r[ent_col])
        # 基础属性句
        pieces = [f"{c} is {r[c]}" for c in cols]
        lines.append(". ".join(pieces) + f". The {ent_col} {ent} has these attributes.")

        # 对所有非数值列补充同义句
        for c in cols:
            if c == ent_col or is_numeric_series(df[c]):
                continue
            cell = str(r[c]).strip()
            if not cell:
                continue

            # 单值
            lines.append(f"The {ent} has {c} {cell}.")
            lines.append(f"{ent} includes {cell} in {c}.")
            lines.append(f"{ent} uses {cell}.")
            lines.append(f"{cell} is the {c} of {ent}.")

            # 多值
            toks = [t.strip() for t in re.split(DELIM_RE, cell, flags=re.IGNORECASE) if t and t.strip()]
            if len(toks) > 1:
                for t in toks:
                    if re.fullmatch(r"\d+(\.\d+)?", t) or len(t) > 40:
                        continue
                    lines.append(f"The {ent} has {c} {t}.")
                    lines.append(f"{ent} includes {t} in {c}.")
                    lines.append(f"{ent} uses {t}.")
                    lines.append(f"{t} is the {c} of {ent}.")

    return "\n".join(lines)


# 分块后逐块问，避免关键行被截断
def chunk_by_lines(text: str, max_chars: int = 900):
    lines = text.splitlines()
    chunks, cur, cur_len = [], [], 0
    for ln in lines:
        if cur_len + len(ln) + 1 > max_chars and cur:
            chunks.append("\n".join(cur)); cur, cur_len = [], 0
        cur.append(ln); cur_len += len(ln) + 1
    if cur:
        chunks.append("\n".join(cur))
    return chunks

# 对每个上下文块都运行一遍问答，取 score 最高的那个作为最终答案
def qa_best(question: str, context_text: str):
    chunks = chunk_by_lines(context_text, max_chars=900)
    best = None
    for i, ctx in enumerate(chunks):
        out = qa(question=question, context=ctx)
        cand = out[0] if isinstance(out, list) else out
        cand["_chunk_id"] = i
        if best is None or float(cand.get("score", 0.0)) > float(best.get("score", 0.0)):
            best = cand
    return best

触发词+尝试聚合

In [5]:
TRIGGERS = {
    "SUM_EN":   r"\b(total|sum)\b",
    "COUNT_EN": r"\b(how many|count)\b",
    "MAX_EN":   r"\b(max|maximum|largest|highest|most)\b",
    "MIN_EN":   r"\b(min|minimum|smallest|lowest|least)\b",
    "SUM_ZH":   r"(合计|总计|总数)",
    "COUNT_ZH": r"(多少|几(个|项|台|人|名)?)",
    "MAX_ZH":   r"(最多|最大|最高)",
    "MIN_ZH":   r"(最少|最小|最低)",
}

In [6]:
def try_aggregate(question: str, df: pd.DataFrame):
    q = question.lower()

    # MAX / MIN
    if re.search(TRIGGERS["MAX_EN"], q) or re.search(TRIGGERS["MAX_ZH"], question):
        score_col = best_numeric_col(question, df) or df.columns[0]
        vals = to_numeric(df[score_col])
        idx = vals.idxmax()
        ent_col = pick_entity_col(df)
        return {"mode":"AGG", "op":"ARGMAX", "answer": str(df.iloc[idx][ent_col]),
                "score_col": score_col, "entity_col": ent_col}

    if re.search(TRIGGERS["MIN_EN"], q) or re.search(TRIGGERS["MIN_ZH"], question):
        score_col = best_numeric_col(question, df) or df.columns[0]
        vals = to_numeric(df[score_col])
        idx = vals.idxmin()
        ent_col = pick_entity_col(df)
        return {"mode":"AGG", "op":"ARGMIN", "answer": str(df.iloc[idx][ent_col]),
                "score_col": score_col, "entity_col": ent_col}

    # SUM
    if re.search(TRIGGERS["SUM_EN"], q) or re.search(TRIGGERS["SUM_ZH"], question):
        col = best_numeric_col(question, df) or df.columns[0]
        vals = to_numeric(df[col]).dropna()
        return {"mode":"AGG", "op":"SUM", "answer": str(int(vals.sum())) if len(vals) else "0",
                "column": col}

    # COUNT（测量型问句使用抽取式QA）
    if re.search(TRIGGERS["COUNT_EN"], q) or re.search(TRIGGERS["COUNT_ZH"], question):
        if looks_like_measurement_question(question, df):
            return None
        kw = None
        m = re.search(r"use\s+([a-z0-9\.\-\+\s]+)", q) or re.search(r"(?:语言|language)\s*(?:为|是|=)\s*([a-z0-9\.\-\+]+)", question, flags=re.IGNORECASE)
        if m:
            kw = m.group(1).strip().lower()
        if kw:
            cnt = 0
            for _, row in df.iterrows():
                if kw in " ".join(map(str, row.values)).lower():
                    cnt += 1
            return {"mode":"AGG", "op":"COUNT_KW", "answer": str(cnt), "keyword": kw}
        return {"mode":"AGG", "op":"COUNT_ALL", "answer": str(len(df))}
    return None

表格问答主入口

In [7]:
# 能聚合的题（MAX/MIN/SUM/COUNT）先直接算，不能聚合就把表序列化成文本逐块运行抽取式 QA
def answer_table_question(question: str, df: pd.DataFrame):
    agg = try_aggregate(question, df)
    if agg is not None:
        return agg
    context_text = serialize_table(df)
    qa_out = qa_best(question, context_text) or {}
    if "answer" in qa_out:
        return {"mode":"QA", "answer": qa_out["answer"], "score": qa_out.get("score", 0.0)}
    return {"mode":"QA", "answer": ""}

示例数据与问答

In [8]:
if __name__ == "__main__":
    table_df = pd.DataFrame({
        "Repository": ["Transformers", "Datasets", "Tokenizers"],
        "Stars": ["36,542", "4,512", "3,934"],
        "Contributors": ["651", "77", "34"],
        "Programming language": ["Python", "Python", "Rust, Python and NodeJS"]
    })

    questions = [
        "Which repository has the most stars?",
        "How many stars does the Transformers repository have?",
        "Which repository uses Rust?",
        "What is the programming language of Datasets?",
        "What is the total number of contributors?",
        "How many repositories use Python?",
        "最少贡献者的是哪个仓库？",
        "Stars 最多的是哪个仓库？",
    ]

    for q in questions:
        ans = answer_table_question(q, table_df)
        extra = {k:v for k,v in ans.items() if k not in ["answer","mode"]}
        print(f"\nQ: {q}\nA: {ans['answer']}")


Q: Which repository has the most stars?
A: Transformers

Q: How many stars does the Transformers repository have?
A: 36,542

Q: Which repository uses Rust?
A: Tokenizers

Q: What is the programming language of Datasets?
A: Python

Q: What is the total number of contributors?
A: 762

Q: How many repositories use Python?
A: 3

Q: 最少贡献者的是哪个仓库？
A: Tokenizers

Q: Stars 最多的是哪个仓库？
A: Transformers
