In [1]:
%config IPCompleter.greedy=True
import re
import json
from collections import defaultdict
from tqdm import tqdm_notebook as tqdm
from elasticsearch import Elasticsearch
from elasticsearch.helpers import parallel_bulk
from sklearn.feature_extraction.text import CountVectorizer
import requests
from time import time
import time

import base64
import xmltodict



In [2]:
es = Elasticsearch([{'host': 'localhost', 'port': 9200, 'timeout': 360, 'maxsize': 25}])

# Index with Snowball

In [4]:
documents_by_id = {}
es.indices.delete(index='myandex')
es.indices.create(index='myandex')

{'acknowledged': True, 'index': 'myandex', 'shards_acknowledged': True}

In [5]:
def processFile(i):
    prefix = '../byweb_for_course/byweb.'
    suffix = '.xml'
    filename = prefix + str(i) + suffix
    with open(filename, 'rb') as f:
        decoded = f.read().decode('cp1251')
        xmldict = xmltodict.parse(decoded)
        for doc in tqdm(xmldict['romip:dataset']['document']):
            try:
                docID = doc['docID']
                documents_by_id[docID] = {}
                url = base64.b64decode(doc['docURL']).decode('cp1251')
                content = base64.b64decode(doc['content']['#text']).decode('cp1251')
                documents_by_id[docID]['url'] = url
                documents_by_id[docID]['content'] = content
            except Exception as e:
                print(e)

In [6]:
for i in range(10):
    processFile(i)































In [7]:
settings_final = {
    'mappings': {
        'properties': {
            'url': {
                'type': 'text'
            },
            'content': {
                'type': 'text',
                "analyzer": "my_custom_analyzer"
            }
        }
    },
    "settings": {
    "analysis": {
      "analyzer": {
        "my_custom_analyzer": {
          "type":      "custom", 
          "tokenizer": "standard",
          "char_filter": [
            "html_strip",
            "yont"
          ],
          "filter": [
            "lowercase",
            #"asciifolding",
            "russian_snow",
            "english_snow"
          ]
        }
      },
        'char_filter': {
                'yont': {
                    'type': 'mapping',
                    'mappings': [
                        'ё => е',
                        'Ё => Е'
                    ]
                }
            },
    'filter': {
            'stop_words': {
                'type': 'stop',
                'stopwords': [
                ]
            },
            'russian_snow': {
                'type': 'snowball',
                'language': 'russian'
            },
            'english_snow': {
                'type': 'snowball',
                'language': 'english'
            }
     }
    }
  }
}

In [8]:
def recreate_index():
    es.indices.delete(index='myandex')
    es.indices.create(index='myandex', body=settings_final)

In [9]:
recreate_index()

In [10]:
def check_analyzer(analyzer, text):
    body = analyzer
    body['text'] = text
    
    tokens = es.indices.analyze(index='myandex', body=body)['tokens']
    tokens = [token_info['token'] for token_info in tokens]
    return tokens

In [11]:
analyzer = {
    'analyzer': 'my_custom_analyzer'
}

check_analyzer(analyzer, '<meta http-equiv="Content-Type" content="text/html; charset=windows-1251"> bla bla русский countable текст Ёшкин кот')

['bla', 'bla', 'русск', 'countabl', 'текст', 'ешкин', 'кот']

In [12]:
def create_es_action(index, doc_id, document):
    return {
        '_index': index,
        '_id': doc_id,
        '_source': document
    }

In [13]:
def es_actions_generator():
    for doc_id, doc in tqdm(documents_by_id.items()):
        yield create_es_action('myandex', doc_id, doc)

In [14]:
start = time.time()
for ok, result in parallel_bulk(es, es_actions_generator(), queue_size=4, thread_count=4, chunk_size=1000):
    if not ok:
        print(result)
end = time.time()
print(f"Time on index creation: {time.strftime('%H:%M:%S.%l', time.gmtime(end - start))}")
print(f"In seconds: {end - start}")


Time on index creation: 00:04:56.12
In seconds: 296.1822123527527


In [15]:
def search(query, *args):
    pretty_print_result(es.search(index='myandex', body=query, size=20), args)
    # note that size set to 20 just because default value is 10 and we know that we have 12 docs and 10 < 12 < 20

def raw_search(query):
    search_result = es.search(index='myandex', body=query, size=20)['hits']
    return [(hit['_id'], hit['_score']) for hit in search_result['hits']]
    
def pretty_print_result(search_result, fields=[]):
    # fields is a list of fields names which we want to be printed
    res = search_result['hits']
    print(f'Total documents: {res["total"]["value"]}')
    for hit in res['hits'][:6]:
        print(f'Doc {hit["_id"]}, score is {hit["_score"]}')
        for field in fields:
            print(f'{field}: {hit["_source"][field]}')
                  
def get_doc_by_id(doc_id):
    return es.get(index='myandex', id=doc_id)['_source']

In [16]:
def get_query(query):
    return {
    'query': {
        'bool': {
            'should': {
                'match': {
                    'content': query
                }
            }
        }
    }
    }

q = get_query('<meta http-equiv="Content-Type" content="text/html; charset=windows-1251">')
search(q)
raw_search(q)

Total documents: 0


[]

In [17]:
def print_index_size(index): 
    print(f"Size of index: {es.indices.stats(index)['_all']['primaries']['store']['size_in_bytes'] / 2 ** 30} GB")

In [18]:
print_index_size('myandex')

Size of index: 3.8551786467432976 GB


In [19]:
def load_queries_and_relevance():
    relevance = defaultdict(dict)
    filename = '../relevant_table_2009.xml'
    with open(filename, 'rb') as f:
        xmldict = xmltodict.parse(f.read())
        for task in tqdm(xmldict['taskDocumentMatrix']['task']):
            task_rel = {}
            has_vital = False
            for doc in task['document']:
                if doc['@relevance'] == 'vital':
                    has_vital = True
                task_rel[doc['@id']] = doc['@relevance']
            if has_vital:
                relevance[task['@id']] = task_rel
    filename = '../web2008_adhoc.xml'
    with open(filename, 'rb') as f:
        xmldict = xmltodict.parse(f.read())
        for task in tqdm(xmldict['task-set']['task']):
            if task['@id'] in relevance:
                relevance[task['@id']]['querytext'] = task['querytext']
    return relevance

In [20]:
relevance = load_queries_and_relevance()







In [23]:
import numpy as np


def get_number_of_correct_out_of_k(results, task_relevance, k):
    return sum([1 if res[0] in task_relevance and task_relevance[res[0]] == 'vital' else 0 for res in results[:k]])

def measure_performance():    
    Q = len(relevance)
    pq = 0
    rq = 0
    prq = 0
    mapq = 0
    tasks_prq = []
    for task in relevance.keys():
        sk = 0
        task_relevance = relevance[task]
        results = raw_search(get_query(task_relevance['querytext']))
        #if len(results) < 20:
        #    print("WARNING LESS 20")
        sk = get_number_of_correct_out_of_k(results, task_relevance, 20)
        pq += sk / 20
        relevant_size = len(['vital' for value in task_relevance.values() if value == 'vital'])
        rq += sk / relevant_size
        current_prq = get_number_of_correct_out_of_k(results, task_relevance, relevant_size) / relevant_size
        prq += current_prq
        tasks_prq.append(current_prq)
        mapk = 0
        for k in range(1, 21):
            mapk += get_number_of_correct_out_of_k(results, task_relevance, k) / k
        mapk /= 20
        mapq += mapk
    print(f"p@20: {pq / Q}")
    print(f"r@20: {rq / Q}")
    print(f"p@R(q): {prq / Q}")
    print(f"map@20(q): {mapq / Q}")
    
    return np.array(tasks_prq)

In [24]:
tasks_prq = measure_performance()

p@20: 0.3839393939393943
r@20: 0.251080754422327
p@R(q): 0.21404717616325877
map@20(q): 0.4353305444924268


In [25]:
q = get_query('')
search(q)
raw_search(q)

Total documents: 0


[]

In [26]:
q = get_query('я пошел')
search(q, 'url')

Total documents: 10000
Doc 772427, score is 11.434296
url: http://wow.tut.by/send/9082/
Doc 1285482, score is 11.06866
url: http://www.bac.tut.by/index.php?tnt=0&tem=&ac=qwe1&id=31929
Doc 1420091, score is 10.830326
url: http://www.deti.by/lib/forkids/tales/slavic/russian-folk/two-in-bag.html
Doc 60845, score is 10.64367
url: http://pritchi.castle.by/ras-8-18.html
Doc 500784, score is 10.526928
url: http://www.sng.by/forum/index.php?showtopic=393
Doc 251864, score is 10.456835
url: http://rogdestvo.by/rus/biblia/_vetchij/07_sudie/sudie_14.php


# Index without snowball

In [27]:
#es.indices.delete(index='myandex_raw')
es.indices.create(index='myandex_raw')

{'acknowledged': True, 'index': 'myandex_raw', 'shards_acknowledged': True}

In [28]:
settings_final_raw = {
    'mappings': {
        'properties': {
            'url': {
                'type': 'text'
            },
            'content': {
                'type': 'text',
                "analyzer": "my_custom_analyzer"
            }
        }
    },
    "settings": {
    "analysis": {
      "analyzer": {
        "my_custom_analyzer": {
          "type":      "custom", 
          "tokenizer": "standard",
          "char_filter": [
            "html_strip"
          ],
          "filter": [
            "lowercase"
          ]
        }
      },
    'char_filter': {
            },
    'filter': {
     }
    }
  }
}

In [29]:
def recreate_index_raw():
    es.indices.delete(index='myandex_raw')
    es.indices.create(index='myandex_raw', body=settings_final_raw)

In [30]:
recreate_index_raw()

In [31]:
def es_actions_generator_raw():
    for doc_id, doc in tqdm(documents_by_id.items()):
        yield create_es_action('myandex_raw', doc_id, doc)

In [32]:
start = time.time()
for ok, result in parallel_bulk(es, es_actions_generator_raw(), queue_size=4, thread_count=4, chunk_size=1000):
    if not ok:
        print(result)
end = time.time()
print(f"Time on index creation: {time.strftime('%H:%M:%S.%l', time.gmtime(end - start))}")
print(f"In seconds: {end - start}")




TransportError: TransportError(429, 'circuit_breaking_exception', '[parent] Data too large, data for [<http_request>] would be [1056103520/1007.1mb], which is larger than the limit of [986061209/940.3mb], real usage: [926998432/884mb], new bytes reserved: [129105088/123.1mb], usages [request=0/0b, fielddata=0/0b, in_flight_requests=433895952/413.7mb, accounting=1261593/1.2mb]')

In [42]:
def search(query, *args):
    pretty_print_result(es.search(index='myandex_raw', body=query, size=20), args)
    # note that size set to 20 just because default value is 10 and we know that we have 12 docs and 10 < 12 < 20

def raw_search(query):
    search_result = es.search(index='myandex_raw', body=query, size=20)['hits']
    return [(hit['_id'], hit['_score']) for hit in search_result['hits']]

In [34]:
print_index_size('myandex_raw')

Size of index: 4.733313490636647 GB


In [35]:
def load_queries_and_relevance_raw():
    relevance = defaultdict(dict)
    filename = '../relevant_table_2009.xml'
    with open(filename, 'rb') as f:
        xmldict = xmltodict.parse(f.read())
        for task in tqdm(xmldict['taskDocumentMatrix']['task']):
            task_rel = {}
            has_vital = False
            for doc in task['document']:
                if doc['@relevance'] == 'vital':
                    has_vital = True
                task_rel[doc['@id']] = doc['@relevance']
            if has_vital:
                relevance[task['@id']] = task_rel
    filename = '../web2008_adhoc.xml'
    with open(filename, 'rb') as f:
        xmldict = xmltodict.parse(f.read())
        for task in tqdm(xmldict['task-set']['task']):
            if task['@id'] in relevance:
                relevance[task['@id']]['querytext'] = task['querytext']
    return relevance

In [36]:
relevance = load_queries_and_relevance_raw()







In [43]:
tasks_prq_raw = measure_performance()

p@20: 0.3348484848484848
r@20: 0.21605664427146334
p@R(q): 0.18301908439672362
map@20(q): 0.39893436948606287


# Comparing snowball and raw

In [45]:
diffs_cnt = 5
prq_diffs = np.abs(tasks_prq - tasks_prq_raw)
sorted_idx = np.argsort(prq_diffs)
relevance_keys = relevance.keys()

for i in sorted_idx[-diffs_cnt::]:
    task_relevance = relevance[list(relevance_keys)[i]]
    query = task_relevance['querytext']
    print('Query: %s. p@R(q) with snowball index: %f. p@R(q) with raw index: %f.'
          % (query, tasks_prq[i], tasks_prq_raw[i]))

Query: в контакте. p@R(q) with snowball index: 0.000000. p@R(q) with raw index: 0.500000.
Query: Риэлт Дудаево. p@R(q) with snowball index: 0.000000. p@R(q) with raw index: 0.500000.
Query: УРАЛЬСКАЯ ПЛИТКА. p@R(q) with snowball index: 1.000000. p@R(q) with raw index: 0.055556.
Query: контакт. p@R(q) with snowball index: 1.000000. p@R(q) with raw index: 0.000000.
Query: Стоматология донецк. p@R(q) with snowball index: 1.000000. p@R(q) with raw index: 0.000000.
