## Валидация: выделение объектов из текста с помощью spacy

прогоняем наш синтетический датасет через модель на основе SpaCy и считаем метрики

In [16]:
import os
import json
import time
import random
import spacy
import re
import glob
import sys

from typing import List, Dict
from spacy.matcher import Matcher, DependencyMatcher
from dotenv import load_dotenv
from tqdm import tqdm

from collections import Counter, defaultdict
from openai import OpenAI

import networkx as nx
import matplotlib.pyplot as plt

lib_path = os.path.abspath(os.path.join(os.getcwd(), '..'))
sys.path.append(lib_path)

from library.final_metrics import evaluate_obj_attr_metrics, evaluate_ged_score


# Путь к папке с датасетом
DATASET_DIR = "../dataset/dataset_validation_spacial"
VAL_SPLIT = 1.0 # на всем тестовом куске

# можно ru_core_news_sm но она работает хуже
nlp = spacy.load("ru_core_news_lg")

## "Модельная" часть

### Функции предобработки и разбора


### Выделение объектов


сразу добавлено что если после существительного идет номер, например "ваза 1" и далее есть "ваза 2" - то это два разных объекта

In [17]:
pseudo_objects = {
    "справа", "слева", "рядом", "впереди", "сзади", "напротив", "внутри",
    "снаружи", "близко", "далеко", "около", "возле", "между", "под", "над"
}


def extract_objects(text):
    doc = nlp(text)
    objects = []
    added = set()
    skip_next = False

    for i, token in enumerate(doc):
        if skip_next:
            skip_next = False
            continue

        if token.pos_ == "NOUN" and token.lemma_.lower() not in pseudo_objects:
            # Случай: существительное + число ("ваза 1")
            if i + 1 < len(doc) and doc[i + 1].pos_ == "NUM":
                combined = token.lemma_.lower() + ' ' + doc[i + 1].text
                if combined not in added:
                    objects.append(combined)
                    added.add(combined)
                skip_next = True
            else:
                # Без номера — сохраняем по лемме (нормализованной форме)
                noun_lemma = token.lemma_.lower()
                if noun_lemma not in added:
                    objects.append(noun_lemma)
                    added.add(noun_lemma)

    return objects

### Выделение атрибутов

In [18]:
def extract_attributes(text, objects):
    doc = nlp(text)
    attr_map = defaultdict(set)

    # Собираем нормализованные ключи объектов для сопоставления
    normalized_objects = set()
    for obj in objects:
        parts = obj.split()
        if len(parts) == 2 and parts[1].isdigit():
            norm = nlp(parts[0])[0].lemma_ + ' ' + parts[1]
            normalized_objects.add(norm)
        else:
            norm = nlp(obj)[0].lemma_
            normalized_objects.add(norm)

    for i, token in enumerate(doc):
        if token.pos_ != "NOUN":
            continue

        # Определяем имя объекта
        object_key = None
        next_token = doc[i + 1] if i + 1 < len(doc) else None
        if next_token and next_token.pos_ == "NUM":
            object_key = token.lemma_.lower() + ' ' + next_token.text
        else:
            object_key = token.lemma_.lower()

        if object_key not in normalized_objects:
            continue

        # 1. Прямые прилагательные (amod) 
        # Для каждого объекта (token) ищем его потомков (children);
        # Если потомок — прилагательное (ADJ) и имеет зависимость amod (атрибут);
        # Смотрим, есть ли у прилагательного наречия (ADV) — например, "очень", "слишком";
        # Склеиваем модификаторы + прилагательное и добавляем это как признак объекта.
        for child in token.children:
            if child.pos_ == "ADJ" and child.dep_ == "amod":
                adv_mods = [adv.text.lower() for adv in child.children if adv.pos_ == "ADV"]
                phrase = " ".join(adv_mods + [child.text.lower()])
                attr_map[object_key].add(phrase)

        # 2. Причастия (acl) 
        # Этот блок обрабатывает причастия, которые выступают в роли определений к объекту 
        # то есть описывают действия или состояния, связанные с ним.
        #
        # Для каждого объекта (token) ищем его дочерние узлы (children);
        # Если один из них — глагол (VERB) в форме причастия (VerbForm=Part) и его зависимость acl;
        # Собираем модификаторы-причастия (очень, ещё, уже) и зависимые дополнения (трещинами, на полу, на вазу);
        # Формируем фразу и добавляем её как признак.
                
        for child in token.children:

            #if child.dep_ == "acl":
            #    print(f"\n[DEBUG] Объект: {token.text} ({object_key})")
            #    print(f"  Причастие: {child.text} ({child.dep_}, {child.pos_}, {child.morph})")
            #    for tok in child.children:
            #        print(f"    Дополнение: {tok.text} ({tok.dep_}, {tok.pos_}, {tok.morph}) -> предлог: {[c.text for c in tok.children if c.dep_ == 'case']}")

            
            
            if child.pos_ == "VERB" and "Part" in child.morph.get("VerbForm") and child.dep_ == "acl":
                adv_mods = [adv.text.lower() for adv in child.children if adv.pos_ == "ADV"]
                
                complements = []
                for tok in child.children:
                    if tok.dep_ in {"obl", "obj", "nmod", "iobj"} and tok.pos_ != "NUM":
                        prep = [c.text.lower() for c in tok.children if c.dep_ == "case"]  # предлог
                        if prep:
                            phrase = " ".join(prep + [tok.text.lower()])
                        else:
                            phrase = tok.text.lower()  # <- вот эта строка добавляет "трещинами"
                        complements.append(phrase)

                phrase = " ".join(adv_mods + [child.text.lower()] + complements)
                attr_map[object_key].add(phrase)                

        # 3. Субъект при прилагательном/причастии 
        # Этот блок находит признаки, выраженные через предикативную конструкцию, 
        # где объект (существительное) является подлежащим (nsubj) прилагательного или причастия, 
        # стоящего в роли сказуемого.
        # 
        # Ищем токен, который является подлежащим (dep_ == "nsubj")и зависит от прилагательного 
        # (ADJ) или причастия (VERB с VerbForm=Part)
        # Считаем это head — прилагательное или причастие, описывающее объект
        # Собираем наречия-модификаторы (например, очень, почти) и зависимые дополнения 
        # (например, на вазу, трещинами) и формируем полное описание признака и сохраняем.
        if token.dep_ == "nsubj" and (token.head.pos_ == "ADJ" or ("Part" in token.head.morph.get("VerbForm"))):
            head = token.head
            adv_mods = [adv.text.lower() for adv in head.children if adv.pos_ == "ADV"]
            #complements = [tok.text.lower() for tok in head.children if tok.dep_ in {"obl", "obj", "nmod"}]
            #complements = [tok.text.lower() for tok in head.children if tok.dep_ in {"obl", "obj", "nmod"} and tok.pos_ != "NUM"]
            
            complements = []
            for tok in head.children:
                if tok.dep_ in {"obl", "obj", "nmod"} and tok.pos_ != "NUM":
                    prep = [c.text.lower() for c in tok.children if c.dep_ == "case"]  # предлог
                    phrase = " ".join(prep + [tok.text.lower()])
                    complements.append(phrase)

            
            phrase = " ".join(adv_mods + [head.text.lower()] + complements)
            attr_map[object_key].add(phrase)

        # 4. Признак после союза (например: "пластмассовый стул 2 но белый")
        # Этот блок реализует эвристику для распознавания признаков, стоящих после объекта, но не связанных напрямую синтаксически из-за союзов, 
        # которые "разрывают" связь (например, но, а, зато).
        # 
        # Проверяет, что текущий токен — это существительное (NOUN), за которым идёт число (NUM) - 
        # мы имеем объект вида стул 2. Затем на расстоянии +2 токена проверяется, есть ли союз но, а, зато
        # И за союзом — прилагательное (ADJ), которое трактуется как дополнительный признак к объекту
        if (i + 3 < len(doc) and token.pos_ == "NOUN" and next_token and next_token.pos_ == "NUM"):
            possible_adj = doc[i + 3]
            conjunction = doc[i + 2]
            if (conjunction.text.lower() in {"но", "а", "зато"} and possible_adj.pos_ == "ADJ" ):
                object_key = token.lemma_.lower() + ' ' + next_token.text
                attr_map[object_key].add(possible_adj.text.lower())     
                
                
        # --- 5. Цепочка прилагательных перед существительным (включая ADV-модификаторы) ---
        if token.pos_ == "NOUN":
            j = i - 1
            collected = []
            while j >= 0:
                t = doc[j]
                if t.pos_ == "ADJ":
                    # ищем наречия, модифицирующие прилагательное
                    adv_mods = [adv.text.lower() for adv in t.children if adv.pos_ == "ADV"]
                    phrase = " ".join(adv_mods + [t.text.lower()])
                    collected.insert(0, phrase)
                elif t.pos_ == "ADV":
                    pass  # ADV может быть частью модификатора — обработается выше
                elif t.text.lower() in {"и", ","} or t.pos_ == "CCONJ" or t.pos_ == "PUNCT":
                    pass
                else:
                    break
                j -= 1
            if collected:
                attr_map[object_key].update(collected)        
                
        # 6. Признаки при существительном, выступающем подлежащим (nsubj), особенно если глагол не причастие
        if token.dep_ == "nsubj" and token.pos_ == "NOUN":
            # проверим, есть ли у подлежащего прилагательные (amod)
            for child in token.children:
                if child.dep_ == "amod" and child.pos_ == "ADJ":
                    adv_mods = [adv.text.lower() for adv in child.children if adv.pos_ == "ADV"]
                    phrase = " ".join(adv_mods + [child.text.lower()])
                    attr_map[object_key].add(phrase)                

        # 7. Признаки у вложенных объектов (например, "стол с пожелтевшими фотографиями") 
        for token in doc:
            if token.lemma_.lower() in normalized_objects:
                for child in token.children:
                    if child.dep_ == "amod" and child.pos_ in {"ADJ", "VERB"}:
                        adv_mods = [adv.text.lower() for adv in child.children if adv.pos_ == "ADV"]
                        phrase = " ".join(adv_mods + [child.text.lower()])
                        object_key = token.lemma_.lower()
                        attr_map[object_key].add(phrase)                    
                    
    return {k: sorted(v) for k, v in attr_map.items()}

### Выделение пространственных связей

In [19]:
def normalize_objects(objects):
    norms = {}
    for obj in objects:
        parts = obj.split()
        if len(parts) == 2 and parts[1].isdigit():
            norms[obj] = f"{nlp(parts[0])[0].lemma_} {parts[1]}"
        else:
            norms[obj] = nlp(obj)[0].lemma_
    return norms

def find_matching_object(token, object_norms):
    lemma = token.lemma_.lower()
    for orig, norm in object_norms.items():
        if lemma == norm:
            return orig
        if token.i + 1 < len(token.doc):
            next_tok = token.doc[token.i + 1]
            combined = f"{lemma} {next_tok.text}"
            if combined == norm:
                return orig
        if token.i - 1 >= 0:
            prev_tok = token.doc[token.i - 1]
            combined = f"{prev_tok.lemma_} {token.text}"
            if combined == norm:
                return orig
    return None

def collect_conj_appos_group(token):
    group = set()

    def dfs(tok):
        if tok in group:
            return
        group.add(tok)
        for child in tok.children:
            if child.dep_ in {"conj", "appos"}:
                dfs(child)
        if tok.dep_ in {"conj", "appos"}:
            dfs(tok.head)

    dfs(token)
    return sorted(group, key=lambda x: x.i)

def build_full_prep_phrase(prep_token, obj_token):
    parts = [prep_token.text.lower()]
    for child in obj_token.children:
        if child.dep_ == "case":
            parts.append(child.text.lower())
    return " ".join(parts)

def extract_spatial_relations(text, objects):
    doc = nlp(text)
    matcher = DependencyMatcher(nlp.vocab)
    norms = normalize_objects(objects)
    relations = set()

    pattern_verb = [
        {"RIGHT_ID": "verb", "RIGHT_ATTRS": {"POS": "VERB"}},
        {"LEFT_ID": "verb", "REL_OP": ">", "RIGHT_ID": "nsubj", "RIGHT_ATTRS": {"DEP": "nsubj"}},
        {"LEFT_ID": "verb", "REL_OP": ">", "RIGHT_ID": "obl", "RIGHT_ATTRS": {"DEP": {"IN": ["obl", "nmod"]}}},
        {"LEFT_ID": "obl", "REL_OP": ">", "RIGHT_ID": "prep", "RIGHT_ATTRS": {"DEP": {"IN": ["case", "flat", "fixed"]}}}
    ]

    pattern_nearby = [
        {"RIGHT_ID": "verb", "RIGHT_ATTRS": {"POS": "VERB"}},
        {"LEFT_ID": "verb", "REL_OP": ">", "RIGHT_ID": "prep_adv", "RIGHT_ATTRS": {"DEP": "advmod", "POS": "ADV"}},
        {"LEFT_ID": "prep_adv", "REL_OP": ">", "RIGHT_ID": "obj", "RIGHT_ATTRS": {"DEP": "obl"}},
        {"LEFT_ID": "obj", "REL_OP": ">", "RIGHT_ID": "prep", "RIGHT_ATTRS": {"DEP": "case"}},
        {"LEFT_ID": "verb", "REL_OP": ">", "RIGHT_ID": "subj", "RIGHT_ATTRS": {"DEP": "nsubj"}},
    ]

    matcher.add("SPATIAL_VERB", [pattern_verb])
    matcher.add("SPATIAL_NEARBY", [pattern_nearby])

    matches = matcher(doc)

    def build_full_prep(tok):
        parts = [tok.text.lower()]
        parts += sorted([child.text.lower() for child in tok.children if child.dep_ in {"fixed", "flat", "case"}], key=lambda x: x)
        return " ".join(parts)

    for match_id, tokens in matches:
        match_label = nlp.vocab.strings[match_id]

        if match_label == "SPATIAL_VERB":
            verb, nsubj, obl, prep = [doc[i] for i in tokens]
            prep_phrase = build_full_prep(prep)

            subj_group = collect_conj_appos_group(nsubj)
            obl_group = collect_conj_appos_group(obl)

            subj_objs = [find_matching_object(tok, norms) for tok in subj_group]
            obl_objs = [find_matching_object(tok, norms) for tok in obl_group]

            subj_objs = [obj for obj in subj_objs if obj]
            obl_objs = [obj for obj in obl_objs if obj]

            for subj_obj in subj_objs:
                for obl_obj in obl_objs:
                    if subj_obj != obl_obj:
                        relations.add((subj_obj, prep_phrase, obl_obj))

        elif match_label == "SPATIAL_NEARBY":
            verb, prep_adv, obj, prep, subj = [doc[i] for i in tokens]
            #prep_phrase = build_full_prep(prep_adv)
            prep_phrase = build_full_prep_phrase(prep_adv, obj)

            subj_group = collect_conj_appos_group(subj)
            subj_objs = [find_matching_object(tok, norms) for tok in subj_group]
            subj_objs = [obj for obj in subj_objs if obj]

            obl_obj = find_matching_object(obj, norms)

            if obl_obj:
                for subj_obj in subj_objs:
                    if subj_obj != obl_obj:
                        relations.add((subj_obj, prep_phrase, obl_obj))

    return list(relations)

## Итоговая сборка

текст -> сценический граф 

```
{
    "scene": {
        "location": "автосервис",
        "objects": [
            {"гаечный ключ": ["металлический"]},
            {"домкрат": ["металлический", "тяжелый", "прочный"]},
            {"аккумулятор": []}
        ],
        "relations": [
            ["аккумулятор", "рядом с", "гаечный ключ"],
            ["гаечный ключ", "на", "домкрат"]
        ]
    }
}
```

location мы не предсказыаем, поэтому оно "неизвестно"

In [20]:
def spacy_model(description, location="неизвестно"):
    # Извлечение предсказаний
    objects = extract_objects(description)
    attributes = extract_attributes(description, objects)
    relations = extract_spatial_relations(description, objects)      
    
    # Приведение предсказанных атрибутов к полному списку объектов
    completed_attrs = [{obj: attributes.get(obj, [])} for obj in objects]

    scene = dict()
    scene["location"] = location
    scene["objects"] = completed_attrs
    scene["relations"] = relations
    
    return {"scene": scene}

In [21]:
# Загружаем все jsonl-файлы из датасета
def load_dataset(path: str) -> List[Dict]:
    dataset = []
    for filename in glob.glob(os.path.join(path, "*.jsonl")):
        with open(filename, "r", encoding="utf-8") as f:
            for line in f:
                dataset.append(json.loads(line))
    return dataset

# Получаем .05 данных
def sample_validation_split(dataset: List[Dict], fraction: float = 0.05) -> List[Dict]:
    sample_size = max(1, int(len(dataset) * fraction))
    return random.sample(dataset, sample_size)

# Обработка + сбор метрик
def evaluate_on_validation_set(dataset: List[Dict]) -> Dict[str, float]:
    metrics_accumulator = defaultdict(list)
    
    for item in tqdm(dataset, ncols=80):
        src_text = item["description"]        
        if not src_text:
            print("Пустое или отсутствующее поле 'description' в элементе:")
            print(item)
            continue
        
        pred = spacy_model(src_text)
        label = {"scene": item["scene"]}
        
        # Оценка для базовых метрик
        metrics = evaluate_obj_attr_metrics(pred, label)
        for k, v in metrics.items():
            metrics_accumulator[k].append(v)

        # Оценка с точки зрения графа целиком
        metrics = evaluate_ged_score(pred, label)
        for k, v in metrics.items():
            metrics_accumulator[k].append(v)
            
            
    # Усреднение по всем примерам
    return {k: round(sum(vs) / len(vs), 4) for k, vs in metrics_accumulator.items()}


In [22]:
all_data = load_dataset(DATASET_DIR)
val_data = sample_validation_split(all_data, VAL_SPLIT)
final_metrics = evaluate_on_validation_set(val_data)

print("Validation results on", len(all_data), "samples:")
for k, v in final_metrics.items():
    print(f"{k}: {v}")


100%|█████████████████████████████████████████| 250/250 [07:48<00:00,  1.87s/it]

Validation results on 250 samples:
f1_objects: 0.9505
f1_attributes_macro: 0.6571
f1_attributes_weighted: 0.8502
f1_global_obj_attr_pairs: 1.0
f1_combined_simple: 0.8038
f1_combined_weighted: 0.9012
GED_score: 0.5398



