In [None]:
# Блок 1
!pip install -q transformers accelerate seqeval

[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/43.6 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m43.6/43.6 kB[0m [31m1.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for seqeval (setup.py) ... [?25l[?25hdone


In [None]:
# Блок 2
from transformers import (AutoTokenizer, AutoModelForTokenClassification,
                          DataCollatorForTokenClassification, TrainingArguments, Trainer)
from seqeval.metrics import f1_score

In [None]:
# Блок 3
import pandas as pd, ast, re

train_df = pd.read_csv("train.csv", sep=";", quotechar='"', engine="python")
submission_df = pd.read_csv("submission.csv", sep=";", quotechar='"', engine="python")

In [None]:
# Блок 4
ALLOWED = {"TYPE","BRAND","VOLUME","PERCENT"}

def clean_ann(s: str):
    try:
        parsed = ast.literal_eval(s)
    except Exception:
        return []
    out=[]
    for x in parsed:
        if isinstance(x,(list,tuple)) and len(x)==3:
            s0,e0,t = x
            if t != "O":
                t = re.sub(r"^(B-|I-)", "", str(t))
                if t in ALLOWED:
                    try:
                        out.append([int(s0), int(e0), t])
                    except: pass
    return out

train_df["entities"] = train_df["annotation"].astype(str).apply(clean_ann)


In [None]:
# Блок 5
model_name = "cointegrated/rubert-tiny2"
tokenizer = AutoTokenizer.from_pretrained(model_name)

labels = ["O","B-TYPE","I-TYPE","B-BRAND","I-BRAND","B-VOLUME","I-VOLUME","B-PERCENT","I-PERCENT"]
label2id = {l:i for i,l in enumerate(labels)}
id2label = {i:l for l,i in label2id.items()}

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/401 [00:00<?, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

In [95]:
# Блок 6
import torch
import numpy as np

class NERDataset(torch.utils.data.Dataset):
    def __init__(self, df, tokenizer, max_len=256):
        self.samples = df["sample"].tolist()
        self.entities = df["entities"].tolist()  # [(s0, e0, t), ...]
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        text = self.samples[idx]
        spans = self.entities[idx]

        # фильтруем и нормализуем интервалы
        ents = []
        L = len(text)
        for s0, e0, t in spans:
            s0 = int(s0); e0 = int(e0)
            if 0 <= s0 < e0 <= L:
                ents.append((s0, e0, str(t)))
        ents.sort(key=lambda x: (x[0], x[1]))

        enc = self.tokenizer(
            text,
            return_offsets_mapping=True,
            truncation=True,
            max_length=self.max_len
        )
        offsets = enc["offset_mapping"]

        labels_ids = []
        for (st, en) in offsets:
            if st == en:
                # спец‑токены уходим в игнор
                labels_ids.append(-100)
                continue

            lab = "O"
            # метим по пересечению диапазонов
            # пересечение есть, если max(st,s0) < min(en,e0)
            for s0, e0, t in ents:
                if max(st, s0) < min(en, e0):
                    lab = f"B-{t}" if st == s0 else f"I-{t}"
                    break

            labels_ids.append(label2id.get(lab, 0))

        return {
            "input_ids": torch.tensor(enc["input_ids"], dtype=torch.long),
            "attention_mask": torch.tensor(enc["attention_mask"], dtype=torch.long),
            "labels": torch.tensor(labels_ids, dtype=torch.long),
        }



# сплит 90/10
rng = np.random.default_rng(42)
perm = rng.permutation(len(train_df))
cut = int(0.9*len(train_df))
tr_idx, va_idx = perm[:cut], perm[cut:]
train_ds = NERDataset(train_df.iloc[tr_idx].reset_index(drop=True), tokenizer, max_len=256)
val_ds   = NERDataset(train_df.iloc[va_idx].reset_index(drop=True), tokenizer, max_len=256)

model = AutoModelForTokenClassification.from_pretrained(
    model_name, num_labels=len(labels), id2label=id2label, label2id=label2id
)
data_collator = DataCollatorForTokenClassification(tokenizer)

from seqeval.metrics import f1_score
from seqeval.scheme import IOB2

def compute_metrics(p):
    logits, labels_arr = p
    preds = np.argmax(logits, axis=-1)
    true_preds, true_labels = [], []
    for p_row, l_row in zip(preds, labels_arr):
        p_seq, l_seq = [], []
        for p_i, l_i in zip(p_row, l_row):
            if l_i == -100:
                continue
            p_seq.append(id2label[int(p_i)])
            l_seq.append(id2label[int(l_i)])
    # Важно: seqeval по умолчанию считает macro-F1 по BIO‑тегам; для мониторинга этого достаточно.
        true_preds.append(p_seq)
        true_labels.append(l_seq)
    return {"f1": f1_score(true_labels, true_preds, zero_division=0, scheme=IOB2)}

Some weights of BertForTokenClassification were not initialized from the model checkpoint at cointegrated/rubert-tiny2 and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [None]:
# Блок 7
from transformers import TrainingArguments, Trainer, EarlyStoppingCallback
import numpy as np
from seqeval.metrics import f1_score

args = TrainingArguments(
    output_dir="ner_ckpt",
    learning_rate=3e-5,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    num_train_epochs=8,
    weight_decay=0.01,
    eval_strategy="epoch",
    save_strategy="epoch",
    logging_strategy="steps",
    logging_steps=50,
    load_best_model_at_end=True,
    metric_for_best_model="f1",
    warmup_ratio=0.1,
    gradient_accumulation_steps=1,
    report_to="none",
    seed=42,
)

trainer = Trainer(
    model=model,
    args=args,
    train_dataset=train_ds,
    eval_dataset=val_ds,
    tokenizer=tokenizer,
    data_collator=data_collator,
    compute_metrics=compute_metrics,
    callbacks=[EarlyStoppingCallback(early_stopping_patience=2)]
)


trainer.train()
print(trainer.evaluate())

  trainer = Trainer(


Epoch,Training Loss,Validation Loss,F1
1,0.0536,0.049143,0.977886
2,0.043,0.033251,0.986298
3,0.0398,0.032495,0.98884
4,0.0117,0.031432,0.989648
5,0.0219,0.029916,0.990454
6,0.0124,0.032472,0.990669
7,0.0121,0.034045,0.990473


In [None]:
# Блок 10
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [97]:
# Блок 9
output_dir = "/content/drive/MyDrive/Colab Notebooks/ner_model_rubert3"

trainer.save_model(output_dir)                   # сохранит модель + config в output_dir
tokenizer.save_pretrained(output_dir)
trainer.save_state()                   # (опционально) состояние Trainer (optimizer/scheduler steps)
print("Best checkpoint:", trainer.state.best_model_checkpoint)

Best checkpoint: ner_ckpt/checkpoint-36894


In [None]:
# Блок 10
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [102]:
# Блок 11
load_path = "/content/drive/MyDrive/Colab Notebooks/ner_model_rubert2"

tokenizer = AutoTokenizer.from_pretrained(load_path)
model = AutoModelForTokenClassification.from_pretrained(load_path)

In [103]:
# =========================
# RUNTIME — ядро инференса
# =========================
import re
from typing import List, Tuple
import numpy as np
import torch

# --- Предусловия (в ноутбуке) ---
assert 'model' in globals() and 'tokenizer' in globals(), "Сначала загрузите model/tokenizer"

# --- Метки ---
if 'labels' not in globals():
    labels = ["O","B-TYPE","I-TYPE","B-BRAND","I-BRAND","B-VOLUME","I-VOLUME","B-PERCENT","I-PERCENT"]
label2id = {l:i for i,l in enumerate(labels)}
id2label  = {i:l for l,i in label2id.items()}

# --- CONFIG (единый источник правды)
CFG = {
    "use_margin_rule": True,
    "margin_delta": 0.06,  # дефолт для всех, если не указан поклассовый
    # ↓ новые дельты по базовым классам (подстрой: TYPE ↑, BRAND сред., V/P низкие)
    "margin_delta_per_class": {
        "TYPE": 0.08,
        "BRAND": 0.05,
        "VOLUME": 0.02,
        "PERCENT": 0.015,
        # "O": 0.06  # можно явно задать, иначе возьмётся общий margin_delta
    },

    "numeric_overrides": True,
    "trim_punct_on_spans": True,
    "word_majority": True,
    "word_inherit_prev": True,
    "majority_threshold": 0.58,
    "max_len": 256,
}

ALLOWED_BASE = {"TYPE","BRAND","VOLUME","PERCENT"}
PUNCT = set(";:,.!?()[]{}«»\"'—–-")

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.eval().to(DEVICE)

# --- Regex для чисел/объёмов/процентов
RE_PERCENT = re.compile(r'(?<!\d)(\d{1,3}(?:[.,]\d{1,2})?)\s*%', re.I)
RE_UNIT = r"(?:мл|л|литр(?:а|ов)?|г|гр|грамм(?:а|ов)?|кг|шт|уп|упак|бут|бутыл(?:ка|ки|ок)|табл|таб|капс|порц|пак)"
RE_UNIT_DOT = r"(?:мл\.|л\.|г\.|гр\.|шт\.|уп\.|таб\.|капс\.)"
RE_VOLUME1 = re.compile(rf'(?<!\d)\d+(?:[.,]\d+)?\s*(?:{RE_UNIT}|{RE_UNIT_DOT})(?!\w)', re.I)
RE_VOLUME2 = re.compile(rf'(?<!\d)\d+\s*[x×х]\s*\d+\s*(?:шт|уп|упак|таб|капс|пак)(?!\w)', re.I)

# --- Утилиты BIO/спанов
def _base(lab: str) -> str:
    return "O" if lab=="O" else (lab.split("-",1)[1] if "-" in lab else lab)

def _repair_bio_token_sequence(token_labels: List[str]) -> List[str]:
    out, prev = [], "O"
    for lab in token_labels:
        if lab == "O":
            out.append("O"); prev = "O"; continue
        if "-" not in lab: lab = f"B-{lab}"
        bio, typ = lab.split("-", 1)
        if bio == "I":
            ok = prev.startswith(("B-","I-")) and prev.split("-",1)[1] == typ
            lab = lab if ok else f"B-{typ}"
        out.append(lab); prev = lab
    return out

def _trim_punct(text: str, s: int, e: int) -> Tuple[int,int]:
    if not CFG["trim_punct_on_spans"]: return s, e
    while s < e and text[s] in PUNCT: s += 1
    while s < e and text[e-1] in PUNCT: e -= 1
    return s, e

def _compress_char_runs_base(char_labels: List[str]) -> List[Tuple[int,int,str]]:
    spans, i, n = [], 0, len(char_labels)
    while i < n:
        lab = char_labels[i]
        if lab == "O": i += 1; continue
        j = i + 1
        while j < n and char_labels[j] == lab: j += 1
        spans.append((i, j, lab))
        i = j
    return spans

def spans_to_charbase(text: str, base_spans: List[Tuple[int,int,str]]) -> List[str]:
    arr = ["O"] * len(text)
    for s, e, base in base_spans:
        s0, e0 = max(0,s), min(len(text), e)
        if s0 < e0:
            arr[s0:e0] = [base] * (e0 - s0)
    return arr

def _word_labels_from_charbase(text: str, char_labels: List[str]) -> List[Tuple[int,int,str]]:
    words = [(m.start(), m.end()) for m in re.finditer(r"\S+", text)]
    out, prev_base = [], "O"
    thr = CFG["majority_threshold"]
    for ws, we in words:
        cnt, non_o = {}, 0
        for i in range(ws, we):
            base = char_labels[i] if 0 <= i < len(char_labels) and char_labels[i] in ALLOWED_BASE else "O"
            cnt[base] = cnt.get(base, 0) + 1
            if base != "O": non_o += 1

        if CFG["word_majority"] and cnt:
            best_base, best_cnt = max(cnt.items(), key=lambda x: x[1])
            if best_base != "O" and best_cnt >= (we - ws) * thr:
                base = best_base
            else:
                if CFG["word_inherit_prev"] and non_o > 0 and prev_base != "O" and cnt.get(prev_base, 0) > 0:
                    base = prev_base
                else:
                    base = max(((b,c) for b,c in cnt.items() if b != "O"),
                               default=("O",0), key=lambda x: x[1])[0]
        else:
            base = max(((b,c) for b,c in cnt.items() if b != "O"),
                       default=("O",0), key=lambda x: x[1])[0]

        out.append((ws, we, base)); prev_base = base
    return out

def build_full_word_bio_from_wordlabels(word_labels: List[Tuple[int,int,str]]) -> List[Tuple[int,int,str]]:
    out, prev_base, started = [], "O", False
    for ws, we, base in word_labels:
        if base == "O":
            out.append((ws, we, "O")); prev_base, started = "O", False
        else:
            if prev_base == base and started:
                out.append((ws, we, f"I-{base}"))
            else:
                out.append((ws, we, f"B-{base}")); started = True
            prev_base = base
    return out

def _inject_numeric_overrides(text: str, char_labels: List[str]) -> None:
    if not CFG["numeric_overrides"]: return
    def mark(a: int, b: int, label: str):
        a = max(0, a); b = min(len(char_labels), b)
        if a < b: char_labels[a:b] = [label] * (b - a)
    for m in RE_PERCENT.finditer(text):  mark(*m.span(), "PERCENT")
    for m in RE_VOLUME1.finditer(text):  mark(*m.span(), "VOLUME")
    for m in RE_VOLUME2.finditer(text):  mark(*m.span(), "VOLUME")

def _apply_margin_rule_per_class(probs_row: np.ndarray) -> bool:
    """
    Возвращает True, если токен надо 'уронить' в O с учётом поклассового дельта.
    """
    i1 = int(np.argmax(probs_row))
    top1 = float(probs_row[i1])
    tmp = probs_row.copy(); tmp[i1] = -1.0
    top2 = float(tmp.max())

    lab = id2label[i1]              # например, 'B-TYPE', 'I-BRAND' или 'O'
    base = _base(lab)               # 'TYPE' | 'BRAND' | 'VOLUME' | 'PERCENT' | 'O'
    delta = CFG.get("margin_delta_per_class", {}).get(base, CFG.get("margin_delta", 0.06))
    return (top1 - top2) < delta

# --- Публичные функции ядра ---
def predict_char_base(text: str) -> List[str]:
    """Посимвольная лента базовых меток ('TYPE'/'BRAND'/'VOLUME'/'PERCENT'/'O')."""
    enc = tokenizer(
        text,
        return_offsets_mapping=True,
        return_overflowing_tokens=True,
        stride=64,
        truncation=True,
        max_length=CFG["max_len"],
        return_tensors="pt",
    )
    char_labels = ["O"] * len(text)
    n_chunks = int(enc["input_ids"].shape[0])

    for i in range(n_chunks):
        offsets = enc["offset_mapping"][i].tolist()
        inputs = {
            k: v[i:i+1].to(DEVICE)
            for k, v in enc.items()
            if k in ("input_ids", "attention_mask", "token_type_ids")
        }
        with torch.no_grad():
            logits = model(**inputs).logits[0]  # [seq, C]
        logprobs = torch.log_softmax(logits, dim=-1).cpu().numpy()
        probs    = np.exp(logprobs)

        keep = [(s,e) for (s,e) in offsets if not (s==0 and e==0)]
        if not keep: continue
        lp = np.array([lp for lp,(s,e) in zip(logprobs, offsets) if not (s==0 and e==0)])
        pr = np.array([pr for pr,(s,e) in zip(probs,    offsets) if not (s==0 and e==0)])

        path = lp.argmax(axis=1).tolist()
        if CFG["use_margin_rule"]:
          for t in range(len(path)):
            if _apply_margin_rule_per_class(pr[t]):
              path[t] = label2id["O"]

        tok_labels = _repair_bio_token_sequence([id2label[int(pid)] for pid in path])
        for lab, (s, e) in zip(tok_labels, keep):
            if s == e or lab == "O": continue
            base = _base(lab)
            if base == "O": continue
            s0, e0 = max(0, s), min(len(char_labels), e)
            if s0 < e0:
                char_labels[s0:e0] = [base] * (e0 - s0)

    _inject_numeric_overrides(text, char_labels)
    return char_labels

def predict_word_bio(text: str) -> List[Tuple[int,int,str]]:
    """Готовый результат для сервера: BIO по словам в виде [(start,end,'B-XXX'|'I-XXX'|'O'), ...]."""
    if not text or not text.strip():
        return []
    char_base = predict_char_base(text)
    base_spans = _compress_char_runs_base(char_base)
    if base_spans and CFG["trim_punct_on_spans"]:
        trimmed = []
        for s, e, base in base_spans:
            s2, e2 = _trim_punct(text, s, e)
            if s2 < e2:
                trimmed.append((s2, e2, base))
        base_spans = trimmed
    char_base2 = spans_to_charbase(text, base_spans)
    word_labels = _word_labels_from_charbase(text, char_base2)
    full_bio = build_full_word_bio_from_wordlabels(word_labels)

    # финальные assert'ы согласованности
    for s,e,lab in full_bio:
        assert 0 <= s < e <= len(text)
        assert lab == "O" or lab.startswith(("B-","I-"))
    return full_bio


In [104]:
# =========================
# BATCH — прогон по submission_df и запись submission_out.csv
# Опираться на функции из Блока A (predict_word_bio)
# =========================
import pandas as pd

assert 'submission_df' in globals(), "Нужно загрузить submission.csv в submission_df"

pred_rows: List[str] = []
for text in submission_df["sample"].tolist():
    full_bio = predict_word_bio(text)
    pred_rows.append(str([(int(s), int(e), str(lab)) for (s,e,lab) in full_bio]))

submission_out = submission_df.copy()
submission_out["annotation"] = pred_rows
submission_out.to_csv("submission_out.csv", sep=";", quotechar='"', index=False)
print("Готово: submission_out.csv сохранён.")
print(submission_out.head(5).to_string(index=False))


Готово: submission_out.csv сохранён.
                   sample                                                          annotation
        форма для выпечки                      [(0, 5, 'B-TYPE'), (6, 9, 'O'), (10, 17, 'O')]
              фарш свиной                               [(0, 4, 'B-TYPE'), (5, 11, 'I-TYPE')]
сок ананасовый без сахара [(0, 3, 'B-TYPE'), (4, 14, 'I-TYPE'), (15, 18, 'O'), (19, 25, 'O')]
                   еринги                                                  [(0, 6, 'B-TYPE')]
                  молооко                                                  [(0, 7, 'B-TYPE')]
