# Index into Elasticsearch Example

In [None]:
from datetime import datetime
from elasticsearch import Elasticsearch
import json
import os

import hashlib
import eland as ed
import pandas as pd
from sqlalchemy import create_engine

In [None]:
es = Elasticsearch("http://localhost:9200")

# Let's index some real data

In [None]:
engine = create_engine('postgresql://username:password@localhost:5432/default_database?gssencmode=disable')
# select all from papers table
df = pd.read_sql_table('papers', engine)

In [None]:
df

In [None]:
df['authors']=df['authors'].str.split('and')
# create md5 hash of each row of the dataframe
df['md5'] = df.apply(lambda x: hashlib.md5(str(x).encode('utf-8')).hexdigest(), axis=1)
pdfs_dir = '../downloads'
# filter the dataframe to only include pdfs that exist in the pdfs directory
df = df[~df.pdf_name.isnull()]
# df = df[df.pdf_name.apply(lambda x: os.path.exists(os.path.join(pdfs_dir, x)))]

In [None]:
# pandas setting show full string for each cell
pd.set_option('display.max_colwidth', -1)
df['path_exists'] = df.pdf_name.apply(lambda x: os.path.exists(os.path.join(pdfs_dir, x)))
df[['pdf_name', 'path_exists']]

Users might want to filter on paper name, journal, year, authors. So let's include all of these in opensearch.

In [None]:
def create_document(row):
    return {
        'authors': row['authors'],
        'year': row['year'],
        'timestamp': datetime.now(),
        'journal': row['journal'],
        'title': row['name'],
        'id': row['md5'],
    }

path = '../data/ocr/'
index_name = 'free-energy-principle'

In [None]:
# Delete index if it exists. For testing.
es.indices.delete(index=index_name, ignore=[400, 404])

In [None]:
for ix, file in enumerate(os.listdir(path)):
    pdf_name = file.split('.')[0]
    with open(path+file, 'r') as f:
        data = json.load(f)
        for page in data['pages']:
            for block in page['blocks']:
                text = block['text']
                # create dict from row of df with name column equal to file_name
                df_row = df.loc[df['pdf_name'].str.split('.').str[0] == pdf_name]
                # if row_of_interest is empty, skip to next file
                if len(df_row) == 0:
                    print('No row found for file: ' + pdf_name)
                    continue
                row_of_interest = df_row.to_dict('records')[0]
                doc = create_document(row_of_interest)
                doc['text'] = text
                resp = es.index(index=index_name, document=doc)
                print('Indexed document: ' + str(ix) + ' of ' + str(len(os.listdir(path))))

es.indices.refresh(index=index_name)

resp = es.search(index=index_name, query={"match_all": {}})
print("Got %d Hits:" % resp['hits']['total']['value'])

In [None]:
resp['hits']

In [None]:
# Connect to free-energy-principle index via localhost:9200 elasticsearch node. Back to pandas style code for familiarity.
df = ed.DataFrame(es, es_index_pattern=index_name)

In [None]:
df

# Index in bulk

In [None]:
# https://github.com/climatepolicyradar/navigator/blob/aa5c1de51d9ff29d7c8f3bd2d9a577cfcaf1f9ba/search-index/app/index.py

In [None]:
from elasticsearch.helpers import bulk, streaming_bulk

In [None]:
from typing import Iterable


class ElasticSearchSIndexer:
    """Methods for indexing data into Elasticsearch."""

    def __init__(self, es_client, index_name):
        self.es_client = es_client
        self.index_name = index_name
        self.es_client.indices.create(index=index_name, ignore=400)

    def _index_body(self) -> dict:
        """Return the body of the index request."""
        return {
            '_index': self.index_name,
            '_type': self.doc_type,
            '_source': {
                'timestamp': datetime.now(),
                'text': self.text,
                'title': self.title,
                'authors': self.authors,
                'journal': self.journal,
                'year': self.year,
            }
        }

    def bulk_index(self, es: Elasticsearch, actions: Iterable[dict]):
        """Bulk load."""
        bulk(es, actions)