In [1]:
%load_ext nb_black

<IPython.core.display.Javascript object>

In [2]:
import lmql
from enum import Enum
import inspect
import textwrap
import asyncio
import chromadb
from chromadb.utils import embedding_functions
import requests
import re
import os
import pandas as pd
from functools import lru_cache
from dataclasses import dataclass, field
from datetime import datetime
from typing import (
    Any,
    Union,
    ClassVar,
    Dict,
    Generator,
    List,
    Optional,
    Protocol,
    Tuple,
    Type,
    Optional,
    TypeVar,
    Callable,
    AsyncGenerator,
    TypedDict,
    Generic,
    Coroutine,
    Set,
    cast,
)
from getpass import getpass
from itertools import chain
from uuid import UUID, uuid4
from glob import glob
from pathlib import Path

DJ_URL = f"http://localhost:8000"

<IPython.core.display.Javascript object>

In [3]:
@lru_cache(1)
def get_chroma():
    return chromadb.Client()

<IPython.core.display.Javascript object>

In [4]:
@dataclass
class VectorStore:
    collection_name: str

    def __post_init__(self):
        ef = embedding_functions.SentenceTransformerEmbeddingFunction(
            model_name="all-MiniLM-L6-v2"
        )
        self.client = get_chroma()
        self.collection = self.client.get_or_create_collection(
            self.collection_name, embedding_function=ef
        )

<IPython.core.display.Javascript object>

In [5]:
OPENAI_API_KEY = "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"
os.environ["OPENAI_API_KEY"] = OPENAI_API_KEY

<IPython.core.display.Javascript object>

In [6]:
metrics_json = requests.get(
    f"{DJ_URL}/metrics",
).json()

metrics = pd.DataFrame(metrics_json)

<IPython.core.display.Javascript object>

In [7]:
dimensions = set(d.split(".")[0] for d in metrics.dimensions.sum())

<IPython.core.display.Javascript object>

In [8]:
dimensions = [
    requests.get(
        f"{DJ_URL}/nodes/{d}",
    ).json()
    for d in dimensions
]

<IPython.core.display.Javascript object>

In [9]:
dimensions = pd.DataFrame(dimensions)

<IPython.core.display.Javascript object>

In [10]:
dimensions_metrics = {}
for m, ds in zip(
    metrics.name,
    metrics.dimensions.apply(lambda l: {d.split(".")[0] for d in l}).tolist(),
):
    for d in ds:
        dimensions_metrics[d] = dimensions_metrics.get(d, [])
        dimensions_metrics[d].append(m)

<IPython.core.display.Javascript object>

In [11]:
dimensions["metrics"] = dimensions.name.map(dimensions_metrics)

<IPython.core.display.Javascript object>

In [12]:
metrics_vectorstore = VectorStore(collection_name="metrics")
dimensions_vectorstore = VectorStore(collection_name="dimensions")
knowledge_vectorstore = VectorStore(collection_name="knowledge")

Using embedded DuckDB without persistence: data will be transient


<IPython.core.display.Javascript object>

In [13]:
def window_document(
    file_name: str, document_text: str, window_size: int = 200, overlap: int = 50
):
    """
    Splits a document into overlapping windows of fixed size.

    Args:
        document (str): The document to split.
        window_size (int): The word size of each window.
        overlap (int): The amount of word overlap between adjacent windows.

    Returns:
        List[str]: A list of overlapping windows.
    """

    document = re.split(r"\s+", document_text)
    title = (
        re.split(r"[._-]+", file_name)
        + re.split(r"\s+", document_text.split("\n")[0])[:10]
    )
    windows = []
    start = 0
    end = window_size
    while end <= len(document):
        windows.append(" ".join((title if start != 0 else []) + document[start:end]))
        start += window_size - overlap
        end += window_size - overlap
    if end > len(document) and start < len(document):
        windows.append(" ".join(title + document[start:]))
    return windows

<IPython.core.display.Javascript object>

In [14]:
knowledge_files = glob("../examples/knowledge/*.txt")

<IPython.core.display.Javascript object>

In [15]:
knowledge_doc_texts = {}
for kd in knowledge_files:
    with open(kd) as f:
        knowledge_doc_texts[".".join(Path(kd).name.split(".")[:-1])] = f.read()

<IPython.core.display.Javascript object>

In [16]:
knowledge_docs = []
for kd, doc in knowledge_doc_texts.items():
    for idx, passage in enumerate(window_document(kd, doc)):
        knowledge_docs.append(
            {
                "ids": kd + f"_{idx}",
                "documents": passage,
                "metadatas": {"file": kd, "part": idx},
            }
        )
knowledge_docs = pd.DataFrame(knowledge_docs)

<IPython.core.display.Javascript object>

In [17]:
knowledge_vectorstore.collection.add(**knowledge_docs.to_dict(orient="list"))

<IPython.core.display.Javascript object>

In [18]:
metric_docs = pd.DataFrame(
    [
        {
            "ids": str(m.id),
            "documents": m.description,
            "metadatas": {
                "name": m["name"],
                "query": m.query,
                "dimensions": str(m.dimensions),
            },
        }
        for _, m in metrics.iterrows()
    ]
)

metrics_vectorstore.collection.add(**metric_docs.to_dict(orient="list"))

<IPython.core.display.Javascript object>

In [19]:
dimension_docs = pd.DataFrame(
    [
        {
            "ids": str(d.node_revision_id),
            "documents": d.description,
            "metadatas": {
                "name": d["name"],
                "query": d.query,
                "metrics": str(d.metrics),
            },
        }
        for _, d in dimensions.iterrows()
    ]
)

dimensions_vectorstore.collection.add(**metric_docs.to_dict(orient="list"))

<IPython.core.display.Javascript object>

In [20]:
SOURCE_PATCH = {}

try:
    getsourcelines
except NameError:
    getsourcelines = inspect.getsourcelines


def monkey_patch_getsourcelines(object):
    if object in SOURCE_PATCH:
        return SOURCE_PATCH[object].splitlines(keepends=True), 0
    return getsourcelines(object)


inspect.getsourcelines = monkey_patch_getsourcelines

<IPython.core.display.Javascript object>

In [21]:
T = TypeVar("T")


def required_value(message: str, return_type: Type[T]) -> Callable[[], T]:
    def raise_message() -> T:
        raise ValueError(message)

    return raise_message


class Stringable(Protocol):
    def __str__(self) -> str:
        pass


SchemaDict = Dict[str, Union[Type[str], Type[int], "SchemaDict"]]


@dataclass
class ToolSchema:
    """
    Final answer value produced from an agent
    """

    schema_dict: TypedDict
    _compiled: bool = field(init=False, default=False)
    _body: Optional[str] = field(init=False, default=None)
    _where: bool = field(init=False, default=False)

    @property
    def body(self):
        self._compile()
        return self._body

    @property
    def code(self):
        self._compile()
        return (
            self.body.replace('\\"[', "")
            .replace("]", "")
            .replace('\\"', '"')
            .strip()[1:-1]
        )

    @property
    def where(self):
        self._compile()
        return self._where

    def _compile(self):
        if self._compiled:
            return
        schema_dict = self.schema_dict.__annotations__
        if not schema_dict:
            self._body = ""
            self._where = ""
            return
        where = []
        code = []
        prefix = self.schema_dict.__name__ + "_"

        def _helper(schema, key, end=False):
            if schema == int:
                variable = (prefix + key).upper()
                where.append(f'INT({variable}) and STOPS_AT({variable}, ",")')
                return variable
            if schema == str:
                variable = (prefix + key).upper()
                where.append(f"""STOPS_AT({variable}, '"')""")
                return variable
            if not isinstance(schema, dict):
                raise Exception(f"Unnacceptable type in schema: `{schema}`")
            result = "{{"
            for idx, (key, value) in enumerate(schema.items()):
                if "[" in key or "]" in key:
                    raise Exception("schema keys cannot have `[` or `]`")
                variable = _helper(value, key=key, end=idx == len(schema))
                quote = '\\"' if value == str else ""
                result += f'\\"{key}\\": {quote}[{variable}], '
            result = result[:-2] + "}}"
            return result

        self._body = _helper(schema_dict, key="")
        self._where = " and ".join(where)


@dataclass
class Tool:
    default_description: ClassVar[str]
    default_ref_name: ClassVar[str]
    input_schema: ClassVar[ToolSchema]
    model_identifier: Optional[str] = None
    description_: Optional[str] = None
    ref_name_: Optional[str] = None
    max_uses_per_query: int = cast(int, float("inf"))

    @property
    def description(self):
        return self.description_ or self.default_description

    @property
    def ref_name(self):
        return self.ref_name_ or self.default_ref_name

    async def __call__(self, action: "Action") -> "Observation":
        raise NotImplementedError()


@dataclass
class Utterance:
    utterance_: Stringable
    timestamp: datetime = field(default_factory=datetime.utcnow)
    context: str = ""
    parent_: Optional["Utterance"] = None
    id: UUID = field(default_factory=uuid4)
    session_: Optional["Session"] = field(default=None, init=False)
    marker: ClassVar[str] = ""

    def __post_init__(self):
        self.session = self.parent_ and self.parent_.session

    @property
    def parent(self):
        return self.parent_

    @parent.setter
    def parent(self, parent: "Utterance"):
        if parent is not None:
            self.session = parent.session
            self.parent_ = parent

    def __str__(self):
        return self.marker + self.utterance

    def history(self, n: Optional[int] = None) -> Generator:
        n_ = n or float("inf")
        curr = self
        while n_ > 0 and (curr is not None):
            yield curr
            curr = curr.parent
            n_ -= 1

    @property
    def session(self):
        if self.session_ is not None:
            return self.session_
        if self.parent is not None:
            return self.parent.session
        return None

    @session.setter
    def session(self, session: "Session"):
        self.session_ = session

    @property
    def utterance(self):
        return str(self.utterance_)

<IPython.core.display.Javascript object>

In [22]:
@dataclass
class User(Utterance):
    """
    Utterance from a user
    """

    marker = "User: "


@dataclass
class Observation(Utterance):
    """
    Value produced from a tool
    """

    marker = "Observation: "
    tool: Tool = field(
        default_factory=required_value("`tool` is required for an Observation.", Tool)
    )


@dataclass
class Action(Utterance):
    """
    Value produced from a tool
    """

    utterance_: dict
    marker = "Action: "
    agent: "Agent" = field(
        default_factory=required_value(
            "`agent` is required for an Action.", lambda: Agent()
        )
    )


@dataclass
class Thought(Utterance):
    """
    Value produced from an agent
    """

    agent: "Agent" = field(
        default_factory=required_value(
            "`agent` is required for a Thought.", lambda: Agent()
        )
    )
    marker = "Thought: "


@dataclass
class Answer(Utterance):
    """
    Final answer value produced from an agent
    """

    agent: "Agent" = field(
        default_factory=required_value(
            "`agent` is required for a Answer.", lambda: Agent()
        )
    )
    marker = "Answer: "

<IPython.core.display.Javascript object>

In [23]:
class SessionStatus(Enum):
    DISCONNECTED = "DISCONNECTED"
    LIVE = "LIVE"
    TIMEOUT = "TIMEOUT"

<IPython.core.display.Javascript object>

In [24]:
@dataclass
class Session:
    agent: "Agent"  # sessions are with an agent
    agent_utterances: Set[
        Union[Type[Action], Type[Observation], Type[Thought], Type[Answer]]
    ] = field(
        default_factory=lambda: {Action, Observation, Thought, Answer}
    )  # this determines how verbose the agent will be
    session_id: UUID = field(default_factory=uuid4)
    status: SessionStatus = SessionStatus.LIVE
    utterance: Optional[str] = field(default=None, init=False)
    timestamp: datetime = field(default_factory=datetime.utcnow)
    timeout: int = 60 * 10
    sessions: ClassVar[Dict[UUID, "Session"]] = {}

    def __post_init__(self):
        Session.sessions[self.session_id] = self

    def check_quit(self, utterance: Utterance) -> bool:
        if utterance is None or utterance.utterance.strip() in ("", "quit", "exit"):
            self.status = SessionStatus.DISCONNECTED
            return True
        return False

    async def __call__(self):
        while True:
            # wait for user input
            user: User = yield
            
            # session is disconnected if a user utterance is none or empty
            if self.check_quit(user):
                return
            user.session = self
            user.parent = self.utterance
            self.utterance = user
            # agent gives all it's utterances in response to the user utterance
            async for response in agent(self.utterance):
                if self.check_quit(response):
                    return
                self.utterance = response
                # only send utterances we're asked to send
                if type(response) in self.agent_utterances:
                    yield response
            yield None


<IPython.core.display.Javascript object>

In [25]:
@dataclass
class VectorStoreMemory:
    utterance: Optional[Utterance] = None
    vector_store: Optional[VectorStore] = None
    default_k: int = 3


#     @property
#     def session_id(self) -> Optional[UUID]:
#         return self.utterance and self.utterance.session_id

#     async def add_memories(self, utterances: List[Utterance]):
#         for utterance in utterances:
#             if self.session_id is not None and utterance.session_id != self.session_id:
#                 raise Exception("utterances belong to the same session as this memory!")
#         if self.vector_store is None:
#             self.vector_store = VectorStore(str(self.session_id))
#         await self.vector_store.coll

#     async def search(self, query: str, k: Optional[int] = None):
#         k = k or self.default_k


@dataclass
class Agent:
    description: str
    ref_name: str
    query: Callable[["Agent", Any, ...], Coroutine[Any, Any, Utterance]]
    tools: List[Type[Tool]]
    model_identifier: str
    decoder: str = "argmax"
    memory: Optional[VectorStoreMemory] = None
    _run: Callable[[Any, ...], Coroutine[Any, Any, Utterance]] = field(
        default=None, init=False
    )
    queue: asyncio.Queue[Utterance] = field(default_factory=asyncio.Queue, init=False)

    def __post_init__(self):
        assert self.tools, "This agent requires some tools"

    async def __call__(self, session: Session) -> Utterance:
        raise NotImplementedError()

    async def asend(self, utterance: Utterance):
        await self.queue.put(utterance)

    async def run(self, *args):
        if self._run is None:
            self._run = self._compile_query(self.query)
        return await self._run(self, *args)

    def _compile_query(
        self, f: Callable[["Agent", Any, ...], Coroutine[Any, Any, Utterance]]
    ):
        sig = inspect.signature(f)
        assert (
            list(sig.parameters.values())[0].name == "agent"
        ), "First parameter to query must be `agent`"
        source = (
            "async def _f"
            + str(sig)
            + ":\n"
            + ("    '''" + f.__doc__.format(**self.__dict__) + "\n    '''")
        )
        #         print(source)
        exec(source)
        SOURCE_PATCH[locals().get("_f")] = source
        return lmql.query(locals().get("_f"))

    async def __call__(
        self, utterances: Set[Union[Type[Observation], Type[Thought], Type[Answer]]]
    ) -> AsyncGenerator[Optional[Utterance], Utterance]:
        raise NotImplementedError()

<IPython.core.display.Javascript object>

In [26]:
@dataclass
class KnowledgeSearchTool(Tool):
    default_description = "Search for knowledge documents."
    default_ref_name = "knowledge_search"
    input_schema = ToolSchema(TypedDict("KnowledgeQuery", {"query": str}))
    n_docs: int = 3
    threshold: float = 0.0

    async def __call__(self, action: Action) -> Observation:
        query = action.utterance_["query"]
        results = knowledge_vectorstore.collection.query(
            query_texts=query, n_results=self.n_docs
        )
        res = ""
        for meta, doc in zip(results["metadatas"], results["documents"]):
            res += f"{meta}: {doc}\n"
        return Observation(tool=self, utterance_=res)

<IPython.core.display.Javascript object>

In [33]:
async def standard_query(
    agent, convo: str, thought_filter: List[str], tool_filter: List[str]
):
    '''
    {decoder}
        """
        The following is a conversation between a User and an AI Agent.
        The Agent is talkative and provides lots of specific details from its context.
        The Agent has Thoughts, uses Tools by providing Tool Input, and ultimately provides Answers.
        If the Agent cannot answer a question using its tools, it truthfully says it does not know.
        The Agent uses thoughful reasoning like so:

        Thought: use tool
        Tool: agent selects appropriate tool
        Tool Input: thoroughly descriptive input for the tool to work
        ===
        Thought: final answer
        Answer: agent describes the answer
        ===
        Thought: no answer
        Answer: Agent explains why it could not find an answer

        {tools_prompt}

        Conversation:
        {{convo}}"""
        "Thought: [THOUGHT]\\n"
        thought = Thought(utterance_ = THOUGHT, agent = agent)
        await agent.asend(though)
        while True:
            if THOUGHT == 'use tool':
                "Tool: [TOOL]\\n"
                {tool_body}
                await agent.asend(observation)
            elif THOUGHT == 'final answer':
                "Answer: [ANSWER]\\n"
                await agent.asend(Answer(utterance_ = ANSWER, agent = agent, parent_ = thought))
                break
            else:
                await agent.asend(Answer(utterance_ = "I apologize, but I did not find an answer.", agent = agent, parent_ = thought))
                break
    from
        "{model_identifier}"
    where
        THOUGHT in [
            thought
            for thought in ["use tool", "final answer", "no answer"]
            if thought not in thought_filter
        ] and
        TOOL in [
            thought
            for thought in {tool_names}
            if thought not in tool_filter
        ] and
        STOPS_AT(THOUGHT, "\\n") and
        STOPS_AT(TOOL, "\\n") and
        {tool_conditions}
    '''

<IPython.core.display.Javascript object>

In [34]:
class StandardAgent(Agent):
    "A standard agent that can answer queries and solve tasks with tools."

    def __init__(
        self,
        *,
        description: str = "",
        ref_name: str = "standard",
        query=standard_query,
        history_length: int = 3,
        history_utterances: Set[Type[Utterance]] = {User, Answer},
        **kwargs,
    ):
        super().__init__(
            query=query,
            description=description or StandardAgent.__doc__,
            ref_name=ref_name,
            **kwargs,
        )
        self.history_length = history_length
        self.history_utterances = history_utterances
        self.tools_prompt = "Here are the tools you choose from:\n" + "\n".join(
            f"            {tool.ref_name}: {tool.description}" for tool in self.tools
        )
        self.tool_refs = {tool.ref_name: tool for tool in self.tools}
        tool_body = []
        for tool in self.tools:
            tool_body.append(f"if TOOL=='{tool.ref_name}':")
            tool_body.append(
                f'                    "Tool Input: {tool.input_schema.body}"'
            )
            tool_body.append(
                f"                    action = Action(utterance_ = {tool.input_schema.code}, agent = agent, parent_ = thought)"
            )
            tool_body.append(
                f"                    observation = await agent.tool_refs.get(TOOL)(action); observation.parent = action"
            )
        self.tool_body = "\n".join(tool_body)
        self.tool_conditions = " and\n".join(
            tool.input_schema.where for tool in self.tools
        )
        self.tool_names = list(self.tool_refs.keys())

    async def __call__(
        self, utterance: Utterance
    ) -> AsyncGenerator[Optional[Utterance], Optional[Utterance]]:
        while True:
            history = []
            for utterance in utterance.history():
                if type(utterance) in self.history_utterances:
                    history.append(utterance)
                if len(history) == self.history_length:
                    break

            history = history[::-1]
            convo = "\n".join(str(u) for u in history) + "\n"
            thought_filter = []
            tool_filter = []

            asyncio.create_task(self.run(convo, thought_filter, tool_filter))

            while response := await self.queue.get():
                if not isinstance(response, Utterance):
                    break
                response_origin = list(response.history())[-1]
                response_origin.parent = utterance
                yield response

<IPython.core.display.Javascript object>

In [35]:
tools = [KnowledgeSearchTool()]
agent = StandardAgent(model_identifier="openai/davinci", tools=tools)

<IPython.core.display.Javascript object>

In [36]:
session = Session(agent)

<IPython.core.display.Javascript object>

In [37]:
async def main():
    session_loop = session()
    # Start the generator
    await session_loop.asend(None)
    while True:
        # Read input from the command line
        user_input = await asyncio.to_thread(input, "Ask something: ")
        user_utterance = User(user_input)
        while response := await session_loop.asend(user_utterance):
            print(response)

<IPython.core.display.Javascript object>

In [38]:
await main()

Ask something: what is the capital of spain
Observation: [{'file': 'most-profitable-municipalities', 'part': 1}, {'file': 'most-profitable-municipalities', 'part': 0}, {'file': 'road-closure-guide-and-tips', 'part': 0}]: ['most profitable municipalities Most Profitable Municipalities  infrastructure, and we have been able to provide a range of repair and maintenance services to support this goal. Our staff has formed strong relationships with the local officials, and we are proud to be a part of the community. 3. County C: County C is a large municipality with a diverse population and a range of infrastructure needs. We have been working with the county to provide repair services for its road network, and this has proven to be a profitable venture for us. The county has also been supportive of our efforts to provide innovative solutions to some of its infrastructure challenges, and we look forward to continuing this partnership. Overall, we are proud of the work that we have done in th

<IPython.core.display.Javascript object>