In [17]:
from __future__ import annotations

from dataclasses import dataclass
from pathlib import Path
from typing import Callable, Dict, Iterator, List, Optional, Sequence, NamedTuple

import csv

# ===================== Exceptions =====================

class UmlsError(Exception):
    """Base exception for UMLS readers."""


class SchemaNotFound(UmlsError):
    """Raised when a file's schema is missing."""
    pass


class ColumnMismatch(UmlsError):
    """Raised when row column count doesn't match schema."""
    pass


# ===================== Schema =====================

@dataclass(frozen=True)
class UmlsSchema:
    """Mapping: RRF filename → ordered column names (from MRFILES.FMT)."""
    by_file: Dict[str, List[str]]

    def files(self) -> List[str]:
        return sorted(self.by_file.keys())

    def columns(self, rrf_filename: str) -> List[str]:
        cols = self.by_file.get(rrf_filename)
        if not cols:
            raise SchemaNotFound(f"Không thấy schema cho {rrf_filename}.")
        return cols

    def __len__(self) -> int:
        return len(self.by_file)


# ===================== Low-level RRF utils =====================

def _read_rrf_lines(path: Path) -> Iterator[List[str]]:
    """
    Đọc file .RRF với delimiter '|'. Mỗi dòng kết thúc bằng '|', vì vậy trường cuối sẽ rỗng.
    Ta bỏ trường rỗng cuối để số cột khớp với schema.
    """
    with path.open("r", encoding="utf-8", newline="") as f:
        reader = csv.reader(f, delimiter="|", quoting=csv.QUOTE_NONE)
        for row in reader:
            if not row:
                continue
            if row[-1] == "":
                row = row[:-1]
            yield row


def _ensure_file(path: Path) -> Path:
    if not path.exists():
        raise FileNotFoundError(f"Không tìm thấy file: {path}")
    return path


# ===================== Load schema =====================

def load_schema_from_mrfiles(meta_dir: Path) -> UmlsSchema:
    """
    Lấy thứ tự cột CHUẨN từ MRFILES.RRF.
    MRFILES: FIL | DES | FMT | CLS | RWS | BTS
    FMT là danh sách tên cột, phân cách bởi dấu ',' theo đúng thứ tự.
    """
    mrfiles = _ensure_file(meta_dir / "MRFILES.RRF")
    by_file: Dict[str, List[str]] = {}

    for row in _read_rrf_lines(mrfiles):
        if len(row) < 3:
            # MRFILES chuẩn có >= 6 cột, nhưng ta chỉ dùng FIL, DES, FMT
            raise ColumnMismatch(f"MRFILES.RRF: dòng không đủ cột: {row}")

        fil = row[0].strip()
        fmt = row[2].strip()
        if not fil:
            continue

        cols = [c.strip() for c in fmt.split(",")] if fmt else []
        cols = [c for c in cols if c]  # lọc rỗng
        by_file[fil] = cols

    if not by_file:
        raise UmlsError("MRFILES.RRF không trả về schema nào.")
    return UmlsSchema(by_file=by_file)


def optional_validate_with_mrcols(meta_dir: Path, schema: UmlsSchema) -> None:
    """
    (Tùy chọn) Đối chiếu với MRCOLS.RRF để phát hiện cột “lạ”.
    MRCOLS: COL | DES | REF | MIN | AV | MAX | FIL | DTY
    Không dùng để xác định THỨ TỰ, chỉ kiểm tra sự hiện diện tên cột.
    """
    mrcols_path = meta_dir / "MRCOLS.RRF"
    if not mrcols_path.exists():
        return

    valid_pairs = set()  # (FIL, COL)
    for row in _read_rrf_lines(mrcols_path):
        if len(row) < 7:
            # Bỏ qua dòng lỗi định dạng, không fail cứng
            continue
        col, fil = row[0].strip(), row[6].strip()
        if fil and col:
            valid_pairs.add((fil, col))

    problems: List[str] = []
    for fil, cols in schema.by_file.items():
        for c in cols:
            if (fil, c) not in valid_pairs:
                problems.append(f"{fil}: cột '{c}' không thấy trong MRCOLS.RRF")

    if problems:
        # Cảnh báo thay vì raise (vì một số bản phát hành có sai khác nhỏ)
        print("⚠️  Cảnh báo đối chiếu MRCOLS:")
        for p in problems[:20]:
            print("   -", p)
        if len(problems) > 20:
            print(f"   ... (+{len(problems)-20} dòng nữa)")


# ===================== RRF Reader =====================

class RRFReader:
    """
    Reader cho *.RRF, dựa vào schema từ MRFILES.
    - iter_rows(): Iterator[List[str]]
    - iter_dicts(): Iterator[Dict[str, str]]
    - iter_namedtuples(): Iterator[NamedTuple]
    - head(): List[Dict[str, str]]
    - to_dataframe(): pandas.DataFrame (nếu có pandas)
    - describe_file(): in thông tin nhanh
    """

    def __init__(self, meta_dir: Path, schema: UmlsSchema, *, strict_columns: bool = True):
        self.meta_dir = meta_dir
        self.schema = schema
        self.strict_columns = strict_columns

    def _path(self, rrf_filename: str) -> Path:
        return _ensure_file(self.meta_dir / rrf_filename)

    def iter_rows(self, rrf_filename: str) -> Iterator[List[str]]:
        expected = len(self.schema.columns(rrf_filename))
        for row in _read_rrf_lines(self._path(rrf_filename)):
            if self.strict_columns and expected and len(row) != expected:
                raise ColumnMismatch(
                    f"{rrf_filename}: số cột thực tế {len(row)} ≠ kỳ vọng {expected}. "
                    f"Ví dụ row: {row[:min(10, len(row))]}..."
                )
            yield row

    def iter_dicts(
        self,
        rrf_filename: str,
        select: Optional[Sequence[str]] = None,
        where: Optional[Callable[[Dict[str, str]], bool]] = None,
    ) -> Iterator[Dict[str, str]]:
        cols = self.schema.columns(rrf_filename)
        out_cols = list(cols) if not select else list(select)

        # map indices khi có select
        indices: Optional[List[int]] = None
        if select:
            unknown = [c for c in select if c not in cols]
            if unknown:
                raise SchemaNotFound(f"Cột không tồn tại trong {rrf_filename}: {unknown}")
            indices = [cols.index(c) for c in select]

        for row in self.iter_rows(rrf_filename):
            if indices is None:
                d = {c: row[i] if i < len(row) else "" for i, c in enumerate(cols)}
            else:
                d = {c: row[i] for c, i in zip(out_cols, indices)}
            if where is None or where(d):
                yield d

    def iter_namedtuples(self, rrf_filename: str) -> Iterator[NamedTuple]:
        cols = self.schema.columns(rrf_filename)
        TupleCls = NamedTuple(f"{Path(rrf_filename).stem}Row", [(c, str) for c in cols])
        for row in self.iter_rows(rrf_filename):
            yield TupleCls(*row)

    def head(
        self,
        rrf_filename: str,
        n: int = 5,
        select: Optional[Sequence[str]] = None,
        where: Optional[Callable[[Dict[str, str]], bool]] = None,
    ) -> List[Dict[str, str]]:
        out: List[Dict[str, str]] = []
        for d in self.iter_dicts(rrf_filename, select=select, where=where):
            out.append(d)
            if len(out) >= n:
                break
        return out

    def to_dataframe(
        self,
        rrf_filename: str,
        select: Optional[Sequence[str]] = None,
        limit: Optional[int] = None,
        dtype: Optional[Dict[str, str]] = None,
        where: Optional[Callable[[Dict[str, str]], bool]] = None,
    ):
        try:
            import pandas as pd  # type: ignore
        except Exception as exc:
            raise RuntimeError("Cần cài pandas để dùng to_dataframe(). `pip install pandas`") from exc

        cols = self.schema.columns(rrf_filename)
        if select is None:
            select = cols

        data: List[Dict[str, str]] = []
        for i, d in enumerate(self.iter_dicts(rrf_filename, select=select, where=where)):
            data.append(d)
            if limit is not None and i + 1 >= limit:
                break

        df = pd.DataFrame(data, columns=list(select))
        if dtype:
            safe = {k: v for k, v in dtype.items() if k in df.columns}
            if safe:
                df = df.astype(safe, errors="ignore")
        return df

    def describe_file(
        self,
        rrf_filename: str,
        n: int = 5,
        select: Optional[Sequence[str]] = None,
        where: Optional[Callable[[Dict[str, str]], bool]] = None,
    ) -> None:
        cols = self.schema.columns(rrf_filename)
        print(f"{rrf_filename}: {len(cols)} columns")
        print(", ".join(cols))
        print("-- head --")
        for row in self.head(rrf_filename, n=n, select=select, where=where):
            print(row)
        print()


# ===================== Example usage =====================

if __name__ == "__main__":
    # Chỉnh đường dẫn META cho đúng
    META = Path(r"./data/2025AA/META")

    # 1) Build schema từ MRFILES (chuẩn) + (tuỳ chọn) đối chiếu MRCOLS
    schema = load_schema_from_mrfiles(META)
    optional_validate_with_mrcols(META, schema)

    # 2) In nhanh 10 file đầu
    print("== UMLS META Schema ==")
    for f in schema.files()[:10]:
        print(f"{f}: {len(schema.columns(f))} columns")
    print()

    # 3) Tạo reader
    reader = RRFReader(META, schema, strict_columns=True)

    # 4) Demo: MRCONSO – lấy vài cột & lọc ENG + ISPREF=Y
    try:
        reader.describe_file(
            "MRCONSO.RRF",
            n=5,
            select=["CUI", "LAT", "ISPREF", "SAB", "TTY", "STR"],
            where=lambda d: d.get("LAT") == "ENG" and d.get("ISPREF") == "Y",
        )
    except Exception as e:
        print(f"⚠️ {e}")

    # 5) Demo: MRSTY – xem 10 dòng vào DataFrame (nếu có pandas)
    try:
        df = reader.to_dataframe("MRSTY.RRF", limit=10)
        print(df.head(3))
    except Exception as e:
        print(f"⚠️ {e}")


== UMLS META Schema ==
AMBIGLUI.RRF: 2 columns
AMBIGSUI.RRF: 2 columns
CHANGE/DELETEDCUI.RRF: 2 columns
CHANGE/DELETEDLUI.RRF: 2 columns
CHANGE/DELETEDSUI.RRF: 3 columns
CHANGE/MERGEDCUI.RRF: 2 columns
CHANGE/MERGEDLUI.RRF: 2 columns
MRAUI.RRF: 9 columns
MRCOLS.RRF: 8 columns
MRCONSO.RRF: 18 columns

MRCONSO.RRF: 18 columns
CUI, LAT, TS, LUI, STT, SUI, ISPREF, AUI, SAUI, SCUI, SDUI, SAB, TTY, CODE, STR, SRL, SUPPRESS, CVF
-- head --
{'CUI': 'C0000005', 'LAT': 'ENG', 'ISPREF': 'Y', 'SAB': 'MSH', 'TTY': 'PEP', 'STR': '(131)I-Macroaggregated Albumin'}
{'CUI': 'C0000005', 'LAT': 'ENG', 'ISPREF': 'Y', 'SAB': 'MSH', 'TTY': 'ET', 'STR': '(131)I-MAA'}
{'CUI': 'C0000039', 'LAT': 'ENG', 'ISPREF': 'Y', 'SAB': 'MTH', 'TTY': 'PN', 'STR': '1,2-dipalmitoylphosphatidylcholine'}
{'CUI': 'C0000039', 'LAT': 'ENG', 'ISPREF': 'Y', 'SAB': 'MSH', 'TTY': 'MH', 'STR': '1,2-Dipalmitoylphosphatidylcholine'}
{'CUI': 'C0000039', 'LAT': 'ENG', 'ISPREF': 'Y', 'SAB': 'MSH', 'TTY': 'PM', 'STR': '1,2 Dipalmitoylphospha

In [20]:
### List all unique SAB value in MRCONSO.RRF

# Begin 

unique_sabs = set()
for row in reader.iter_dicts("MRCONSO.RRF", select=["SAB"]):
    sab = row.get("SAB")
    if sab:
        unique_sabs.add(sab)

print("Unique SAB values in MRCONSO.RRF:")
for sab in sorted(unique_sabs):
    print(sab)

Unique SAB values in MRCONSO.RRF:
AIR
AOD
AOT
ATC
CCS
CCSR_ICD10CM
CCSR_ICD10PCS
CDCREC
CHV
COSTAR
CSP
CST
CVX
DRUGBANK
DXP
FMA
GO
HCPCS
HGNC
HL7V2.5
HL7V3.0
HPO
ICD10PCS
ICD9CM
ICPC
ICPCBAQ
ICPCDAN
ICPCDUT
ICPCFIN
ICPCFRE
ICPCGER
ICPCHEB
ICPCHUN
ICPCITA
ICPCNOR
ICPCPOR
ICPCSPA
ICPCSWE
LCH
LCH_NW
LNC
LNC-DE-AT
LNC-DE-DE
LNC-EL-GR
LNC-ES-AR
LNC-ES-ES
LNC-ES-MX
LNC-ET-EE
LNC-FR-BE
LNC-FR-CA
LNC-FR-FR
LNC-IT-IT
LNC-KO-KR
LNC-NL-NL
LNC-PL-PL
LNC-PT-BR
LNC-RU-RU
LNC-TR-TR
LNC-UK-UA
LNC-ZH-CN
MCM
MED-RT
MEDLINEPLUS
MEDLINEPLUS_SPA
MSH
MSHGER
MTH
MTHCMSFRF
MTHICD9
MTHMST
MTHMSTFRE
MTHMSTITA
MTHSPL
MVX
NCBI
NCI
OMIM
PDQ
QMR
RAM
RXNORM
SOP
SPN
SRC
TKMT
USP
USPMG
UWDA
VANDF
