-
Notifications
You must be signed in to change notification settings - Fork 13.3k
/
vector_db.py
74 lines (61 loc) 路 2.71 KB
/
vector_db.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
"""Question-answering with sources over a vector database."""
import warnings
from typing import Any, Dict, List
from langchain_core.callbacks import (
AsyncCallbackManagerForChainRun,
CallbackManagerForChainRun,
)
from langchain_core.documents import Document
from langchain_core.pydantic_v1 import Field, root_validator
from langchain_core.vectorstores import VectorStore
from langchain.chains.combine_documents.stuff import StuffDocumentsChain
from langchain.chains.qa_with_sources.base import BaseQAWithSourcesChain
class VectorDBQAWithSourcesChain(BaseQAWithSourcesChain):
"""Question-answering with sources over a vector database."""
vectorstore: VectorStore = Field(exclude=True)
"""Vector Database to connect to."""
k: int = 4
"""Number of results to return from store"""
reduce_k_below_max_tokens: bool = False
"""Reduce the number of results to return from store based on tokens limit"""
max_tokens_limit: int = 3375
"""Restrict the docs to return from store based on tokens,
enforced only for StuffDocumentChain and if reduce_k_below_max_tokens is to true"""
search_kwargs: Dict[str, Any] = Field(default_factory=dict)
"""Extra search args."""
def _reduce_tokens_below_limit(self, docs: List[Document]) -> List[Document]:
num_docs = len(docs)
if self.reduce_k_below_max_tokens and isinstance(
self.combine_documents_chain, StuffDocumentsChain
):
tokens = [
self.combine_documents_chain.llm_chain._get_num_tokens(doc.page_content)
for doc in docs
]
token_count = sum(tokens[:num_docs])
while token_count > self.max_tokens_limit:
num_docs -= 1
token_count -= tokens[num_docs]
return docs[:num_docs]
def _get_docs(
self, inputs: Dict[str, Any], *, run_manager: CallbackManagerForChainRun
) -> List[Document]:
question = inputs[self.question_key]
docs = self.vectorstore.similarity_search(
question, k=self.k, **self.search_kwargs
)
return self._reduce_tokens_below_limit(docs)
async def _aget_docs(
self, inputs: Dict[str, Any], *, run_manager: AsyncCallbackManagerForChainRun
) -> List[Document]:
raise NotImplementedError("VectorDBQAWithSourcesChain does not support async")
@root_validator()
def raise_deprecation(cls, values: Dict) -> Dict:
warnings.warn(
"`VectorDBQAWithSourcesChain` is deprecated - "
"please use `from langchain.chains import RetrievalQAWithSourcesChain`"
)
return values
@property
def _chain_type(self) -> str:
return "vector_db_qa_with_sources_chain"