-
Notifications
You must be signed in to change notification settings - Fork 0
/
matcher.py
78 lines (68 loc) · 1.93 KB
/
matcher.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import json
import requests
import ahocorasick # type: ignore
from typing import Optional
from followthemoney import model
from followthemoney.types import registry
from normality import ascii_text
from normality.util import Categories
from normality.scripts import is_latin
from normality.cleaning import category_replace
from normality.constants import WS
URL = 'https://data.opensanctions.org/datasets/latest/sanctions/entities.ftm.json'
NORM_FORM: Categories = {
"Cc": None,
"Cf": None,
"Cs": None,
"Co": None,
"Cn": None,
"Lm": None,
"Mn": None,
"Mc": WS,
"Me": None,
"No": None,
"Zs": None,
"Zl": None,
"Zp": None,
"Pc": None,
"Pd": None,
"Ps": None,
"Pe": None,
"Pi": None,
"Pf": None,
"Po": None,
"Sm": None,
"Sc": None,
"Sk": None,
"So": None,
}
def norm_text(text: Optional[str]) -> Optional[str]:
ascii = ascii_text(text)
ascii = category_replace(ascii, NORM_FORM)
if ascii is not None and len(ascii) > 2:
return ascii.upper()
return None
def build_automaton() -> ahocorasick.Automaton:
automaton = ahocorasick.Automaton()
res = requests.get(URL, stream=True)
res.raise_for_status()
for line in res.iter_lines():
proxy = model.get_proxy(json.loads(line))
if not proxy.schema.is_a('LegalEntity'):
continue
tokens = set()
for name in proxy.get_type_values(registry.name, matchable=True):
if not is_latin(name):
continue
norm_name = norm_text(name)
if norm_name is not None:
tokens.add(norm_name)
for tok in tokens:
automaton.add_word(tok, proxy.id)
automaton.make_automaton()
return automaton
if __name__ == '__main__':
aut = build_automaton()
text = 'My name is Vladimir Putin, I am the President of Russia'
for match in aut.iter(norm_text(text)):
print(match)