In [8]:
import math, unicodedata, torch
from typing import List, Tuple, Optional, Union, Dict, Any, Iterable, Callable
from transformers import AutoTokenizer, AutoModelForCausalLM
import json, csv
import os
import re, html
try:
    import markdown as mdlib
    from bs4 import BeautifulSoup
    _HAVE_MD_STACK = True
except Exception:
    _HAVE_MD_STACK = False

# -------- utils --------
def md_to_plain_fallback(s: str, strip_code_blocks: bool = True) -> str:
    """
    fallback อย่างง่าย: ตัด markdown เบื้องต้นด้วย regex
    """
    text = s
    if strip_code_blocks:
        text = re.sub(r"```.*?```", "", text, flags=re.DOTALL)
        text = re.sub(r"`[^`]*`", "", text)
    else:
        # เก็บ code blocks ไว้แต่ลอก tag อื่น ๆ ออก
        pass
    text = re.sub(r"\!\[[^\]]*\]\([^)]*\)", "", text)      # images
    text = re.sub(r"\[[^\]]*\]\([^)]*\)", r"\1", text)     # links -> anchor text
    text = re.sub(r"[#*_>~-]+", " ", text)                 # md symbols
    text = re.sub(r"\s+\n", "\n", text)
    text = re.sub(r"\n{3,}", "\n\n", text)
    return text.strip()

def is_markdown_path(path: str) -> bool:
    ext = os.path.splitext(path)[1].lower()
    return ext in (".md", ".markdown")

def md_to_plain_external(s: str, strip_code_blocks: bool = True) -> str:
    """
    ใช้ markdown → HTML แล้วใช้ BeautifulSoup ดึงข้อความล้วน
    - strip_code_blocks=True จะลบ <pre>/<code> ออก
    - False จะคงโค้ดบล็อกไว้ โดยห่อด้วย ``` และ inline code ด้วย backticks
    """
    if not _HAVE_MD_STACK:
        return md_to_plain_fallback(s, strip_code_blocks)

    html_doc = mdlib.markdown(
        s,
        extensions=[
            "extra",          # รวม tables, abbr, etc.
            "fenced_code",
            "sane_lists",
            "codehilite",
            "toc",
            "smarty",
        ],
        output_format="html5",
    )
    soup = BeautifulSoup(html_doc, "html.parser")

    # จัดการรูปภาพ: ใช้ alt เป็นข้อความ
    for img in soup.find_all("img"):
        alt = img.get("alt") or ""
        img.replace_with(alt)

    if strip_code_blocks:
        # ลบ code blocks ทั้งหมด
        for tag in soup.find_all(["pre", "code"]):
            tag.decompose()
    else:
        # เก็บ code blocks: แทน <pre> ด้วย ``` ... ```
        for pre in soup.find_all("pre"):
            code_text = pre.get_text("\n")
            pre.replace_with("\n```\n" + code_text + "\n```\n")
        # inline code: <code>...</code> → `...`
        for code in soup.find_all("code"):
            if code.parent and code.parent.name != "pre":
                code.replace_with("`" + code.get_text() + "`")

    # ลิงก์: ใช้ข้อความลิงก์ (ตัว soup.get_text จะเก็บ text ของ <a> ให้อยู่แล้ว)
    text = soup.get_text("\n")

    # ทำความสะอาดเล็กน้อย
    text = html.unescape(text)
    text = re.sub(r"[ \t]+\n", "\n", text)
    text = re.sub(r"\n{3,}", "\n\n", text)
    return text.strip()

def detect_file_format(path: str) -> str:
    ext = os.path.splitext(path)[1].lower()
    if ext in (".md", ".markdown"):
        return "md"
    if ext == ".csv":
        return "csv"
    if ext in (".jsonl", ".ndjson"):
        return "jsonl"
    return "text"  # txt/อื่น ๆ

def autodetect_text_field(keys: List[str]) -> Optional[str]:
    # เดาจากคีย์ที่พบบ่อย
    candidates = ["ข้อความ", "text", "Text", "message", "content", "utterance"]
    for c in candidates:
        if c in keys:
            return c
    return None

def read_jsonl_texts(
    path: str,
    field: Optional[str] = None,
    max_rows: Optional[int] = None,
    skip_empty: bool = True,
    encoding: str = "utf-8"
) -> Tuple[List[str], Dict[str, Any]]:
    texts: List[str] = []
    num_lines = 0
    used_field = field
    with open(path, "r", encoding=encoding) as f:
        for line in f:
            line = line.strip()
            if not line:
                continue
            try:
                obj = json.loads(line)
            except Exception:
                continue
            if used_field is None:
                used_field = autodetect_text_field(list(obj.keys()))
            val = obj.get(used_field) if used_field else None
            if val is None:
                # ถ้าไม่มีฟิลด์ชัดเจน ลองเลือกฟิลด์แรกที่เป็นสตริง
                for k, v in obj.items():
                    if isinstance(v, str):
                        val = v
                        used_field = k
                        break
            if isinstance(val, str):
                s = val.strip()
                if (not skip_empty) or s:
                    texts.append(s)
                    num_lines += 1
            if max_rows is not None and num_lines >= max_rows:
                break
    meta = {"detected_text_field": used_field, "num_rows": num_lines}
    return texts, meta

def read_csv_texts(
    path: str,
    text_col: Optional[str] = None,
    sep: Optional[str] = None,
    encoding: str = "utf-8-sig",
    max_rows: Optional[int] = None,
    skip_empty: bool = True,
) -> Tuple[List[str], Dict[str, Any]]:
    """
    อ่าน CSV แบบไม่พึ่ง pandas (ถ้าอยากใช้ pandas ก็แทนส่วนนี้ได้)
    - เดา delimiter ถ้า sep=None
    - เดาชื่อคอลัมน์ถ้า text_col=None
    """
    texts: List[str] = []
    used_sep = sep
    used_col = text_col
    num_rows = 0

    with open(path, "r", encoding=encoding, newline="") as f:
        sample = f.read(4096)
        f.seek(0)
        if used_sep is None:
            try:
                used_sep = csv.Sniffer().sniff(sample).delimiter
            except Exception:
                used_sep = ","  # fallback
        reader = csv.DictReader(f, delimiter=used_sep)
        if reader.fieldnames is None:
            return [], {"detected_text_col": None, "num_rows": 0, "delimiter": used_sep}

        if used_col is None:
            used_col = autodetect_text_field(reader.fieldnames)

        for row in reader:
            val = row.get(used_col) if used_col else None
            if isinstance(val, str):
                s = val.strip()
                if (not skip_empty) or s:
                    texts.append(s)
                    num_rows += 1
            if max_rows is not None and num_rows >= max_rows:
                break

    meta = {"detected_text_col": used_col, "num_rows": num_rows, "delimiter": used_sep}
    return texts, meta

def sanitize_text(s: str) -> str:
    if s is None: return ""
    s = s.replace("\u00A0", " ").replace("\u200B", "").replace("\u200C", "").replace("\u200D", "")
    return unicodedata.normalize("NFKC", s)

def get_all_special_ids(tokenizer) -> set:
    """รวบรวม special token IDs ทั้งหมด"""
    ids = set()
    for tid in [tokenizer.eos_token_id, tokenizer.bos_token_id, tokenizer.pad_token_id]:
        if tid is not None:
            ids.add(tid)
    # เพิ่มจาก all_special_ids ถ้ามี
    if hasattr(tokenizer, 'all_special_ids') and tokenizer.all_special_ids:
        ids.update(tokenizer.all_special_ids)
    return ids

def mask_special_tokens(labels: torch.Tensor, special_ids: set) -> torch.Tensor:
    """Mask special tokens ให้เป็น -100"""
    if not special_ids:
        return labels
    masked = labels.clone()
    for tid in special_ids:
        masked[masked == tid] = -100
    return masked

def supports_chat_template(tokenizer) -> bool:
    """ตรวจสอบว่า tokenizer รองรับ chat template หรือไม่"""
    return hasattr(tokenizer, 'chat_template') and tokenizer.chat_template is not None

def batch_iterator(items: List[Any], batch_size: int) -> Iterable[List[Any]]:
    """แบ่งรายการเป็น batches"""
    for i in range(0, len(items), batch_size):
        yield items[i:i + batch_size]

def encode_with_step(
    tokenizer: AutoTokenizer,
    texts: List[str],
    max_length: int,
    step_tokens: int,
    use_chat_template: bool,
    eval_role: str = "assistant",
    assistant_user_prompt: Optional[str] = None,
    verbose: bool = False,
    chunk_head: int = 3,
    chunk_tail: int = 1,
) -> List[List[Dict[str, torch.Tensor]]]:
    encoded_per_line = []

    for idx, text in enumerate(texts):
        if len(text.strip()) == 0:
            encoded_per_line.append([])
            continue

        # Apply chat template (ตามเดิม)
        if use_chat_template:
            if eval_role == "assistant":
                user_msg = assistant_user_prompt or "โปรดพิมพ์ข้อความต่อไปนี้ซ้ำตามตัวอักษรโดยไม่แก้ไข"
                conv = [
                    {"role": "user", "content": user_msg},
                    {"role": "assistant", "content": text}
                ]
            else:
                conv = [{"role": "user", "content": text}]
            try:
                rendered = tokenizer.apply_chat_template(conv, add_generation_prompt=False, tokenize=False)
            except:
                rendered = text
        else:
            rendered = text

        if verbose:
            print(f"  Text {idx+1}: {len(text)} chars -> {len(rendered)} chars (after template)")

        # เข้ารหัสเต็มก่อน
        full_enc = tokenizer(rendered, add_special_tokens=True, return_tensors="pt")
        total_tokens = full_enc["input_ids"].size(1)
        if verbose:
            print(f"  Total tokens: {total_tokens}")

        if total_tokens <= max_length:
            chunks = [{
                "input_ids": full_enc["input_ids"],
                "attention_mask": full_enc["attention_mask"],
            }]
            if verbose:
                print(f"  No sliding needed: 1 chunk of {total_tokens} tokens")
        else:
            if verbose:
                print(f"  Using manual sliding window...")
            full_enc_list = tokenizer(rendered, add_special_tokens=True)
            input_ids_list = full_enc_list["input_ids"]

            chunks = []
            debug_chunks = []  # (tok_count, start_idx, end_idx)
            start_idx = 0
            while start_idx < len(input_ids_list):
                end_idx = min(start_idx + max_length, len(input_ids_list))
                chunk_input_ids = input_ids_list[start_idx:end_idx]
                chunk_attention = [1] * len(chunk_input_ids)
                chunk = {
                    "input_ids": torch.tensor([chunk_input_ids], dtype=torch.long),
                    "attention_mask": torch.tensor([chunk_attention], dtype=torch.long),
                }
                chunks.append(chunk)
                debug_chunks.append((len(chunk_input_ids), start_idx, end_idx))
                if end_idx >= len(input_ids_list):
                    break
                start_idx += step_tokens

            if verbose:
                total_chunks = len(debug_chunks)
                print(f"  Total chunks: {total_chunks}")
                # พิมพ์เฉพาะตัวอย่างหัว/ท้าย
                if total_chunks <= (chunk_head + chunk_tail):
                    for i, (tok, st, ed) in enumerate(debug_chunks, 1):
                        print(f"    Chunk {i}: {tok} tokens (pos {st}:{ed})")
                else:
                    # head
                    for i, (tok, st, ed) in enumerate(debug_chunks[:chunk_head], 1):
                        print(f"    Chunk {i}: {tok} tokens (pos {st}:{ed})")
                    # ellipsis
                    skipped = total_chunks - (chunk_head + chunk_tail)
                    print(f"    ... {skipped} more chunks ...")
                    # tail
                    start_num = total_chunks - chunk_tail + 1
                    for j, (tok, st, ed) in enumerate(debug_chunks[-chunk_tail:], start_num):
                        print(f"    Chunk {j}: {tok} tokens (pos {st}:{ed})")
        encoded_per_line.append(chunks)
    return encoded_per_line

@torch.no_grad()
def compute_ppl_per_line(
    model_name: str,
    texts: List[str],
    batch_size: int = 4,
    max_length: int = 1024,
    step_tokens: Optional[int] = None,
    use_chat_template: Optional[bool] = None,
    eval_role: str = "assistant",
    assistant_user_prompt: Optional[str] = None,
    verbose: bool = False,
) -> Tuple[List[float], List[int]]:
    """
    คำนวณ PPL สำหรับแต่ละบรรทัด/ข้อความ
    คืน: (ppl_list, token_counts_list)
    """
    device = "cuda" if torch.cuda.is_available() else "cpu"

    # Load model with offline support
    is_local_path = os.path.exists(model_name) and os.path.isdir(model_name)
    load_kwargs = {"use_fast": True}
    model_kwargs = {
        "torch_dtype": (torch.bfloat16 if torch.cuda.is_available() else torch.float32),
        "device_map": "auto" if torch.cuda.is_available() else None,
    }

    if is_local_path:
        load_kwargs["local_files_only"] = True
        model_kwargs["local_files_only"] = True
        if verbose:
            print(f"Loading local model from: {model_name}")

    try:
        tokenizer = AutoTokenizer.from_pretrained(model_name, **load_kwargs)
        model = AutoModelForCausalLM.from_pretrained(model_name, **model_kwargs).eval()
    except OSError as e:
        if "couldn't connect" in str(e) or "Network is unreachable" in str(e):
            if verbose:
                print("Network unavailable, trying offline mode...")
            load_kwargs["local_files_only"] = True
            model_kwargs["local_files_only"] = True
            tokenizer = AutoTokenizer.from_pretrained(model_name, **load_kwargs)
            model = AutoModelForCausalLM.from_pretrained(model_name, **model_kwargs).eval()
        else:
            raise e

    # Determine chat template usage
    template_supported = supports_chat_template(tokenizer)
    if use_chat_template is None:
        model_name_lower = model_name.lower()
        likely_instruct = any(keyword in model_name_lower for keyword in [
            'instruct', 'chat', 'it', 'sft', 'alpaca', 'vicuna', 'llama-2-chat', 'llama-3-instruct', 'gemma'
        ])
        will_use_template = template_supported and likely_instruct
    else:
        will_use_template = use_chat_template and template_supported

    if verbose:
        print(f"Model: {model_name}")
        print(f"Chat template supported: {template_supported}")
        print(f"Will use chat template: {will_use_template}")
        print(f"Processing {len(texts)} texts...")

    # Get special tokens
    special_ids = get_all_special_ids(tokenizer)

    # Sanitize texts
    clean_texts = [sanitize_text(t) for t in texts]

    if step_tokens is None or step_tokens < 1:
        # เผื่อผู้ใช้ลืมคำนวณ step มาก่อน: ใช้ค่า default = 25% ของ max_length
        step_tokens = max(1, max_length // 4)

    # Encode with sliding window
    encoded_per_line = encode_with_step(
        tokenizer, clean_texts, max_length, step_tokens, will_use_template,
        eval_role, assistant_user_prompt,
        verbose=verbose,
        chunk_head=3,
        chunk_tail=1,
    )

    ppl_list: List[float] = []
    tok_list: List[int] = []

    loss_fct = torch.nn.CrossEntropyLoss(reduction="none")

    # Process each text (line)
    for line_idx, chunks in enumerate(encoded_per_line):
        if not chunks:
            ppl_list.append(float('inf'))
            tok_list.append(0)
            continue

        sum_nll = 0.0
        sum_tokens = 0

        # Process chunks in batches
        for chunk_batch in batch_iterator(chunks, batch_size):
            # Combine into single batch
            input_ids = torch.cat([c["input_ids"] for c in chunk_batch], dim=0).to(device)
            attention_mask = torch.cat([c["attention_mask"] for c in chunk_batch], dim=0).to(device)

            # Prepare labels
            labels = input_ids.clone()
            labels = mask_special_tokens(labels, special_ids)

            # Forward pass
            with torch.autocast(device_type="cuda", dtype=torch.bfloat16, enabled=torch.cuda.is_available()):
                outputs = model(input_ids=input_ids, attention_mask=attention_mask)
                logits = outputs.logits

            # Shift for causal LM
            shift_logits = logits[:, :-1, :].contiguous()
            shift_labels = labels[:, 1:].contiguous()
            shift_mask = ((shift_labels != -100) & (attention_mask[:, 1:] == 1)).to(torch.float32)

            # Calculate loss
            vocab_size = shift_logits.size(-1)
            token_loss = loss_fct(shift_logits.view(-1, vocab_size), shift_labels.view(-1))
            token_loss = token_loss.view(shift_labels.size(0), shift_labels.size(1))

            # Apply mask and sum
            masked_loss = token_loss * shift_mask
            nll_per_seq = masked_loss.sum(dim=1)
            tokens_per_seq = shift_mask.sum(dim=1)

            sum_nll += float(nll_per_seq.sum().item())
            sum_tokens += int(tokens_per_seq.sum().item())

        # Calculate PPL for this line
        if sum_tokens > 0:
            ppl = math.exp(sum_nll / sum_tokens)
        else:
            ppl = float('inf')

        ppl_list.append(ppl)
        tok_list.append(sum_tokens)

        if verbose and (line_idx + 1) % 10 == 0:
            print(f"Processed {line_idx + 1}/{len(texts)} texts")

    return ppl_list, tok_list

@torch.no_grad()
def compute_ppl(
    model_name: str,
    text_or_path: Union[str, List[str]],
    context_length: int = 1024,
    overlap_ratio: Optional[float] = 0.25,   # กำหนด overlap เป็นสัดส่วน
    overlap: Optional[int] = None,           # หรือกำหนดเป็นจำนวนโทเค็นตรง ๆ
    batch_size: int = 4,
    use_chat_template: Optional[bool] = None,
    eval_role: str = "assistant",
    assistant_user_prompt: Optional[str] = None,
    verbose: bool = False,
    is_file: bool = True,
    md_handling: str = "auto",
    md_strip_code_blocks: bool = True,
    file_format: str = "auto",          # "auto" | "text" | "md" | "csv" | "jsonl"
    csv_text_col: Optional[str] = None,
    csv_sep: Optional[str] = None,
    csv_encoding: str = "utf-8-sig",
    jsonl_text_field: Optional[str] = None,
    jsonl_encoding: str = "utf-8",
    max_rows: Optional[int] = None,
    skip_empty: bool = True,
):
    """
    Main function - รองรับทั้งไฟล์เดียวและรายการข้อความ
    """

    def clamp01(x: float) -> float:
        return 0.0 if x < 0.0 else (1.0 if x > 1.0 else x)

    # Prepare texts
    if isinstance(text_or_path, list):
        texts = text_or_path
        if verbose:
            print(f"Processing {len(texts)} texts from list")
    elif is_file:
        if verbose:
            print(f"Reading file: {text_or_path}")
        if not os.path.exists(text_or_path):
            raise FileNotFoundError(f"File not found: {text_or_path}")

        # ตัดสินใจชนิดไฟล์
        fmt = file_format
        if fmt == "auto":
            fmt = detect_file_format(text_or_path)

        if fmt == "csv":
            # อ่าน CSV เป็นรายการสตริง
            texts, meta = read_csv_texts(
                text_or_path,
                text_col=csv_text_col,
                sep=csv_sep,
                encoding=csv_encoding,
                max_rows=max_rows,
                skip_empty=skip_empty,
            )
            if verbose:
                print(f"CSV loaded: rows={meta.get('num_rows', 0)} | "
                      f"col={meta.get('detected_text_col')} | sep={meta.get('delimiter')}")
        elif fmt == "jsonl":
            # อ่าน JSONL เป็นรายการสตริง
            texts, meta = read_jsonl_texts(
                text_or_path,
                field=jsonl_text_field,
                max_rows=max_rows,
                skip_empty=skip_empty,
                encoding=jsonl_encoding,
            )
            if verbose:
                print(f"JSONL loaded: rows={meta.get('num_rows', 0)} | "
                      f"field={meta.get('detected_text_field')}")
        else:
            # อ่านเป็นสตริงเดียว (text/md)
            with open(text_or_path, "r", encoding="utf-8") as f:
                content = f.read()

            # Markdown?
            will_md = (
                (md_handling == "force") or
                (md_handling == "auto" and (fmt == "md"))
            )
            if will_md:
                if verbose:
                    print(f"Detected Markdown → converting to plain text using "
                          f"{'markdown+bs4' if _HAVE_MD_STACK else 'regex fallback'} ...")
                    before_len = len(content)
                content = md_to_plain_external(content, strip_code_blocks=md_strip_code_blocks)
                if verbose:
                    print(f"Markdown converted: {before_len} → {len(content)} chars")

            if verbose:
                print(f"File size: {len(content)} characters")

            content = content.strip()
            texts = [content] if content else []

        if verbose:
            print(f"Prepared {len(texts)} text(s) for processing")
            if texts and len(texts) == 1:
                print(f"Text length: {len(texts[0])} characters")
            elif texts and len(texts) > 1:
                avg_len = sum(len(t) for t in texts) / len(texts)
                print(f"Average text length: {avg_len:.1f} characters")
    else:
        texts = [str(text_or_path)]
        if verbose:
            print(f"Processing direct text input: {len(texts[0])} characters")

    if not texts:
        return {
            "context_length": context_length,
            "overlap_tokens": 0 if overlap is None else int(max(0, min(overlap, context_length - 1))),
            "overlap_ratio": 0.0 if overlap_ratio is None else clamp01(float(overlap_ratio)),
            "num_texts": 0,
            "PPL_micro": None,
            "PPL_macro": None,
            "tokens_evaluated": 0,
            "used_chat_template": False,
        }

    # ---- Compute overlap → step_tokens ----
    if overlap is not None:
        overlap_tokens = max(0, min(int(overlap), context_length - 1))
        effective_overlap_ratio = overlap_tokens / context_length
    else:
        r = 0.0 if overlap_ratio is None else clamp01(float(overlap_ratio))
        overlap_tokens = int(round(context_length * r))
        if overlap_tokens >= context_length:
            overlap_tokens = context_length - 1
        effective_overlap_ratio = overlap_tokens / context_length

    step_tokens = max(1, context_length - overlap_tokens)

    if verbose:
        print(f"Context length: {context_length}, Overlap: {overlap_tokens} "
              f"({effective_overlap_ratio:.2%}), Step: {step_tokens}")

    # ---- Calculate PPL per line ----
    ppl_list, tok_list = compute_ppl_per_line(
        model_name=model_name,
        texts=texts,
        batch_size=batch_size,
        max_length=context_length,
        step_tokens=step_tokens,
        use_chat_template=use_chat_template,
        eval_role=eval_role,
        assistant_user_prompt=assistant_user_prompt,
        verbose=verbose,
    )

    if verbose:
        print(f"Token counts per text: {tok_list}")
        print(f"PPL per text: {[f'{p:.3f}' for p in ppl_list]}")

    # Calculate aggregate metrics
    valid_ppls = [p for p in ppl_list if not math.isinf(p)]
    total_tokens = sum(tok_list)

    if not valid_ppls:
        ppl_micro = ppl_macro = None
    else:
        # Macro: average of individual PPLs
        ppl_macro = sum(valid_ppls) / len(valid_ppls)

        # Micro: weighted by tokens (approximate)
        if total_tokens > 0:
            weighted_sum = sum(ppl * tok for ppl, tok in zip(ppl_list, tok_list) if not math.isinf(ppl))
            ppl_micro = weighted_sum / total_tokens
        else:
            ppl_micro = ppl_macro

    result = {
        "context_length": context_length,
        "overlap_tokens": overlap_tokens,
        "overlap_ratio": effective_overlap_ratio,
        "num_texts": len(texts),
        "PPL_micro": ppl_micro,
        "PPL_macro": ppl_macro,
        "tokens_evaluated": total_tokens,
        "used_chat_template": use_chat_template,
    }

    # Add per-text results if multiple texts
    if len(texts) > 1:
        result["per_text_ppls"] = ppl_list
        result["per_text_tokens"] = tok_list

    return result


In [9]:
sample_md_content = """# โครงการตัวอย่าง: ประเมิน PPL จาก Markdown

> บทนำ: เอกสารนี้เขียนด้วย *Markdown* เพื่อทดสอบท่อ PPL

## หัวข้อย่อย
- รายการที่หนึ่ง: ข้อความภาษาไทยผสมอังกฤษ (Thai + English)
- รายการที่สอง: inline code เช่น `tokenize(text)` และลิงก์ [UN](https://www.un.org)

ตารางตัวอย่าง (จะถูกแปลงเป็นข้อความล้วน):
| คอลัมน์ | ค่า |
|---------|-----|
| a       | 10  |
| b       | 20  |

รูปภาพ (จะใช้ alt text): ![โลโก้โครงการ](logo.png)

โค้ดบล็อก (fenced code):
```python
def greet(name):
    print(f"สวัสดี {name}")
greet("โลก")
"""
with open(test_file_md, "w", encoding="utf-8") as f:
    f.write(sample_md_content)

In [80]:
# สร้างไฟล์ทดสอบ
sample_content = """ไอซีอีเอสซีอาร์ (ICESCR) เป็นสนธิสัญญาพหุภาคีซึ่งผ่านมติสมัชชาใหญ่แห่งสหประชาชาติเมื่อวันที่ 16 ธันวาคม ค.ศ. 1966 และมีผลใช้บังคับตั้งแต่วันที่ 3 มกราคม ค.ศ. 1976 เป็นต้นมา กติกาฯ ผูกมัดภาคีให้ทำงานเพื่อมุ่งสู่การให้สิทธิทางเศรษฐกิจ สังคม และการเมือง (อีเอสซีอาร์) แก่ปัจเจกบุคคล รวมถึงสิทธิแรงงานและสิทธิในสุขภาพอนามัย สิทธิในการศึกษา ตลอดจนสิทธิในมาตรฐานการครองชีพที่พอเพียง ณ เดือนกรกฎาคม ค.ศ. 2011 กติกาฯ มีภาคี 160 ประเทศ และยังมีอีก 6 ประเทศที่ได้ลงนามแล้วแต่ยังไม่ได้ให้สัตยาบัน กติกาฯ เป็นส่วนหนึ่งของตราสารสิทธิมนุษยชนระหว่างประเทศร่วมกับปฏิญญาสากลว่าด้วยสิทธิมนุษยชน (ยูดีเอชอาร์) และกติการะหว่างประเทศว่าด้วยสิทธิพลเมืองและสิทธิทางการเมือง (ไอซีซีพีอาร์) และรวมถึงพิธีสารเลือกรับที่หนึ่งและที่สองของกติการะหว่างประเทศว่าด้วยสิทธิพลเมืองและสิทธิทางการเมืองด้วย"""

with open(test_file, "w", encoding="utf-8") as f:
    f.write(sample_content)

In [None]:
# ตั้งค่าโมเดลและข้อมูลทดสอบ
model_path = "/model/gemma-3-270m-it"
base_model_path = "/model/gemma-3-270m"  # base model (ถ้ามี)
test_file = "test.txt"
test_file_md = "test_md.txt"
test_texts = [
    "สวัสดีครับ ผมชื่อจอห์น",
    "The quick brown fox jumps over the lazy dog",
    "ประเทศไทยมีความหลากหลายทางวัฒนธรรม", 
    "การประเมิน perplexity ช่วยวัดความสามารถของโมเดล"
]

In [None]:
# ==========================================
# 1. Auto-detect (แนะนำ) - ระบบจะเลือกให้เอง
# ==========================================
print("1. AUTO-DETECT MODE")
print("-" * 50)

result1 = compute_ppl(
    model_name=model_path,
    text_or_path=test_texts,
    context_length=32000, #กำหนดตามความเหมาะสม
    overlap_ratio=0.25, #อัตราส่วนการเลื่อนของ sliding window กำหนดตามความเหมาะสม 
    use_chat_template=None,  #None = auto-detect
    is_file=False, #บอกว่าไม่ใช่ไฟล์
    verbose=True #เพิ่มการพิมพ์ log
)

print(f"PPL_micro: {result1.get('PPL_micro', 'N/A'):.4f}")
print(f"PPL_macro: {result1.get('PPL_macro', 'N/A'):.4f}")  
print(f"Total tokens: {result1.get('tokens_evaluated', 0)}")

1. AUTO-DETECT MODE
--------------------------------------------------
Reading file: test_md.txt
File size: 520 characters
Prepared 1 text(s) for processing
Text length: 519 characters
Context length: 32000, Overlap: 8000 (25.00%), Step: 24000
Loading local model from: C:\Users\Mynew\Downloads\gpt-2
Model: C:\Users\Mynew\Downloads\gpt-2
Chat template supported: False
Will use chat template: False
Processing 1 texts...
  Text 1: 520 chars -> 520 chars (after template)
  Total tokens: 227
  No sliding needed: 1 chunk of 227 tokens
Token counts per text: [226]
PPL per text: ['17712.323']
PPL_micro: 17712.3233
PPL_macro: 17712.3233
Total tokens: 226


In [None]:
# ==========================================
# 2. บังคับใช้เป็น base model (ไม่ใช้ template)
# ==========================================
print(f"\n2. BASE MODEL MODE (No Chat Template)")
print("-" * 50)

result2 = compute_ppl(
    model_name=base_model_path,  # หรือใช้ base_model_path ถ้ามี
    text_or_path=test_texts,
    context_length=32000, #กำหนดตามความเหมาะสม
    overlap_ratio=0.25, #อัตราส่วนการเลื่อนของ sliding window กำหนดตามความเหมาะสม 
    use_chat_template=False,  # บังคับไม่ใช้ template
    is_file=False,
    verbose=False
)

print(f"PPL_micro: {result2.get('PPL_micro', 'N/A'):.4f}")
print(f"PPL_macro: {result2.get('PPL_macro', 'N/A'):.4f}")
print(f"Total tokens: {result2.get('tokens_evaluated', 0)}")


2. BASE MODEL MODE (No Chat Template)
--------------------------------------------------
PPL_micro: 78.5556
PPL_macro: 67.0417
Total tokens: 37


In [None]:
# ==========================================
# 3. ใช้ข้อความตรงๆ (ไม่ใช่ไฟล์)
# ==========================================
print(f"\n3. DIRECT TEXT INPUT")
print("-" * 50)

single_text = "นี่คือการทดสอบ PPL กับข้อความภาษาไทยที่ป้อนเข้าโดยตรง ไม่ผ่านไฟล์"

result3 = compute_ppl(
    model_name=model_path,
    text_or_path=single_text,
    context_length=32000, #กำหนดตามความเหมาะสม
    overlap_ratio=0.25, #อัตราส่วนการเลื่อนของ sliding window กำหนดตามความเหมาะสม 
    is_file=False,  #บอกว่าไม่ใช่ไฟล์
    verbose=True
)

print(f"Text: {single_text}")
print(f"PPL_micro: {result3.get('PPL_micro', 'N/A'):.4f}")
print(f"PPL_macro: {result3.get('PPL_macro', 'N/A'):.4f}")
print(f"Total tokens: {result3.get('tokens_evaluated', 0)}")


3. DIRECT TEXT INPUT
--------------------------------------------------
Processing direct text input: 65 characters
Context length: 512, Stride: 409
Loading local model from: /model/gemma-3-270m-it
Model: /model/gemma-3-270m-it
Chat template supported: True
Will use chat template: True
Processing 1 texts...
  Text 1: 65 chars -> 188 chars (after template)
  Total tokens: 50
  No sliding needed: 1 chunk of 50 tokens
Token counts per text: [48]
PPL per text: ['618.523']
Text: นี่คือการทดสอบ PPL กับข้อความภาษาไทยที่ป้อนเข้าโดยตรง ไม่ผ่านไฟล์
PPL_micro: 618.5234
PPL_macro: 618.5234
Total tokens: 48


In [None]:
# ==========================================
# 4. ใช้ไฟล์ (txt, markdown, jsonl, csv)
# ==========================================
print(f"\n4. FILE INPUT")
print("-" * 50)

result4 = compute_ppl(
    model_name=model_path,
    text_or_path=test_file,
    context_length=32000,
    overlap_ratio=0.25,
    is_file=True,  # บอกว่าเป็นไฟล์
    verbose=True
)

print(f"File: {test_file}")
print(f"PPL_micro: {result4.get('PPL_micro', 'N/A'):.4f}")
print(f"PPL_macro: {result4.get('PPL_macro', 'N/A'):.4f}")
print(f"Total tokens: {result4.get('tokens_evaluated', 0)}")


4. FILE INPUT
--------------------------------------------------
Reading file: test.txt
File size: 770 characters
Prepared 1 text(s) for processing
Text length: 770 characters
Context length: 2048, Stride: 1638
Loading local model from: /model/gemma-3-270m-it
Model: /model/gemma-3-270m-it
Chat template supported: True
Will use chat template: True
Processing 1 texts...
  Text 1: 771 chars -> 894 chars (after template)
  Total tokens: 333
  No sliding needed: 1 chunk of 333 tokens
Token counts per text: [331]
PPL per text: ['36.650']
File: test.txt
PPL_micro: 36.6495
PPL_macro: 36.6495
Total tokens: 331
