In [None]:
import json
from langchain_community.document_loaders import WebBaseLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_core.vectorstores import InMemoryVectorStore
from langchain.vectorstores.base import VectorStoreRetriever
from langchain_openai import OpenAIEmbeddings
from dotenv import load_dotenv
load_dotenv()


True

In [71]:
import textwrap

def pretty_print(text, width=80):
    wrapper = textwrap.TextWrapper(width=width, replace_whitespace=False)
    # Handle both real \n and escaped \\n
    clean_text = text.replace("\\n", "\n").replace("\u200b", "")
    paragraphs = clean_text.split("\n\n")
    
    for para in paragraphs:
        # Dedent so we don't get weird indents from source formatting
        dedented = textwrap.dedent(para.strip())
        print(wrapper.fill(dedented))
        print()  # preserve blank line between paragraphs


In [23]:
RETRIEVAL_SOURCES_PATH = '../constants/retrieval_sources.json'

def _get_src_urls(src_path: str = RETRIEVAL_SOURCES_PATH):
    with open(src_path) as retrieval_src_file:
        retrieval_srcs = json.load(retrieval_src_file)
        urls = retrieval_srcs['bsky'] + retrieval_srcs['skyware']
        return urls

In [73]:
urls = _get_src_urls()
docs = [WebBaseLoader(url).load() for url in urls]

docs[0][0].page_content.strip()[:1000]

'The AT Protocol | Bluesky\n\n\n\n\n\n\nSkip to main contentBlueskyDocsBlogShowcaseGitHubSearchGet StartedTutorialsStarter TemplatesAdvanced GuidesThe AT ProtocolFederation ArchitectureLinks, mentions, and rich textRate LimitsLabels and moderationPostsTimestampsFirehoseResolving IdentitiesCustom SchemasBackfilling the NetworkRead-After-WriteService AuthPDS EntrywayoEmbed and Post Embed WidgetAction Intent LinksOAuth Client ImplementationAPI Hosts and AuthHTTP ReferenceSupportAdvanced GuidesThe AT ProtocolOn this pageThe AT Protocol\nThe AT Protocol (Authenticated Transfer Protocol, or atproto) is a standard for public conversation and an open-source framework for building social apps.\nIt creates a standard format for user identity, follows, and data on social apps, allowing apps to interoperate and users to move across them freely. It is a federated network with account portability.\nBasic Concepts\u200b\nIdentity\u200b\nUsers are identified by domain names on the AT Protocol. These d

In [95]:
docs_list = [item for sublist in docs for item in sublist]
text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
    chunk_size=100,
    chunk_overlap=50
)
doc_splits = text_splitter.split_documents(docs_list)

In [30]:
doc_splits[0].page_content.strip()

'The AT Protocol | Bluesky'

In [52]:
vectorstore = InMemoryVectorStore.from_documents(
    documents=doc_splits,
    embedding=OpenAIEmbeddings()
)
retriever = vectorstore.as_retriever()

{}


In [51]:
from langchain.tools.retriever import create_retriever_tool

retriever_tool = create_retriever_tool(
    retriever,
    "retrieve_bsky_docs",
    "Search and return information about the Bluesky social app and Bluesky labelers.",
)

In [69]:
# Use retrieval tool defined above to search for information about "label configuration":
# Returns amount of chunks specified in retriever.

docs = retriever_tool.invoke({'query': 'label configuration'})
pretty_print(docs)

Some examples of the definitions you might use for a label

Label configuration
The user may choose to hide, warn, or ignore each label from
filter the labeled content from feeds and listings. Ignore just ignores the
label. If adult content is not enabled in preferences, the behavior should force
to hide with no override.
Labelers
Labelers publish labels through a labeling
service. They also receive reports through a reporting service.

For this guide, we’ve set up an example labeler with four labels for a user to
choose from: fire, water, air, and earth. These labels have been set to “inform”
because they’re intended to be used for informational purposes, “warn” severity
so that they appear on user profiles and posts, and no blur because these are
not moderation labels.

Developers building client applications should understand how to apply labels
(#2) and user controls (#3). For more complete details, see the Labels
Specification.
Labels
Labels are published by moderation services, w

Evaluate doc relevance

In [None]:
from pydantic import BaseModel, Field
from typing import Literal
from langchain.chat_models import init_chat_model
from langgraph.graph import MessagesState

EVAL_PROMPT = (
    "You are an evaluator assessing the relevance of a retrieved document to a user question. \n"
    "Here is the retrieved document: \n\n {context} \n\n"
    "Here is the user question: {question} \n"
    "If the document contains keyword(s) or semantic meaning related to the user question, grade it as relevant. \n"
    "Give a binary score 'yes' or 'no' score to indicate whether the document is relevant to the question."
)

class EvaluateDocuments(BaseModel):
    """Evaluate documents using a binary score for relevance check"""
    binary_score: str = Field(
        description="Relevance score: 'yes' if relevant, or 'no' if not relevant"
    )

eval_model = init_chat_model("openai:gpt-4.1", temperature=0)

def evaluate_documents(state: MessagesState) -> Literal['generate_answer', 'rewrite_question']:
    question = state['messages'][0].content
    context = state['messages'][-1].content

    prompt = EVAL_PROMPT.format(context=context, question=question)

    response = eval_model.with_structured_output(EvaluateDocuments).invoke(
        [{'role': 'user', 'content': prompt}]
    )
    score = response.binary_score

    if score == 'yes':
        return 'generate_answer'
    else:
        return 'rewrite_question'

In [137]:
from langchain_core.messages import convert_to_messages

input = {
    "messages": convert_to_messages(
        [
            {
                "role": "user",
                "content": "What does the Bluesky documentation say about Personal Data Servers?",
            },
            {
                "role": "assistant",
                "content": "",
                "tool_calls": [
                    {
                        "id": "1",
                        "name": "retrieve_bsky_docs",
                        "args": {"query": "bluesky personal data server"},
                    }
                ],
            },
            {"role": "tool", "content": "sike!", "tool_call_id": "1"},
        ]
    )
}

print(input)
evaluate_documents(input)

{'messages': [HumanMessage(content='What does the Bluesky documentation say about Personal Data Servers?', additional_kwargs={}, response_metadata={}), AIMessage(content='', additional_kwargs={}, response_metadata={}, tool_calls=[{'name': 'retrieve_bsky_docs', 'args': {'query': 'bluesky personal data server'}, 'id': '1', 'type': 'tool_call'}]), ToolMessage(content='sike!', tool_call_id='1')]}


2025-08-11 00:43:44 - INFO - HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"


'rewrite_question'

Rewrite question

In [83]:
response_model = init_chat_model("openai:gpt-4.1", temperature=0)

REWRITE_PROMPT = (
    "Look at the input and try to reason about the underlying semantic intent / meaning.\n"
    "Here is the initial question:"
    "\n ------- \n"
    "{question}"
    "\n ------- \n"
    "Formulate an improved question:"
)


def rewrite_question(state: MessagesState):
    """Rewrite the original user question."""
    messages = state["messages"]
    question = messages[0].content
    prompt = REWRITE_PROMPT.format(question=question)
    response = response_model.invoke([{"role": "user", "content": prompt}])
    return {"messages": [{"role": "user", "content": response.content}]}

In [114]:
input = {
    "messages": convert_to_messages(
        [
            {
                "role": "user",
                "content": "What does the Bluesky documentation say about Personal Data Servers?",
            },
            {
                "role": "assistant",
                "content": "",
                "tool_calls": [
                    {
                        "id": "1",
                        "name": "retrieve_bsky_docs",
                        "args": {"query": "bluesky personal data server"},
                    }
                ],
            },
            {"role": "tool", "content": "sike!", "tool_call_id": "1"},
        ]
    )
}

response = rewrite_question(input)
# print(response)
print(response["messages"][-1]["content"])

What information does the Bluesky documentation provide regarding Personal Data Servers, including their purpose, functionality, and role within the Bluesky ecosystem?


In [116]:
print(input)
response = rewrite_question(input)
# input["messages"][0] = {"role": "user", "content": response} 
# print(type((input["messages"][0])))
print(type(input))

{'messages': [HumanMessage(content='What does the Bluesky documentation say about Personal Data Servers?', additional_kwargs={}, response_metadata={}), AIMessage(content='', additional_kwargs={}, response_metadata={}, tool_calls=[{'name': 'retrieve_bsky_docs', 'args': {'query': 'bluesky personal data server'}, 'id': '1', 'type': 'tool_call'}]), ToolMessage(content='sike!', tool_call_id='1')]}
<class 'dict'>


In [None]:
GENERATE_PROMPT = (
    "You are an assistant for question-answering tasks about Bluesky's moderation system. Use the following pieces of retrieved context to answer the question.\n"
    "If the question asks for label configuration, label definitions, or moderation settings, provide the appropriate configuration in the correct format (JSON, code snippets, or structured data as needed).\n"
    "Try your best to suggest label names, descriptions, and severity, based on the provided question and context.\n"
    "If the question asks for general information or explanations, use three sentences maximum and keep the answer concise.\n"
    "Question: {question}"
    "Context: {context}"
)

def generate_answer(state: MessagesState):
    """Generate an answer."""
    question = state["messages"][0].content
    context = state["messages"][-1].content
    prompt = GENERATE_PROMPT.format(question=question, context=context)
    response = response_model.invoke([{"role": "user", "content": prompt}])
    return {"messages": [response]}

** Notes on optimization testing: **

The below script tests RAG performance w/ a single query while modifying the following: 
* Chunk size & chunk overlap 
   (less context loss but more noise & memory use:)
    * big chunk, high overlap
    * big chunk, low overlap
   
    (more specific but more context loss)
    * small chunk, high overlap
    * small chunk, low overlap

We can repeat the above w/ different queries & diff models.

Our default LLM has been OpenAI's, but it would be helpful for us to find the optimal chunking for an Ollama model, as inputted data then remains private and responses are not uncensored.

Later exploration: diff chunking algorithms (what's the deal with late chunking?), different retrieval algorithms

In [None]:
## testing setup: ##
import logging
import sys

logging.basicConfig(format='%(asctime)s | %(levelname)s : %(message)s',
                     level=logging.INFO, stream=sys.stdout)

logging.info('Hello world!')

# sample input, copied from above w/ notes:
input_state = {
    "messages": convert_to_messages(
        [
            {
                # evaluate_documents expects question here:
                "role": "user",
                "content": "What does the Bluesky documentation say about Personal Data Servers?",
            },
            {
                "role": "assistant",
                "content": "",
                "tool_calls": [
                    {
                        "id": "1",
                        "name": "retrieve_bsky_docs",
                        "args": {"query": "bluesky personal data server"},
                    }
                ],
            },
            # evaluate_documents expects context (i.e retrieved docs) here
            # but why do we get a tool call instead?
            {"role": "tool", "content": "sike!", "tool_call_id": "1"},
        ]
    )
}


2025-08-11 00:33:50 - INFO - Hello world!


In [None]:
# (logic for this func is sound!)
def build_retriever(CHUNK_SIZE=100, CHUNK_OVERLAP=50, embedder="openai") -> VectorStoreRetriever:
    # fetching & cleaning documentation:
    urls = _get_src_urls()
    docs = [WebBaseLoader(url).load() for url in urls]

    docs[0][0].page_content.strip()[:1000]

    # chunking docs; storing docs as vector embeddings:
    docs_list = [item for sublist in docs for item in sublist]
    text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
        chunk_size=CHUNK_SIZE,
        chunk_overlap=CHUNK_OVERLAP
    )
    doc_splits = text_splitter.split_documents(docs_list)
    doc_splits[0].page_content.strip()

    vectorstore = InMemoryVectorStore.from_documents(
        documents=doc_splits,
        embedding=OpenAIEmbeddings()
    )
    retriever = vectorstore.as_retriever()
    return retriever

def retrieve_docs(retriever, query="label configuration") -> str:
    """Use a pre-defined retriever to build retrieval tool that queries database as prompted."""
    docs = retriever_tool.invoke({'query': query})
    logging.info(f"docs retrieved for query {query}: {pretty_print(docs)}")
    return docs

# TODO: make sure the docs retrieved by retriever are passed in through state.
def test_rag(state: "MessagesState"):
    """rewrite_question until evaluate_documents determines that retrieved docs are sufficiently relevant; then generate_answer."""
    # track all iterations of rewritten question & retrieved docs:
    iterations = {}

    while evaluate_documents(state) == 'rewrite_question':
        response = rewrite_question(state)
        iterations["rewritten question"] = "retrieved docs"

        # ^ TODO: Implement logic keeping track of (1) docs retrieved during this 
        # iteration & (2) rewritten question. Make sure new docs are retrieved & appended to state.

        # NOTE for Sybille:
        # rewrite_question outputs the new question as a dict, while evaluate_documents 
        # takes in a MessagesState object where item 0 is query to be evaluated, & item -1 
        # is content (i.e. retrieved docs.). 
        # in this while loop, should new question replace old question, or be appended onto 
        # state alongside another tool call to our retriever?
        # Unsure if the above should be done manually, w/ LangChain tools, or w/ an actual 
        # LangChain graph!

    # return metadata after successful answer generation:
    return {
        "state": generate_answer(state),
        "iterations": iterations}

In [158]:
def trial_run(state: "MessagesState"):
    output = []
    for chunk_size in [200, 500, 1000]:
        for overlap in [0, int(chunk_size*0.2), int(chunk_size*0.5)]:
            retriever = build_retriever()
            docs = retrieve_docs(retriever, "label configuration")
            test_output = test_rag(state)
            
            output = {
                 "chunk_size": chunk_size,
                 "chunk_overlap": overlap,
                 "rewrite_retrieve_iterations": test_output["iterations"],
                 "final_state": test_output["state"]
                 }
    return output

In [None]:
# (ignore - Ran is figuring out indexing)
# # Rewrite question until evaluate_documents outputs "generate answer"
# While evaluate_documents(input) == 'rewrite_question':
    logging.info("evaluate_documents called.")
    response = rewrite_question(input)
    input["messages"][0] = {"role": "user", "content": response} 
    input_iterations += response