In [1]:
%config IPCompleter.greedy=True
import re
import json
from elasticsearch import Elasticsearch
from pymystem3 import Mystem
from sklearn.feature_extraction.text import CountVectorizer
import requests

In [2]:
def json_read(filename):
    with open(filename, 'r') as inf:
        res = json.load(inf)
    return res

def json_dump(obj, filename, ea=False, indent=4):
    with open(filename, 'w') as ouf:
        json.dump(obj, ouf, ensure_ascii=ea, indent=indent)

### Connect to Elasticsearch

Note you must have an Elasticsearch index of Wikidata entities. To create the index you may use [this] notebook.

In [3]:
es = Elasticsearch([{'host': 'localhost', 'port': 9200, 'timeout': 360, 'maxsize': 25}])

### Load linker classes

In [4]:
class Morpher:
    def __init__(self):
        self.stop_pos_tags = set([
            'ADVPRO',
            'APRO',
            'CONJ',
            'INTJ',
            'PART',
            'PR',
            'SPRO',
            'V',
            'ADV'
        ])
        self.mystem = Mystem()
        self.tokenizer = CountVectorizer(lowercase=False, token_pattern='\w+').build_analyzer()
        
    def preprocess(self, text):
        return(' '.join(self.tokenizer(text)))
        
    def analyze(self, text):
        return self.mystem.analyze(text)
    
    def approve_tag(self, tag):
        if tag in self.stop_pos_tags:
            return False
        return True

In [5]:
class Query:
    def __init__(self, text, morpher):
        self.text = text
        self.morpher = morpher
        self.save_analysis()
        self.save_filtered_tokens()
        self.save_token_ngrams()
        self.save_capital_pairs()
        self.save_capital_singles()
        
    def save_analysis(self):
        text = self.morpher.preprocess(self.text)
        analysis = self.morpher.analyze(text)
        self.analysis = []
        for entry in analysis:
            if not re.fullmatch(r'\s*', entry['text']):
                self.analysis.append(entry)
                
    def _get_lemma_from_analysis(self, entry):
        if 'analysis' not in entry or \
           not entry['analysis'] or \
           'lex' not in entry['analysis'][0]:
            return entry['text']
        return entry['analysis'][0]['lex']
                
    def save_filtered_tokens(self):
        self.filtered_tokens = []
        for entry in self.analysis:
            if 'analysis' in entry:
                if not entry['analysis']:
                    self.filtered_tokens.append(self._get_lemma_from_analysis(entry))
                    continue
                if entry['text'][0].isupper() and len(entry['text']) > 1:
                    self.filtered_tokens.append(self._get_lemma_from_analysis(entry))
                    continue
                pos_tag = entry['analysis'][0]['gr'].split(',', 1)[0].split('=', 1)[0]
                if self.morpher.approve_tag(pos_tag):
                    self.filtered_tokens.append(self._get_lemma_from_analysis(entry))
    
    def save_token_ngrams(self, n=3):
        self.token_ngrams = []
        for i in range(len(self.analysis) - (n - 1)):
            self.token_ngrams.append(' '.join([self._get_lemma_from_analysis(entry) for entry in self.analysis[i:(i + n)]]))
        return self.token_ngrams
    
    def save_capital_pairs(self):
        self.capital_pairs = []
        for i in range(0, len(self.analysis) - 1):
            if self.analysis[i]['text'][0].isupper() and \
               len(self.analysis[i]['text']) > 1 and \
               self.analysis[i + 1]['text'][0].isupper() and \
               len(self.analysis[i + 1]['text']) > 1:
                self.capital_pairs.append(' '.join([self._get_lemma_from_analysis(entry) for entry in self.analysis[i:(i + 2)]]))
        return self.capital_pairs
    
    def save_capital_singles(self):
        self.capital_singles = []
        for entry in self.analysis:
            if entry['text'][0].isupper() and \
               len(entry['text']) > 2:
                self.capital_singles.append(self._get_lemma_from_analysis(entry))
        return self.capital_singles
    
    def build_match_query(self, query, fuzziness='AUTO'):
        return  { 
                    'match': {
                        'label': {
                            'query': query,
                            "fuzziness": fuzziness,
                            "prefix_length": 1,
                            'fuzzy_transpositions': False
                        }
                    }
                }
    
    def build_phrase_query(self, query):
        return  { 
                    'match_phrase': {
                        'label': query
                    }
                }
    
    def get_phrase_queries(self):
        qs = []
        for ng in self.token_ngrams:
            qs.append(self.build_phrase_query(ng))
        for cp in self.capital_pairs:
            qs.append(self.build_phrase_query(cp))
        return qs
    
    def get_fulltext_queries(self):
        return [self.build_match_query(' '.join(self.filtered_tokens))]
    
    def get_single_capital_queries(self):
        qs = []
        for cs in self.capital_singles:
            qs.append(self.build_phrase_query(cs))
        return qs
        
    def build_es_query(self, queries):
        return  {
                    'query': {
                        'bool': {
                            'should': queries
                        }
                    }
                }

In [6]:
class QueryQuestion(Query):
    def save_filtered_tokens(self):
        self.filtered_tokens = []
        for entry in self.analysis[1:]:
            if 'analysis' in entry:
                if not entry['analysis']:
                    self.filtered_tokens.append(self._get_lemma_from_analysis(entry))
                    continue
                if entry['text'][0].isupper() and len(entry['text']) > 1:
                    self.filtered_tokens.append(self._get_lemma_from_analysis(entry))
                    continue
                pos_tag = entry['analysis'][0]['gr'].split(',', 1)[0].split('=', 1)[0]
                if self.morpher.approve_tag(pos_tag):
                    self.filtered_tokens.append(self._get_lemma_from_analysis(entry))
                    
    def save_capital_pairs(self):
        self.capital_pairs = []
        for i in range(1, len(self.analysis) - 1):
            if self.analysis[i]['text'][0].isupper() and \
               len(self.analysis[i]['text']) > 1 and \
               self.analysis[i + 1]['text'][0].isupper() and \
               len(self.analysis[i + 1]['text']) > 1:
                self.capital_pairs.append(' '.join([self._get_lemma_from_analysis(entry) for entry in self.analysis[i:(i + 2)]]))
        return self.capital_pairs
    
    def save_capital_singles(self):
        self.capital_singles = []
        for entry in self.analysis[1:]:
            if entry['text'][0].isupper() and \
               len(entry['text']) > 2:
                self.capital_singles.append(self._get_lemma_from_analysis(entry))
        return self.capital_singles

In [7]:
class QueryAnswer(Query):
    def save_token_ngrams(self, n=2):
        self.token_ngrams = [' '.join([self._get_lemma_from_analysis(entry) for entry in self.analysis])]
        for i in range(len(self.analysis) - (n - 1)):
            self.token_ngrams.append(' '.join([self._get_lemma_from_analysis(entry) for entry in self.analysis[i:(i + n)]]))

In [8]:
class Matcher:
    def __init__(self, es_instance):
        self.es = es_instance
        self.wiki_links = json_read('wiki_links.json')
        self.pageviews = json_read('pageviews_by_entity.json')
    
    def get_names_and_descriptions(self, qids):
        if not qids:
            return {}
        qids_list = '|'.join(qids)
        wikiapi_query = f'https://www.wikidata.org/w/api.php?action=wbgetentities&format=json&ids={qids_list}&languages=ru&props=labels|descriptions'
        resp = requests.get(wikiapi_query).json()
        result = {}
        for qid in qids:
            cur_data = resp['entities'][qid]
            name = None
            if 'labels' in cur_data:
                if 'ru' in cur_data['labels']:
                    name = cur_data['labels']['ru']['value']
            description = None
            if 'descriptions' in cur_data:
                if 'ru' in cur_data['descriptions']:
                    description = cur_data['descriptions']['ru']['value']
            result[qid] = {'name': name, 'description': description}
        return result  

#     Fastest API
#     def get_wikipedia_pageviews(self, titles_dict):
#         if not titles_dict:
#             return
#         titles_list = list(map(lambda s: s.replace(' ', '_'), titles_dict.keys()))
#         for i in range(len(titles_list)):
#             titles = '|'.join(titles_list)
#             wikiapi_query = f'https://ru.wikipedia.org/w/api.php?action=query&format=json&prop=pageviews&pvipdays=30&titles={titles}'
#             resp = requests.get(wikiapi_query).json()
#             if 'batchcomplete' in resp:
#                 break
#         pages_data = (resp['query']['pages'])
        
#         for entry in pages_data.values():
#             view_stats = entry['pageviews']
#             views = sum(filter(None, view_stats.values()))
#             cur_title = entry['title']
#             titles_dict[cur_title] = views
            
#     Slower API
#     def get_wikipedia_pageviews(self, wiki_title):
#         wiki_title = wiki_title.replace(' ', '_')
#         wikiapi_query = f'https://wikimedia.org/api/rest_v1/metrics/pageviews/per-article/ru.wikipedia/all-access/all-agents/{wiki_title}/monthly/2019010100/2019013100'
#         resp = requests.get(wikiapi_query).json()
#         return resp['items'][0]['views']

    def get_wikipedia_pages(self, qids):
        result = {qid: {} for qid in qids}
        for qid in qids:
            if qid in self.wiki_links:
                result[qid]['ruwiki'] = self.wiki_links[qid]
            else:
                result[qid]['ruwiki'] = None
            if qid in self.pageviews:
                result[qid]['views'] = self.pageviews[qid]
            else:
                result[qid]['views'] = 0
        return result
    
#     Using Wikidata API
#     def get_wikipedia_pages(self, qids):
#         if not qids:
#             return {}
#         qids_list = '|'.join(qids)
#         wikiapi_query = f'https://www.wikidata.org/w/api.php?action=wbgetentities&format=json&ids={qids_list}&sitefilter=ruwiki&props=sitelinks'
#         resp = requests.get(wikiapi_query).json()
#         result = {}
#         views = {}
#         for qid in qids:
#             cur_data = resp['entities'][qid]
#             wiki_title = None
#             if 'sitelinks' in cur_data and 'ruwiki' in cur_data['sitelinks'] and 'title' in cur_data['sitelinks']['ruwiki']:
#                 wiki_title = cur_data['sitelinks']['ruwiki']['title']
#                 views[wiki_title] = 0
#             result[qid] = {'ruwiki': wiki_title}
            
#         self.get_wikipedia_pageviews(views)
#         for qid in qids:
#             cur_title = result[qid]['ruwiki']
#             if cur_title is not None:
#                 result[qid]['views'] = views[cur_title]
#             else:
#                 result[qid]['views'] = 0
#         return result

    def _apply_ranking(self, matches, relative_rate=0.9):
        if len(matches) == 0:
            return matches
        
        matches = sorted(matches, key=lambda m: m['score'], reverse=True)      
        
        cur_max_pos = 0
        cur_pos = 1
        while cur_pos < len(matches):
            cur_max_score = matches[cur_max_pos]['score']
            while cur_pos < len(matches) and matches[cur_pos]['score'] >= relative_rate * matches[cur_max_pos]['score']:
                cur_pos += 1
            matches[cur_max_pos:cur_pos] = sorted(matches[cur_max_pos:cur_pos], key=lambda m: m['views'], reverse=True)
            cur_max_pos = cur_pos
            cur_pos = cur_max_pos + 1
            
        return matches
        
    def get_query_matches(self, query, n_matches=30):
        # run query and collect results
        es_result = es.search(index='all_entities', body=query, size=n_matches)['hits']
        matches_dict = {}
        for entry in es_result['hits']:
            qid = entry['_source']['qid']
            if qid not in matches_dict:
                matches_dict[qid] = entry['_source']
                matches_dict[qid]['score'] = entry['_score']
                
        # obtain label and description for each qid
        names_and_descriptions = self.get_names_and_descriptions(matches_dict.keys())
        for qid, nds in names_and_descriptions.items():
            matches_dict[qid].update(nds)
            
        # obtain wikipedia page and popularity for each qid
        wikipedia_pages = self.get_wikipedia_pages(matches_dict.keys())
        for qid, wps in wikipedia_pages.items():
            matches_dict[qid].update(wps)
            
        # rank matched entities
        matches = self._apply_ranking(list(matches_dict.values()))
        return matches

In [9]:
class Suggester:
    def __init__(self, matcher, morpher):
        self.matcher = matcher
        self.morpher = morpher
        
    def _build_query(self, text):
        return Query(text=text, morpher=self.morpher)
    
    def _get_matches(self, q, query_type):
        if query_type == 'fulltext':
            return self.matcher.get_query_matches(q.build_es_query(q.get_fulltext_queries()))
        if query_type == 'phrase':
            return self.matcher.get_query_matches(q.build_es_query(q.get_phrase_queries()))
        if query_type == 'single':
            single_queries = q.get_single_capital_queries()
            return [self.matcher.get_query_matches(q.build_es_query(single_query))
                   for single_query in single_queries]
        
    def _select_matches(self, q, min_total, f, p, s):
        matches = []
        matches_qids = set()
        
        phrase_matches = self._get_matches(q, query_type='phrase')
        if len(phrase_matches) > p:
            phrase_matches = phrase_matches[:p]
        for match in phrase_matches:
            matches_qids.add(match['qid'])
            match['source'] = 'phrase'
        
        single_matches = self._get_matches(q, query_type='single')
        single_matches_result = []
        for single_match in single_matches:
            if not single_match:
                continue
            top_match = single_match[0]
            if top_match['qid'] not in matches_qids:
                matches_qids.add(top_match['qid'])
                top_match['source'] = 'single'
                single_matches_result.append(top_match)
        single_matches_result = sorted(single_matches_result, key=lambda m: m['views'], reverse=True)
        
        fulltext_matches = self._get_matches(q, query_type='fulltext')
        fulltext_matches_result = []
        f_cnt = 0
        for match in fulltext_matches:
            if match['qid'] not in matches_qids:
                f_cnt += 1
                matches_qids.add(match['qid'])
                match['source'] = 'fulltext'
                fulltext_matches_result.append(match)
            if len(matches_qids) >= min_total and f_cnt >= f:
                break
        fulltext_matches_result = sorted(fulltext_matches_result, key=lambda m: m['score'], reverse=True)
                
        matches.extend(phrase_matches)
        matches.extend(single_matches_result)
        matches.extend(fulltext_matches_result)
        return matches
    
    def get_suggestions(self, text, answer=''):
        q = self._build_query(text)

        matches = self._select_matches(q)
        return {
            'text': text,
            'answer': answer,
            'matches': matches
        }
    
    @staticmethod
    def pretty_print(entry):
        lines = []
        
        matches = entry['matches']
        query = entry['text']
        
        lines.append(f'Query: {query}\n')
        
        for match in matches:
            if match['name']:
                lines.append(match['name'])
            else:
                lines.append('*no name*')

            if match['description']:
                desc = match['description']
                lines.append(f'({desc})')

            qid = match['qid']
            lines.append(f'Link: https://www.wikidata.org/wiki/{qid}\n')
        
        return '\n'.join(lines)

In [10]:
class SuggesterQuestion(Suggester):
    def _build_query(self, text):
        return QueryQuestion(text=text, morpher=self.morpher)
    
    def _get_matches(self, q, query_type):
        if query_type == 'fulltext':
            return self.matcher.get_query_matches(q.build_es_query(q.get_fulltext_queries()), n_matches=20)
        if query_type == 'phrase':
            return self.matcher.get_query_matches(q.build_es_query(q.get_phrase_queries()), n_matches=8)
        if query_type == 'single':
            single_queries = q.get_single_capital_queries()
            return [self.matcher.get_query_matches(q.build_es_query(single_query), n_matches=10)
                   for single_query in single_queries]
        
    def _select_matches(self, q, min_total=8, f=5, p=4, s=1):
        return super()._select_matches(q, min_total, f, p, s)

In [11]:
class SuggesterAnswer(Suggester):
    def _build_query(self, text):
        return QueryAnswer(text=text, morpher=self.morpher)
    
    def _get_matches(self, q, query_type):
        if query_type == 'fulltext':
            return self.matcher.get_query_matches(q.build_es_query(q.get_fulltext_queries()), n_matches=5)
        if query_type == 'phrase':
            return self.matcher.get_query_matches(q.build_es_query(q.get_phrase_queries()), n_matches=10)
        if query_type == 'single':
            single_queries = q.get_single_capital_queries()
            return [self.matcher.get_query_matches(q.build_es_query(single_query), n_matches=5)
                   for single_query in single_queries]
    
    def _select_matches(self, q, min_total=5, f=3, p=4, s=1):
        return super()._select_matches(q, min_total, f, p, s)

### Run linker

The simplest way to prepare linkers for questions and answers is as following:

In [12]:
mrph = Morpher()
mtc = Matcher(es_instance=es)
suggester_question = SuggesterQuestion(matcher=mtc, morpher=mrph)
suggester_answer = SuggesterAnswer(matcher=mtc, morpher=mrph)

Right then you can get list of the candidate entities:

In [13]:
suggestions_question = suggester_question.get_suggestions('В каком американском штате находится Большой Каньон?')
suggestions_answer = suggester_answer.get_suggestions('Аризона')

Resulting suggestions are in JSON:

In [15]:
suggestions_question

{'text': 'В каком американском штате находится Большой Каньон?',
 'answer': '',
 'matches': [{'qid': 'Q118841',
   'label': 'большой каньон',
   'score': 17.875689,
   'name': 'Большой каньон',
   'description': 'каньон, прорезанный рекой Колорадо в плато Колорадо, штат Аризона, США',
   'ruwiki': 'Большой_каньон',
   'views': 4756,
   'source': 'phrase'},
  {'qid': 'Q3848004',
   'label': 'большой каньон',
   'score': 17.875689,
   'name': 'Большой каньон',
   'description': None,
   'ruwiki': 'Большой_каньон_(Крым)',
   'views': 391,
   'source': 'phrase'},
  {'qid': 'Q1542548',
   'label': 'большой каньон',
   'score': 17.875689,
   'name': 'Большой каньон',
   'description': 'фильм 1991 года',
   'ruwiki': 'Большой_каньон_(фильм)',
   'views': 173,
   'source': 'phrase'},
  {'qid': 'Q32508401',
   'label': 'большой каньон',
   'score': 17.875689,
   'name': 'Большой Каньон',
   'description': None,
   'ruwiki': None,
   'views': 0,
   'source': 'phrase'},
  {'qid': 'Q11442',
   'la

But you can print them in some more human-readable way:

In [16]:
print(Suggester.pretty_print(suggestions_question))
print(Suggester.pretty_print(suggestions_answer))

Query: В каком американском штате находится Большой Каньон?

Большой каньон
(каньон, прорезанный рекой Колорадо в плато Колорадо, штат Аризона, США)
Link: https://www.wikidata.org/wiki/Q118841

Большой каньон
Link: https://www.wikidata.org/wiki/Q3848004

Большой каньон
(фильм 1991 года)
Link: https://www.wikidata.org/wiki/Q1542548

Большой Каньон
Link: https://www.wikidata.org/wiki/Q32508401

велосипед
(двухколёсное транспортное средство, приводимое в движение посредством педалей)
Link: https://www.wikidata.org/wiki/Q11442

каньон
Link: https://www.wikidata.org/wiki/Q150784

Большой каньон Крыма
Link: https://www.wikidata.org/wiki/Q16627582

Гора Большой Каньон
Link: https://www.wikidata.org/wiki/Q31626940

США
(федеративное государство в Северной Америке)
Link: https://www.wikidata.org/wiki/Q30

Каньоны
Link: https://www.wikidata.org/wiki/Q3014145

государство
(суверенная территориальная организация)
Link: https://www.wikidata.org/wiki/Q7275

Query: Аризона

Аризона
(штат в США)
Link: