In [1]:
import os
import logging
import sys

logging.basicConfig(stream=sys.stdout, level=logging.ERROR)
logging.getLogger().addHandler(logging.StreamHandler(stream=sys.stdout))

In [3]:
!mkdir -p 'data/'
!wget 'https://gist.githubusercontent.com/janchorowski/dd22a293f3d99d1b726eedc7d46d2fc0/raw/pathway_readme.md' -O 'data/pathway_readme.md'

--2024-01-02 11:21:08--  https://gist.githubusercontent.com/janchorowski/dd22a293f3d99d1b726eedc7d46d2fc0/raw/pathway_readme.md
正在解析主机 gist.githubusercontent.com (gist.githubusercontent.com)... 198.18.7.170
正在连接 gist.githubusercontent.com (gist.githubusercontent.com)|198.18.7.170|:443... 已连接。
已发出 HTTP 请求，正在等待回应... 200 OK
长度：8645 (8.4K) [text/plain]
正在保存至: “data/pathway_readme.md”


2024-01-02 11:21:09 (14.7 MB/s) - 已保存 “data/pathway_readme.md” [8645/8645])



In [2]:
import pathway as pw

data_sources = []
data_sources.append(
    pw.io.fs.read(
        "./data",
        format="binary",
        mode="streaming",
        with_metadata=True,
    )  # This creates a `pathway` connector that tracks
    # all the files in the ./data directory
)

# This creates a connector that tracks files in Google drive.
# please follow the instructions at https://pathway.com/developers/tutorials/connectors/gdrive-connector/ to get credentials
# data_sources.append(
#     pw.io.gdrive.read(object_id="17H4YpBOAKQzEJ93xmC2z170l0bP2npMy", service_user_credentials_file="credentials.json", with_metadata=True))

In [3]:
from llama_index.retrievers import PathwayVectorServer
from llama_index.embeddings import OpenAIEmbedding
from llama_index.node_parser import TokenTextSplitter

embed_model = OpenAIEmbedding(embed_batch_size=10)

transformations_example = [
    TokenTextSplitter(
        chunk_size=150,
        chunk_overlap=10,
        separator=" ",
    ),
    embed_model,
]

processing_pipeline = PathwayVectorServer(
    *data_sources,
    transformations=transformations_example,
)

# Define the Host and port that Pathway will be on
PATHWAY_HOST = "127.0.0.1"
PATHWAY_PORT = 8754

# `threadedus` runs pathway in detached mode, we have to set it to False when running from terminal or container
# for more information on `with_cache` check out https://pathway.com/developers/api-docs/persistence-api
processing_pipeline.run_server(
    host=PATHWAY_HOST, port=PATHWAY_PORT, with_cache=False, threaded=True
)

<Thread(Thread-4 (run), started 11062505472)>

(Press CTRL+C to quit)
148


In [4]:
from llama_index.retrievers import PathwayRetriever

retriever = PathwayRetriever(host=PATHWAY_HOST, port=PATHWAY_PORT)
retriever.retrieve(str_or_query_bundle="what is pathway")

[NodeWithScore(node=TextNode(id_='2230ec7c-7f58-456f-9ee4-61800037d93b', embedding=None, metadata={'created_at': 1704165668, 'modified_at': 1704167994, 'owner': None, 'path': '/Users/lifcc/Desktop/code/C++/learn/StarLight/MachineLearning/LlamaIndex/pipeline/data/pathway_readme.md'}, excluded_embed_metadata_keys=[], excluded_llm_metadata_keys=[], relationships={}, hash='36fc0f59beeb40fc900b30aac1ca9ecc782f5499e9f64ed2ad1d6eace906ed27', text="Pathway is an open framework for high-throughput and low-latency real-time data processing. It is used to create Python code which seamlessly combines batch processing, streaming, and real-time API's for LLM apps. Pathway's distributed runtime (🦀-🐍) provides fresh results of your data pipelines whenever new inputs and requests are received.\n\nIn the first place, Pathway was designed to be a life-saver (or at least a time-saver) for Python developers and ML/AI engineers faced with live data sources, where you need to react quickly to fresh data. Sti

In [5]:
from llama_index.query_engine import RetrieverQueryEngine

query_engine = RetrieverQueryEngine.from_args(
    retriever,
)

In [6]:
response = query_engine.query("Tell me about Pathway")
print(str(response))

Pathway is an open framework designed for high-throughput and low-latency real-time data processing. It enables the creation of Python code that seamlessly integrates batch processing, streaming, and real-time APIs for LLM apps. The distributed runtime of Pathway, represented by the symbols 🦀-🐍, ensures the delivery of fresh results in data pipelines whenever new inputs and requests are received. Originally intended to be a time-saving tool for Python developers and ML/AI engineers dealing with live data sources, Pathway has evolved into a versatile and powerful tool applicable to various tasks, particularly in the realm of streaming in Python.
