In [1]:
import asyncio
from aioes import Elasticsearch

In [2]:
from chunk import TimeDistance
from chunk import Chunker
from slack_data_loader import SlackLoader

In [3]:
DATH_TO_DATA = './data'

In [4]:
es = Elasticsearch(['localhost:9200'])

In [5]:
def gen_synonyms():
    """
    Generate some synonyms in a file. All words separated by comma are treated as equal
    """
    with open("help_data/synonyms.txt", "w") as syns:
        syns.write("xboost, эксгебуст, эксбуст, иксгебуст, xgboost\n")
        syns.write("пыха, пыху, пых, php\n")
        syns.write("lol, лол\n")
        syns.write("питон, python\n")

In [6]:
index_name = "ods-slack-index"
mapping_name = "thread"
index_body = {
    "settings": {
        "analysis": {
          "filter": {
            "russian_stop": {
              "type":       "stop",
              "stopwords":  "_russian_" 
            },
            "russian_stemmer": {
              "type":       "stemmer",
              "language":   "russian"
            },
            "synonyms_expand": {
              "type": "synonym", 
              # path to synonym file.
              # for ES to be able to read it, security policy should be set as described here:
              # https://stackoverflow.com/questions/35401917/reading-a-file-in-an-elasticsearch-plugin
              "synonyms_path": "/usr/share/config_data/synonyms.txt"
            }
          },
          "analyzer": {
            "russian_syn": {
              "tokenizer":  "standard",
              "filter": [
                "lowercase",
                "russian_stop",
                "russian_stemmer",
                "synonyms_expand"
              ]
            }
          }
        }
    },
    "mappings":{  
        mapping_name:{
          "properties":{
            "chanel": {"type": "keyword"},
            "title": {"type":"string", "analyzer":"russian_syn"},
            "messages" : {
                "properties":{
                    "text": {"type":"string", "analyzer":"russian_syn"},
                    "user_id": {"type": "keyword"},
                    "user_real_name": {"type":"string"},
                    "ts": {"type": "date"}
                }
            }
          }
        }
    }
}

async def create_index():
    ret = await es.indices.create(
        index=index_name,
        body=index_body
    )
    print(ret)


async def delete_index():
    ret = await es.delete(
        index=index_name
    )
    print(ret)

async def openclose():
    """
    Closing and opening index again reloads synomyms file
    """
    await es.indices.close(index=index_name)
    await es.indices.open(index=index_name)
    
async def populate_index(chanel, messages):
    await es.index(
        index=index_name,
        doc_type=mapping_name,
        body={
            "chanel": chanel,
            "title": messages[0]['text'],
            "messages": messages
        }
    )

In [7]:
loop = asyncio.get_event_loop()

# reload synonims without recreating the whole database
gen_synonyms()
loop.run_until_complete(openclose())

NotFoundError: TransportError(404, '{"error":{"root_cause":[{"type":"index_not_found_exception","reason":"no such index","resource.type":"index_or_alias","resource.id":"ods-slack-index","index_uuid":"_na_","index":"ods-slack-index"}],"type":"index_not_found_exception","reason":"no such index","resource.type":"index_or_alias","resource.id":"ods-slack-index","index_uuid":"_na_","index":"ods-slack-index"},"status":404}')

In [10]:
def index_chanel(chanel = "nlp"):
    data = SlackLoader(DATH_TO_DATA, only_channels=[chanel])
    chunker = Chunker()
    groups  = chunker.get_groups(data)
    
    try:
        loop.run_until_complete(delete_index())
    except:
        print("warn. cant delete index")
    loop.run_until_complete(create_index())

    for g in groups:
        users = data.users
        for msg in g:
            msg['user_real_name'] = users[msg['user']]['name']
            if 'dt' in msg:
                del msg['dt']
            msg['ts'] = int(msg['ts'])
            msg['timestamp'] = str(msg['ts'])
            if "attachments" in msg:
                for attach in msg["attachments"]:
                    if 'ts' in attach:
                        attach['ts'] = float(attach['ts'])
        loop.run_until_complete(populate_index(chanel, g))

In [11]:
index_chanel("nlp")

warn. cant delete index
{'shards_acknowledged': True, 'acknowledged': True}
