# Pathway

>[Pathway](https://pathway.com/) is an open data processing framework. It allows you to easily develop data transformation pipelines and Machine Learning applications that work with live data sources and changing data.

In the notebook, we'll demo the `SelfQueryRetriever` wrapped around a `Pathway` vector store. 

## Creating a Pathway vector store
First we'll want to create a Pathway vector store and seed it with some data. We've created a small demo set of documents that contain summaries of movies.

**Note:** The self-query retriever requires you to have `lark` installed (`pip install lark`). We also need the `pathway` package.

In [None]:
#!pip install lark


In [None]:
#!pip install pathway


We want to use `OpenAIEmbeddings` so we have to get the OpenAI API Key.

In [1]:
import getpass
import os

os.environ["OPENAI_API_KEY"] = getpass.getpass("OpenAI API Key:")

In [None]:
import pathway as pw
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.text_splitter import CharacterTextSplitter
from langchain.vectorstores import PathwayVectorServer
import logging
logging.disable(logging.INFO)


# This creates a `pathway` connector that reads a single file.
data_sources = []
data_sources.append(
    pw.io.fs.read(
        "../../../../../templates/rag-pathway/sample_documents",
        format="binary",
        mode="streaming",
        with_metadata=True,
    )
)

# This creates a connector that tracks files in Google drive.
# please follow the instructions at https://pathway.com/developers/tutorials/connectors/gdrive-connector/ to get credentials
# data_sources.append(
#     pw.io.gdrive.read(object_id="17H4YpBOAKQzEJ93xmC2z170l0bP2npMy", service_user_credentials_file="credentials.json", with_metadata=True))

# Choose proper LangChain document transformers
text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
embeddings_model = OpenAIEmbeddings(openai_api_key=os.environ["OPENAI_API_KEY"])

# The PathwayVectorServer is a wrapper over pathway.xpacks.llm.vector_store to accept LangChain transforments
# Fell free to fork it to develop bespoke document processing pipelines.
vector_server = PathwayVectorServer(
    *data_sources,
    embedder=embeddings_model,
    splitter=text_splitter,
)
vector_server.run_server(host="127.0.0.1", port=8765, threaded=True, with_cache=False)

We now instantiate and configure the client

In [3]:
from langchain.vectorstores import PathwayVectorClient

client = PathwayVectorClient(
    host="127.0.0.1",
    port=8765,
)

## Creating our self-querying retriever

Now we can instantiate our retriever. To do this we'll need to provide some information upfront about the metadata fields that our documents support and a short description of the document contents. Pathway data sources come with some default metadata containing e.g. a modification date or an owner.

In [4]:
from langchain.chains.query_constructor.base import AttributeInfo
from langchain.chat_models import ChatOpenAI
from langchain.retrievers.self_query.base import SelfQueryRetriever

metadata_field_info = [
    AttributeInfo(
        name="created_at",
        description="The timestamp indicating when the source was created.",
        type="integer",
    ),
    AttributeInfo(
        name="modified_at",
        description="The timestamp indicating when the source was modified.",
        type="integer",
    ),
    AttributeInfo(
        name="owner",
        description="The name of the owner of the source file",
        type="string",
    ),
    AttributeInfo(
        name="path", description="Path of the source file", type="string"
    ),
]
document_content_description = "Information about Pathway"
llm = ChatOpenAI(temperature=0)
retriever = SelfQueryRetriever.from_llm(
    llm,
    client,
    document_content_description,
    metadata_field_info,
)

## Testing it out
And now we can try actually using our retriever!

In [5]:
# This example only specifies a relevant query
retriever.get_relevant_documents("What are the features of Pathway?")

[Document(page_content="Pathway is an open framework for high-throughput and low-latency real-time data processing. It is used to create Python code which seamlessly combines batch processing, streaming, and real-time API's for LLM apps. Pathway's distributed runtime (🦀-🐍) provides fresh results of your data pipelines whenever new inputs and requests are received.\n\nIn the first place, Pathway was designed to be a life-saver (or at least a time-saver) for Python developers and ML/AI engineers faced with live data sources, where you need to react quickly to fresh data. Still, Pathway is a powerful tool that can be used for a lot of things. If you want to do streaming in Python, build an AI data pipeline, or if you are looking for your next Python data processing framework, keep reading.", metadata={'created_at': 1702922696, 'modified_at': 1702922696, 'owner': None, 'path': '/Users/mlewandowski/langchain/templates/rag-pathway/sample_documents/repo_readme.md'}),
 Document(page_content="P

In [6]:
# This example specifies a query and a filter
retriever.get_relevant_documents("What are the features of Pathway? Tell me the answer using only the file sources containing what-you-get in the path.")

[Document(page_content="Pathway also relies on a powerful Rust engine to ensure high performance for your pipelines, no matter if you are dealing with batch or streaming data.\nPathway engine makes the utmost of Rust speed and memory safety to provide efficient parallel and distributed processing without being limited by Python's [GIL](https://en.wikipedia.org/w/index.php?title=Global_interpreter_lock&oldid=1144836295).\n\nPathway takes the best of both worlds and efficiently associates the convenience of Python with the power of Rust.\n\n\n## 2. Incremental computation\n\nPathway's engine incrementally processes data updates. This means that the minimum work needed by any algorithm or transformation is performed to refresh its results when fresh data arrives.\n\n## 3. An ML-friendly code life cycle\n\n### Writing your code", metadata={'created_at': 1702922696, 'modified_at': 1702922696, 'owner': None, 'path': '/Users/mlewandowski/langchain/templates/rag-pathway/sample_documents/what-y

In [7]:
# This example specifies a composite filter
retriever.get_relevant_documents(
    "What are the features of Pathway? Tell me the answer using only the file sources containing readme in the path and modified later than 10th December 2023."
)

[Document(page_content="Pathway is an open framework for high-throughput and low-latency real-time data processing. It is used to create Python code which seamlessly combines batch processing, streaming, and real-time API's for LLM apps. Pathway's distributed runtime (🦀-🐍) provides fresh results of your data pipelines whenever new inputs and requests are received.\n\nIn the first place, Pathway was designed to be a life-saver (or at least a time-saver) for Python developers and ML/AI engineers faced with live data sources, where you need to react quickly to fresh data. Still, Pathway is a powerful tool that can be used for a lot of things. If you want to do streaming in Python, build an AI data pipeline, or if you are looking for your next Python data processing framework, keep reading.", metadata={'created_at': 1702922696, 'modified_at': 1702922696, 'owner': None, 'path': '/Users/mlewandowski/langchain/templates/rag-pathway/sample_documents/repo_readme.md'}),
 Document(page_content="P

## Filter k

We can also use the self query retriever to specify `k`: the number of documents to fetch.

We can do this by passing `enable_limit=True` to the constructor.

In [8]:
retriever = SelfQueryRetriever.from_llm(
    llm,
    client,
    document_content_description,
    metadata_field_info,
    enable_limit=True,
    verbose=True,
)

In [9]:
# This example only specifies a relevant query
retriever.get_relevant_documents("What are the two features of Pathway?")

[Document(page_content="Pathway is an open framework for high-throughput and low-latency real-time data processing. It is used to create Python code which seamlessly combines batch processing, streaming, and real-time API's for LLM apps. Pathway's distributed runtime (🦀-🐍) provides fresh results of your data pipelines whenever new inputs and requests are received.\n\nIn the first place, Pathway was designed to be a life-saver (or at least a time-saver) for Python developers and ML/AI engineers faced with live data sources, where you need to react quickly to fresh data. Still, Pathway is a powerful tool that can be used for a lot of things. If you want to do streaming in Python, build an AI data pipeline, or if you are looking for your next Python data processing framework, keep reading.", metadata={'created_at': 1702922696, 'modified_at': 1702922696, 'owner': None, 'path': '/Users/mlewandowski/langchain/templates/rag-pathway/sample_documents/repo_readme.md'}),
 Document(page_content="P