In [1]:
from chain import chain, vector_search_as_retriever, model
import asyncio
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableParallel, RunnablePassthrough, RunnableLambda
from langchain_core.messages import AIMessage, HumanMessage
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain.memory import ChatMessageHistory

In [2]:
input_example = {
    "messages": [
        {
            "role": "user",
            "content": "What is DLT?",
        },
    ]
}

In [3]:
async for chunk in chain.astream(input_example):
    print(chunk, end="", flush=True)

Based on the context provided, DLT refers to Delta Live Tables, which is a platform for building and deploying data pipelines in Databricks. It provides a Python API for defining data pipelines, and the functions for this API are defined in the `dlt` module. Therefore, to use Delta Live Tables in your Python notebooks or files, you need to import the `dlt` module as follows:

```python
import dlt
```

Delta Live Tables focuses on the transformation step of data pipelines, using a "transform after load" architecture. It assumes that you already have a copy of your data in your database, and it provides features such as version control, documentation, and modularity. Delta Live Tables can be used with dbt Cloud, which is a development environment for data analysts and data engineers to transform data by writing select statements. dbt Cloud comes equipped with turnkey support for scheduling jobs, CI/CD, serving documentation, monitoring and alerting, and an integrated development environm

In [71]:
from operator import itemgetter


human_template = """Answer the question based only on the following context:
{context}

Question: {question}
"""

prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            "You are a helpful assistant. Answer all questions to the best of your ability.",
        ),
        MessagesPlaceholder(variable_name="history"),
        ("human", human_template),
    ]
)

output_parser = StrOutputParser()

# def retrieve_preprocess(input: dict):
#     return input["input"]

# VS Index is getting a dict when it should be getting a str. Need to figure out how to preprocess this dict with a RunnableLambda
setup_and_retrieval = RunnableParallel(
    {
        "context": itemgetter("input")
        # | RunnableLambda(retrieve_preprocess)
        | vector_search_as_retriever,
        "question": RunnablePassthrough(),
        "history": itemgetter("history")
    }
)
chain = setup_and_retrieval | prompt | model | output_parser

In [72]:
chain.get_graph().print_ascii()

                                    +-----------------------------------------+                              
                                    | Parallel<context,question,history>Input |                              
                                    +-----------------------------------------+                              
                                 *******                  *                *******                           
                          *******                          *                      *******                    
                      ****                                 *                             *******             
+-----------------------------+                            *                                    ****         
| Lambda(itemgetter('input')) |                            *                                       *         
+-----------------------------+                            *                                       *         
          

In [35]:
from langchain_core.runnables.history import RunnableWithMessageHistory

# demo_ephemeral_chat_history_for_chain = ChatMessageHistory()

# chain_with_message_history = RunnableWithMessageHistory(
#     chain,
#     lambda session_id: demo_ephemeral_chat_history_for_chain,
#     input_messages_key="input",
#     history_messages_key="chat_history",
# )

In [52]:
from langchain_community.chat_message_histories import ChatMessageHistory
from langchain_core.chat_history import BaseChatMessageHistory
from langchain_core.runnables.history import RunnableWithMessageHistory

store = {}


def get_session_history(session_id: str) -> BaseChatMessageHistory:
    if session_id not in store:
        store[session_id] = ChatMessageHistory()
    return store[session_id]


with_message_history = RunnableWithMessageHistory(
    chain,
    get_session_history,
    input_messages_key="input",
    history_messages_key="history",
)

In [53]:
with_message_history.get_graph().print_ascii()

                                             +------------------------+                                               
                                             | Parallel<history>Input |                                               
                                             +------------------------+                                               
                                                ***               ***                                                 
                                             ***                     ***                                              
                                           **                           **                                            
                           +------------------------+               +-------------+                                   
                           | Lambda(_enter_history) |               | Passthrough |                                   
                           +--------------------

In [73]:
with_message_history.ainvoke(
    {"input": "what is dlt"},
    {"configurable": {"session_id": "unused"}},
)

<coroutine object RunnableBindingBase.ainvoke at 0x1610c4940>

In [74]:
async for chunk in with_message_history.astream({"input": "what is dlt"},
    {"configurable": {"session_id": "unused"}}):
    print(chunk, end="", flush=True)



Based on the provided context, dlt is the Python module for Delta Live Tables. Delta Live Tables is a service for building and deploying reliable data pipelines on the Databricks platform. The dlt module provides Python APIs for defining and executing Delta Live Tables pipelines.

In [65]:
def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)

rag_chain_from_docs = (
    RunnablePassthrough.assign(context=(lambda x: format_docs(x["context"])))
    | prompt
    | model
    | StrOutputParser()
)

rag_chain_with_source = RunnableParallel(
    {"context": vector_search_as_retriever, "question": RunnablePassthrough()}
).assign(answer=with_message_history)

In [66]:
rag_chain_with_source.get_graph().print_ascii()

                                                                 +---------------------------------+                                                       
                                                                 | Parallel<context,question>Input |                                                       
                                                                 +---------------------------------+                                                       
                                                                         ***              ***                                                              
                                                                      ***                    ***                                                           
                                                                    **                          **                                                         
                                                     +----------

In [64]:
rag_chain_with_source.invoke(
    {"input": "what is dlt"},
    {"configurable": {"session_id": "unused"}},
)



Exception: Response content b'{"error_code":"MALFORMED_REQUEST","message":"Could not parse request object: Expected Scalar value for String field \'query_text\'\\n at [Source: (ByteArrayInputStream); line: 1, column: 133]\\n at [Source: java.io.ByteArrayInputStream@d32b042; line: 1, column: 133]"}', status_code 400

ERROR:asyncio:Task exception was never retrieved
future: <Task finished name='Task-1053' coro=<py_anext.<locals>.anext_impl() done, defined at /Users/taylor.isbell/.pyenv/versions/3.11.7/envs/lakehouse_app/lib/python3.11/site-packages/langchain_core/utils/aiter.py:55> exception=KeyError('context')>
Traceback (most recent call last):
  File "/Users/taylor.isbell/.pyenv/versions/3.11.7/envs/lakehouse_app/lib/python3.11/site-packages/langchain_core/utils/aiter.py", line 62, in anext_impl
    return await __anext__(iterator)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/taylor.isbell/.pyenv/versions/3.11.7/envs/lakehouse_app/lib/python3.11/site-packages/langchain_core/runnables/base.py", line 3313, in atransform
    async for chunk in self._atransform_stream_with_config(
  File "/Users/taylor.isbell/.pyenv/versions/3.11.7/envs/lakehouse_app/lib/python3.11/site-packages/langchain_core/runnables/base.py", line 1967, in _atransform_stream_with_config
    chunk: Output = await asyncio.cr

ImportError: Could not import wikipedia python package. Please install it with `pip install wikipedia`.