# OCR-пайплайн на основе VLM (Gigachat) для распознавания текста с белых маркерных доск и измерение метрик CER и WER

In [None]:
from dotenv import load_dotenv
load_dotenv()

import os, io, cv2, requests, numpy as np
from pathlib import Path
import pandas as pd
from gigachat import GigaChat
import json, re
from typing import Optional, Dict, Any, Tuple, List
import math
from pathlib import Path

API_ROOT = "https://gigachat.devices.sberbank.ru/api/v1"
VERIFY = False 

print('AUTHORIZATION_KEY:', bool(os.getenv('AUTHORIZATION_KEY')))

import warnings
from urllib3.exceptions import InsecureRequestWarning
warnings.filterwarnings('ignore', category=InsecureRequestWarning)

In [None]:
gc = GigaChat(credentials=os.getenv('AUTHORIZATION_KEY'), verify_ssl_certs=False, scope='GIGACHAT_API_PERS', model='GigaChat-2-Max')

# быстрая проверка: должен ответить «ok»
resp = gc.chat('Скажи ok одним словом')
print('SDK работает:', resp.choices[0].message.content)

# сохраним токен для REST
token = getattr(gc, 'token', None)
print('TOKEN:', bool(token))

In [None]:
def upload_image_to_gc_rest(gc, img_bgr) -> str:
    """Загрузка изображения через REST, с токеном из SDK"""
    token = getattr(gc, 'token', None)
    assert token, 'Не удалось получить токен из SDK-клиента (gc.token)'
    ok, buf = cv2.imencode('.png', img_bgr); assert ok, 'PNG encode failed'
    files = {'file': ('crop.png', io.BytesIO(buf.tobytes()), 'image/png')}
    data  = {'purpose': 'general'}

    r = requests.post(f"{API_ROOT}/files",
                      headers={"Authorization": f"Bearer {token}"},
                      files=files, data=data, timeout=60, verify=VERIFY)
    r.raise_for_status()
    return r.json()["id"]

OCR_PROMPT = "Верни ровно текст на картинке одной строкой, без комментариев и пояснений."

def chat_with_attachment_rest(gc, file_id: str, prompt: str, model_id: str) -> str:
    """OCR-запрос с attachments через REST"""
    token = getattr(gc, "token", None)
    assert token, 'Не удалось получить токен из SDK-клиента (gc.token)'
    payload = {
        'model': model_id,
        'messages': [{
            'role': 'user',
            'content': prompt,
            'attachments': [file_id]
        }],
        'temperature': 0.0,
        'top_p': 0.1
    }
    r = requests.post(f"{API_ROOT}/chat/completions",
                      headers={"Authorization": f"Bearer {token}",
                               "Content-Type": "application/json"},
                      json=payload, timeout=60, verify=VERIFY)
    r.raise_for_status()
    return r.json()["choices"][0]["message"]["content"].strip()

def gc_ocr_image(gc, img_bgr, model_id: str):
    """Комбо-функция: загрузка + распознавание"""
    fid = upload_image_to_gc_rest(gc, img_bgr)
    return chat_with_attachment_rest(gc, fid, OCR_PROMPT, model_id)

In [None]:
def preprocess_line(img_bgr: np.ndarray) -> np.ndarray:
    """Легкая предобработка для улучшения контраста и ликвидации шума"""
    gray = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2GRAY) # цвет не нужен, переведем в серый
    clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8,8)) # выравниваем гистограмму = улучшаем контраст в неровно освещенных фрагментах
    gray = clahe.apply(gray)
    gray = cv2.GaussianBlur(gray, (3,3), 0) # легкое сглаживание
    _, bw = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) # бинаризация
    return cv2.cvtColor(bw, cv2.COLOR_GRAY2BGR) # вернем обратно в трехканальный формат

def _edit_distance(a, b):
    """Алгоритм Левенштейна"""
    n, m = len(a), len(b)
    if n < m: a, b, n, m = b, a, m, n
    prev = list(range(m+1)); cur=[0]*(m+1)
    for i in range(1, n+1):
        cur[0]=i; ai=a[i-1]
        for j in range(1, m+1):
            cost = 0 if ai==b[j-1] else 1
            cur[j]=min(prev[j]+1, cur[j-1]+1, prev[j-1]+cost)
        prev, cur = cur, prev
    return prev[m]
    
def CER(ref, hyp): ref, hyp = ref or '', hyp or ''; return _edit_distance(list(ref), list(hyp)) / max(1,len(ref))
def WER(ref, hyp):
    ref_w=(ref or '').split(); hyp_w=(hyp or '').split()
    return _edit_distance(ref_w, hyp_w) / max(1,len(ref_w))

def read_lines_csv(csv_path: str, images_dir: str) -> pd.DataFrame:
    """Загрузка таблицы"""
    df = pd.read_csv(csv_path, sep=None, engine='python', encoding='utf-8-sig')
    def norm(cols):
        out=[]
        for c in cols:
            c = c.replace("\ufeff","").replace("\xa0"," ")
            c = " ".join(c.strip().split())
            out.append(c)
        return out
    df.columns = norm(df.columns)
    df = df.rename(columns={'Picture name':'picture_name', "text_line":"origin_text_line"})
    df['abs_path'] = df['picture_name'].apply(lambda p: str(Path(images_dir)/str(p)))
    return df[['picture_name','origin_text_line','abs_path']].reset_index(drop=True)

lines_dir = 'Text_dataset_lines_for_CER_WER'
lines_csv = 'dataset_text_lines_CER_WER.csv'
N_LINES = 5  # для теста
df_lines = read_lines_csv(lines_csv, lines_dir).head(N_LINES)
df_lines.head()

In [None]:
# доступные модели под токеном пользователя
models_resp = requests.get(f"{API_ROOT}/models",
                           headers={"Authorization": f"Bearer {token}"},
                           verify=VERIFY, timeout=30)
print('MODELS:', models_resp.status_code)
print(models_resp.text[:800])

MODEL_ID = 'GigaChat-2-Max'

preds=[]
for i, row in df_lines.iterrows():
    img = cv2.imread(row['abs_path'], cv2.IMREAD_COLOR)
    if img is None:
        preds.append('')
        print('не открылось:', row['abs_path'])
        continue
    img_p = preprocess_line(img)
    try:
        hyp = gc_ocr_image(gc, img_p, MODEL_ID)
    except requests.HTTPError as e:
        print('Ошибка на', row['abs_path'], ' ', e.response.status_code, e.response.text[:200])
        hyp = ''
    except Exception as e:
        print('Ошибка на', row['abs_path'], ' ', e)
        hyp = ''
    preds.append(hyp)

df_lines['pred_text_line'] = preds
df_lines['CER'] = [CER(str(r), str(h)) for r,h in zip(df_lines['origin_text_line'], df_lines['pred_text_line'])]
df_lines['WER'] = [WER(str(r), str(h)) for r,h in zip(df_lines['origin_text_line'], df_lines['pred_text_line'])]
print('CER mean:', df_lines["CER"].mean(), 'WER mean:', df_lines['WER'].mean())
df_lines.head()

In [None]:
# Корневая папка с изображениями схем
SCHEMES_DIR = Path('dataset_for_BR') 
OUT_DIR = Path('results_bord/vis')
OUT_DIR.mkdir(parents=True, exist_ok=True)

In [None]:
def imread_any(path: Path):
    """Загрузка изображения"""
    data = np.fromfile(str(path), dtype=np.uint8)
    img  = cv2.imdecode(data, cv2.IMREAD_COLOR)
    return img

def clip_point(x, y, W, H):
    # гарантируем что координата (x, y) лежит внутри изображения WxH
    return max(0, min(W-1, int(x))), max(0, min(H-1, int(y)))

def clip_box(x, y, w, h, W, H):
    """Аккуратно обрезаем рамку до границ изображения"""
    x0 = max(0, int(x)); y0 = max(0, int(y))
    x1 = min(W, x0 + int(w)); y1 = min(H, y0 + int(h))
    if x1 <= x0: x1 = min(W, x0 + 1)
    if y1 <= y0: y1 = min(H, y0 + 1)
    return x0, y0, x1, y1

def draw_rect(img, x, y, w, h, color=(255,0,0), thickness=2):
    """Рисуем прямоугольник"""
    H, W = img.shape[:2]
    x0,y0,x1,y1 = clip_box(x,y,w,h,W,H)
    cv2.rectangle(img, (x0,y0), (x1,y1), color, thickness)

def draw_arrow(img, xb, yb, xt, yt, color=(0,0,255), thickness=2, tip_len=0.25):
    """Рисуем стрелку"""
    H, W = img.shape[:2]
    x0,y0 = clip_point(xb, yb, W, H)
    x1,y1 = clip_point(xt, yt, W, H)
    if (x0==x1) and (y0==y1):
        return
    cv2.arrowedLine(img, (x0,y0), (x1,y1), color, thickness, line_type=cv2.LINE_AA, tipLength=tip_len)


In [None]:
DATA_DIR = Path(".")  # CSV лежат в корне проекта

# функции для чтения эталонных данных
def read_rectangles_csv(path: Path) -> pd.DataFrame:
    df = pd.read_csv(path, sep=None, engine='python', encoding='utf-8-sig')
    df = df.rename(columns={
        'image_name': 'filename',
        'bbox_x': 'x',
        'bbox_y': 'y',
        'bbox_width': 'w',
        'bbox_height': 'h',
        'label_name': 'label'
    })

    # типы координат -> int
    for c in ['x','y','w','h','image_width','image_height']:
        df[c] = pd.to_numeric(df[c], errors='coerce').fillna(0).astype(int)
    df['abs_path'] = df['filename'].apply(lambda f: str(SCHEMES_DIR/str(f)))

    # оставим только нужные для отрисовки
    return df[['filename','abs_path','x','y','w','h']].reset_index(drop=True)

def read_arrows_points_csv(path: Path) -> pd.DataFrame:
    df = pd.read_csv(path, sep=None, engine='python', encoding='utf-8-sig')
    df = df.rename(columns={'image_name': 'filename'})
    for c in ['x_base','y_base','x_tip','y_tip']:
        df[c] = pd.to_numeric(df[c], errors='coerce').fillna(0).astype(int)

    df['abs_path'] = df['filename'].apply(lambda f: str(SCHEMES_DIR/str(f)))
    return df[['filename','abs_path','x_base','y_base','x_tip','y_tip']].reset_index(drop=True)

def read_text_boxes_csv(path: Path) -> pd.DataFrame:
    df = pd.read_csv(path, sep=None, engine='python', encoding='utf-8-sig')
    df = df.rename(columns={
        'image_name': 'filename',
        'bbox_x': 'x',
        'bbox_y': 'y',
        'bbox_width': 'w',
        'bbox_height': 'h',
        'label_name': 'label'
    })
    df['abs_path'] = df['filename'].apply(lambda f: str(SCHEMES_DIR/str(f)))
    return df[['filename','abs_path','x','y','w','h']].reset_index(drop=True)

In [None]:
rects_csv = Path('rectangles.csv')
arrows_csv = Path('arrows.csv')
text_csv = Path('text.csv')

df_rects = read_rectangles_csv(rects_csv)
df_arrows = read_arrows_points_csv(arrows_csv)
df_text = read_text_boxes_csv(text_csv) 

In [None]:
# Соберём список всех уникальных изображений из прямоугольников/стрелок
all_files = sorted(set(df_rects['abs_path']).union(set(df_arrows['abs_path'])))

saved = 0
for img_path in all_files:
    img = imread_any(Path(img_path))
    if img is None:
        print("Не удалось прочитать:", img_path)
        continue

    # прямоугольники - синие рамки
    rrows = df_rects[df_rects['abs_path'] == img_path]
    for r in rrows.itertuples(index=False):
        draw_rect(img, r.x, r.y, r.w, r.h, color=(255,0,0), thickness=2)

    # стрелки - красные (по двум точкам)
    arows = df_arrows[df_arrows['abs_path'] == img_path]
    for r in arows.itertuples(index=False):
        draw_arrow(img, r.x_base, r.y_base, r.x_tip, r.y_tip, color=(0,0,255), thickness=2, tip_len=0.25)

    # сохраняем
    out_path = OUT_DIR / Path(img_path).name
    ok, buf = cv2.imencode('.jpg', img, [int(cv2.IMWRITE_JPEG_QUALITY), 92])
    if ok:
        out_path.write_bytes(buf.tobytes())
        saved += 1

print(f"Готово. Сохранено визуализаций: {saved} в {OUT_DIR}")

In [None]:
API_ROOT = "https://gigachat.devices.sberbank.ru/api/v1"
VERIFY = False

MODEL_ID = "GigaChat-2-Max"

DETECT_PROMPT = (
"""Ты — детектор схем. Верни ТОЛЬКО JSON без пояснений и без форматирования Markdown.
Требуется найти:
- Прямоугольники (рамки блоков): для каждого x,y,w,h (x,y — верхний левый угол).
- Стрелки: для каждой (x_base,y_base) — начало, (x_tip,y_tip) — конец.

Строгий JSON-ответ в формате:
{
  "image_width": <int>,
  "image_height": <int>,
  "rectangles": [
    {"x": <int>, "y": <int>, "w": <int>, "h": <int>}, ...
  ],
  "arrows": [
    {"x_base": <int>, "y_base": <int>, "x_tip": <int>, "y_tip": <int>}, ...
  ]
}

Правила:
- Координаты целые, в пикселях, в пределах изображения.
- Если объектов нет, верни пустые списки.
- Никаких комментариев. Только валидный JSON.
"""
)

def _upload_image_via_rest(gc, img_bgr: np.ndarray) -> str:
    token = getattr(gc, "token", None)
    assert token, "Нет токена у SDK-клиента (gc.token)"
    ok, buf = cv2.imencode(".png", img_bgr); assert ok, "PNG encode failed"
    files = {"file": ("img.png", io.BytesIO(buf.tobytes()), "image/png")}
    r = requests.post(f"{API_ROOT}/files",
                      headers={"Authorization": f"Bearer {token}"},
                      files=files, data={"purpose":"general"},
                      timeout=60, verify=VERIFY)
    r.raise_for_status()
    return r.json()["id"]

def _post_chat_with_attachment(gc, file_id: str, prompt: str, model_id: str) -> str:
    token = getattr(gc, "token", None)
    payload = {
        "model": model_id,
        "messages": [{"role": "user", "content": prompt, "attachments": [file_id]}],
        "temperature": 0.0,
        "top_p": 0.1,
    }
    r = requests.post(f"{API_ROOT}/chat/completions",
                      headers={"Authorization": f"Bearer {token}", "Content-Type":"application/json"},
                      json=payload, timeout=120, verify=VERIFY)
    r.raise_for_status()
    return r.json()["choices"][0]["message"]["content"]

def _force_json(s: str) -> dict:
    # 1) пробуем как есть
    try:
        return json.loads(s)
    except Exception:
        pass
    # 2) вытащим «самый наружный» JSON-блок
    m = re.search(r'\{.*\}', s, flags=re.S)
    if m:
        try:
            return json.loads(m.group(0))
        except Exception:
            pass
    raise ValueError("Модель вернула невалидный JSON")

def detect_layout_with_gc(gc, img_bgr: np.ndarray, model_id: str = MODEL_ID) -> dict:
    """
    Возвращает dict:
    {
      "image_width": int, "image_height": int,
      "rectangles": [{"x","y","w","h"}, ...],
      "arrows": [{"x_base","y_base","x_tip","y_tip"}, ...]
    }
    """
    fid = _upload_image_via_rest(gc, img_bgr)
    raw = _post_chat_with_attachment(gc, fid, DETECT_PROMPT, model_id)
    data = _force_json(raw)

    H, W = img_bgr.shape[:2]
    # нормализация и клип координат
    iw = int(data.get("image_width", W))
    ih = int(data.get("image_height", H))
    rects = []
    for r in data.get("rectangles", []) or []:
        x = max(0, min(int(r.get("x", 0)), W-1))
        y = max(0, min(int(r.get("y", 0)), H-1))
        w = max(1, min(int(r.get("w", 1)), W - x))
        h = max(1, min(int(r.get("h", 1)), H - y))
        rects.append({"x":x,"y":y,"w":w,"h":h})
    arrows = []
    for a in data.get("arrows", []) or []:
        xb = max(0, min(int(a.get("x_base", 0)), W-1))
        yb = max(0, min(int(a.get("y_base", 0)), H-1))
        xt = max(0, min(int(a.get("x_tip", 0)),  W-1))
        yt = max(0, min(int(a.get("y_tip", 0)),  H-1))
        arrows.append({"x_base":xb,"y_base":yb,"x_tip":xt,"y_tip":yt})
    return {
        "image_width": iw, "image_height": ih,
        "rectangles": rects,
        "arrows": arrows
    }

In [None]:
from tqdm import tqdm

# соберём список картинок в папке
img_paths = sorted([p for p in SCHEMES_DIR.iterdir() if p.suffix.lower() in {'.jpg','.jpeg','.png'}])
print('Всего изображений:', len(img_paths))

det_rect_rows = []
det_arrow_rows = []

for p in tqdm(img_paths):
    img = imread_any(p)
    if img is None:
        print('Не открылся:', p)
        continue
    try:
        det = detect_layout_with_gc(gc, img, MODEL_ID)
    except Exception as e:
        print('Ошибка детекции на', p.name, ' ', e)
        continue

    # прямоугольники
    for r in det['rectangles']:
        det_rect_rows.append({
            'image_name': p.name,
            'image_width': det['image_width'],
            'image_height': det['image_height'],
            'bbox_x': r['x'], 'bbox_y': r['y'], 'bbox_width': r['w'], 'bbox_height': r['h']
        })
    # стрелки
    for a in det['arrows']:
        det_arrow_rows.append({
            'image_name': p.name,
            'image_width': det['image_width'],
            'image_height': det['image_height'],
            'x_base': a['x_base'], 'y_base': a['y_base'], 'x_tip': a['x_tip'], 'y_tip': a['y_tip']
        })

det_rects = pd.DataFrame(det_rect_rows)
det_arrows = pd.DataFrame(det_arrow_rows)

Path('results_bord').mkdir(parents=True, exist_ok=True)
det_rects.to_csv('results_bord/detected_rectangles.csv', index=False, encoding='utf-8-sig')
det_arrows.to_csv('results_bord/detected_arrows.csv',    index=False, encoding='utf-8-sig')

print('Сохранено:',
      '\n rectangles -> results_bord/detected_rectangles.csv (', len(det_rects), 'строк )',
      '\n arrows -> results_bord/detected_arrows.csv (', len(det_arrows), 'строк )')

In [None]:
# какие изображения присутствуют в детектах
all_names = sorted(set(det_rects.get('image_name', [])).union(set(det_arrows.get('image_name', []))))

saved = 0
for name in all_names:
    img_path = SCHEMES_DIR / name
    img = imread_any(img_path)
    if img is None:
        print("Не открыть:", img_path)
        continue

    # прямоугольники (из детекции) - зелёные
    rows_r = det_rects[det_rects['image_name'] == name]
    for r in rows_r.itertuples(index=False):
        draw_rect(img, r.bbox_x, r.bbox_y, r.bbox_width, r.bbox_height, color=(0,255,0), thickness=3)

    # стрелки (из детекции) - фиолетовые
    rows_a = det_arrows[det_arrows['image_name'] == name]
    for a in rows_a.itertuples(index=False):
        draw_arrow(img, a.x_base, a.y_base, a.x_tip, a.y_tip, color=(255,0,255), thickness=3, tip_len=0.25)

    # сохранить
    out_path = OUT_DIR / name
    ok, buf = cv2.imencode('.jpg', img, [int(cv2.IMWRITE_JPEG_QUALITY), 92])
    if ok:
        out_path.write_bytes(buf.tobytes()); saved += 1

print(f"Готово. Сохранено визуализаций: {saved} → {OUT_DIR}")

# быстрый предпросмотр нескольких
samples = sorted(OUT_DIR.glob('*.jpg'))[:2]
for p in samples:
    vis = imread_any(p)
    if vis is None: 
        continue
    plt.figure(figsize=(10,6))
    plt.imshow(cv2.cvtColor(vis, cv2.COLOR_BGR2RGB))
    plt.axis('off')
    plt.title(p.name)
    plt.show()

In [None]:
# пороги для метрик
IOU_THR = 0.50   # минимальный IoU для того, чтобы рамка считалась совпавшей
ARW_THR = 0.03   # порог нормированной ошибки концов стрелки (в долях диагонали изображения)

GT_RECTS_CSV = Path('rectangles.csv')
GT_ARROWS_CSV = Path('arrows.csv')
DET_RECTS_CSV = Path('results_bord/detected_rectangles.csv')
DET_ARROWS_CSV = Path('results_bord/detected_arrows.csv')

In [None]:
# нормализуем данные
def norm_rects(df: pd.DataFrame, is_det: bool) -> pd.DataFrame:
    # переименовываем возможные варианты колонок в стандартные
    df = df.rename(columns={
        'image_name': 'image_name',
        'filename': 'image_name',
        'bbox_x': 'x', 'bbox_y': 'y', 'bbox_width': 'w', 'bbox_height': 'h',
        'x': 'x', 'y': 'y', 'w': 'w', 'h': 'h',
        'image_width': 'W', 'image_height': 'H', 'width': 'W', 'height': 'H'
    })
    keep = ['image_name', 'x', 'y', 'w', 'h', 'W', 'H']
    missing = [c for c in keep if c not in df.columns]
    if missing:
        raise ValueError(f'Нет нужных колонок {missing}. Есть: {list(df.columns)}')
    # приводим все числовые колонки к int
    for c in keep[1:]:
        df[c] = pd.to_numeric(df[c], errors='coerce').fillna(0).astype(int)
    # возвращаем только нужные колонки
    return df[keep].copy()

def norm_arrows(df: pd.DataFrame, is_det: bool) -> pd.DataFrame:
    df = df.rename(columns={
        'image_name': 'image_name', 'filename': 'image_name',
        'x_base': 'x_base', 'y_base': 'y_base', 'x_tip': 'x_tip', 'y_tip': 'y_tip',
        'image_width': 'W', 'image_height': 'H', 'width': 'W', 'height': 'H'
    })
    keep = ['image_name', 'x_base', 'y_base', 'x_tip', 'y_tip', 'W', 'H']
    missing = [c for c in keep if c not in df.columns]
    if missing:
        raise ValueError(f'Нет нужных колонок {missing}. Есть: {list(df.columns)}')
    for c in keep[1:]:
        df[c] = pd.to_numeric(df[c], errors='coerce').fillna(0).astype(int)
    return df[keep].copy()

In [None]:
def iou(boxA, boxB):
    """Вычисление IoU между двумя рамками"""
    # каждая рамка — (x, y, w, h)
    ax0, ay0 = boxA[0], boxA[1]
    ax1, ay1 = ax0 + boxA[2], ay0 + boxA[3]
    bx0, by0 = boxB[0], boxB[1]
    bx1, by1 = bx0 + boxB[2], by0 + boxB[3]
    # находим пересечение
    inter_x0, inter_y0 = max(ax0, bx0), max(ay0, by0)
    inter_x1, inter_y1 = min(ax1, bx1), min(ay1, by1)
    iw, ih = max(0, inter_x1 - inter_x0), max(0, inter_y1 - inter_y0)
    inter = iw * ih
    # площадь объединения
    areaA = boxA[2]*boxA[3]
    areaB = boxB[2]*boxB[3]
    union = areaA + areaB - inter
    return inter / union if union > 0 else 0.0

In [None]:
def greedy_match_rects(gt_boxes, det_boxes, thr=0.5):
    """Сопоставление рамок"""
    used_det = set()
    matches = []
    for gi, g in enumerate(gt_boxes):
        best_i, best_j = 0.0, None
        for dj, d in enumerate(det_boxes):
            if dj in used_det:
                continue
            i = iou(g, d)
            if i > best_i:
                best_i, best_j = i, dj
        if best_j is not None and best_i >= thr:
            matches.append((gi, best_j))
            used_det.add(best_j)
    # определяем пропущенные (FN) и лишние (FP)
    FN = [i for i in range(len(gt_boxes))  if i not in [m[0] for m in matches]]
    FP = [j for j in range(len(det_boxes)) if j not in [m[1] for m in matches]]
    return matches, FN, FP

In [None]:
def arrow_end_error_norm(a_gt, a_det, diag):
    """Ошибка конца стрелок"""
    xgb, ygb, xgt, ygt = a_gt
    xdb, ydb, xdt, ydt = a_det
    eb = math.hypot(xgb - xdb, ygb - ydb) / diag
    et = math.hypot(xgt - xdt, ygt - ydt) / diag
    return (eb + et) / 2.0  # средняя ошибка base+tip, нормированная на диагональ

def greedy_match_arrows(gt_ar, det_ar, W, H, thr_norm=0.03):
    """Соповтсавлние стрелок"""
    diag = max(1.0, math.hypot(W, H))  # диагональ изображения для нормировки
    used_det = set()
    matches = []
    for gi, g in enumerate(gt_ar):
        best_e, best_j = 1e9, None
        for dj, d in enumerate(det_ar):
            if dj in used_det:
                continue
            e = arrow_end_error_norm(g, d, diag)
            if e < best_e:
                best_e, best_j = e, dj
        if best_j is not None and best_e <= thr_norm:
            matches.append((gi, best_j))
            used_det.add(best_j)
    FN = [i for i in range(len(gt_ar))  if i not in [m[0] for m in matches]]
    FP = [j for j in range(len(det_ar)) if j not in [m[1] for m in matches]]
    return matches, FN, FP

In [None]:
# основная функция оценки
def evaluate_simple_rects_arrows(
    gt_rects_df: pd.DataFrame,
    det_rects_df: pd.DataFrame,
    gt_arrows_df: pd.DataFrame,
    det_arrows_df: pd.DataFrame,
    iou_thr: float = IOU_THR,
    arrow_thr: float = ARW_THR
):
    # нормализуем входные таблицы
    gtr = norm_rects(gt_rects_df, is_det=False)
    dtr = norm_rects(det_rects_df, is_det=True)
    gta = norm_arrows(gt_arrows_df, is_det=False)
    dta = norm_arrows(det_arrows_df, is_det=True)

    # общий список всех имён изображений
    names = sorted(set(gtr['image_name']).union(set(gta['image_name']))
                   .union(set(dtr['image_name'])).union(set(dta['image_name'])))

    # инициализация счётчиков
    total_rect_tp = total_rect_fp = total_rect_fn = 0
    total_arw_tp  = total_arw_fp  = total_arw_fn  = 0

    per_image = []  # таблица метрик по каждому изображению
    # проходим по всем изображениям
    for name in names:
        # фильтруем рамки и стрелки для конкретного изображения
        gtr_i = gtr[gtr['image_name']==name].reset_index(drop=True)
        dtr_i = dtr[dtr['image_name']==name].reset_index(drop=True)
        gta_i = gta[gta['image_name']==name].reset_index(drop=True)
        dta_i = dta[dta['image_name']==name].reset_index(drop=True)

        # берём размеры кадра
        def pick_wh(df_list):
            for df in df_list:
                if len(df):
                    return int(df['W'].iloc[0]), int(df['H'].iloc[0])
            return 0, 0
        W, H = pick_wh([dtr_i, gtr_i, dta_i, gta_i])

        gt_boxes  = [(r.x, r.y, r.w, r.h) for r in gtr_i.itertuples(index=False)]
        det_boxes = [(r.x, r.y, r.w, r.h) for r in dtr_i.itertuples(index=False)]
        m_rects, fn_rects, fp_rects = greedy_match_rects(gt_boxes, det_boxes, thr=iou_thr)
        tp_r, fp_r, fn_r = len(m_rects), len(fp_rects), len(fn_rects)
        total_rect_tp += tp_r; total_rect_fp += fp_r; total_rect_fn += fn_r

        gt_arr  = [(r.x_base, r.y_base, r.x_tip, r.y_tip) for r in gta_i.itertuples(index=False)]
        det_arr = [(r.x_base, r.y_base, r.x_tip, r.y_tip) for r in dta_i.itertuples(index=False)]
        m_arw, fn_arw, fp_arw = greedy_match_arrows(gt_arr, det_arr, W, H, thr_norm=arrow_thr)
        tp_a, fp_a, fn_a = len(m_arw), len(fp_arw), len(fn_arw)
        total_arw_tp += tp_a; total_arw_fp += fp_a; total_arw_fn += fn_a

        # вычисляем precision / recall / f1 для конкретного изображения
        prec_r = tp_r / max(1, tp_r + fp_r)
        rec_r  = tp_r / max(1, tp_r + fn_r)
        f1_r   = 2 * prec_r * rec_r / max(1e-9, prec_r + rec_r)

        prec_a = tp_a / max(1, tp_a + fp_a)
        rec_a  = tp_a / max(1, tp_a + fn_a)
        f1_a   = 2 * prec_a * rec_a / max(1e-9, prec_a + rec_a)

        per_image.append({
            'image_name': name, 'W': W, 'H': H,
            'rect_tp': tp_r, 'rect_fp': fp_r, 'rect_fn': fn_r,
            'rect_precision': prec_r, 'rect_recall': rec_r, 'rect_f1': f1_r,
            'arrow_tp': tp_a, 'arrow_fp': fp_a, 'arrow_fn': fn_a,
            'arrow_precision': prec_a, 'arrow_recall': rec_a, 'arrow_f1': f1_a
        })

    # агрегируем результаты по всему датасету (микро-усреднение)
    rect_prec = total_rect_tp / max(1, total_rect_tp + total_rect_fp)
    rect_rec  = total_rect_tp / max(1, total_rect_tp + total_rect_fn)
    rect_f1   = 2 * rect_prec * rect_rec / max(1e-9, rect_prec + rect_rec)

    arw_prec  = total_arw_tp / max(1, total_arw_tp + total_arw_fp)
    arw_rec   = total_arw_tp / max(1, total_arw_tp + total_arw_fn)
    arw_f1    = 2 * arw_prec * arw_rec / max(1e-9, arw_prec + arw_rec)

    summary = {
        'IOU_THR': IOU_THR,
        'ARROW_THR_diag_frac': ARW_THR,
        'rect_precision': rect_prec, 'rect_recall': rect_rec, 'rect_f1': rect_f1,
        'rect_TP': total_rect_tp, 'rect_FP': total_rect_fp, 'rect_FN': total_rect_fn,
        'arrow_precision': arw_prec, 'arrow_recall': arw_rec, 'arrow_f1': arw_f1,
        'arrow_TP': total_arw_tp, 'arrow_FP': total_arw_fp, 'arrow_FN': total_arw_fn,
    }

    return summary, pd.DataFrame(per_image)

In [None]:
# загружаем CSV и считаем метрики 
gt_rects_df   = pd.read_csv(GT_RECTS_CSV,   sep=None, engine='python', encoding='utf-8-sig')
gt_arrows_df  = pd.read_csv(GT_ARROWS_CSV,  sep=None, engine='python', encoding='utf-8-sig')
det_rects_df  = pd.read_csv(DET_RECTS_CSV,  encoding='utf-8-sig')
det_arrows_df = pd.read_csv(DET_ARROWS_CSV, encoding='utf-8-sig')

summary, per_img = evaluate_simple_rects_arrows(
    gt_rects_df, det_rects_df, gt_arrows_df, det_arrows_df,
    iou_thr=IOU_THR, arrow_thr=ARW_THR
)

print('Простые метрики детекции рамок и стрелок')
for k,v in summary.items():
    if isinstance(v, float):
        print(f'{k:26s}: {v:.4f}')
    else:
        print(f'{k:26s}: {v}')

# сохраняем таблицы с метриками
out_dir = Path('results_bord')
out_dir.mkdir(parents=True, exist_ok=True)
per_img.to_csv(out_dir/'metrics_simple_per_image.csv', index=False, encoding='utf-8-sig')
pd.DataFrame([summary]).to_csv(out_dir/'metrics_simple_summary.csv', index=False, encoding='utf-8-sig')

print('\nСохранено:')
print(' - results_bord/metrics_simple_per_image.csv')
print(' - results_bord/metrics_simple_summary.csv')