## Initialization

In [1]:
%pip install ipykernel ipywidgets tqdm haystack-ai cohere-haystack qdrant-haystack fastembed-haystack hayhooks elasticsearch tqdm unstructured

Note: you may need to restart the kernel to use updated packages.


In [1]:
import importlib
importlib.invalidate_caches()

In [4]:
from qdrant_client import QdrantClient
collection_name = "1_gemeente_cohere"
qdrant_client = QdrantClient(host="localhost", port=6333)
doc_count = qdrant_client.count(collection_name).count
print(f"Total documents in collection '{collection_name}': {doc_count}.")

document_ids = []
offset = None
limit = 10000  # Adjust this based on your system's capabilities


while True:
    scroll_result = qdrant_client.scroll(
        collection_name=collection_name,
        limit=limit,
        offset=offset,
        with_payload=['source_id'],
        with_vectors=False
    )
    
    batch, next_offset = scroll_result
    
    if not batch:
        break
        
    for doc in batch:
        print("payload: ", doc.payload)
        break
    break
    document_ids.extend(doc.payload for doc in batch)
    
    if next_offset is None:
        break
    
    offset = next_offset

# print(f"Retrieved {len(document_ids)} document IDs.")
# document_ids[0]


Total documents in collection '1_gemeente_cohere': 225642.
{}
Retrieved 0 document IDs.


IndexError: list index out of range

'0000207c-5f13-581b-8718-21f3d07f4875'

In [1]:
from haystack_integrations.document_stores.qdrant import QdrantDocumentStore

from haystack_integrations.components.embedders.fastembed import FastembedSparseDocumentEmbedder
import os
os.environ["COHERE_API_KEY"] = "RU9eGeOrKo0jD2Z6kAqOJAw2RpOmF4jGgO9ZAGQT"

document_store = QdrantDocumentStore(
    host="localhost",
    port=6333,
    index="1_gemeente_cohere",
    embedding_dim=384,
    use_sparse_embeddings=True    
)




## Indexing

In [2]:
from haystack import Document, Pipeline
from haystack.components.writers import DocumentWriter
from haystack.components.preprocessors import DocumentSplitter
from haystack_integrations.components.embedders.cohere import CohereDocumentEmbedder
from haystack.utils import Secret
from elasticsearch import Elasticsearch
from elasticsearch.helpers import scan
from elasticsearch.exceptions import NotFoundError
from haystack_integrations.document_stores.elasticsearch import ElasticsearchDocumentStore
import multiprocessing
from multiprocessing import Pool
import concurrent.futures
from concurrent.futures import ProcessPoolExecutor  # Use ProcessPoolExecutor for multiprocessing
from tqdm import tqdm
import time
from haystack.components.preprocessors import DocumentCleaner
from haystack.components.preprocessors import NLTKDocumentSplitter
import contextlib  # For suppressing stdout and stderr
import logging  # For adjusting logging levels
from math import ceil
import sys
import re
from lxml import etree
from unstructured.partition.html import partition_html
from haystack_integrations.components.embedders.fastembed import FastembedSparseDocumentEmbedder
from unstructured.cleaners.core import (
    clean,
    clean_non_ascii_chars,
    replace_unicode_quotes,
    group_broken_paragraphs
)
# Suppress specific FutureWarning from transformers library
import warnings
warnings.filterwarnings("ignore", category=FutureWarning, module="transformers.utils.generic")
# os.environ["TOKENIZERS_PARALLELISM"] = "false"

# Suppress logging messages from the 'unstructured' library
logging.getLogger('unstructured').setLevel(logging.ERROR)
logging.getLogger('lxml').setLevel(logging.ERROR)
# Constants
NUM_WORKERS = 16
BATCH_SIZE = 1000  # Number of documents to process from Elasticsearch in each batch
SLEEP_TIME = 0     # Set sleep time to zero since GPUs are underutilized
DENSE_BATCH_SIZE = 256   # Increased batch size for dense embeddings per GPU

es = Elasticsearch("http://localhost:9200")
query_overijssel = {
    "query": {
        "bool": {
            "must": [
                {"exists": {"field": "description"}}
            ],
            "minimum_should_match": 1,
            "should": [
                {"match_phrase": {"location": "GM0193"}},
                {"match_phrase": {"location": "GM0141"}},
                {"match_phrase": {"location": "GM0166"}},
                {"match_phrase": {"location": "GM1774"}},
                {"match_phrase": {"location": "GM0183"}},
                {"match_phrase": {"location": "GM0173"}},
                {"match_phrase": {"location": "GM0150"}},
                {"match_phrase": {"location": "GM1700"}},
                {"match_phrase": {"location": "GM0164"}},
                {"match_phrase": {"location": "GM0153"}},
                {"match_phrase": {"location": "GM0148"}},
                {"match_phrase": {"location": "GM1708"}},
                {"match_phrase": {"location": "GM0168"}},
                {"match_phrase": {"location": "GM0160"}},
                {"match_phrase": {"location": "GM0189"}},
                {"match_phrase": {"location": "GM0177"}},
                {"match_phrase": {"location": "GM1742"}},
                {"match_phrase": {"location": "GM0180"}},
                {"match_phrase": {"location": "GM1896"}},
                {"match_phrase": {"location": "GM0175"}},
                {"match_phrase": {"location": "GM1735"}},
                {"match_phrase": {"location": "GM0147"}},
                {"match_phrase": {"location": "GM0163"}},
                {"match_phrase": {"location": "GM0158"}},
                {"match_phrase": {"location": "GM1773"}}
            ]
        }
    }
}
query = query_overijssel
index_name = "jodal_documents7"
total_docs = es.count(index=index_name, body=query)['count']

from haystack import Document

document_store = ElasticsearchDocumentStore(hosts = "http://localhost:9200")

# Constants
NUM_WORKERS = 16
BATCH_SIZE = 1000  # Number of documents to process from Elasticsearch in each batch
SLEEP_TIME = 0     # Set sleep time to zero since GPUs are underutilized
DENSE_BATCH_SIZE = 256   # Increased batch size for dense embeddings per GPU
NER_BATCH_SIZE = 512    # Increased batch size for NER per GPU

# Initialize NVML for GPU temperature monitoring (optional, can be removed if not needed)
# pynvml.nvmlInit()

def format_time(seconds):
    """Format time in seconds to HH:MM:SS."""
    hours = int(seconds) // 3600
    minutes = (int(seconds) % 3600) // 60
    seconds = int(seconds) % 60
    return f"{hours:02d}:{minutes:02d}:{seconds:02d}"

@contextlib.contextmanager
def suppress_stdout_stderr():
    """Suppress stdout and stderr."""
    with open(os.devnull, 'w') as devnull:
        old_stdout = sys.stdout
        old_stderr = sys.stderr
        sys.stdout = devnull
        sys.stderr = devnull
        try:
            yield
        finally:
            sys.stdout = old_stdout
            sys.stderr = old_stderr

def remove_processing_instructions(html_text):
    """Remove processing instructions from HTML content using lxml."""
    try:
        with suppress_stdout_stderr():
            parser = etree.HTMLParser(remove_pis=True)
            tree = etree.fromstring(html_text.encode('utf-8'), parser)
            return etree.tostring(tree, encoding='unicode', method='html')
    except Exception:
        # If parsing fails, return the original text
        # Error is handled silently
        return html_text

def process_document(doc):
    error_count = 0  # Initialize error counter
    try:
        # Extract the 'description' field
        description = doc.get('_source', {}).get('description', '')
        try:
            description = remove_processing_instructions(description)
        except Exception:
            # If parsing fails, keep the original description
            error_count += 1

        # Process 'description' field using unstructured.io
        try:
            with suppress_stdout_stderr():
                elements = partition_html(
                    text=description,
                    chunking_strategy='by_title',
                    combine_text_under_n_chars=512,
                    max_characters=1028,
                    new_after_n_chars=786,
                    overlap=128
                )
        except Exception:
            # Error in partition_html
            error_count += 1
            return None, error_count  # Return None and error count

        docs = []
        chunk_count = len(elements)

        for i, element in enumerate(elements):
            try:
                # Skip elements that are processing instructions
                if hasattr(element, 'element') and isinstance(element.element, etree._ProcessingInstruction):
                    continue

                if hasattr(element, 'text') and element.text:
                    cleaned_text = clean(
                        element.text,
                        extra_whitespace=True,
                        dashes=True,
                        bullets=True,
                        trailing_punctuation=False,
                        lowercase=False
                    )
                    cleaned_text = clean_non_ascii_chars(cleaned_text)
                    cleaned_text = replace_unicode_quotes(cleaned_text)
                    cleaned_text = group_broken_paragraphs(cleaned_text)

                    title = doc["_source"].get("title", "")
                    location_name = doc["_source"].get("location_name", "")
                    cleaned_text = f'Titel: {title} \n Gemeente: {location_name} \n Tekst: \n\n {cleaned_text}'

                    docs.append({
                        "text": cleaned_text,
                        "es_id": doc["_id"],
                        "title": title,
                        "location": doc["_source"].get("location", ""),
                        "location_name": location_name,
                        "modified": doc["_source"].get("modified", ""),
                        "published": doc["_source"].get("published", ""),
                        "source": doc["_source"].get("source", ""),
                        "type": doc["_source"].get("type", ""),
                        "identifier": doc["_source"].get("identifier", ""),
                        "url": doc["_source"].get("url", ""),
                        "chunk_index": i,
                        "chunk_count": chunk_count
                    })
                else:
                    # Skip elements without text
                    continue
            except Exception:
                # Error processing element
                error_count += 1
                continue

        return (docs if docs else None), error_count
    except Exception:
        # Log exception and return None
        error_count += 1
        return None, error_count

def batch_iterator(iterator, batch_size):
    """Yield batches of documents from an iterator."""
    batch = []
    for item in iterator:
        batch.append(item)
        if len(batch) == batch_size:
            yield batch
            batch = []
    if batch:
        yield batch
indexing_pipeline = Pipeline()
indexing_pipeline.add_component("splitter", DocumentSplitter(split_by="word", split_length=200))
indexing_pipeline.add_component("embedder", CohereDocumentEmbedder(model="embed-multilingual-light-v3.0"))
indexing_pipeline.add_component("writer", DocumentWriter(document_store=document_store))
indexing_pipeline.connect("splitter", "embedder")
indexing_pipeline.connect("embedder", "writer")

indexing_pipeline.run({"splitter": {"documents": documents}})



KeyboardInterrupt: 

## Rag Pipieline

In [6]:
from haystack import Pipeline
from haystack.components.builders.prompt_builder import PromptBuilder
from haystack_integrations.components.embedders.cohere import CohereTextEmbedder
from haystack_integrations.components.generators.cohere import CohereGenerator
from haystack_integrations.components.rankers.cohere import CohereRanker
from haystack.components.preprocessors import DocumentCleaner
from haystack_integrations.components.retrievers.qdrant import QdrantHybridRetriever
from haystack_integrations.components.embedders.fastembed import FastembedSparseTextEmbedder	
import logging
from haystack import component
from haystack import Document
from typing import List, Dict, Any
from pprint import pprint
import os

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

# os.environ["COHERE_API_KEY"] = "RU9eGeOrKo0jD2Z6kAqOJAw2RpOmF4jGgO9ZAGQT" # OSF Trial API Key
os.environ["COHERE_API_KEY"] = "leBUANLdJzox27RHfrolRkiCzWIEMmyeBTeTKmsE" # OSF Production API Key

template = """
You will be provided with Dutch language docunents. Create informative answer in Dutch for a given question based solely on the provided documents. Use citations to support your answer.

\nDocuments:
{% for document in documents %}
    {{ document.content }}
{% endfor %}

\nQuestion: {{question}};
\nAnswer (include citations):
"""

# @component
# class ContentNormalizer:
#     @component.output_types(documents=List[Document])
#     def run(self, documents: List[Document]) -> Dict[str, Any]:
#         normalized_docs = []
#         for doc in documents:
#             normalized_docs.append(Document(id=doc.id, content=doc.meta['text'], meta=doc.meta, score=doc.score))
#         return {"documents": normalized_docs}

def create_rag_pipeline(document_store):
    rag_pipe = Pipeline("rag_pipe")

    rag_pipe.add_component("dense_text_embedder", CohereTextEmbedder(model="embed-multilingual-light-v3.0"))
    rag_pipe.add_component("sparse_text_embedder", FastembedSparseTextEmbedder(model="Qdrant/bm25"))
    rag_pipe.add_component("hybrid_retriever", QdrantHybridRetriever(document_store=document_store, top_k=50))
    rag_pipe.add_component("reranker", CohereRanker(model="rerank-multilingual-v3.0", top_k=20))
    rag_pipe.add_component("prompt_builder", PromptBuilder(template=template))
    rag_pipe.add_component("llm", CohereGenerator(model="command-r-plus"))
    # rag_pipe.add_component("content_normalizer", ContentNormalizer())

    rag_pipe.connect("sparse_text_embedder.sparse_embedding", "hybrid_retriever.query_sparse_embedding")
    rag_pipe.connect("dense_text_embedder.embedding", "hybrid_retriever.query_embedding")
    rag_pipe.connect("hybrid_retriever.documents", "reranker.documents")
    # rag_pipe.connect("hybrid_retriever.documents", "content_normalizer.documents")
    # rag_pipe.connect("content_normalizer.documents", "reranker.documents")
    rag_pipe.connect("reranker", "prompt_builder")
    rag_pipe.connect("prompt_builder", "llm")

    return rag_pipe

def main():
    question = "Klimaatbeleid Almelo"
    logger.info(f"Vraag: {question}")

    rag_pipe = create_rag_pipeline(document_store)

    result = rag_pipe.run({
        "dense_text_embedder": {"text": question},
        "sparse_text_embedder": {"text": question},
        "reranker": {"query": question},
        "prompt_builder": {"question": question}
    })
    
    # dump pipe
    with open("hayhooks/rag_pipe.yaml", "w") as file:
        rag_pipe.dump(file)
    
    pprint(result['llm'])

if __name__ == "__main__":
    main()

INFO:__main__:Vraag: Klimaatbeleid Almelo
DEBUG:haystack.core.pipeline.base:Adding component 'dense_text_embedder' (<haystack_integrations.components.embedders.cohere.text_embedder.CohereTextEmbedder object at 0x72b821d6b010>

Inputs:
  - text: str
Outputs:
  - embedding: List[float]
  - meta: Dict[str, Any])
DEBUG:haystack.core.pipeline.base:Adding component 'sparse_text_embedder' (<haystack_integrations.components.embedders.fastembed.fastembed_sparse_text_embedder.FastembedSparseTextEmbedder object at 0x72b8222d9c90>

Inputs:
  - text: str
Outputs:
  - sparse_embedding: SparseEmbedding)
DEBUG:haystack.core.pipeline.base:Adding component 'hybrid_retriever' (<haystack_integrations.components.retrievers.qdrant.retriever.QdrantHybridRetriever object at 0x72b8222da9d0>

Inputs:
  - query_embedding: List[float]
  - query_sparse_embedding: SparseEmbedding
  - filters: Union[Dict[str, Any], Filter]
  - top_k: Optional[int]
  - return_embedding: Optional[bool]
  - score_threshold: Optional[fl

{'meta': [{'citations': None,
           'documents': None,
           'finish_reason': 'COMPLETE',
           'index': 0,
           'model': 'command-r-plus',
           'usage': {'completion_tokens': 493.0, 'prompt_tokens': 6304.0}}],
 'replies': ['Het klimaatbeleid van Almelo richt zich op het verminderen van '
             'de CO2-uitstoot, het opwekken van duurzame energie en het '
             'voorbereid zijn op de effecten van klimaatverandering, zoals '
             'wateroverlast en hittestress. [1,2,3,4,5,6,7,8,9] De gemeente '
             'streeft ernaar om in 2030 55% minder CO2 uit te stoten en om in '
             '2050 een robuuste energieneutrale stad te zijn. [10] Dit wordt '
             'bereikt door het ontwikkelen en uitvoeren van '
             'duurzaamheidsbeleid, het stimuleren van duurzame initiatieven '
             'van derden en het treffen van klimaatmaatregelen. '
             '[1,3,4,5,6,7,11] \n'
             '\n'
             'De gemeente Almelo wer