## Ingest PDF or Office documents using Tika, LangChain and Elasticsearch

First, we will import all the necessary libraries and fetch variable values that we have previously setup using the previous defined environment variables

In [None]:
import os
import hashlib
import fnmatch
import tqdm
from tika import parser
from langchain.text_splitter import RecursiveCharacterTextSplitter
from ssl import create_default_context
from elasticsearch import Elasticsearch, helpers

In [None]:
es_url = os.getenv('ES_URL')
es_user = os.getenv('ES_USER')
es_password = os.getenv('ES_PASSWORD')
es_cacert = os.getenv('ES_CACERT')
es_index_name = os.getenv('ES_INDEX_NAME')
docs_dir = os.getenv('DOCS_DIR')

## Define helper functions

Next, we define the utility functions below that we will use later on to initialize Elasticsearch and generate a dictionary of documents to ingest into an Elasticsearch index

In [None]:
def init_elasticsearch(es_cacert, es_url, es_user, es_password):
    context = create_default_context(cafile=es_cacert)
    client = Elasticsearch(
        es_url,
        basic_auth=(es_user, es_password),
        ssl_context=context,
        request_timeout=480
    )
    return client

def generate_documents(docs_dir, es_index_name, text_splitter):
    doc_count = 0
    documents = []
    for filename in os.listdir(docs_dir):
        f = os.path.join(docs_dir, filename)
        if os.path.isfile(f):
            with open(f) as file:
                print("Processing " + f)
                doc_count += 1
                parsed_document = parser.from_file(f)
                document_text = parsed_document['content']
                chunks = text_splitter.split_text(document_text)
                for chunk_id, chunk_text in enumerate(chunks):
                    source_dict = dict()
                    document_dict = {"_index": es_index_name}
                    source_dict['doc_id'] = doc_count
                    source_dict['chunk_id'] = chunk_id
                    source_dict['title'] = str(filename)
                    source_dict['text'] = chunk_text
                    document_dict['_id'] = hashlib.md5(chunk_text.encode('utf-8')).hexdigest()
                    document_dict['_source'] = source_dict
                    documents.append(document_dict)
    return documents

def yield_docs(documents):
    for doc in documents:
        yield doc

## Using LangChain text splitting for chunking

We use the LangChain library to split our document text into smaller chunks. This is needed so as to fit text within our model's context window. For example, ELSER in Elasticsearch has a 512 token limit, so we need to ensure every chunk remains within that size so we can obtain embedded vectors on each of the 512 token chunks of the document text. 

You can update the code below and use different values of "chunk_size" and "chunk_overlap" depending on the downstream model that you are working with or try out other text splitters as well. You can read more about it in the [LangChain documentation](https://python.langchain.com/docs/modules/data_connection/document_transformers/)

In [None]:
text_splitter =RecursiveCharacterTextSplitter.from_tiktoken_encoder(
    chunk_size=512,
    chunk_overlap=124,
    is_separator_regex=False,
    disallowed_special=()
)

## Index into Elasticsearch

We will now use the helpers to initialize an Elasticsearch client and ingest documents into an index upon breaking them into chunks as defined by our chunking process above

In [None]:
client = init_elasticsearch(es_cacert, es_url, es_user, es_password)
documents = generate_documents(docs_dir, es_index_name, text_splitter)
number_of_docs = len(documents)

In [None]:
print("Indexing documents...")

progress = tqdm.tqdm(unit="docs", total=number_of_docs)
successes = 0

try:
    for ok, action in helpers.streaming_bulk(
    client=client,
    chunk_size=50,
    actions=yield_docs(documents),
    ):
        progress.update(50)
        successes += ok
except Exception as e:
    print(e.errors)

print("Indexed %d/%d documents" % (successes, number_of_docs))