# Load dataset from NarrativeQA

We use Narrative from Question Generation and RAG Evaluation

In [None]:
import pandas as pd
from Utils import *
data_loader = DatasetLoader()
from datasets import load_dataset
import os

vector_DB = VectorDatabase()
embedder = Embedder()
data_processor = DataProcessor(embedder=embedder, vectordatabase=vector_DB)

In [None]:
vector_DB.list_collections()

In [None]:
def extract_narrativeqa_text(split='train'):
    # 載入數據集
    dataset = load_dataset("deepmind/narrativeqa", split=split)
    
    # 用於存儲唯一文本的字典
    unique_summaries = {}
    unique_documents = {}
    
    total_summary_chars = 0
    total_document_chars = 0
    
    # 用於存儲問題和答案的列表
    questions = []
    answers = []
    
    # 從每個示例中提取文本
    for example in dataset:
        summary = example['document']['summary']['text']
        document = example['document']['text']
        metadata = example['document']['kind'] + "\\" + example['document']['summary']['title']
        
        # 只有當摘要和文檔都是唯一的時才添加到 df_doc
        if summary not in unique_summaries and document not in unique_documents:
            unique_summaries[summary] = metadata
            unique_documents[document] = metadata
            total_summary_chars += len(summary)
            total_document_chars += len(document)
        
        # 總是添加問題和答案到 df_qa
        questions.append(example['question']['text'])
        answers_text = ""
        for answer in example['answers']:
            answers_text += answer['text'] + ", "
        answers.append(answers_text)
            
    # 從字典創建列表
    summaries = list(unique_summaries.keys())
    documents = list(unique_documents.keys())
    metadata = [unique_summaries[s] for s in summaries]  # 將元數據與摘要對齊
    
    # 計算平均值
    num_examples = len(summaries)
    avg_summary_chars = total_summary_chars / num_examples if num_examples > 0 else 0
    avg_document_chars = total_document_chars / num_examples if num_examples > 0 else 0
    
    # 創建 df_doc DataFrame
    df_doc = pd.DataFrame({
        'summary': summaries,
        'document': documents,
        'metadata': metadata
    })
    
    # 創建 df_qa DataFrame
    df_qa = pd.DataFrame({
        'questions': questions,
        'ground_truths': answers,
        'answers': ['' for _ in range(len(questions))],
        'context': ['' for _ in range(len(questions))]
    })
    
    print(f'唯一文檔數量: {num_examples}')
    print(f'問答對數量: {len(df_qa)}')
    print(f'平均摘要長度: {avg_summary_chars:.2f} 字符')
    print(f'平均文檔長度: {avg_document_chars:.2f} 字符')
    
    return df_doc, df_qa

In [None]:
# df_doc, df_qa = extract_narrativeqa_text(split="train")
# df_doc.to_parquet(".parquet/narrative_qa_doc_full.parquet")
# df_qa.to_parquet(".parquet/narrative_qa_qa_full.parquet")
df_doc = pd.read_parquet(".parquet/narrative_qa_doc_full.parquet")
df_qa = pd.read_parquet(".parquet/narrative_qa_qa_full.parquet")

df_doc_sample = df_doc.sample(frac=0.01, random_state=42)
# df_qa_sample = df_qa.sample(frac=0.05, random_state=42)
df_doc_sample.to_parquet(".parquet/narrative_qa_doc_sample_11.parquet")
# df_qa_sample.to_parquet(".parquet/narrative_qa_qa_sample_11.parquet")
# df_doc_sample

In [None]:
# load parquet
df_doc_sample = pd.read_parquet(".parquet/narrative_qa_doc_sample_11.parquet")



# Transform the dataframe into .txts

In [None]:

import re
from bs4 import BeautifulSoup
import unicodedata

def preprocess_content(content: str) -> str:
    soup = BeautifulSoup(content, 'html.parser')
    text = soup.get_text()

    # unify to NFKC normalization form
    text = unicodedata.normalize('NFKC', text)

    # remove url
    text = re.sub(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', '', text)

    # remove extra whitespace
    text = re.sub(r'[ \t]+', ' ', text).strip()

    # remove special characters, but keep some punctuation
    text = re.sub(r'[^\w\s.,!?;:()"-]', '', text)

    # unify quotes
    text = text.replace('"', '"').replace('"', '"')

    # remove consecutive punctuation
    text = re.sub(r'([.,!?;:])\1+', r'\1', text)

    # ensure there is appropriate whitespace between sentences
    text = re.sub(r'([.,!?;:])\s*', r'\1 ', text)

    return text.strip()

def write_text_to_files_by_metadata(df):
    # Ensure the dataframe has the required columns
    if not all(col in df.columns for col in ['document', 'metadata']):
        raise ValueError("Dataframe must contain 'document' and 'metadata' columns")

    os.makedirs(".txt/", exist_ok=True)
    
    # Dictionary to keep track of file handles
    file_handles = {}

    try:
        for _, row in df.iterrows():
            metadata = row['metadata']
            metadata = metadata.replace(" ", "_").replace("/", "_").replace("\\", "_").replace(":", "_").replace("\"", "")
            document = preprocess_content(row['document'])

            # Create or get file handle
            if metadata not in file_handles:
                filename = f".txt/{metadata}.txt"
                file_handles[metadata] = open(filename, 'a', encoding='utf-8')

            # Write document to file
            file_handles[metadata].write(document + "\n\n")  # Add two newlines for separation

    finally:
        # Close all file handles
        for handle in file_handles.values():
            handle.close()

    print(f"Files created: {', '.join(f'{metadata}.txt' for metadata in file_handles.keys())}")

In [None]:
write_text_to_files_by_metadata(df_doc_sample)


# Embedder into Milvus (GPU) for txts

In [None]:
data_processor.directory_files_process("narrative_qa_full_gpu", ".txt/", True, True)

# Import GRAPH RAG data to neo4j

In [None]:
knowledge_DB = KnowledgeGraphDatabase()

In [None]:
knowledge_DB.transform_graph_rag_to_neo4j(datapath="../graph_rag_sample/output/20240906-153334/artifacts")

In [None]:
retriever = Retriever()
retriever.global_retrieve(0)

# Test Modular RAG

In [None]:
import pandas as pd
rag_evaluation_dataset = pd.read_parquet(".parquet/narrative_qa_qa_sample_11.parquet")
dataset_queries = rag_evaluation_dataset["questions"].tolist()[5:10:]
print(dataset_queries)
# print(vector_DB.list_collections())
answer = rag_evaluation_dataset["ground_truths"].tolist()[5:10:]
print(answer)


In [1]:
from Module import *
from Config.output_pydantic import *
from langchain_core.runnables.config import RunnableConfig
from IPython.display import Image


config = RunnableConfig(recursion_limit=1000000)
workflow = WorkFlowModularHybridRAG()

results = workflow.graph.stream({
    "dataset_queries": dataset_queries,
    "specific_collection": "narrative_qa_full_gpu",
}, config=config)

for result in results:
    print(result)


Image(workflow.graph.get_graph().draw_mermaid_png())



In [None]:
results["all_results"]

In [None]:
answer

In [None]:
from Module import *
from Config.output_pydantic import *
workflow = WorkFlowModularHybridRAG_Unit_Function_Test()

results = workflow.graph.stream({
    "specific_collection": "narrative_qa_full_gpu",
    "user_query": "What is the main topic of this dataset?",
})


from IPython.display import Image

# Image(workflow.graph.get_graph().draw_mermaid_png())
for result in results:
    print(result)




In [None]:
import operator
from typing import Annotated, TypedDict

from langchain_openai import ChatOpenAI
from langgraph.constants import Send
from langgraph.graph import END, StateGraph, START

# NOTE:
# - if you're using langchain-core >= 0.3, you need to use pydantic v2
# - if you're using langchain-core >= 0.2,<0.3, you need to use pydantic v1
from langchain_core import __version__ as core_version
from packaging import version

core_version = version.parse(core_version)
if (core_version.major, core_version.minor) < (0, 3):
    from pydantic.v1 import BaseModel, Field
else:
    from pydantic import BaseModel, Field

# Model and prompts
# Define model and prompts we will use
subjects_prompt = """Generate a comma separated list of between 2 and 5 examples related to: {topic}."""
joke_prompt = """Generate a joke about {subject}"""
best_joke_prompt = """Below are a bunch of jokes about {topic}. Select the best one! Return the ID of the best one.

{jokes}"""


class Subjects(BaseModel):
    subjects: list[str]


class Joke(BaseModel):
    joke: str


class BestJoke(BaseModel):
    id: int = Field(description="Index of the best joke, starting with 0", ge=0)


model = ChatOpenAI(model="gpt-4o-mini")

# Graph components: define the components that will make up the graph


# This will be the overall state of the main graph.
# It will contain a topic (which we expect the user to provide)
# and then will generate a list of subjects, and then a joke for
# each subject
class OverallState(TypedDict):
    topic: str
    subjects: list
    # Notice here we use the operator.add
    # This is because we want combine all the jokes we generate
    # from individual nodes back into one list - this is essentially
    # the "reduce" part
    jokes: Annotated[list, operator.add]
    best_selected_joke: str


# This will be the state of the node that we will "map" all
# subjects to in order to generate a joke
class JokeState(TypedDict):
    subject: str


# This is the function we will use to generate the subjects of the jokes
def generate_topics(state: OverallState):
    prompt = subjects_prompt.format(topic=state["topic"])
    response = model.with_structured_output(Subjects).invoke(prompt)
    return {"subjects": response.subjects}


# Here we generate a joke, given a subject
def generate_joke(state: JokeState):
    prompt = joke_prompt.format(subject=state["subject"])
    response = model.with_structured_output(Joke).invoke(prompt)
    return {"jokes": [response.joke]}


# Here we define the logic to map out over the generated subjects
# We will use this an edge in the graph
def continue_to_jokes(state: OverallState):
    # We will return a list of `Send` objects
    # Each `Send` object consists of the name of a node in the graph
    # as well as the state to send to that node
    return [Send("generate_joke", {"subject": s}) for s in state["subjects"]]


# Here we will judge the best joke
def best_joke(state: OverallState):
    jokes = "\n\n".join(state["jokes"])
    prompt = best_joke_prompt.format(topic=state["topic"], jokes=jokes)
    response = model.with_structured_output(BestJoke).invoke(prompt)
    return {"best_selected_joke": state["jokes"][response.id]}


# Construct the graph: here we put everything together to construct our graph
graph = StateGraph(OverallState)
graph.add_node("generate_topics", generate_topics)
graph.add_node("generate_joke", generate_joke)
graph.add_node("best_joke", best_joke)
graph.add_edge(START, "generate_topics")
graph.add_conditional_edges("generate_topics", continue_to_jokes, ["generate_joke"])
graph.add_edge("generate_joke", "best_joke")
graph.add_edge("best_joke", END)
app = graph.compile()

In [None]:
for s in app.stream({"topic": "animals"}):
    print(s)

# Test out Retriever for local retriever and global retriever

In [None]:
from Utils import *
retriever = Retriever()
knowledge_DB = KnowledgeGraphDatabase()
# knowledge_DB.create_entity_vector_index()
# knowledge_DB.create_community_weight()
global_result = retriever.global_retrieve(0)
local_result = retriever.local_retrieve(["What is the meaning of life"])

In [None]:
local_result

In [None]:
global_result["communities"]

In [None]:
import Config.constants as const
import json
from MultiAgent import *
from Utils import *

retriever = Retriever()
multi_agent = MultiAgent_RAG()
# all_communities = retriever.global_retrieve(0)["communities"]

# batches = []
# for i in range(0, len(all_communities), const.NODE_BATCH_SIZE):
#     batch_communities = all_communities[i:i + const.NODE_BATCH_SIZE]
#     batches.append({
#         "user_query": "What is the meaning of life",
#         "sub_queries": [],
#         "batch_communities": batch_communities,
#         "batch_size": len(batch_communities),
#     })


# all_scores = multi_agent.topic_reranking_run_batch_async(node_batch_inputs=batches).relevant_scores
# print(all_scores)
# print(len(all_scores))
# print(len(all_communities))

multi_agent.user_query_classification_run(user_query="Why does the author choose to use first-person point of view in this article?")





In [None]:
from Utils import *
retriever = Retriever()
retriever.hybrid_retrieve(collection_name="narrative_qa_full_gpu", query_texts=["What is the meaning of life"])