Imports and settings

In [None]:
import os

from dotenv import load_dotenv
from opensearchpy import OpenSearch

# --- Configuration ---
load_dotenv()
DOMAIN_NAME = os.getenv("DOMAIN_NAME")
AWS_REGION = os.getenv("AWS_REGION")
OPENSEARCH_PORT = os.getenv("OPENSEARCH_PORT")
INDEX_NAME = os.getenv("INDEX_NAME")
OPENSEARCH_HOST = f"{DOMAIN_NAME}.{AWS_REGION}.opensearch.localhost.localstack.cloud"
OPENSEARCH_USERNAME = os.getenv("OPENSEARCH_USERNAME")
OPENSEARCH_PASSWORD = os.getenv("OPENSEARCH_PASSWORD")

OpenSearch Client Configuration

In [None]:
# For a local OpenSearch instance, typically you'd use HTTP and basic auth.
# If you are using AWS OpenSearch Service, you would need AWS4Auth.
# For simplicity with a local docker-compose setup, we'll use basic auth.
auth = (OPENSEARCH_USERNAME, OPENSEARCH_PASSWORD)
client = OpenSearch(
    hosts=[{"host": OPENSEARCH_HOST, "port": OPENSEARCH_PORT}],
    http_auth=auth,
    http_compress=True,  # enables gzip compression for request bodies
    use_ssl=True,  # Use SSL for local OpenSearch, usually self-signed
    verify_certs=False,  # Do not verify certs for local self-signed certs
    ssl_assert_hostname=False,
    ssl_show_warn=False,
)

Extract text from PDF

In [43]:
from document_search_airflow.extractor_pdf_with_text import extract_text_from_pdf

pdf_filename = "ai-04-00049.pdf"
pages = extract_text_from_pdf(pdf_filename)

Chunk text

In [44]:
from langchain_text_splitters import SentenceTransformersTokenTextSplitter
from sentence_transformers import SentenceTransformer

splitter = SentenceTransformersTokenTextSplitter(
    model_name="sentence-transformers/all-mpnet-base-v2",  # characters or ~512 tokens
    chunk_overlap=50,
    tokens_per_chunk=256,
)
model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")  # 384‑D


extracted_data = []
for page_number, page_text in pages:
    chunks = splitter.split_text(page_text)
    embeddings = model.encode(chunks, batch_size=32, show_progress_bar=True)
    extracted_data.append((page_number, page_text, chunks, embeddings))


Batches: 100%|██████████| 1/1 [00:00<00:00,  7.57it/s]
Batches: 100%|██████████| 1/1 [00:00<00:00,  2.71it/s]
Batches: 100%|██████████| 1/1 [00:00<00:00, 34.32it/s]
Batches: 100%|██████████| 1/1 [00:00<00:00, 42.57it/s]
Batches: 100%|██████████| 1/1 [00:00<00:00, 42.67it/s]
Batches: 100%|██████████| 1/1 [00:00<00:00, 38.41it/s]
Batches: 100%|██████████| 1/1 [00:00<00:00, 37.31it/s]
Batches: 100%|██████████| 1/1 [00:00<00:00, 40.29it/s]
Batches: 100%|██████████| 1/1 [00:00<00:00,  3.43it/s]
Batches: 100%|██████████| 1/1 [00:00<00:00, 25.35it/s]
Batches: 100%|██████████| 1/1 [00:00<00:00, 48.93it/s]
Batches: 100%|██████████| 1/1 [00:00<00:00, 41.39it/s]
Batches: 100%|██████████| 1/1 [00:00<00:00, 39.28it/s]
Batches: 100%|██████████| 1/1 [00:00<00:00, 37.68it/s]
Batches: 100%|██████████| 1/1 [00:00<00:00, 38.60it/s]
Batches: 100%|██████████| 1/1 [00:00<00:00, 33.11it/s]
Batches: 100%|██████████| 1/1 [00:00<00:00, 38.13it/s]
Batches: 100%|██████████| 1/1 [00:00<00:00, 37.71it/s]
Batches: 1

Embed chunks using local embedding model

In [None]:
from sentence_transformers import SentenceTransformer

model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")  # 384‑D
embeddings = model.encode(chunks, batch_size=32, show_progress_bar=True)

Batches: 100%|██████████| 1/1 [00:01<00:00,  1.42s/it]


Insert into OpenSearch Index

In [None]:
from opensearchpy import OpenSearch, helpers

bulk_actions = (
    {
        "_index": "rag_docs",
        "_id": f"{file_name}_{page}_{i}",
        "_source": {
            "chunk_text": chunk,
            "embedding": emb.tolist(),  # ndarray → list
            "file_name": file_name,
            "page": page,
            "chunk_index": i,
            "source_stage": "pdf_text",
        },
    }
    for i, (chunk, emb) in enumerate(zip(chunks, embeddings))
)

helpers.bulk(client, bulk_actions, request_timeout=120)


In [None]:
from opensearchpy import helpers

bulk_actions = []
for page_number, page_text, chunks, embeddings in extracted_data:
    for chunk_index, (chunk_text, embedding) in enumerate(zip(chunks, embeddings)):
        doc = {
            "_index": "rag_docs",
            "_id": f"{file_hash}_{page_number}_{chunk_index}",
            "_source": {
                "file_name": file_name,
                "page": page_number,
                "chunk_index": chunk_index,
                "chunk_text": chunk_text,
                "embedding": embedding,
            },
        }
        bulk_actions.append(doc)
helpers.bulk(client, bulk_actions, request_timeout=120)