# Elasticsearch Document Processing Pipeline

This notebook provides a complete pipeline for processing legal documents and storing them in Elasticsearch with:
- Text chunking and vector embeddings
- Named Entity Recognition (NER) annotations
- Full-text search capabilities
- Duplicate detection and removal

## Configuration and Setup

In [13]:
# Configuration
ELASTICSEARCH_HOST = "http://localhost:9201"
INDEX_NAME = "eu_legislation"
JSON_FOLDER = "./output"
CHUNK_SIZE = 500
CHUNK_OVERLAP = 100
VECTOR_DIMS = 768
MODEL_NAME = "Alibaba-NLP/gte-multilingual-base"

# Imports
from elasticsearch import Elasticsearch, helpers
import os
import json
import hashlib
import torch
import nltk
from tqdm import tqdm
from sentence_transformers import SentenceTransformer
from langchain_text_splitters import RecursiveCharacterTextSplitter
import logging

# Initialize Elasticsearch client
es = Elasticsearch(ELASTICSEARCH_HOST, verify_certs=False, request_timeout=60)

# Test connection
try:
    response = es.info()
    es_version = response["version"]["number"]
    print(f"Connected to Elasticsearch Server Version: {es_version}")
except Exception as e:
    print(f"Connection failed: {e}")

Connected to Elasticsearch Server Version: 8.13.3


## Utility Functions

In [15]:
def get_string_hash(input_string):
    """Generate SHA256 hash for a given string."""
    hash_object = hashlib.sha256()
    hash_object.update(input_string.encode("utf-8"))
    return hash_object.hexdigest()


def process_annotation(annotation, text, document_id):
    """Process a single annotation into the required format."""
    name = text[annotation["start"]:annotation["end"]]
    
    ann_object = {
        "mention": name,
        "start": annotation["start"],
        "end": annotation["end"],
        "id": annotation["id"],
        "type": annotation["type"],
    }
    
    # Handle linking information
    if ("linking" in annotation.get("features", {}) and 
        not annotation["features"]["linking"].get("is_nil", True)):
        
        linking = annotation["features"]["linking"]
        ann_object.update({
            "display_name": annotation["features"].get("title", name),
            "is_linked": True,
            "id_ER": linking.get("top_candidate", {}).get("url", "")
        })
    else:
        ann_object.update({
            "display_name": name,
            "is_linked": False,
            "id_ER": f"{document_id}_{name}"
        })
    
    return ann_object


def clean_document_data(file_object):
    """Clean and prepare document data for indexing."""
    # Remove unnecessary fields
    for key in ["annotation_sets", "annoation_sets", "features", "_id"]:
        if key in file_object:
            del file_object[key]
    
    # Ensure required fields exist
    if "metadata" not in file_object:
        file_object["metadata"] = []
    
    return file_object

In [16]:
# Quick document search and inspection
def search_documents(query="*", size=10, exclude_fields=None):
    """Search documents in the index with optional field exclusions."""
    if exclude_fields is None:
        exclude_fields = ["chunks", "annotations"]
    
    search_query = {
        "query": {"query_string": {"query": query}},
        "_source": {"excludes": exclude_fields},
        "size": size,
    }
    
    return es.search(index=INDEX_NAME, body=search_query)


def find_empty_annotation_documents():
    """Find documents with empty annotations field."""
    search_query = {
        "query": {"query_string": {"query": "*"}},
        "_source": {"excludes": ["chunks", "annotation_sets"]},
    }
    
    response = es.search(index=INDEX_NAME, body=search_query)
    empty_annotation_ids = []
    
    for hit in response["hits"]["hits"]:
        if hit["_source"].get("annotations") == []:
            name = hit["_source"].get("name", "(no name)")
            print(f"Document with empty 'annotations' field: {name}")
            empty_annotation_ids.append(hit["_source"]["id"])
    
    return empty_annotation_ids


# Example usage
sample_response = search_documents(size=1)
if sample_response["hits"]["hits"]:
    print("Sample document:")
    print(json.dumps(sample_response["hits"]["hits"][0]["_source"], indent=2))

Sample document:
{
  "text": "CHAPTER IV\nUNFAIR CONTRACTUAL TERMS RELATED TO DATA ACCESS AND USE BETWEEN ENTERPRISES\nArticle 13\nUnfair contractual terms unilaterally imposed on another enterprise\n1. A contractual term concerning access to and the use of data or liability and remedies for the breach or the termination of data related obligations, which has been unilaterally imposed by an enterprise on another enterprise, shall not be binding on the latter enterprise if it is unfair.\n2. A contractual term which reflects mandatory provisions of Union law, or provisions of Union law which would apply if the contractual terms did not regulate the matter, shall not be considered to be unfair.\n3. A contractual term is unfair if it is of such a nature that its use grossly deviates from good commercial practice in data access and use, contrary to good faith and fair dealing.\n4. In particular, a contractual term shall be unfair for the purposes of paragraph 3, if its object or effect is t

  return es.search(index=INDEX_NAME, body=search_query)


## Index Management

In [18]:
def get_index_settings():
    """Get the index settings with custom nested object limit."""
    return {
        "settings": {
            "index.mapping.nested_objects.limit": 20000
        },
        "mappings": {
            "properties": {
                "text": {"type": "text"},
                "name": {"type": "keyword"},
                "preview": {"type": "keyword"},
                "id": {"type": "keyword"},
                "metadata": {
                    "type": "nested",
                    "properties": {
                        "type": {"type": "keyword"},
                        "value": {"type": "keyword"}
                    }
                },
                "annotations": {
                    "type": "nested",
                    "properties": {
                        "mention": {"type": "keyword"},
                        "start": {"type": "integer"},
                        "end": {"type": "integer"},
                        "display_name": {"type": "keyword"},
                        "id": {"type": "integer"},
                        "type": {"type": "keyword"},
                        "is_linked": {"type": "boolean"},
                        "id_ER": {"type": "keyword"}
                    }
                },
                "chunks": {
                    "type": "nested",
                    "properties": {
                        "vectors": {
                            "type": "nested",
                            "properties": {
                                "predicted_value": {
                                    "type": "dense_vector",
                                    "index": True,
                                    "dims": VECTOR_DIMS,
                                    "similarity": "cosine",
                                },
                                "text": {"type": "text"},
                                "entities": {"type": "text"},
                            },
                        },
                    },
                }
            }
        }
    }


def recreate_index(index_name=INDEX_NAME, delete_existing=False):
    """Create or recreate the Elasticsearch index."""
    try:
        if delete_existing and es.indices.exists(index=index_name):
            es.indices.delete(index=index_name)
            print(f"Deleted existing index: {index_name}")
        
        if not es.indices.exists(index=index_name):
            index_settings = get_index_settings()
            response = es.indices.create(index=index_name, body=index_settings)
            print(f"Created index: {index_name}")
            return response
        else:
            print(f"Index {index_name} already exists")
            return None
            
    except Exception as e:
        print(f"Error managing index: {e}")
        return None


# Uncomment the line below to recreate the index
recreate_index(delete_existing=True)

Deleted existing index: eu_legislation
Created index: eu_legislation


  response = es.indices.create(index=index_name, body=index_settings)


ObjectApiResponse({'acknowledged': True, 'shards_acknowledged': True, 'index': 'eu_legislation'})

## Data Processing Pipeline

In [19]:
def read_json_files(path, target_ids=None):
    """
    Read and process JSON annotation files from a directory.
    
    Args:
        path: Directory containing .json.annotated files
        target_ids: Optional set of document IDs to filter by
        
    Returns:
        List of processed document objects
    """
    json_files = [f for f in os.listdir(path) if f.endswith(".json")]
    data = []
    
    print(f"Processing {len(json_files)} JSON files from {path}")
    
    for json_file in tqdm(json_files, desc="Reading files"):
        try:
            with open(os.path.join(path, json_file), "r") as file:
                file_object = json.load(file)
                
                # Skip empty files
                if not file_object.get("text"):
                    print(f"Warning: Skipping empty file: {json_file}")
                    continue
                
                # Generate document ID
                file_object["id"] = get_string_hash(file_object["text"])
                
                # Filter by target IDs if provided
                if target_ids and file_object["id"] not in target_ids:
                    continue
                
                # Process annotations
                annotations = process_document_annotations(file_object)
                file_object["annotations"] = annotations
                
                # Clean up the document
                file_object = clean_document_data(file_object)
                
                data.append(file_object)
                
        except Exception as e:
            print(f"Error processing {json_file}: {e}")
            continue
    
    print(f"Successfully processed {len(data)} documents")
    return data


def process_document_annotations(file_object):
    """Extract and process annotations from a document."""
    text = file_object.get("text", "")
    annotations = []
    
    annotation_sets = file_object.get("annotation_sets", {})
    entities = annotation_sets.get("entities_", {})
    raw_annotations = entities.get("annotations", [])
    
    for annotation in raw_annotations:
        try:
            ann_object = process_annotation(annotation, text, file_object.get("id", ""))
            annotations.append(ann_object)
        except Exception as e:
            print(f"Warning: Error processing annotation: {e}")
            continue
    
    print(f"Processed {len(annotations)} annotations for document '{file_object.get('name', 'Unknown')}'")
    return annotations


def send_to_elasticsearch(data, index_name=INDEX_NAME, update_existing=True):
    """
    Send documents to Elasticsearch with optional duplicate handling.
    
    Args:
        data: List of document objects
        index_name: Target index name
        update_existing: Whether to update existing documents
    """
    print(f"Sending {len(data)} documents to Elasticsearch...")
    
    for item in tqdm(data, desc="Indexing documents"):
        try:
            if update_existing:
                # Remove existing documents with same ID
                search_query = {"query": {"term": {"id": item["id"]}}}
                search_response = es.search(index=index_name, body=search_query)
                
                for hit in search_response["hits"]["hits"]:
                    es.delete(index=index_name, id=hit["_id"])
            
            # Index the new document
            es.index(index=index_name, body=item)
            
        except Exception as e:
            print(f"Error indexing document {item.get('name', 'Unknown')}: {e}")
    
    print("Document indexing completed")

In [20]:
# Process documents - you can filter by specific document IDs if needed
# empty_annotation_ids = find_empty_annotation_documents()  # Uncomment to filter
target_ids = None  # or set to empty_annotation_ids to process only those documents

data = read_json_files(JSON_FOLDER, target_ids=target_ids)

Processing 48 JSON files from ./output


Reading files: 100%|██████████| 48/48 [00:00<00:00, 438.73it/s]

Processed 122 annotations for document 'DataAct_Chapter_IX'
Processed 158 annotations for document 'DataGovernanceAct_Chapter_II'
Processed 134 annotations for document 'DataAct_Chapter_VIII'
Processed 49 annotations for document 'AIAct_Chapter_XI'
Processed 631 annotations for document 'AIAct_Chapter_III'
Processed 183 annotations for document 'DataGovernanceAct_Chapter_III'
Processed 127 annotations for document 'AIAct_Chapter_I'
Processed 67 annotations for document 'GDPR_Chapter_IX'
Processed 88 annotations for document 'DataGovernanceAct_Chapter_I'
Processed 59 annotations for document 'DataGovernanceAct_Chapter_VI'
Processed 137 annotations for document 'DataGovernanceAct_Chapter_IV'
Processed 104 annotations for document 'DataAct_Chapter_XI'
Processed 204 annotations for document 'DataAct_Chapter_III'
Processed 664 annotations for document 'DataAct-intro'
Processed 501 annotations for document 'AIAct_Chapter_IX'
Processed 24 annotations for document 'GDPR_Chapter_X'
Processed 22




In [None]:
print(f"Loaded {len(data)} documents ready for processing")

In [21]:
send_to_elasticsearch(data)

Sending 48 documents to Elasticsearch...


  search_response = es.search(index=index_name, body=search_query)
  es.index(index=index_name, body=item)
Indexing documents: 100%|██████████| 48/48 [00:04<00:00,  9.87it/s]

Document indexing completed





## Text Chunking and Vector Embeddings

### Mapping for chunk vector search

In [None]:
# This mapping update is now included in the main index creation
# No need to run separately if using recreate_index() function
print("Vector mapping is included in the main index configuration")

In [22]:
# Initialize text splitter
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=CHUNK_SIZE,
    chunk_overlap=CHUNK_OVERLAP,
    length_function=len,
    is_separator_regex=False,
)

In [23]:
class DocumentChunker:
    """Handles document chunking and embedding generation."""
    
    def __init__(self, model_name=MODEL_NAME, device="mps"):
        """Initialize the chunker with sentence transformer model."""
        self.model_name = model_name
        self.device = device
        self._model = None
        self._initialize_model()
    
    def _initialize_model(self):
        """Lazy initialization of the sentence transformer model."""
        if self._model is None:
            print(f"Loading embedding model: {self.model_name}")
            self._model = SentenceTransformer(
                self.model_name, 
                trust_remote_code=True
            ).to(self.device)
            print("Model loaded successfully")
    
    def generate_chunks_with_embeddings(self, text):
        """
        Split text into chunks and generate embeddings for each chunk.
        
        Args:
            text: Input text to chunk and embed
            
        Returns:
            List of lists: [embedding, chunk_text, entities_placeholder]
        """
        try:
            # Split text into chunks
            chunks = text_splitter.split_text(text)
            
            if not chunks:
                print("Warning: No chunks generated from text")
                return []
            
            # Generate embeddings
            embeddings = self._model.encode(chunks, show_progress_bar=False)
            
            # Return as list of lists (mutable) instead of tuples (immutable)
            result = [[emb.tolist(), chunk, ""] for emb, chunk in zip(embeddings, chunks)]
            
            return result
            
        except Exception as e:
            print(f"Error in chunking/embedding: {e}")
            return []
    
    def get_entities_from_chunk(self, chunk_text, full_text, annotations):
        """
        Find entities from annotations that are present in the chunk text.
        
        Args:
            chunk_text: The text of the current chunk
            full_text: The full document text 
            annotations: List of annotation objects
        
        Returns:
            String of entity mentions found in the chunk
        """
        chunk_entities = []
        
        # Find the position of this chunk in the full text
        chunk_start_in_full = full_text.find(chunk_text)
        if chunk_start_in_full == -1:
            return ""
        
        chunk_end_in_full = chunk_start_in_full + len(chunk_text)
        
        # Check which annotations overlap with this chunk
        for annotation in annotations:
            ann_start = annotation.get("start", 0)
            ann_end = annotation.get("end", 0)
            
            # Check if annotation overlaps with chunk boundaries
            if ((ann_start >= chunk_start_in_full and ann_start < chunk_end_in_full) or 
                (ann_end > chunk_start_in_full and ann_end <= chunk_end_in_full) or 
                (ann_start <= chunk_start_in_full and ann_end >= chunk_end_in_full)):
                
                entity_mention = full_text[ann_start:ann_end]
                chunk_entities.append(entity_mention)
        
        return " ".join(chunk_entities)


# Initialize the chunker
chunker = DocumentChunker()

Loading embedding model: Alibaba-NLP/gte-multilingual-base


Some weights of the model checkpoint at Alibaba-NLP/gte-multilingual-base were not used when initializing NewModel: ['classifier.bias', 'classifier.weight']
- This IS expected if you are initializing NewModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing NewModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


Model loaded successfully


In [24]:
def update_document_with_chunks(doc_id, doc_source, chunker_instance):
    """
    Update a document with chunks and their entities using existing annotations.
    
    Args:
        doc_id: Elasticsearch document ID
        doc_source: Full document source containing text and annotations
        chunker_instance: DocumentChunker instance
    """
    text = doc_source.get("text", "")
    existing_annotations = doc_source.get("annotations", [])
    
    # Generate chunks with embeddings
    chunks = chunker_instance.generate_chunks_with_embeddings(text)
    
    if not chunks:
        print(f"Warning: No chunks generated for document {doc_id}")
        return
    
    # Add entity information to each chunk
    for chunk in chunks:
        chunk_text = chunk[1]
        chunk_entities = chunker_instance.get_entities_from_chunk(
            chunk_text, text, existing_annotations
        )
        chunk[2] = chunk_entities  # Update entities placeholder
    
    # Prepare data for Elasticsearch update
    passages_body = [
        {
            "vectors": {
                "predicted_value": chunk[0],
                "entities": chunk[2], 
                "text": chunk[1]
            }
        } 
        for chunk in chunks
    ]
    
    # Update document in Elasticsearch
    try:
        data = {"doc": {"chunks": passages_body}}
        response = es.update(index=INDEX_NAME, id=doc_id, body=data)
        print(f"Updated document {doc_id} with {len(chunks)} chunks")
    except Exception as e:
        print(f"Error updating document {doc_id}: {e}")


def process_documents_for_chunking(index_name=INDEX_NAME, query=None, batch_size=10000):
    """
    Process documents from Elasticsearch to add chunks and embeddings.
    
    Args:
        index_name: Index to search
        query: Optional query to filter documents
        batch_size: Maximum documents to process
    """
    if query is None:
        query = {"match_all": {}}
    
    search_body = {"query": query}
    
    try:
        print("Searching for documents to process...")
        response = es.search(index=index_name, body=search_body, size=batch_size)
        documents = response["hits"]["hits"]
        
        print(f"Found {len(documents)} documents to process")
        
        # Suppress Elasticsearch transport logging for cleaner output
        logging.getLogger("elastic_transport").setLevel(logging.WARNING)
        
        # Process documents in reverse order (optional)
        documents.reverse()
        
        # Process each document
        for doc in tqdm(documents, desc="Adding chunks and embeddings"):
            doc_id = doc["_id"]
            doc_source = doc["_source"]
            update_document_with_chunks(doc_id, doc_source, chunker)
            
        print("Document processing completed!")
        
    except Exception as e:
        print(f"Error during document processing: {e}")


# Example usage:
process_documents_for_chunking()  # Process all documents
# process_documents_for_chunking(query={"terms": {"id": specific_ids}})  # Process specific documents

Searching for documents to process...


  response = es.search(index=index_name, body=search_body, size=batch_size)


Found 48 documents to process


  response = es.update(index=INDEX_NAME, id=doc_id, body=data)
Adding chunks and embeddings:   2%|▏         | 1/48 [01:44<1:21:57, 104.63s/it]

Updated document CzlguZkB6MMnAM0I3DcJ with 78 chunks


Adding chunks and embeddings:   4%|▍         | 2/48 [02:14<46:34, 60.75s/it]   

Updated document CjlguZkB6MMnAM0I2ze6 with 255 chunks


Adding chunks and embeddings:   6%|▋         | 3/48 [02:21<27:12, 36.27s/it]

Updated document CTlguZkB6MMnAM0I2zdQ with 168 chunks


Adding chunks and embeddings:   8%|▊         | 4/48 [02:24<16:56, 23.11s/it]

Updated document CDlguZkB6MMnAM0I2jfu with 29 chunks


Adding chunks and embeddings:  10%|█         | 5/48 [02:34<13:03, 18.21s/it]

Updated document BzlguZkB6MMnAM0I2jec with 120 chunks


Adding chunks and embeddings:  12%|█▎        | 6/48 [02:35<08:37, 12.32s/it]

Updated document BjlguZkB6MMnAM0I2jc8 with 6 chunks


Adding chunks and embeddings:  15%|█▍        | 7/48 [02:35<05:46,  8.46s/it]

Updated document BTlguZkB6MMnAM0I2Tfi with 12 chunks


Adding chunks and embeddings:  17%|█▋        | 8/48 [02:37<04:16,  6.41s/it]

Updated document BDlguZkB6MMnAM0I2TeU with 48 chunks


Adding chunks and embeddings:  19%|█▉        | 9/48 [02:38<02:57,  4.55s/it]

Updated document AzlguZkB6MMnAM0I2TdH with 3 chunks


Adding chunks and embeddings:  21%|██        | 10/48 [02:40<02:29,  3.92s/it]

Updated document AjlguZkB6MMnAM0I2Df7 with 68 chunks


Adding chunks and embeddings:  23%|██▎       | 11/48 [02:41<01:46,  2.89s/it]

Updated document ATlguZkB6MMnAM0I2Dd3 with 15 chunks


Adding chunks and embeddings:  25%|██▌       | 12/48 [02:42<01:24,  2.34s/it]

Updated document ADlguZkB6MMnAM0I2Dcq with 34 chunks


Adding chunks and embeddings:  27%|██▋       | 13/48 [02:45<01:30,  2.60s/it]

Updated document _zlguZkB6MMnAM0I1zbd with 49 chunks


Adding chunks and embeddings:  29%|██▉       | 14/48 [02:49<01:40,  2.97s/it]

Updated document _jlguZkB6MMnAM0I1zZ0 with 105 chunks


Adding chunks and embeddings:  31%|███▏      | 15/48 [02:50<01:22,  2.51s/it]

Updated document _TlguZkB6MMnAM0I1jb1 with 44 chunks


Adding chunks and embeddings:  33%|███▎      | 16/48 [02:53<01:25,  2.69s/it]

Updated document _DlguZkB6MMnAM0I1jao with 83 chunks


Adding chunks and embeddings:  35%|███▌      | 17/48 [02:54<01:05,  2.13s/it]

Updated document -zlguZkB6MMnAM0I1jZa with 16 chunks


Adding chunks and embeddings:  38%|███▊      | 18/48 [02:56<01:03,  2.10s/it]

Updated document -jlguZkB6MMnAM0I1Tb- with 37 chunks


Adding chunks and embeddings:  40%|███▉      | 19/48 [02:58<00:59,  2.06s/it]

Updated document -TlguZkB6MMnAM0I1Ta2 with 33 chunks


Adding chunks and embeddings:  42%|████▏     | 20/48 [03:02<01:08,  2.46s/it]

Updated document -DlguZkB6MMnAM0I1TYF with 113 chunks


Adding chunks and embeddings:  44%|████▍     | 21/48 [03:02<00:52,  1.93s/it]

Updated document 9zlguZkB6MMnAM0I1Das with 20 chunks


Adding chunks and embeddings:  46%|████▌     | 22/48 [03:05<00:54,  2.11s/it]

Updated document 9jlguZkB6MMnAM0I1DZq with 44 chunks


Adding chunks and embeddings:  48%|████▊     | 23/48 [03:05<00:41,  1.65s/it]

Updated document 9TlguZkB6MMnAM0I1DYl with 13 chunks


Adding chunks and embeddings:  50%|█████     | 24/48 [03:24<02:42,  6.79s/it]

Updated document 9DlguZkB6MMnAM0I0zbJ with 693 chunks


Adding chunks and embeddings:  52%|█████▏    | 25/48 [03:25<01:56,  5.05s/it]

Updated document 8zlguZkB6MMnAM0I0zZP with 15 chunks


Adding chunks and embeddings:  54%|█████▍    | 26/48 [03:25<01:19,  3.63s/it]

Updated document 8jlguZkB6MMnAM0I0zYM with 5 chunks


Adding chunks and embeddings:  56%|█████▋    | 27/48 [03:27<01:03,  3.03s/it]

Updated document 8TlguZkB6MMnAM0I0jbK with 46 chunks


Adding chunks and embeddings:  58%|█████▊    | 28/48 [03:28<00:46,  2.31s/it]

Updated document 8DlguZkB6MMnAM0I0jZ8 with 15 chunks


Adding chunks and embeddings:  62%|██████▎   | 30/48 [03:28<00:23,  1.30s/it]

Updated document 7zlguZkB6MMnAM0I0jYx with 9 chunks
Updated document 7jlguZkB6MMnAM0I0Tbj with 1 chunks


Adding chunks and embeddings:  65%|██████▍   | 31/48 [03:31<00:29,  1.71s/it]

Updated document 7TlguZkB6MMnAM0I0TaD with 66 chunks


Adding chunks and embeddings:  67%|██████▋   | 32/48 [03:40<01:00,  3.75s/it]

Updated document 7DlguZkB6MMnAM0I0TYS with 279 chunks


Adding chunks and embeddings:  69%|██████▉   | 33/48 [03:40<00:43,  2.87s/it]

Updated document 6zlguZkB6MMnAM0I0Da1 with 5 chunks


Adding chunks and embeddings:  71%|███████   | 34/48 [03:46<00:49,  3.54s/it]

Updated document 6jlguZkB6MMnAM0I0DZq with 190 chunks


Adding chunks and embeddings:  73%|███████▎  | 35/48 [03:57<01:15,  5.83s/it]

Updated document 6TlguZkB6MMnAM0Izzbo with 431 chunks


Adding chunks and embeddings:  75%|███████▌  | 36/48 [03:59<00:58,  4.90s/it]

Updated document 6DlguZkB6MMnAM0IzzZL with 36 chunks


Adding chunks and embeddings:  77%|███████▋  | 37/48 [04:01<00:42,  3.89s/it]

Updated document 5zlguZkB6MMnAM0Izjbt with 39 chunks


Adding chunks and embeddings:  79%|███████▉  | 38/48 [04:03<00:31,  3.20s/it]

Updated document 5jlguZkB6MMnAM0Izjad with 49 chunks


Adding chunks and embeddings:  81%|████████▏ | 39/48 [04:03<00:22,  2.49s/it]

Updated document 5TlguZkB6MMnAM0IzjZE with 22 chunks


Adding chunks and embeddings:  83%|████████▎ | 40/48 [04:04<00:16,  2.07s/it]

Updated document 5DlguZkB6MMnAM0IzTb1 with 30 chunks


Adding chunks and embeddings:  85%|████████▌ | 41/48 [04:05<00:11,  1.63s/it]

Updated document 4zlguZkB6MMnAM0IzTao with 18 chunks


Adding chunks and embeddings:  88%|████████▊ | 42/48 [04:08<00:12,  2.00s/it]

Updated document 4jlguZkB6MMnAM0IzTZM with 95 chunks


Adding chunks and embeddings:  90%|████████▉ | 43/48 [04:10<00:09,  1.87s/it]

Updated document 4TlguZkB6MMnAM0IzDb3 with 56 chunks


Adding chunks and embeddings:  92%|█████████▏| 44/48 [04:23<00:21,  5.41s/it]

Updated document 4DlguZkB6MMnAM0IzDaC with 557 chunks


Adding chunks and embeddings:  94%|█████████▍| 45/48 [04:24<00:12,  4.00s/it]

Updated document 3zlguZkB6MMnAM0Iyzai with 8 chunks


Adding chunks and embeddings:  96%|█████████▌| 46/48 [04:25<00:06,  3.22s/it]

Updated document 3jlguZkB6MMnAM0Iyjb_ with 48 chunks


Adding chunks and embeddings:  98%|█████████▊| 47/48 [04:27<00:02,  2.80s/it]

Updated document 3TlguZkB6MMnAM0IyjZ8 with 60 chunks


Adding chunks and embeddings: 100%|██████████| 48/48 [04:29<00:00,  5.61s/it]

Updated document 3DlguZkB6MMnAM0IyTb5 with 43 chunks
Document processing completed!





## Maintenance and Utility Operations

In [None]:
def remove_duplicates_by_id(client, index_name):
    """
    Remove duplicate documents based on the 'id' field, keeping only one copy.
    
    Args:
        client: Elasticsearch client
        index_name: Target index name
    """
    print("Scanning for duplicate documents...")
    
    # Step 1: Gather all document IDs grouped by 'id' field
    id_groups = {}
    for hit in scan(client, index=index_name, query={"_source": ["id"], "query": {"match_all": {}}}):
        doc_id = hit["_source"].get("id")
        es_id = hit["_id"]
        if not doc_id:
            continue
        id_groups.setdefault(doc_id, []).append(es_id)

    # Step 2: Prepare deletions - keep only one doc per unique 'id'
    actions = []
    duplicate_count = 0
    
    for doc_id, es_ids in id_groups.items():
        if len(es_ids) > 1:
            duplicate_count += len(es_ids) - 1
            # Keep the first, delete the rest
            duplicates = es_ids[1:]
            for dup_id in duplicates:
                actions.append({
                    "_op_type": "delete", 
                    "_index": index_name, 
                    "_id": dup_id
                })

    # Step 3: Bulk delete duplicates
    if actions:
        print(f"Deleting {duplicate_count} duplicate documents...")
        success, _ = bulk(client, actions, refresh=True)
        print(f"Deleted {success} duplicate documents")
    else:
        print("No duplicates found")


def copy_index_data(source_index, dest_index, client=es):
    """
    Copy all documents from source index to destination index.
    
    Args:
        source_index: Source index name
        dest_index: Destination index name
        client: Elasticsearch client
    """
    print(f"Copying data from '{source_index}' to '{dest_index}'...")
    
    # Copy all documents
    actions = []
    doc_count = 0
    
    for doc in scan(client, index=source_index, query={"query": {"match_all": {}}}):
        doc_body = doc["_source"]
        actions.append({
            "_op_type": "index",
            "_index": dest_index,
            "_id": doc.get("_id"),
            **doc_body
        })
        doc_count += 1

    print(f"Prepared {doc_count} documents for transfer")

    if actions:
        bulk(client, actions, request_timeout=300)
        print("Bulk transfer completed successfully")
    else:
        print("Warning: No documents found to transfer")


def recreate_annotations_from_raw(index_name, client=es):
    """
    Recreate annotations from raw annotation_sets data for all documents.
    Useful when annotation format needs to be updated.
    
    Args:
        index_name: Target index name
        client: Elasticsearch client
    """
    print(f"Recreating annotations for index '{index_name}'...")
    
    count = 0
    for doc in scan(client, index=index_name, query={"query": {"match_all": {}}}):
        doc_id = doc["_id"]
        source = doc["_source"]
        
        # Skip if no annotation_sets
        if "annotation_sets" not in source:
            continue
            
        try:
            new_annotations = process_document_annotations(source)
            client.update(
                index=index_name, 
                id=doc_id, 
                body={"doc": {"annotations": new_annotations}}
            )
            count += 1
            
            if count % 100 == 0:
                print(f"Processed {count} documents...")
                
        except Exception as e:
            print(f"Error updating doc {doc_id}: {e}")

    print(f"Total documents updated: {count}")


# Example usage (uncomment to use):
# remove_duplicates_by_id(es, INDEX_NAME)
# copy_index_data("source_index", "dest_index")
# recreate_annotations_from_raw(INDEX_NAME)

## Complete Workflow Example

Follow these steps to process your documents from start to finish:

In [None]:
# Complete Processing Workflow
# Follow these steps in order:

print("ELASTICSEARCH DOCUMENT PROCESSING WORKFLOW")
print("=" * 50)

# Step 1: Create/recreate the index
print("\n1. Setting up Elasticsearch index...")
# recreate_index(delete_existing=True)  # Uncomment to recreate index

# Step 2: Process and load documents
print("\n2. Processing JSON files...")
# data = read_json_files(JSON_FOLDER)
# print(f"Loaded {len(data)} documents")

# Step 3: Send documents to Elasticsearch
print("\n3. Indexing documents...")
# send_to_elasticsearch(data)

# Step 4: Add chunks and embeddings
print("\n4. Adding chunks and embeddings...")
# process_documents_for_chunking()

# Step 5: Clean up duplicates
print("\n5. Removing duplicates...")
# remove_duplicates_by_id(es, INDEX_NAME)

print("\nWorkflow template ready!")
print("Note: Uncomment the lines above to execute each step")