Skip to content

Commit

Permalink
llm-app example updates and version bumps (#6471)
Browse files Browse the repository at this point in the history
GitOrigin-RevId: e291a674bebd9f3f7e16e75fe3ac39754ccd51e6
  • Loading branch information
berkecanrizai authored and Manul from Pathway committed May 17, 2024
1 parent dd038ce commit 7e6a329
Show file tree
Hide file tree
Showing 8 changed files with 103 additions and 352 deletions.
2 changes: 1 addition & 1 deletion examples/pipelines/demo-document-indexing/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ Alternatively, you can launch just the indexing pipeline as a single Docker cont

```bash
docker build -t vector_indexer .
docker run -v `pwd`/files-for-indexing:/app/files-for-indexing vector_indexer
docker run -v `pwd`/files-for-indexing:/app/files-for-indexing -p 8000:8000 vector_indexer
```

The volume overlay is important - without it, docker will not see changes to files under the `files-for-indexing` folder.
Expand Down
4 changes: 1 addition & 3 deletions examples/pipelines/demo-document-indexing/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
pathway==0.8.2
pathway[all]~=0.11.0
python-dotenv==1.0.1
litellm==1.17.3
unstructured[all-docs]==0.10.28
mpmath==1.3.0
28 changes: 2 additions & 26 deletions examples/pipelines/demo-question-answering/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ This example spawns a lightweight webserver that accepts queries on six possible
### LLM and RAG capabilities
- `/v1/pw_ai_answer` to ask questions about your documents, or directly talk with your LLM;
- `/v1/pw_ai_summary` to summarize a list of texts;
- `/v1/pw_ai_aggregate_responses` to make a summary of a question for different documents and answers;

See the [using the app section](###Using-the-app) to learn how to use the provided endpoints.

Expand Down Expand Up @@ -274,8 +273,7 @@ curl -X 'POST' \
-H 'Content-Type: application/json' \
-d '{
"query": "string",
"metadata_filter": "string",
"k": 0
"k": 2
}'
```

Expand Down Expand Up @@ -336,28 +334,6 @@ curl -X 'POST' \

Specifying the GPT model with `"model": "gpt-4"` is also possible.

#### Aggregating different responses

Aggregating is useful when you have a number of responses for different texts or files for a given question.
It organizes responses and creates an executive outlook.

An example curl query is as follows:

```bash
curl -X 'POST' \
'http://0.0.0.0:8000/v1/pw_ai_aggregate_responses' \
-H 'accept: */*' \
-H 'Content-Type: application/json' \
-d '{
"question": "Is there any action required from the marketing team?",
"answers": [
"File a.pdf - We need approval for social media campaign from marketing dept.",
"File b.pdf - There are no action points.",
"Budget approval is needed from head of marketing."
]
}'
```

This endpoint also supports setting different models in the query by default.

To execute similar curl queries as above, you can visit [ai-pipelines page](https://pathway.com/solutions/ai-pipelines/) and try out the queries from the Swagger UI.
Expand All @@ -367,4 +343,4 @@ To execute similar curl queries as above, you can visit [ai-pipelines page](http

First, you can try adding your files and seeing changes in the index. To test index updates, simply add more files to the `data` folder.

If you are using Google Drive or other sources, simply upload your files there.
If you are using Google Drive or other sources, simply upload your files there.
242 changes: 18 additions & 224 deletions examples/pipelines/demo-question-answering/app.py
Original file line number Diff line number Diff line change
@@ -1,85 +1,36 @@
import json
import logging
import sys
from enum import Enum

import click
import pathway as pw
import pathway.io.fs as io_fs
import pathway.io.gdrive as io_gdrive
import yaml
from dotenv import load_dotenv
from pathway.internals.udfs import DiskCache, ExponentialBackoffRetryStrategy
from pathway.xpacks.llm import embedders, llms, prompts
from pathway.xpacks.llm.parsers import ParseUnstructured
from pathway.xpacks.llm.splitters import TokenCountSplitter
from pathway.udfs import DiskCache, ExponentialBackoffRetryStrategy
from pathway.xpacks.llm import embedders, llms, parsers, splitters
from pathway.xpacks.llm.question_answering import BaseRAGQuestionAnswerer
from pathway.xpacks.llm.vector_store import VectorStoreServer

load_dotenv()


class AIResponseType(Enum):
SHORT = "short"
LONG = "long"


def _unwrap_udf(func):
if isinstance(func, pw.UDF):
return func.__wrapped__
return func


@pw.udf
def prep_rag_prompt(
prompt: str, docs: list[pw.Json], filter: str | None, response_type: str
) -> str:
if filter is None:
return prompt

docs = docs.value # type: ignore

try:
docs = [{"text": doc["text"], "path": doc["metadata"]["path"]} for doc in docs]

except Exception:
print("No context was found.")

if response_type == AIResponseType.SHORT.value:
prompt_func = _unwrap_udf(prompts.prompt_short_qa)
else:
prompt_func = _unwrap_udf(prompts.prompt_citing_qa)
return prompt_func(prompt, docs)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(name)s %(levelname)s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)


@pw.udf
def prompt_aggregate(question: str, answers: list[str]) -> str:
summary_data = "\n".join(answers)

summaries_str = json.dumps(summary_data, indent=2)

prompt = f"""Given a json with client names and responses
to the question: "{question}".
Categorize clients stance according to their policy and list them separately.
Use the question and answers to separate them with good logic according to question.
Use Markdown formatting starting with header level 2 (##).
Company Policies: ```{summaries_str}```
Answer:"""

return prompt
load_dotenv()


def data_sources(source_configs) -> list[pw.Table]:
sources = []
for source_config in source_configs:
if source_config["kind"] == "local":
source = io_fs.read(
source = pw.io.fs.read(
**source_config["config"],
format="binary",
with_metadata=True,
)
sources.append(source)
elif source_config["kind"] == "gdrive":
source = io_gdrive.read(
source = pw.io.gdrive.read(
**source_config["config"],
with_metadata=True,
)
Expand All @@ -100,148 +51,6 @@ def data_sources(source_configs) -> list[pw.Table]:
return sources


class PathwayRAG:
class PWAIQuerySchema(pw.Schema):
prompt: str
filters: str | None = pw.column_definition(default_value=None)
model: str | None = pw.column_definition(default_value="gpt-3.5-turbo")
response_type: str = pw.column_definition(default_value="short") # short | long

class SummarizeQuerySchema(pw.Schema):
text_list: list[str]
model: str | None = pw.column_definition(default_value="gpt-3.5-turbo")

class AggregateQuerySchema(pw.Schema):
question: str
answers: list[str]
model: str | None = pw.column_definition(default_value="gpt-3.5-turbo")

def __init__(
self,
*docs: pw.Table,
llm: pw.UDF,
embedder: pw.UDF,
splitter: pw.UDF,
parser: pw.UDF = ParseUnstructured(),
doc_post_processors=None,
) -> None:
self.llm = llm

self.embedder = embedder

self.vector_server = VectorStoreServer(
*docs,
embedder=embedder,
splitter=splitter,
parser=parser,
doc_post_processors=doc_post_processors,
)

@pw.table_transformer
def pw_ai_query(self, pw_ai_queries: pw.Table[PWAIQuerySchema]) -> pw.Table:
"""Main function for RAG applications that answer questions
based on available information."""

pw_ai_results = pw_ai_queries + self.vector_server.retrieve_query(
pw_ai_queries.select(
metadata_filter=pw.this.filters,
filepath_globpattern=pw.cast(str | None, None),
query=pw.this.prompt,
k=6,
)
).select(
docs=pw.this.result,
)

pw_ai_results += pw_ai_results.select(
rag_prompt=prep_rag_prompt(
pw.this.prompt, pw.this.docs, pw.this.filters, pw.this.response_type
)
)
pw_ai_results += pw_ai_results.select(
result=self.llm(
llms.prompt_chat_single_qa(pw.this.rag_prompt),
model=pw.this.model,
)
)
return pw_ai_results

@pw.table_transformer
def summarize_query(
self, summarize_queries: pw.Table[SummarizeQuerySchema]
) -> pw.Table:
summarize_results = summarize_queries.select(
pw.this.model,
prompt=prompts.prompt_summarize(pw.this.text_list),
)
summarize_results += summarize_results.select(
result=self.llm(
llms.prompt_chat_single_qa(pw.this.prompt),
model=pw.this.model,
)
)
return summarize_results

@pw.table_transformer
def aggregate_query(
self, aggregate_queries: pw.Table[AggregateQuerySchema]
) -> pw.Table:
aggregate_results = aggregate_queries.select(
pw.this.model,
prompt=prompt_aggregate(pw.this.question, pw.this.answers),
)
aggregate_results += aggregate_results.select(
result=self.llm(
llms.prompt_chat_single_qa(pw.this.prompt),
model=pw.this.model,
)
)
return aggregate_results

def build_server(self, host: str, port: int) -> None:
"""Adds HTTP connectors to input tables"""

webserver = pw.io.http.PathwayWebserver(host=host, port=port)

# connect http endpoint to output writer
def serve(route, schema, handler):
queries, writer = pw.io.http.rest_connector(
webserver=webserver,
route=route,
schema=schema,
autocommit_duration_ms=50,
delete_completed_queries=True,
)
writer(handler(queries))

serve(
"/v1/retrieve",
self.vector_server.RetrieveQuerySchema,
self.vector_server.retrieve_query,
)
serve(
"/v1/statistics",
self.vector_server.StatisticsQuerySchema,
self.vector_server.statistics_query,
)
serve(
"/v1/pw_list_documents",
self.vector_server.InputsQuerySchema,
self.vector_server.inputs_query,
)
serve("/v1/pw_ai_answer", self.PWAIQuerySchema, self.pw_ai_query)
serve(
"/v1/pw_ai_summary",
self.SummarizeQuerySchema,
self.summarize_query,
)
serve(
"/v1/pw_ai_aggregate_responses",
self.AggregateQuerySchema,
self.aggregate_query,
)


@click.command()
@click.option("--config_file", default="config.yaml", help="Config file to be used.")
def run(
Expand All @@ -257,8 +66,6 @@ def run(
cache_strategy=DiskCache(),
)

text_splitter = TokenCountSplitter(max_tokens=400)

chat = llms.OpenAIChat(
model=GPT_MODEL,
retry_strategy=ExponentialBackoffRetryStrategy(max_retries=6),
Expand All @@ -269,31 +76,18 @@ def run(
host_config = configuration["host_config"]
host, port = host_config["host"], host_config["port"]

rag_app = PathwayRAG(
doc_store = VectorStoreServer(
*data_sources(configuration["sources"]),
embedder=embedder,
llm=chat,
splitter=text_splitter,
splitter=splitters.TokenCountSplitter(max_tokens=400),
parser=parsers.ParseUnstructured(),
)

rag_app.build_server(host=host, port=port)
rag_app = BaseRAGQuestionAnswerer(llm=chat, indexer=doc_store)

if configuration["cache_options"].get("with_cache", True):
print("Running with cache enabled.")
cache_backend = pw.persistence.Backend.filesystem(
configuration["cache_options"].get("cache_folder", "./Cache")
)
persistence_config = pw.persistence.Config.simple_config(
cache_backend,
persistence_mode=pw.PersistenceMode.UDF_CACHING,
)
else:
persistence_config = None
rag_app.build_server(host=host, port=port)

pw.run(
monitoring_level=pw.MonitoringLevel.NONE,
persistence_config=persistence_config,
)
rag_app.run_server(with_cache=True, terminate_on_error=False)


if __name__ == "__main__":
Expand Down
4 changes: 1 addition & 3 deletions examples/pipelines/demo-question-answering/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
pathway==0.8.4
pathway[all]>=0.11.0
python-dotenv==1.0.1
litellm==1.17.3
unstructured[all-docs]==0.10.28
mpmath==1.3.0
Loading

0 comments on commit 7e6a329

Please sign in to comment.