Экспорт таблиц из Postgres в Elasticsearch
========================

Таблицы копируются как есть, без изменений в базе данных Postgres.


To access notebook via ssh: `$ ssh -N -L 8888:localhost:8888 {user}@{server_ip}`

In [1]:
import os
import csv
import json
import sys
import requests
from pprint import pprint
import psycopg2
import psycopg2.extras  
import ipywidgets as widgets
import time
import datetime
from elasticsearch import Elasticsearch

# from IPython.core.debugger import set_trace

In [2]:
print(os.getenv('RGDSN'))
print(os.getenv('RGPASS'))

host=134.0.107.93 port=5432 dbname=rgdb user=root password=rosgas2011 sslmode=disable
rosgas2011


In [3]:
RGUSER = 'admin'
RGPASS = os.getenv('RGPASS')
RGDSN = os.getenv('RGDSN')
# ELASTIC_ENDPOINT = "http://rg-corpus-caddy:8080/elasticsearch/"
ELASTIC_ENDPOINT = 'http://dockertest.rgwork.ru:9094/elasticsearch/' if os.path.exists('local-file.txt') else 'http://es01:9200/'
es = Elasticsearch(ELASTIC_ENDPOINT, http_auth=(RGUSER, RGPASS))



def save_batch(lines: list, elastic_endpoint:str, index_name:str):
    """saves batch of lines to database"""
    data = '\n'.join(lines)+'\n'
    r = requests.post(f'{elastic_endpoint}{index_name}/_bulk', 
                      headers = {'Content-Type': 'application/x-ndjson; charset=UTF-8'}, 
                      auth=(RGUSER,RGPASS),
                      data=data.encode('utf-8'))
    try:
        rjson=r.json()
        if rjson.get('errors') is not False:
            pprint(rjson)
    except:
        pprint(r)
        
    
def set_progress(p1,p2, val):
    if p1 is not None:
        p1.value = val
    if p2 is not None:
        p2.value = str(val)

def save_table_to_elastic(table_name: str, idname: str, elastic_endpoint:str, index_name:str,  max_number=0, batch_size=1000):
    """Копировать таблицу в elasticsearch, как есть, без изменений в базе данных Postgres.
    
    - table_name - name of postgres table
    - idname - выражение для поля postgres значение которого нужно сделать уникальным идентификатором записи Эластик
    - elastic_endpoint - конечная точка эластик
    - index_name - имя инедекса эластик
    - max_number - max number of records to save
    - batch_size  - number of records in a batch 
    """
    p1 = widgets.IntProgress(min=0, max=max_number) 
    p2 = widgets.Label()
    box = widgets.HBox([p1,p2])
    display(box)
    
    start = time.time()
    counter =0    # aka record id 
    lines =[]     # list of text lines to save
    conn = psycopg2.connect(RGDSN)
    try:
        with conn:
            with conn.cursor('servercursor') as curs:
                curs.execute(f"SELECT {idname}, row_to_json(r,FALSE)::text FROM {table_name} r LIMIT {max_number}")
                for record in curs:
                    if counter >= max_number: break
                    elastic_id = counter if record[0] is None else record[0]
                    lines.append('{"index" : {"_id" : "'+str(elastic_id)+'"}}')
                    lines.append(record[1])
                    counter += 1
                    if counter % batch_size ==0:
                        duration = (time.time()-start)/60
                        p1.value=counter; p2.value = f'{counter}/{max_number}. Время {duration:.2f} мин'
                        save_batch(lines, elastic_endpoint, index_name)
                        lines.clear()
                p1.value=counter; p2.value = f'{counter}/{max_number}. Время {duration:.2f} мин'
                save_batch(lines, elastic_endpoint, index_name)
                lines.clear()
       
    except Exception as ex:
        print(ex)
    finally:
        conn.close()    
    
def create_index(index_name:str, mapping_file:str):
    """ Создает индекс Эластик из маппинга в файле. 
    - index_name - имя индекса эластик
    - mapping_file_name -имя файла с маппингом в формате  JSON.
    """
    try:
        f = open(mapping_file, 'r') 
        json_data = json.load(f)
        f.close()
        pprint(es.indices.create(index=index_name, body=json_data))
    except Exception as ex:
        pprint(ex)



## Получить список индексов

In [39]:
print(es.cat.indices('art*', v=True))
# articles_indices = [ idx['index'] for idx in es.cat.indices(v=True, format='json') if idx['index'][:3] == 'art' ]


health status index          uuid                   pri rep docs.count docs.deleted store.size pri.store.size
green  open   articles_tfidf 68CuvhmpQh2jiV32Bp44tw   1   0          0            0       208b           208b
green  open   articles_k20b0 jJI7IwmfSmGGY7EtHdhZZA   1   0          0            0       208b           208b
green  open   articles       gsVg1W2SRZKzmOCxuS2wEw   1   0    1232876            0      9.6gb          9.6gb



## Создать индексы

In [35]:
create_index(index_name='articles_k20b0', mapping_file='mappings/articles-k20b0-mapping.json')

{'acknowledged': True, 'index': 'articles_k20b0', 'shards_acknowledged': True}


In [36]:
create_index(index_name='articles_tfidf', mapping_file='mappings/articles-tfidf-mapping.json')

{'acknowledged': True, 'index': 'articles_tfidf', 'shards_acknowledged': True}


## Удалить индексы
<div style="font-weight:bold; color:red;">осторожно</div>


In [33]:
es.indices.delete('articles_k20b0')

{'acknowledged': True}

In [None]:
es.indices.delete('articles_tfidf')

## Импорт таблиц в Эластик 

Перед тем как сделать импорт создайте индексы с помощью команд приведенных выше. 
В противном случае будут созданы индексы с маппингом по умолчанию,
что не всегда соответствует ожиданиям.

Действуйте внимательно, поскольку данные могут быть перезаписаны.



In [None]:
save_table_to_elastic('rubrics', 'id', ELASTIC_ENDPOINT, 'rubrics', max_number=2000 , batch_size=500)

In [None]:
save_table_to_elastic('rubrics_objects', "kind || '-' || rubric_id || '-' || object_id", ELASTIC_ENDPOINT, 'rubrics_objects', max_number=3500000 , batch_size=10000)

In [None]:
save_table_to_elastic('articles', 'obj_id', ELASTIC_ENDPOINT, 'articles', max_number=1250000 , batch_size=5000)

In [None]:
save_table_to_elastic('articles', 'obj_id', ELASTIC_ENDPOINT, 'articles_k20b0', max_number=1250000 , batch_size=5000)

In [None]:
save_table_to_elastic('articles', 'obj_id', ELASTIC_ENDPOINT, 'articles_tfidf', max_number=1250000 , batch_size=5000)