# OpenSearch Index Creation and Document Ingestion

This notebook demonstrates how to:
1. Create an OpenSearch index with proper mappings for hybrid search
2. Process PDF documents into chunks
3. Generate embeddings for the chunks
4. Ingest the chunks with their embeddings into OpenSearch

All functions are defined directly in this notebook, allowing you to modify them and experiment with different approaches.


In [1]:
# Import necessary libraries
import json
import os
import re
import sys
from typing import Dict, Any, List

from PyPDF2 import PdfReader
from opensearchpy import OpenSearch, helpers
from sentence_transformers import SentenceTransformer



  from tqdm.autonotebook import tqdm, trange


In [2]:
# Set up Python path to access project modules
sys.path.insert(0, "..")

%load_ext autoreload
%autoreload 2


In [3]:
# Define constants
# You can modify these values to experiment with different settings

# OpenSearch connection settings
OPENSEARCH_HOST = "localhost"  # OpenSearch host
OPENSEARCH_PORT = 9200  # OpenSearch port
OPENSEARCH_INDEX = "documents"  # Index name for document storage

# Embedding settings
EMBEDDING_MODEL_NAME = "sentence-transformers/all-MiniLM-L6-v2"  # Model for generating embeddings
EMBEDDING_DIMENSION = 384  # Embedding dimension for the model
ASSYMETRIC_EMBEDDING = False  # Whether to use asymmetric embeddings

# Chunking settings
TEXT_CHUNK_SIZE = 500  # Number of tokens per chunk
TEXT_CHUNK_OVERLAP = 100  # Overlap between chunks

print("Constants defined. You can modify these values to experiment with different settings.")


Constants defined. You can modify these values to experiment with different settings.


In [4]:
# Utility functions for text processing

def clean_text(text: str) -> str:
    """
    Cleans OCR-extracted text by removing unnecessary newlines, hyphens, and correcting common OCR errors.

    Args:
        text (str): The text to clean.

    Returns:
        str: The cleaned text.
    """
    # Remove hyphens at line breaks (e.g., 'exam-\nple' -> 'example')
    text = re.sub(r"(\w+)-\n(\w+)", r"\1\2", text)

    # Replace newlines within sentences with spaces
    text = re.sub(r"(?<!\n)\n(?!\n)", " ", text)

    # Replace multiple newlines with a single newline
    text = re.sub(r"\n+", "\n", text)

    # Remove excessive whitespace
    text = re.sub(r"[ \t]+", " ", text)

    return text.strip()


def chunk_text(text: str, chunk_size: int, overlap: int = 100) -> List[str]:
    """
    Splits text into chunks with a specified overlap.

    Args:
        text (str): The text to split.
        chunk_size (int): The number of tokens in each chunk.
        overlap (int): The number of tokens to overlap between chunks.

    Returns:
        List[str]: A list of text chunks.
    """
    # Clean the text before chunking
    text = clean_text(text)

    # Tokenize the text into words
    tokens = text.split(" ")

    chunks = []
    start = 0
    while start < len(tokens):
        end = start + chunk_size
        chunk_tokens = tokens[start:end]
        chunk_text = " ".join(chunk_tokens)
        chunks.append(chunk_text)
        start = end - overlap  # Move back by 'overlap' tokens

    return chunks

print("Utility functions defined. You can modify these functions to experiment with different text processing techniques.")


Utility functions defined. You can modify these functions to experiment with different text processing techniques.


In [5]:
# Embedding functions

def get_embedding_model():
    """
    Loads and returns the sentence transformer embedding model.
    
    Returns:
        SentenceTransformer: The loaded embedding model.
    """
    print(f"Loading embedding model: {EMBEDDING_MODEL_NAME}")
    model = SentenceTransformer(EMBEDDING_MODEL_NAME)
    return model


def generate_embeddings(texts: List[str]):
    """
    Generates embeddings for a list of text chunks.
    
    Args:
        texts (List[str]): List of text chunks to embed.
        
    Returns:
        List[numpy.ndarray]: List of embedding vectors.
    """
    model = get_embedding_model()
    
    # If using asymmetric embeddings, prefix each text with "passage: "
    if ASSYMETRIC_EMBEDDING:
        texts = [f"passage: {text}" for text in texts]
        
    # Generate embeddings
    embeddings = model.encode(texts)
    return embeddings

print("Embedding functions defined. You can modify these functions to experiment with different embedding techniques.")


Embedding functions defined. You can modify these functions to experiment with different embedding techniques.


## 1. Connect to OpenSearch and Create Index

Now that we have all our utility functions defined, let's connect to OpenSearch and create an index with the right mappings for hybrid search.

The index configuration includes three main components:
1. **Text Field (`text`)**: Used for full-text search with BM25 algorithm
2. **Vector Field (`embedding`)**: Used for semantic search using KNN
3. **Metadata Field (`document_name`)**: Used for filtering and organizing documents

Make sure you have OpenSearch running locally (typically in a Docker container).


In [6]:
# Create an OpenSearch client
client = OpenSearch(
    hosts=[{"host": OPENSEARCH_HOST, "port": OPENSEARCH_PORT}],
    http_compress=True,
    timeout=30,
    max_retries=3,
    retry_on_timeout=True,
)

# Check connection
try:
    info = client.info()
    print(f"Successfully connected to OpenSearch {info['version']['number']}")
except Exception as e:
    print(f"Failed to connect to OpenSearch: {e}")
    print("Make sure OpenSearch is running on localhost:9200")
    raise



Successfully connected to OpenSearch 2.19.2


In [None]:
# Define the index configuration
def create_index_config() -> Dict[str, Any]:
    """
    Creates the index configuration with mappings for text, embeddings, and metadata.
    
    Returns:
        Dict[str, Any]: The index configuration.
    
    No default_pipeline configured This means you'll generate embeddings manually using your generate_embeddings() function
    # You'll do:
    text → SentenceTransformer → embeddings → bulk index both text + embeddings

    """
    config = {
        "settings": {
            "index": {
                "number_of_shards": 1,
                "number_of_replicas": 0,
                "knn": True  # ← Enables k-NN search for vectors
            }
        },
        "mappings": {
            "properties": {
                "text": {
                    "type": "text"  # For standard text search # ← For BM25/keyword search For BM25/keyword search (INDEPENDENT of knn setting)
                },
                "embedding": {
                    "type": "knn_vector",
                    "dimension": EMBEDDING_DIMENSION,  # Match your embedding model's dimension
                    "method": {
                        "engine": "faiss",
                        "space_type": "l2",
                        "name": "hnsw",
                        "parameters": {}
                    }
                },
                "document_name": {
                    "type": "keyword"  # For exact match on document names
                }
            }
        }
    }
    return config

# Get the index configuration
index_config = create_index_config()
print("\nIndex Configuration:")
print(json.dumps(index_config, indent=2))



Index Configuration:
{
  "settings": {
    "index": {
      "number_of_shards": 1,
      "number_of_replicas": 0,
      "knn": true
    }
  },
  "mappings": {
    "properties": {
      "text": {
        "type": "text"
      },
      "embedding": {
        "type": "knn_vector",
        "dimension": 384,
        "method": {
          "engine": "faiss",
          "space_type": "l2",
          "name": "hnsw",
          "parameters": {}
        }
      },
      "document_name": {
        "type": "keyword"
      }
    }
  }
}


In [8]:
# Create the index if it doesn't exist
if not client.indices.exists(index=OPENSEARCH_INDEX):
    response = client.indices.create(index=OPENSEARCH_INDEX, body=index_config)
    print(f"\nCreated index {OPENSEARCH_INDEX} with response: {response}")
else:
    print(f"\nIndex {OPENSEARCH_INDEX} already exists")


Index documents already exists


In [9]:
from opensearchpy.exceptions import NotFoundError
pipeline_name = "nlp-search-pipeline"

try:
    result = client.transport.perform_request(
        "GET",
        f"/_search/pipeline/{pipeline_name}"
    )
    print(f"\n✅ Search pipeline '{pipeline_name}' exists.")
    print(result)
except NotFoundError:
    print(f"\n⚠️ Search pipeline '{pipeline_name}' does NOT exist.")
except Exception as e:
    print(f"\n🚨 Error: {e}")



✅ Search pipeline 'nlp-search-pipeline' exists.
{'nlp-search-pipeline': {'description': 'Post processor for hybrid search', 'phase_results_processors': [{'normalization-processor': {'normalization': {'technique': 'min_max'}, 'combination': {'technique': 'arithmetic_mean', 'parameters': {'weights': [0.3, 0.7]}}}}]}}


## 2. Process PDF Document

Now let's process a PDF document to extract its content:
1. Read the PDF and extract the text
2. Clean and chunk the text into smaller segments
3. Generate embeddings for each chunk

You can replace the PDF path with your own document if you want to experiment with different content.


In [10]:
import os

cwd = os.getcwd()
print(cwd)

/Users/Shared/Projects/build_your_local_RAG_system/notebooks


In [11]:
# Read and process the PDF
pdf_path = "climate.pdf"  # Path relative to notebook directory

# Read the PDF file

reader = PdfReader(pdf_path)
text = "".join([page.extract_text() for page in reader.pages])
print(f"Extracted {len(text)} characters from {pdf_path}")

# Show a sample of the extracted text
print("\nSample of extracted text:")
print(text[:500] + "...")

# Clean the text
cleaned_text = clean_text(text)
print(f"\nText cleaned. Length: {len(cleaned_text)} characters")

# Chunk the text
chunks = chunk_text(cleaned_text, chunk_size=TEXT_CHUNK_SIZE, overlap=TEXT_CHUNK_OVERLAP)
print(f"Split text into {len(chunks)} chunks")

# Display a sample chunk
print("\nSample chunk:")
print(chunks[0])


Extracted 94936 characters from climate.pdf

Sample of extracted text:
3
1This Summary for Policymakers should be cited as:
IPCC, 2013: Summary for Policymakers. In: Climate Change 2013: The Physical Science Basis.  Contribution of 
Working Group I to the Fifth Assessment Report of the Intergovernmental Panel on Climate Change  [Stocker, 
T.F ., D. Qin, G.-K. Plattner, M. Tignor, S.K. Allen, J. Boschung, A. Nauels, Y . Xia, V. Bex and P .M. Midgley (eds.)]. 
Cambridge University Press, Cambridge, United Kingdom and New York, NY , USA.Summary  
for Policymakers SPM
...

Text cleaned. Length: 94017 characters
Split text into 38 chunks

Sample chunk:
3 1This Summary for Policymakers should be cited as: IPCC, 2013: Summary for Policymakers. In: Climate Change 2013: The Physical Science Basis. Contribution of Working Group I to the Fifth Assessment Report of the Intergovernmental Panel on Climate Change [Stocker, T.F ., D. Qin, G.-K. Plattner, M. Tignor, S.K. Allen, J. Boschung, A. Nauels, 

In [None]:
pdf_file_name = pdf_path.replace('.pdf', '')
#PDF → Extract Text → Clean → Chunk → Generate Embeddings → Prepare Documents → Ready for Bulk Index


# Generate embeddings for the chunks
print("Generating embeddings for chunks. This might take a moment...")
embeddings = generate_embeddings(chunks)
print(f"Generated {len(embeddings)} embeddings")
print(f"Embedding shape: {embeddings[0].shape}")

# Display a sample embedding (just a few values to avoid clutter)
print("\nSample embedding:")
print(embeddings[0])

# Prepare documents for indexing
documents_to_index = [
    {
        "doc_id": f"{pdf_file_name}_{i}",
        "text": chunk,
        "embedding": embedding,
        "document_name": pdf_file_name,
    }
    for i, (chunk, embedding) in enumerate(zip(chunks, embeddings))
]

print(f"\nPrepared {len(documents_to_index)} documents for indexing")


Generating embeddings for chunks. This might take a moment...
Loading embedding model: sentence-transformers/all-MiniLM-L6-v2
Generated 38 embeddings
Embedding shape: (384,)

Sample embedding:
[-4.45852056e-02  1.83011238e-02  1.03055820e-01  1.06724672e-01
  6.92621693e-02  3.94778289e-02 -7.02718794e-02  1.85092222e-02
 -2.64169718e-03  8.67921561e-02 -2.22836696e-02 -5.14434054e-02
  3.07141598e-02  3.79240252e-02  2.55169477e-02  6.16954565e-02
 -8.45513791e-02 -9.24728811e-02 -1.43486066e-02  5.17309010e-02
  1.11270929e-03  2.32094023e-02 -2.65127607e-02  6.72897100e-02
  5.76641364e-03  2.66086427e-04 -9.05089825e-02  5.80190942e-02
 -6.24796152e-02  1.27771646e-01 -5.01407199e-02  7.86757767e-02
 -4.40925732e-02 -4.25539874e-02  4.44621220e-02 -1.00494716e-02
  1.28308190e-02 -2.88113523e-02 -7.05305263e-02  2.85778772e-02
  2.66953278e-02  5.05793653e-03 -2.76468452e-02 -4.56769541e-02
  1.51983555e-02  5.19505404e-02  5.31637706e-02 -3.04289646e-02
 -1.58158168e-01  3.9543110

## 3. Ingest Documents into OpenSearch

Now that we've processed the document and generated embeddings, let's ingest them into OpenSearch. We'll:

1. Format each document with its text, embedding, and metadata
2. Use the bulk API to efficiently insert all documents
3. Verify that the documents were properly indexed

This creates searchable content in the OpenSearch index that we can later query using hybrid search.


In [13]:
documents_to_index

[{'doc_id': 'climate_0',
  'text': '3 1This Summary for Policymakers should be cited as: IPCC, 2013: Summary for Policymakers. In: Climate Change 2013: The Physical Science Basis. Contribution of Working Group I to the Fifth Assessment Report of the Intergovernmental Panel on Climate Change [Stocker, T.F ., D. Qin, G.-K. Plattner, M. Tignor, S.K. Allen, J. Boschung, A. Nauels, Y . Xia, V. Bex and P .M. Midgley (eds.)]. Cambridge University Press, Cambridge, United Kingdom and New York, NY , USA.Summary for Policymakers SPM Drafting Authors: Lisa V. Alexander (Australia), Simon K. Allen (Switzerland/New Zealand), Nathaniel L. Bindoff (Australia), François-Marie Bréon (France), John A. Church (Australia), Ulrich Cubasch (Germany), Seita Emori (Japan), Piers Forster (UK), Pierre Friedlingstein (UK/Belgium), Nathan Gillett (Canada), Jonathan M. Gregory (UK), Dennis L. Hartmann (USA), Eystein Jansen (Norway), Ben Kirtman (USA), Reto Knutti (Switzerland), Krishna Kumar Kanikicharla (India), 

In [None]:
# Prepare bulk actions for OpenSearch
#Converts documents to OpenSearch bulk format with _index, _id, _source structure
#Converts numpy arrays to lists - OpenSearch requires embeddings as JSON arrays, not numpy arrays
"""
# Your index mappings ←→ Document fields
"text": {"type": "text"}           ←→ "text": prefixed_text
"embedding": {"type": "knn_vector"} ←→ "embedding": [0.1, 0.2, ...]  
"document_name": {"type": "keyword"} ←→ "document_name": "climate"

"""
actions = []
for doc in documents_to_index:
    # Handle asymmetric embedding if enabled
    if ASSYMETRIC_EMBEDDING:
        prefixed_text = f"passage: {doc['text']}"
    else:
        prefixed_text = doc['text']
    
    # Create an action for this document
    action = {
        "_index": OPENSEARCH_INDEX,
        "_id": doc["doc_id"],
        "_source": {
            "text": prefixed_text,
            "embedding": doc["embedding"].tolist(),  # Convert numpy array to list
            "document_name": doc["document_name"],
        },
    }
    actions.append(action)



In [None]:
# Perform bulk indexing
print(f"Indexing {len(actions)} documents into OpenSearch...")
try:
    success, errors = helpers.bulk(client, actions, raise_on_error=True) #Performs bulk insert of all prepared documents into your documents index
    if errors:
        print(f"Indexed {success} documents with {len(errors)} errors")
        print(f"First error: {errors[0]}")
    else:
        print(f"Successfully indexed {success} documents")
except Exception as e:
    print(f"Error during bulk indexing: {e}")
  

Indexing 38 documents into OpenSearch...
Successfully indexed 38 documents


In [16]:
  # Verify the documents are indexed
response = client.count(index=OPENSEARCH_INDEX)
print(f"Total documents in index: {response['count']}")

# Get one document to verify content
if response['count'] > 0:
    sample = client.search(
        index=OPENSEARCH_INDEX, 
        body={
            "size": 1,
            "_source": {"excludes": ["embedding"]},  # Exclude embeddings as they're large
            "query": {"match_all": {}}
        }
    )
    print("\nSample document from index:")
    print(json.dumps(sample['hits']['hits'][0]['_source'], indent=2))

Total documents in index: 38

Sample document from index:
{
  "document_name": "climate",
  "text": "3 1This Summary for Policymakers should be cited as: IPCC, 2013: Summary for Policymakers. In: Climate Change 2013: The Physical Science Basis. Contribution of Working Group I to the Fifth Assessment Report of the Intergovernmental Panel on Climate Change [Stocker, T.F ., D. Qin, G.-K. Plattner, M. Tignor, S.K. Allen, J. Boschung, A. Nauels, Y . Xia, V. Bex and P .M. Midgley (eds.)]. Cambridge University Press, Cambridge, United Kingdom and New York, NY , USA.Summary for Policymakers SPM Drafting Authors: Lisa V. Alexander (Australia), Simon K. Allen (Switzerland/New Zealand), Nathaniel L. Bindoff (Australia), Fran\u00e7ois-Marie Br\u00e9on (France), John A. Church (Australia), Ulrich Cubasch (Germany), Seita Emori (Japan), Piers Forster (UK), Pierre Friedlingstein (UK/Belgium), Nathan Gillett (Canada), Jonathan M. Gregory (UK), Dennis L. Hartmann (USA), Eystein Jansen (Norway), Ben Kir

In [17]:
# Search with keyword matching

query = {
    "size": 2,
    "_source": {"excludes": ["embedding"]},
    "query": {
        "match": {
            "text": "climate"
        }
    }
}
results = client.search(index=OPENSEARCH_INDEX, body=query)
for hit in results['hits']['hits']:
    print(json.dumps(hit['_source'], indent=2))

{
  "document_name": "climate",
  "text": "high confidence ), extremely unlikely less than 1\u00b0C ( high confidence ), and very unlikely greater than 6\u00b0C ( medium confidence )16. The lower temperature limit of the assessed likely range is thus less than the 2\u00b0C in the AR4, but the upper limit is the same. This assessment reflects improved understanding, the extended temperature record in the atmosphere and ocean, and new estimates of radiative forcing. {TS TFE.6, Figure 1; Box 12.2} \u2022 The rate and magnitude of global climate change is determined by radiative forcing, climate feedbacks and the storage of energy by the climate system. Estimates of these quantities for recent decades are consistent with the assessed likely range of the equilibrium climate sensitivity to within assessed uncertainties, providing strong evidence for our understanding of anthropogenic climate change. {Box 12.2, Box 13.1} \u2022 The transient climate response quantifies the response of the cli

In [107]:
query_text = "How is ocean warnings?"

# Generate embedding
query_embedding = generate_embeddings([query_text])[0].tolist()

# Set top_k
top_k = 3

query_body = {
    "_source": {"exclude": ["embedding"]},
    "query": {
        "hybrid": {
            "queries": [
                {"match": {"text": {"query": query_text}}},
                {
                    "knn": {
                        "embedding": {
                            "vector": query_embedding,
                            "k": top_k
                        }
                    }
                }
            ]
        }
    },
    "size": top_k
}

response = client.search(
        index=OPENSEARCH_INDEX, body=query_body, search_pipeline="nlp-search-pipeline"
    )

# Print the results
print(f"\nTop {top_k} results for query: '{query_text}'\n")
for i, hit in enumerate(response['hits']['hits'], 1):
    print(f"Result {i}:")
    print(json.dumps(hit['_source'], indent=2))
    print("-" * 60)

Loading embedding model: sentence-transformers/all-MiniLM-L6-v2


Result 1:
{
  "document_name": "climate",
  "text": "sea level. There is low confidence in region-specific projections of storminess and associated storm surges. m SREX assessed it to be very likely that mean sea level rise will contribute to future upward trends in extreme coastal high water levels.SPMSummary for Policymakers8B.2 Ocean Ocean warming dominates the increase in energy stored in the climate system, accounting for more than 90% of the energy accumulated between 1971 and 2010 ( high confidence ). It is virtually certain that the upper ocean (0\u2212700 m) warmed from 1971 to 2010 (see Figure SPM.3), and it likely warmed between the 1870s and 1971. {3.2, Box 3.1} \u2022 On a global scale, the ocean warming is largest near the surface, and the upper 75 m warmed by 0.11 [0.09 to 0.13] \u00b0C per decade over the period 1971 to 2010. Since AR4, instrumental biases in upper-ocean temperature records have been iden

## Conclusion

Congratulations! You've successfully:
1. Created an OpenSearch index with proper mappings for hybrid search
2. Processed a PDF document and split it into chunks
3. Generated embeddings for each chunk
4. Ingested the chunks with their embeddings into OpenSearch

All the code is defined directly in this notebook, so you can experiment with different:
- Text cleaning and chunking strategies
- Embedding models and parameters
- OpenSearch index configurations

In the next notebook, you'll learn how to perform hybrid search on this indexed content and generate responses using LLMs.
