In [1]:
from haystack.components.converters import TextFileToDocument
from haystack.components.embedders import OpenAIDocumentEmbedder
from haystack_integrations.document_stores.weaviate.document_store import WeaviateDocumentStore
from haystack.components.writers import DocumentWriter
from haystack import Pipeline
from haystack_integrations.document_stores.elasticsearch import ElasticsearchDocumentStore

  from .autonotebook import tqdm as notebook_tqdm


### Pseudocode

In [2]:
def index_wiki_data(category: str, filepath: str, category_pages_indexed: dict) -> int:
    """
    Indexes already chunked wiki data for all pages in a category and its subcategories. Chunked data is available in 
    the .metadata/chunk directory. The intermediate embeddings are stored in the .metadata/index/embeddings directory.
    
    List of Haystack Document objects is created from stored chunks and stored into three databases:
    - ElasticsearchDocumentStore: for full-text search (list of Document objects without embeddings is stored)
    - WeaviateDocumentStore: for vector search (list of Document objects enriched with embeddings is stored)
    - Neo4j: for graph search (list of Document objects are stored as Chunk type nodes and Section, Page, Category type nodes
    are created to represent the structure of the data)
    
    pseuodocode:
    - get pages and subcategories from title_pathname map
    - create pages_filename_set and categories_dirname_set
    
    - for each page in pages:
        - if page is not already indexed (check in redis set):
        - if page not in pages_filename_set:
            - call get_chunks to get list of Document objects from chunks
            - store documents in ElasticsearchDocumentStore (DocumentWriter(document_store = document_store, policy=DuplicatePolicy.SKIP)
            - call get_embedded_documents to get list of Document objects with embeddings
            - store documents in WeaviateDocumentStore (DocumentWriter(document_store = document_store, policy=DuplicatePolicy.SKIP)
            - call create_graph to create graph representation of the data in Neo4j -- ensure duplicate nodes and edges not created
            - add page to redis set
    
    - for each subcategory in categories:
        - if subcategory is not already indexed (check in redis set):
        - if subcategory not in categories_dirname_set:
            - call index_wiki_data recursively on subcategory
            - add subcategory to redis set
        

    """

In [None]:
def create_category_graph(category: str, filepath: str) -> None:
    """
    Creates a graph representation of the category and connections to its subcategories and pages. The graph is created 
    on top of the individual page hierarchy graphs already existing in Neo4j. 
    
    pseuodocode:
    - get pages and subcategories from title_pathname map
    
    - for each page in pages:
        - call create_category_to_page relationship
        
    - for each subcategory in categories:
        - call create_category_to_subcategory relationship
        - call create_category_graph recursively on subcategory
    """

### Code

In [None]:
import os
import json
import redis
from typing import List, Dict
from pathlib import Path
from haystack import Document
from typing import Tuple
from haystack.document_stores.types import DuplicatePolicy

# Connect to Redis
r = redis.Redis(host='localhost', port=6379, db=0)
embedder = OpenAIDocumentEmbedder(model="text-embedding-3-small")
w_store = WeaviateDocumentStore(url="http://localhost:8088")
w_writer = DocumentWriter(document_store=w_store), policy=DuplicatePolicy.SKIP
e_store = ElasticsearchDocumentStore(hosts= "http://localhost:9200")
e_writer = DocumentWriter(document_store=e_store, policy=DuplicatePolicy.SKIP)

def get_title_pathname_map(filepath: str) -> dict:
    """
    Reads the title_pathname map from a file. Returns an empty dictionary if the file does not exist.
    """
    title_pathname_filepath = os.path.join(filepath, ".metadata/download/title_pathname.json")
    if not os.path.exists(title_pathname_filepath):
        return {"pages": {}, "categories": {}}
    
    with open(title_pathname_filepath, "r") as file:
        title_pathname = json.load(file)
    
    return title_pathname

def get_documents_and_page_hierarchy(filepath: str, page_title: str, page_filename: str) -> Tuple[List[Document], dict]:
    """
    Extracts the documents and hierarchy of a page from the stored chunks in the .metadata/chunk/{page_filename}.json file.
    """
    chunk_filepath = os.path.join(filepath, ".metadata/chunk", f"{page_filename}.json")
    if not os.path.exists(chunk_filepath):
        return [], {}
    
    with open(chunk_filepath, "r") as file:
        data = json.load(file)
    
    if not "splitter" in data:
        return [], {}
    
    documents = []
    if "documents" in data:
        documents = data["documents"]
        documents = [Document.from_dict(doc) for doc in documents]  # convert dict to Haystack Document object

    hierarchy = {}
    if "hierarchy" in data:
        hierarchy = data["hierarchy"]
    
    return documents, hierarchy


def store_documents_elasticsearch(documents: List[Document]) -> None:
    """
    Store documents in ElasticsearchDocumentStore.
    """
    e_writer.run(documents=documents)
    
def get_embedded_documents(documents: List[Document], filepath: str, page_filename: str) -> List[Document]:
    """
    Get embeddings for the documents using the OpenAIDocumentEmbedder.
    
    Store the embedded documents at .metadata/index/embeddings/{page_filename}.json
    """
    embedded_documents = embedder.run(documents=documents)
    
    # Store the embedded documents
    embeddings_filepath = os.path.join(filepath, ".metadata/index/embeddings", f"{page_filename}.json")
    embedded_docs_file_to_save = {
        "documents": [doc.to_dict() for doc in embedded_documents["documents"]],    # convert Haystack Document object to dict
        "meta": embedded_documents["meta"]
        }
    with open(embeddings_filepath, "w") as file:
        json.dump(embedded_docs_file_to_save, file)
    
    return embedded_documents["documents"]

def store_documents_weaviate(documents: List[Document]) -> None:
    """
    store documents in WeaviateDocumentStore.
    """
    w_writer.run(documents=documents)
    

def create_graph(documents: List[dict]) -> None:
    """
    Placeholder function to create graph representation of the data in Neo4j.
    """
    pass

def index_wiki_data(category: str, filepath: str, category_pages_indexed: Dict[str, int]) -> int:
    """
    Indexes already chunked wiki data for all pages in a category and its subcategories. Chunked data is available in 
    the .metadata/chunk directory. The intermediate embeddings are stored in the .metadata/index/embeddings directory.
    
    List of Haystack Document objects is created from stored chunks and stored into three databases:
    - ElasticsearchDocumentStore: for full-text search (list of Document objects without embeddings is stored)
    - WeaviateDocumentStore: for vector search (list of Document objects enriched with embeddings is stored)
    - Neo4j: for graph search (list of Document objects are stored as Chunk type nodes and Section, Page, Category type nodes
    are created to represent the structure of the data)
    """
    title_pathname = get_title_pathname_map(filepath)
    
    pages_filename_set = {file.name for file in Path(filepath).glob("*.html")}
    categories_dirname_set = {dir.name for dir in Path(filepath).iterdir() if dir.is_dir() and dir.name != ".metadata"}
    
    num_total_pages_indexed = 0
    
    pages = title_pathname["pages"]
    for page_title, page_filename in pages.items():
        if r.sismember("indexed_pages", page_title):
            continue
        if page_filename not in pages_filename_set:
            continue
        documents, hierarchy = get_documents_and_page_hierarchy(filepath, page_title, page_filename)
        store_documents_elasticsearch(documents)
        embedded_documents = get_embedded_documents(documents)
        store_documents_weaviate(embedded_documents)
        create_graph(hierarchy)
        r.sadd("indexed_pages", page_title)
        num_total_pages_indexed += 1
    
    if num_total_pages_indexed > 0:
        category_pages_indexed[category] = num_total_pages_indexed
    
    subcategories = title_pathname["categories"]
    for subcategory_title, subcategory_path in subcategories.items():
        if r.sismember("indexed_categories", subcategory_title):
            continue
        if subcategory_path not in categories_dirname_set:
            continue
        subcategory_path = os.path.join(filepath, subcategory_path)
        num_total_pages_indexed += index_wiki_data(subcategory_title, subcategory_path, category_pages_indexed)
        r.sadd("indexed_categories", subcategory_title)
    
    return num_total_pages_indexed