In [36]:
from bs4 import BeautifulSoup
import re

def WikiPageChunks(html_str: str) -> list:
    soup = BeautifulSoup(html_str, 'html.parser')  # Parse the HTML content

    chunks = []

    def clean_text(text):
        cleaned_text = text.replace('\n', ' ').replace('\xa0', ' ')  # Replace newlines and non-breaking spaces with regular spaces
        cleaned_text = re.sub(r'\s+', ' ', cleaned_text)  # Replace multiple spaces with a single space
        cleaned_text = re.sub(r'(\d)\s+(\d)', r'\1\2', cleaned_text)  # Remove spaces between digits
        cleaned_text = cleaned_text.strip()  # Remove leading and trailing spaces
        return cleaned_text

    html_soup = soup.body or soup  # Use the body of the HTML if it exists, otherwise use the whole soup
    nested = ['ul', 'ol', 'dl', 'li', 'dt', 'dd']  # Tags that represent nested lists
    for tag in html_soup.find_all(recursive=False):  # Iterate over top-level tags in the HTML
        if tag.name == 'p':
            chunks.append(clean_text(tag.get_text(separator=' ')))  # Clean and add paragraph text to chunks
        elif tag.name == 'link':
            continue  # Skip link tags
        elif tag.name in nested:
            list_items = tag.find_all('li')  # Find all list items
            for li in list_items:
                chunks.append(clean_text(li.get_text(separator=' ')))  # Clean and add each list item text to chunks
        else:
            chunks.append(str(tag))  # Add other tags as strings
    
    return chunks

In [37]:
from typing import List, Dict, Any
import uuid
from haystack import Document
from haystack import component

class Chunk:
    def __init__(self, id: str, next: str = None):
        self.id = id
        
class Section:
    def __init__(self, name: str, type: str):
        self.name = name
        self.type = type
        self.chunks = []
        self.sections = []
        
class Page:
    def __init__(self, title: str):
        self.title = title
        self.sections = []
        self.chunks = []
        
@component
class WikiPageChunker:
    """
    A component that splits the content of Wikipedia pages into chunks.
    - The document content is expected to be in HTML format fetched via wikipediaapi and
    which has been run through TextFileToDocument converter.
    - Each chunk is a paragraph, list, or table in the Wikipedia page.
    - Each chunk is stored as a separate document with text in 'content' field
    - Each chunk also stores title, h2, h3 etc in meta field.
    - Custom component also creates a hierarchical structure of the chunks based on title, h2, h3 etc.
    """
    @component.output_types(documents=List[Document], hierarchy=dict)
    def run(self, documents: List[Document]):
        chunks = []
        hierarchy = {}
        
        for doc in documents:
            page_title = doc.meta["file_path"].replace(".html", "")
            page = Page(page_title)
            
            html_content = doc.content
            page_chunks = WikiPageChunks(html_content)
            i = 0
            current_h2 = ""
            current_h3 = ""
            current_h4 = ""
            current_section = None
            current_sub_section = None
            current_sub_sub_section = None

            for chunk in page_chunks:
                if chunk == "":
                    continue
                if chunk.startswith("<h2>"):
                    current_h2 = chunk[4:-5]  # Extract text between <h2> and </h2>
                    current_h3 = ""  # Reset h3 when a new h2 is found
                    current_h4 = ""  # Reset h4 when a new h2 is found
                    current_section = Section(current_h2, "h2")
                    page.sections.append(current_section)
                    current_sub_section = None
                    current_sub_sub_section = None
                elif chunk.startswith("<h3>"):
                    current_h3 = chunk[4:-5]  # Extract text between <h3> and </h3>
                    current_h4 = ""  # Reset h4 when a new h3 is found
                    if current_section:
                        current_sub_section = Section(current_h3, "h3")
                        current_section.sections.append(current_sub_section)
                        current_sub_sub_section = None
                elif chunk.startswith("<h4>"):
                    current_h4 = chunk[4:-5]  # Extract text between <h4> and </h4>
                    if current_sub_section:
                        current_sub_sub_section = Section(current_h4, "h4")
                        current_sub_section.sections.append(current_sub_sub_section)
                else:
                    meta = {
                        "file_path": doc.meta["file_path"],
                        "source_id": doc.id,
                        "split_id": i,
                        "title": "Dinosaurs"
                    }
                    if current_h2:
                        meta["h2"] = current_h2
                    if current_h3:
                        meta["h3"] = current_h3
                    if current_h4:
                        meta["h4"] = current_h4

                    chunk_obj = Chunk(str(uuid.uuid4()))
                    chunks.append(
                        Document(
                            id=chunk_obj.id,
                            content=chunk,
                            meta=meta
                        )
                    )
                    if current_sub_sub_section:
                        current_sub_sub_section.chunks.append(chunk_obj)
                    elif current_sub_section:
                        current_sub_section.chunks.append(chunk_obj)
                    elif current_section:
                        current_section.chunks.append(chunk_obj)
                    else:
                        page.chunks.append(chunk_obj)
                    i += 1
            
            hierarchy[page_title] = self.page_to_dict(page)
        
        return {"documents": chunks, "hierarchy": hierarchy}

    def page_to_dict(self, page: Page) -> Dict[str, Any]:
        return {
            "title": page.title,
            "sections": [self.section_to_dict(section) for section in page.sections],
            "chunks": [self.chunk_to_dict(chunk) for chunk in page.chunks],
        }

    def section_to_dict(self, section: Section) -> Dict[str, Any]:
        return {
            "name": section.name,
            "type": section.type,
            "chunks": [self.chunk_to_dict(chunk) for chunk in section.chunks],
            "sections": [self.section_to_dict(sub_section) for sub_section in section.sections],
        }

    def chunk_to_dict(self, chunk: Chunk) -> Dict[str, Any]:
        return {
            "id": chunk.id,
        }

In [38]:
from neo4j import GraphDatabase

class Neo4jGraphCreator:
    def __init__(self, uri, user, password):
        self.driver = GraphDatabase.driver(uri, auth=(user, password))

    def close(self):
        self.driver.close()

    def create_graph(self, page_dict):
        with self.driver.session() as session:
            # Create Page node with UUID
            page_uuid = str(uuid.uuid4())
            page_query = """
            MERGE (p:Page {title: $title})
            ON CREATE SET p.uuid = $uuid
            RETURN p
            """
            session.run(page_query, title=page_dict['title'], uuid=page_uuid)

            # Create sections and chunks
            self.create_sections_and_chunks(session, page_uuid, page_dict['sections'])
            self.create_chunks(session, page_uuid, page_dict['chunks'])

    def create_sections_and_chunks(self, session, parent_uuid, sections):
        for section in sections:
            section_uuid = str(uuid.uuid4())
            section_labels = f":Section:{section['type']}"
            
            # Query to find either Page or Section as parent
            section_query = f"""
            MATCH (parent {{uuid: $parent_uuid}})
            MERGE (s{section_labels} {{name: $name}})
            ON CREATE SET s.uuid = $uuid
            MERGE (parent)-[:HAS_SECTION]->(s)
            RETURN s
            """
            session.run(section_query, parent_uuid=parent_uuid, name=section['name'], uuid=section_uuid)

            # Recursively create sub-sections and chunks
            self.create_sections_and_chunks(session, section_uuid, section['sections'])
            self.create_chunks(session, section_uuid, section['chunks'])

    def create_chunks(self, session, parent_uuid, chunks):
        first_chunk_created = False
        
        for i, chunk in enumerate(chunks):
            # Use chunk['id'] as the UUID
            chunk_uuid = chunk['id']
            
            chunk_query = """
            MATCH (parent {uuid: $parent_uuid})
            MERGE (c:Chunk {uuid: $uuid})
            MERGE (parent)-[:HAS_CHUNK]->(c)
            RETURN c
            """
            session.run(chunk_query, parent_uuid=parent_uuid, uuid=chunk_uuid)

            # Create the FIRST_CHUNK relationship if first_chunk not yet created
            if not first_chunk_created and i == 0:
                first_chunk_query = """
                MATCH (parent {uuid: $parent_uuid}), (c:Chunk {uuid: $chunk_uuid})
                MERGE (parent)-[:FIRST_CHUNK]->(c)
                RETURN parent, c
                """
                session.run(first_chunk_query, parent_uuid=parent_uuid, chunk_uuid=chunk['id'])
                first_chunk_created = True

        # Create NEXT relationships between chunks once all chunks are created
        for i, chunk in enumerate(chunks):
            # Set the NEXT relationship
            if i < len(chunks) - 1:
                next_chunk_query = """
                MATCH (c1:Chunk {uuid: $uuid1}), (c2:Chunk {uuid: $uuid2})
                MERGE (c1)-[:NEXT]->(c2)
                RETURN c1, c2
                """
                session.run(next_chunk_query, uuid1=chunk['id'], uuid2=chunks[i + 1]['id'])

In [39]:
@component
class GraphCreatorComponent:
    """
    A component that creates a graph in Neo4j from the hierarchical structure of Wikipedia page chunks.
    """
    def run(self, hierarchy: dict):
        neo4j_creator = Neo4jGraphCreator("bolt://localhost:7687", "neo4j", "neo4jpass")
        neo4j_creator.create_graph(hierarchy["Dinosaur"])
        neo4j_creator.close()
        return {}

In [40]:
from pathlib import Path
from haystack.components.converters import TextFileToDocument
from haystack.components.embedders import OpenAIDocumentEmbedder
from haystack_integrations.document_stores.weaviate.document_store import WeaviateDocumentStore
from haystack.components.writers import DocumentWriter
from haystack import Pipeline
from haystack_integrations.document_stores.elasticsearch import ElasticsearchDocumentStore


converter = TextFileToDocument()
splitter = WikiPageChunker()
embedder = OpenAIDocumentEmbedder(model="text-embedding-3-small")
w_store = WeaviateDocumentStore(url="http://localhost:8088")
w_writer = DocumentWriter(document_store=w_store)
e_store = ElasticsearchDocumentStore(hosts= "http://localhost:9200")
e_writer = DocumentWriter(document_store=e_store)
graph_creator = GraphCreatorComponent()

indexing_pipeline = Pipeline()

indexing_pipeline.add_component("converter", converter)
indexing_pipeline.add_component("splitter", splitter)
indexing_pipeline.add_component("embedder", embedder)
indexing_pipeline.add_component("w_writer", w_writer)
indexing_pipeline.add_component("e_writer", e_writer)
indexing_pipeline.add_component("graph_creator", graph_creator)

indexing_pipeline.connect("converter", "splitter")
indexing_pipeline.connect("splitter.documents", "embedder.documents")
indexing_pipeline.connect("embedder", "w_writer")
indexing_pipeline.connect("splitter.documents", "e_writer.documents")
indexing_pipeline.connect("splitter.hierarchy", "graph_creator.hierarchy")



<haystack.core.pipeline.pipeline.Pipeline object at 0x7281bf55d100>
🚅 Components
  - converter: TextFileToDocument
  - splitter: WikiPageChunker
  - embedder: OpenAIDocumentEmbedder
  - w_writer: DocumentWriter
  - e_writer: DocumentWriter
  - graph_creator: GraphCreatorComponent
🛤️ Connections
  - converter.documents -> splitter.documents (List[Document])
  - splitter.documents -> embedder.documents (List[Document])
  - splitter.documents -> e_writer.documents (List[Document])
  - splitter.hierarchy -> graph_creator.hierarchy (dict)
  - embedder.documents -> w_writer.documents (List[Document])

In [41]:
w_store.count_documents()

0

In [42]:
e_store.count_documents()

0

In [44]:
indexing_pipeline.run(data={"converter": {"sources": [Path("Dinosaur.html")]}})

  timestamp = datetime.utcnow().replace(tzinfo=tzutc())
Calculating embeddings: 100%|██████████| 9/9 [00:07<00:00,  1.17it/s]


{'embedder': {'meta': {'model': 'text-embedding-3-small',
   'usage': {'prompt_tokens': 20252, 'total_tokens': 20252}}},
 'e_writer': {'documents_written': 258},
 'w_writer': {'documents_written': 258}}

In [48]:
e_store.filter_documents(filters = {"field": "id", "operator": "==", "value": "bceced15-011d-4c78-9be4-168e32244697"})

[Document(id=bceced15-011d-4c78-9be4-168e32244697, content: 'Scientists will probably never be certain of the largest and smallest dinosaurs to have ever existed...', meta: {'file_path': 'Dinosaur.html', 'source_id': '93000a3fb02b99d2d115cd4042256d2f5db2a0ff3928927ca14465276534a75e', 'split_id': 201, 'title': 'Dinosaurs', 'h2': 'Paleobiology', 'h3': 'Size', 'h4': 'Largest and smallest'}, score: 0.0, embedding: vector of size 1536)]

w_store cannot be filter queried without defining a schema it seems. e_store works fine when wanting to query.

In [50]:
e_store.filter_documents(filters = {"field": "id", "operator": "==", "value": "b47767a8-8844-4073-b16f-bf6be076525d"})

[Document(id=b47767a8-8844-4073-b16f-bf6be076525d, content: 'Large meat-eating dinosaurs had a complex system of air sacs similar to those found in modern birds,...', meta: {'file_path': 'Dinosaur.html', 'source_id': '93000a3fb02b99d2d115cd4042256d2f5db2a0ff3928927ca14465276534a75e', 'split_id': 231, 'title': 'Dinosaurs', 'h2': 'Origin of birds', 'h3': 'Soft anatomy'}, score: 0.0, embedding: vector of size 1536)]

Indexing successful!

***Note: Use Elasticsearch to fetch document by id. Weaviate schema hasn't been defined during document store init, therefore filtering does not work there.***

In [51]:
e_store.filter_documents(filters = {"id": "75b8c578-e366-483d-8a1c-3e32a93dcc36"})




[Document(id=75b8c578-e366-483d-8a1c-3e32a93dcc36, content: 'Dinosaurs diverged from their archosaur ancestors during the Middle to Late Triassic epochs, roughly...', meta: {'file_path': 'Dinosaur.html', 'source_id': '93000a3fb02b99d2d115cd4042256d2f5db2a0ff3928927ca14465276534a75e', 'split_id': 39, 'title': 'Dinosaurs', 'h2': 'Evolutionary history', 'h3': 'Origins and early evolution'}, score: 0.0, embedding: vector of size 1536)]

Above query is deprecated, use earlier filter

Below is a more complex query example:

In [52]:
e_store.filter_documents(filters={
    "operator": "AND",
    "conditions": [
        {"field": "meta.title", "operator": "==", "value": "Dinosaurs"},
        {"field": "meta.h2", "operator": "==", "value": "Paleobiology"},
        {"field": "meta.h3", "operator": "==", "value": "Size"},
        {"field": "meta.h4", "operator": "==", "value": "Largest and smallest"}
    ]
})

[Document(id=bceced15-011d-4c78-9be4-168e32244697, content: 'Scientists will probably never be certain of the largest and smallest dinosaurs to have ever existed...', meta: {'file_path': 'Dinosaur.html', 'source_id': '93000a3fb02b99d2d115cd4042256d2f5db2a0ff3928927ca14465276534a75e', 'split_id': 201, 'title': 'Dinosaurs', 'h2': 'Paleobiology', 'h3': 'Size', 'h4': 'Largest and smallest'}, score: 0.0, embedding: vector of size 1536),
 Document(id=7a844fcb-32a5-4dbf-bfd7-097d78f5a299, content: 'The tallest and heaviest dinosaur known from good skeletons is Giraffatitan brancai (previously clas...', meta: {'file_path': 'Dinosaur.html', 'source_id': '93000a3fb02b99d2d115cd4042256d2f5db2a0ff3928927ca14465276534a75e', 'split_id': 202, 'title': 'Dinosaurs', 'h2': 'Paleobiology', 'h3': 'Size', 'h4': 'Largest and smallest'}, score: 0.0, embedding: vector of size 1536),
 Document(id=5c19df63-6a0e-4d11-bfa5-825bbcbbc2fc, content: 'There were larger dinosaurs, but knowledge of them is based entirel

***
Note:
- Most operations that can be done with Neo4j graph can be done using above filtering mechanism as well. Example - fetch all chunks of a given subsection deeply nested in a given title.
- Exception being - find the previous and next chunk given a certain chunk id. Even this can be achieved with just Elasticsearch filter query, but would be much more natural with Neo4j.
- In future, if more relations need to be added, for example - linking a chunk in one title with a chunk in another title, this can be done much more naturally in Neo4j.
***

More sanity check:

In [53]:
from haystack import Document
from haystack_integrations.components.retrievers.weaviate.embedding_retriever import WeaviateEmbeddingRetriever
from haystack_integrations.components.retrievers.elasticsearch import ElasticsearchBM25Retriever


question = "What are heavy, quadrapedal thyreophorans?"

e_retriever = ElasticsearchBM25Retriever(document_store=e_store)
w_retriever = WeaviateEmbeddingRetriever(document_store=w_store, top_k=1)

document = Document(content=question)
result = embedder.run(documents=[document])
embedded_document = result["documents"][0]
embedding = embedded_document.embedding

Calculating embeddings: 100%|██████████| 1/1 [00:00<00:00,  2.41it/s]


In [54]:
retrieval_result = w_retriever.run(query_embedding=embedding, top_k=1)
print(retrieval_result)
retrieved_documents = retrieval_result["documents"]
for doc in retrieved_documents:
    print(doc.content)

{'documents': [Document(id=246e0812-17d0-49c6-8253-e69279ba95bf, content: '†Eurypoda (heavy, quadrupedal thyreophorans)', meta: {'h3': 'Taxonomy', 'source_id': '93000a3fb02b99d2d115cd4042256d2f5db2a0ff3928927ca14465276534a75e', 'split_id': 59.0, 'file_path': 'Dinosaur.html', 'title': 'Dinosaurs', 'h4': None, 'h2': 'Classification'}, score: 0.857631266117096, embedding: vector of size 1536)]}
†Eurypoda (heavy, quadrupedal thyreophorans)


In [55]:
elastic_fetched = e_retriever.run(query=question, top_k=1)

elastic_fetched

{'documents': [Document(id=246e0812-17d0-49c6-8253-e69279ba95bf, content: '†Eurypoda (heavy, quadrupedal thyreophorans)', meta: {'file_path': 'Dinosaur.html', 'source_id': '93000a3fb02b99d2d115cd4042256d2f5db2a0ff3928927ca14465276534a75e', 'split_id': 59, 'title': 'Dinosaurs', 'h2': 'Classification', 'h3': 'Taxonomy'}, score: 19.630394, embedding: vector of size 1536)]}

Sanity check: Neo4j also has this chunk under h3: Taxonomy. Check. Content cannot be compared as Neo4j is not storing text content.