### Загрузка данных

In [1]:
import re
from conllu import parse
from conllu import models
from itertools import product
from collections import defaultdict
from typing import Iterable

In [34]:
from math import inf

In [2]:
conllu_path = 'en_ewt-ud-train.conllu'

In [3]:
with open(conllu_path, 'r', encoding='utf-8') as f:
    conllufile = f.read()
sentences = parse(conllufile)

In [12]:
some_sentence = sentences[58]

In [42]:
some_sentence.metadata['text']

'"They are targetting ambulances", "American snipers are shooting children and pregnant women", and "They are using cluster bombs against civilians" is all you get to hear from him.'

## Проверка на узлы

In [20]:
def token_match_node(token: models.Token, node_pattern: dict) -> bool:
    "Проверяет соответствует ли данный токен заданному паттерну"
    
    for feat in node_pattern:
        if feat in token.keys():
            if not re.match(node_pattern[feat], token[feat], re.I):
                return False
        elif token['feats']:
            if feat in token['feats']:
                if not re.match(node_pattern[feat], token['feats'][feat]): 
                    return False
            elif feat == "exclude":
                for ef in node_pattern[feat]:
                    if ef in token['feats']:
                        return False
        else:
            return False
    return True

In [23]:
def search_suitable_tokens(token_list: models.TokenList, node_pattern: dict) -> list:
    "Возвращает список из всех токенов, подходящих под заданный паттерн"
    
    suitable_tokens = []
    for token in token_list:
        if token_match_node(token, node_pattern) and isinstance(token['id'], int):
            suitable_tokens.append(token['id'] - 1)
    return suitable_tokens

In [24]:
def find_all_nodes(nodes: dict, sentence: models.TokenList) -> dict:
    """Возвращает словарь {node_name: [possible_tokens]}
    или пустой словарь, если не все ноды найдены"""
    
    nodes_tokens = {}
    for node in nodes:
        sutable_tokens = search_suitable_tokens(sentence, nodes[node])
        if sutable_tokens:
            nodes_tokens[node] = sutable_tokens
        else:
            return {}
    return nodes_tokens

__Пример:__

In [None]:
node_pattern = {
    'N': 
}

## Проверка на ограничения

### Relations

In [29]:
def all_deprels(token_list: models.TokenList) -> defaultdict:
    "Cоздает словарь вида {'relation': (head, dependent)} из всех отношений в предложении"
    
    deprels = defaultdict(list)
    for t in token_list:
        if isinstance(t['head'], int) and isinstance(t['id'], int):
            deprels[t['deprel']].append((t['head'] - 1, t['id'] - 1))
    return deprels

In [30]:
def pattern_relations(rel_pattern: str, sent_rels: defaultdict):
    """Возвращает все названия отношений в предложении, 
    которые описываются заданной регуляркой"""
    
    rels = []
    for rel in sent_rels:
        if re.search(rel_pattern, rel):
            rels.append(rel)
    return rels

In [31]:
def tokens_with_rel(rel_name: str, sent_rels: defaultdict, possible_pairs: Iterable[tuple]) -> set: 
    "Возвращает множество из пар токенов, между которыми отношение rel_name"
    
    if not rel_name in sent_rels:
        return False
    else:
        return set(sent_rels[rel_name]).intersection(possible_pairs)

In [32]:
def relpattern_tokens(possible_pairs: Iterable[tuple], sentence:models.TokenList, rel_pattern: str) -> set:
    "Возвращает все пары токенов, которые попадают под заданный паттерн rel_pattern"
    
    sent_rels = all_deprels(sentence)
    all_suitable_rels = set()
    for rel in pattern_relations(rel_pattern, sent_rels):
        all_suitable_rels = all_suitable_rels | tokens_with_rel(rel, sent_rels, possible_pairs)
    return all_suitable_rels

### Linear distance

In [39]:
def linear_distance(possible_tokens_pairs: Iterable[tuple], lindist: tuple) -> set:
    "Возвращает только те пары токенов, между которыми заданное расстояние"
    
    suitable_tokens = set()
    for pair in possible_tokens_pairs:
        dist = pair[1] - pair[0]
        if dist >= lindist[0] and dist <= lindist[1]:
            suitable_tokens.add(pair)
    return suitable_tokens

### Совпадение/Несовпадение значений признаков

In [40]:
def pair_match_fconstraint(token_pair: tuple, sentence: models.TokenList, c_pattern: dict) -> bool:
    """Проверяет соответствует ли пара токенов ограничениям на признаки"""
    
    t1_feats = sentence[token_pair[0]]['feats']
    t2_feats = sentence[token_pair[1]]['feats']
    if t1_feats and t2_feats:
        for c in c_pattern:
            for f in c_pattern[c]:
                if (f in t1_feats) and (f in t2_feats): 
                    if c == 'intersec':
                        if t1_feats[f] != t2_feats[f]:
                            return False
                    elif c == 'disjoint':
                        if t1_feats[f] == t2_feats[f]:
                            return False
                else:
                    return False
        return True
    else:
        return False

In [41]:
def feature_constraint(possible_token_pairs: Iterable[tuple], sentence: models.TokenList, constr_pattern: dict) -> set:
    """Ищет среди пар токенов такие, которые соответствуют ограничениям на признаки"""
    
    suitable_pairs = set()
    for pair in possible_token_pairs:
        if pair_match_fconstraint(pair, sentence, constr_pattern):
            suitable_pairs.add(pair)
    return suitable_pairs

### Tree distance

### Сопоставление со всеми ограничениями

In [55]:
def match_constraints(nodes_constraints: dict, nodes_tokens: dict, sentence: models.TokenList) -> bool:
    """Проверяет предложение на все ограничения"""
    
    for np in nodes_constraints:
        suitable_pairs = list(product(nodes_tokens[np[0]], nodes_tokens[np[1]])) #всевозможные комбинации токенов для нодов
        for constraint in nodes_constraints[np]:
            
            if constraint == 'deprels':
                suitable_pairs = relpattern_tokens(suitable_pairs, sentence, nodes_constraints[np][constraint])
            elif constraint == 'lindist':
                suitable_pairs = linear_distance(suitable_pairs, nodes_constraints[np][constraint])
            elif constraint == 'fconstraint':
                suitable_pairs = feature_constraint(suitable_pairs, sentence, nodes_constraints[np][constraint])
                
            if not suitable_pairs:
                return False 
            else:
                # удаляем токены, которые не подошли
                nodes_tokens[np[0]] = [p[0] for p in suitable_pairs]
                nodes_tokens[np[1]] = [p[1] for p in suitable_pairs]
    return True

In [44]:
def filter_sentence(sentence: models.TokenList, nodes_pattern: dict, constraints: dict) -> bool:
    "Возвращает True, если предложение соответствует заданному паттерну"
    
    found_nodes = find_all_nodes(nodes_pattern, sentence)
    if not found_nodes:
        return False
    else:
        if not match_constraints(constraints, found_nodes, sentence): 
            return False
        else: 
            return True 

## Примеры

In [52]:
# примеры запросов
patterns_names = ['passive with by Agent', 'all the', '2+amod', 'SOmatchnumber']
node_patterns = [
    {
        'V': {},
        'S': {},
        'BY': {'lemma': '^by$'},
        'N': {},
    },
    {
        'A': {'lemma': '^all$'},
        'T': {'lemma': '^the$'},
    },
    {
        'N': {},
        'M': {'upos': 'ADJ'},
    },
    {
        'S': {},
        'V': {},
        'O': {}
    },
]
constraints = [
    {
        ('V', 'S'): {'deprels': '^aux:pass$'},
        ('V', 'N'): {'deprels': '^obl$'},
        ('N', 'BY'): {'deprels': '^case$'},
    },
    {
        ('A', 'T'): {'lindist': (1, 1)}
    },
    {
        ('N', 'M'): {'deprels': '^amod$', 'lindist': (-inf, -2)}
    },
    {
        ('V', 'S'): {'deprels': '^.subj$'},
        ('V', 'O'): {'deprels': '^obj$'},
        ('S', 'O'): {'fconstraint': {'intersec': 'Number'}}
    }
]
tasks = [(patterns_names[i], node_patterns[i], constraints[i]) for i in range(len(patterns_names))]

In [59]:
def probing_dict(class_label: str, sentences: Iterable[models.TokenList],
                nodes_pattern: dict, constraints: dict) -> dict:
    """Составляет словарь {название класса: список предложений}"""
    
    pd = defaultdict(list)
    for sentence in sentences:
        if filter_sentence(sentence, nodes_pattern, constraints):
            pd[class_label].append(sentence.metadata['text'])
    return pd

In [58]:
task = tasks[0]
probing_dict(task[0], sentences, task[1], task[2])

# Проблема
Кратко: я умею последовательно проверять на отношения между парами нодов, но не могу проверять на __одновременные__ отношения между всеми нодами. 

Например, есть предложение _"Маша купила овощи, а мальчики купили самовар"_. Мы хотим отфильтровать предложения по совпадению числа у подлежащего и сказуемого. Для этого будем использовать следующий паттерн:

In [60]:
tasks[3]

('SOmatchnumber',
 {'S': {}, 'V': {}, 'O': {}},
 {('V', 'S'): {'deprels': '^.subj$'},
  ('V', 'O'): {'deprels': '^obj$'},
  ('S', 'O'): {'fconstraint': {'intersec': 'Number'}}})

Для начала делаем __проверку на узлы__. После нее получим следующие спиcки: 

In [61]:
import numpy as np

In [84]:
found_nodes = {
    'S': list(np.arange(7)),
    'V': list(np.arange(7)),
    'O': list(np.arange(7))
}
found_nodes

{'S': [0, 1, 2, 3, 4, 5, 6],
 'V': [0, 1, 2, 3, 4, 5, 6],
 'O': [0, 1, 2, 3, 4, 5, 6]}

Потом идем проверять на ограничения. Для этого составляем декартово произведение для первой пары.

In [85]:
%%time
vs = list(product(found_nodes['V'], found_nodes['S']))

Wall time: 0 ns


Проверяем на наличие nsubj между S и V. relpattern_tokens вернет следующее:

In [86]:
nsubj = set([(1, 0), (5, 1)])
nsubj

{(1, 0), (5, 1)}

А также удалит из списков токенов S и V те, которые не подошли 

In [87]:
found_nodes['V'] = [p[0] for p in nsubj]
found_nodes['S'] = [p[1] for p in nsubj]
found_nodes

{'S': [0, 1], 'V': [1, 5], 'O': [0, 1, 2, 3, 4, 5, 6]}

Повторяем для obj и S.Number=O.Number

In [88]:
vo = list(product(found_nodes['V'], found_nodes['O']))
obj = set([(1, 2), (5, 6)])
found_nodes['V'] = [p[0] for p in obj]
found_nodes['O'] = [p[1] for p in obj]
found_nodes

{'S': [0, 1], 'V': [1, 5], 'O': [2, 6]}

In [89]:
so = list(product(found_nodes['S'], found_nodes['O']))
match_number = set([(0, 6), (4, 2)])
found_nodes['S'] = [p[0] for p in match_number]
found_nodes['O'] = [p[1] for p in match_number]
found_nodes

{'S': [4, 0], 'V': [1, 5], 'O': [2, 6]}

И как бы получается, что через мой фильтр это предложение прошло (ни разу не возвращалось пустое множество из пар), но при этом, это предложение не подходит. То есть надо как-то проверять одновременные отношения между токенами.

Можно сделать:
- делать произведение не списков пары нодов, а списков всех нодов (но это будет очень длинные списки, особенно если на сами ноды не наложены морфлогические ограничения и ими могут быть любые токены из предложения) &emsp;product(found_nodes['V'], found_nodes['O'], found_nodes['S'])
- после моей проверки (когда уже в списках сильно меньше токенов) делать декартово произведение и снова проверять на те же условия (но опять проходиться по циклу, которых у меня и так много...) &emsp;[(4, 1, 2), (4, 1, 6), ..., (0, 5, 6)]
- какой-то более умный способ (мне кажется что-то должно быть) 
- вообще поменять всю идею... 