diff --git a/llama-index-integrations/indices/llama-index-indices-managed-dashscope/.gitignore b/llama-index-integrations/indices/llama-index-indices-managed-dashscope/.gitignore new file mode 100644 index 0000000000000..8ead961e42aed --- /dev/null +++ b/llama-index-integrations/indices/llama-index-indices-managed-dashscope/.gitignore @@ -0,0 +1,156 @@ +llama_index/_static +.DS_Store +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +bin/ +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +etc/ +include/ +lib/ +lib64/ +parts/ +sdist/ +share/ +var/ +wheels/ +pip-wheel-metadata/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +.ruff_cache + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints +notebooks/ + +# IPython +profile_default/ +ipython_config.py + +# pyenv +.python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ +pyvenv.cfg + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# Jetbrains +.idea +modules/ +*.swp + +# VsCode +.vscode + +# pipenv +Pipfile +Pipfile.lock + +# pyright +pyrightconfig.json + +# local test file +tests/test_local.py diff --git a/llama-index-integrations/indices/llama-index-indices-managed-dashscope/BUILD b/llama-index-integrations/indices/llama-index-indices-managed-dashscope/BUILD new file mode 100644 index 0000000000000..0896ca890d8bf --- /dev/null +++ b/llama-index-integrations/indices/llama-index-indices-managed-dashscope/BUILD @@ -0,0 +1,3 @@ +poetry_requirements( + name="poetry", +) diff --git a/llama-index-integrations/indices/llama-index-indices-managed-dashscope/Makefile b/llama-index-integrations/indices/llama-index-indices-managed-dashscope/Makefile new file mode 100644 index 0000000000000..b9eab05aa3706 --- /dev/null +++ b/llama-index-integrations/indices/llama-index-indices-managed-dashscope/Makefile @@ -0,0 +1,17 @@ +GIT_ROOT ?= $(shell git rev-parse --show-toplevel) + +help: ## Show all Makefile targets. + @grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[33m%-30s\033[0m %s\n", $$1, $$2}' + +format: ## Run code autoformatters (black). + pre-commit install + git ls-files | xargs pre-commit run black --files + +lint: ## Run linters: pre-commit (black, ruff, codespell) and mypy + pre-commit install && git ls-files | xargs pre-commit run --show-diff-on-failure --files + +test: ## Run tests via pytest. + pytest tests + +watch-docs: ## Build and watch documentation. + sphinx-autobuild docs/ docs/_build/html --open-browser --watch $(GIT_ROOT)/llama_index/ diff --git a/llama-index-integrations/indices/llama-index-indices-managed-dashscope/README.md b/llama-index-integrations/indices/llama-index-indices-managed-dashscope/README.md new file mode 100644 index 0000000000000..e2e9e3cd50231 --- /dev/null +++ b/llama-index-integrations/indices/llama-index-indices-managed-dashscope/README.md @@ -0,0 +1,46 @@ +# LlamaIndex Indices Integration: Managed-Dashscope + +## Installation + +```shell +pip install llama-index-indices-managed-dashscope +``` + +## Usage + +```python +import os +from llama_index.core.schema import QueryBundle +from llama_index.readers.dashscope.base import DashScopeParse +from llama_index.readers.dashscope.utils import ResultType + +os.environ["DASHSCOPE_API_KEY"] = "your_api_key_here" +os.environ["DASHSCOPE_WORKSPACE_ID"] = "your_workspace_here" + +# init retriever from scratch +from llama_index.indices.managed.dashscope.retriever import ( + DashScopeCloudRetriever, +) + + +file_list = [ + # your files (accept doc, docx, pdf) +] + +parse = DashScopeParse(result_type=ResultType.DASHCOPE_DOCMIND) +documents = parse.load_data(file_path=file_list) + +# create a new index +index = DashScopeCloudIndex.from_documents( + documents, + "my_first_index", + verbose=True, +) + +# # connect to an existing index +# index = DashScopeCloudIndex("my_first_index") + +retriever = index.as_retriever() +nodes = retriever.retrieve("test query") +print(nodes) +``` diff --git a/llama-index-integrations/indices/llama-index-indices-managed-dashscope/llama_index/indices/managed/dashscope/BUILD b/llama-index-integrations/indices/llama-index-indices-managed-dashscope/llama_index/indices/managed/dashscope/BUILD new file mode 100644 index 0000000000000..db46e8d6c978c --- /dev/null +++ b/llama-index-integrations/indices/llama-index-indices-managed-dashscope/llama_index/indices/managed/dashscope/BUILD @@ -0,0 +1 @@ +python_sources() diff --git a/llama-index-integrations/indices/llama-index-indices-managed-dashscope/llama_index/indices/managed/dashscope/__init__.py b/llama-index-integrations/indices/llama-index-indices-managed-dashscope/llama_index/indices/managed/dashscope/__init__.py new file mode 100644 index 0000000000000..7ade8ae4c86b7 --- /dev/null +++ b/llama-index-integrations/indices/llama-index-indices-managed-dashscope/llama_index/indices/managed/dashscope/__init__.py @@ -0,0 +1,5 @@ +from llama_index.indices.managed.dashscope.base import DashScopeCloudIndex +from llama_index.indices.managed.dashscope.retriever import DashScopeCloudRetriever + + +__all__ = ["DashScopeCloudIndex", "DashScopeCloudRetriever"] diff --git a/llama-index-integrations/indices/llama-index-indices-managed-dashscope/llama_index/indices/managed/dashscope/api_utils.py b/llama-index-integrations/indices/llama-index-indices-managed-dashscope/llama_index/indices/managed/dashscope/api_utils.py new file mode 100644 index 0000000000000..635a57a46ca3b --- /dev/null +++ b/llama-index-integrations/indices/llama-index-indices-managed-dashscope/llama_index/indices/managed/dashscope/api_utils.py @@ -0,0 +1,122 @@ +import json +from typing import List, Optional + +from llama_index.indices.managed.dashscope.transformations import ( + DashScopeConfiguredTransformation, +) +from llama_index.core.schema import BaseNode, TransformComponent + + +def default_transformations() -> List[TransformComponent]: + """Default transformations.""" + from llama_index.node_parser.dashscope import DashScopeJsonNodeParser + from llama_index.embeddings.dashscope import ( + DashScopeEmbedding, + DashScopeTextEmbeddingModels, + DashScopeTextEmbeddingType, + ) + + node_parser = DashScopeJsonNodeParser() + document_embedder = DashScopeEmbedding( + model_name=DashScopeTextEmbeddingModels.TEXT_EMBEDDING_V2, + text_type=DashScopeTextEmbeddingType.TEXT_TYPE_DOCUMENT, + ) + return [ + node_parser, + document_embedder, + ] + + +def get_pipeline_create( + name: str, + transformations: Optional[List[TransformComponent]] = None, + documents: Optional[List[BaseNode]] = None, +) -> dict: + configured_transformations: List[DashScopeConfiguredTransformation] = [] + for transformation in transformations: + try: + configured_transformations.append( + DashScopeConfiguredTransformation.from_component(transformation) + ) + except ValueError: + raise ValueError(f"Unsupported transformation: {type(transformation)}") + + configured_transformation_items: List[Dict] = [] + for item in configured_transformations: + configured_transformation_items.append( + { + "component": json.loads(item.component.json()), + "configurable_transformation_type": item.configurable_transformation_type.name, + } + ) + data_sources = [ + { + "source_type": "DATA_CENTER_FILE", + "component": { + "doc_ids": [doc.node_id for doc in documents], + }, + } + ] + return { + "name": name, + "pipeline_type": "MANAGED_SHARED", + "configured_transformations": configured_transformation_items, + "data_sources": data_sources, + "data_sinks": [ + { + "sink_type": "ES", + } + ], + # for debug + "data_type": "structured", + "config_model": "recommend", + } + + +def get_doc_insert( + transformations: Optional[List[TransformComponent]] = None, + documents: Optional[List[BaseNode]] = None, +) -> dict: + configured_transformations: List[DashScopeConfiguredTransformation] = [] + for transformation in transformations: + try: + configured_transformations.append( + DashScopeConfiguredTransformation.from_component(transformation) + ) + except ValueError: + raise ValueError(f"Unsupported transformation: {type(transformation)}") + + configured_transformation_items: List[Dict] = [] + for item in configured_transformations: + configured_transformation_items.append( + { + "component": json.loads(item.component.json()), + "configurable_transformation_type": item.configurable_transformation_type.name, + } + ) + data_sources = [ + { + "source_type": "DATA_CENTER_FILE", + "component": { + "doc_ids": [doc.node_id for doc in documents], + }, + } + ] + return { + "configured_transformations": configured_transformation_items, + "data_sources": data_sources, + } + + +def get_doc_delete(ref_doc_ids: List[str]) -> dict: + data_sources = [ + { + "source_type": "DATA_CENTER_FILE", + "component": { + "doc_ids": ref_doc_ids, + }, + } + ] + return { + "data_sources": data_sources, + } diff --git a/llama-index-integrations/indices/llama-index-indices-managed-dashscope/llama_index/indices/managed/dashscope/base.py b/llama-index-integrations/indices/llama-index-indices-managed-dashscope/llama_index/indices/managed/dashscope/base.py new file mode 100644 index 0000000000000..09a2cca2ba38d --- /dev/null +++ b/llama-index-integrations/indices/llama-index-indices-managed-dashscope/llama_index/indices/managed/dashscope/base.py @@ -0,0 +1,280 @@ +"""Managed index. + +A managed Index - where the index is accessible via some API that +interfaces a managed service. + +""" +import os +from typing import Any, List, Optional, Type, Union +from enum import Enum +import requests +import json + +from llama_index.core.base.base_query_engine import BaseQueryEngine +from llama_index.core.base.base_retriever import BaseRetriever +from llama_index.core.callbacks.base import CallbackManager +from llama_index.core.indices.managed.base import BaseManagedIndex +from llama_index.core.schema import BaseNode, Document, TransformComponent +from llama_index.core.settings import Settings + +from llama_index.indices.managed.dashscope.api_utils import ( + get_pipeline_create, + default_transformations, + get_doc_insert, + get_doc_delete, +) +from llama_index.indices.managed.dashscope.utils import ( + run_ingestion, + get_pipeline_id, +) +from llama_index.indices.managed.dashscope.constants import ( + DASHSCOPE_DEFAULT_BASE_URL, + UPSERT_PIPELINE_ENDPOINT, + START_PIPELINE_ENDPOINT, + CHECK_INGESTION_ENDPOINT, + PIPELINE_SIMPLE_ENDPOINT, + INSERT_DOC_ENDPOINT, + DELETE_DOC_ENDPOINT, +) + + +class Status(Enum): + ERROR = "ERROR" + SUCCESS = "Success" + PENDING = "PENDING" + RUNNING = "RUNNING" + CANCELED = "CANCELED" + FAILED = "FAILED" + FINISHED = "FINISHED" + + +class DashScopeCloudIndex(BaseManagedIndex): + """DashScope Cloud Platform Index.""" + + def __init__( + self, + name: str, + nodes: Optional[List[BaseNode]] = None, + transformations: Optional[List[TransformComponent]] = None, + timeout: int = 60, + workspace_id: Optional[str] = None, + api_key: Optional[str] = None, + base_url: Optional[str] = DASHSCOPE_DEFAULT_BASE_URL, + show_progress: bool = False, + callback_manager: Optional[CallbackManager] = None, + **kwargs: Any, + ) -> None: + """Initialize the Platform Index.""" + self.name = name + self.transformations = transformations or [] + + if nodes is not None: + raise ValueError( + "DashScopeCloudIndex does not support nodes on initialization" + ) + + self.workspace_id = workspace_id or os.environ.get("DASHSCOPE_WORKSPACE_ID") + self._api_key = api_key or os.environ.get("DASHSCOPE_API_KEY") + self._base_url = os.environ.get("DASHSCOPE_BASE_URL", None) or base_url + self._headers = { + "Content-Type": "application/json", + "Accept-Encoding": "utf-8", + "X-DashScope-WorkSpace": self.workspace_id, + "Authorization": "Bearer " + self._api_key, + "X-DashScope-OpenAPISource": "CloudSDK", + } + self._timeout = timeout + self._show_progress = show_progress + self._service_context = None + self._callback_manager = callback_manager or Settings.callback_manager + + @classmethod + def from_documents( # type: ignore + cls: Type["DashScopeCloudIndex"], + documents: List[Document], + name: str, + transformations: Optional[List[TransformComponent]] = None, + workspace_id: Optional[str] = None, + api_key: Optional[str] = None, + base_url: Optional[str] = None, + timeout: int = 60, + verbose: bool = True, + **kwargs: Any, + ) -> "DashScopeCloudIndex": + """Build a DashScope index from a sequence of documents.""" + pipeline_create = get_pipeline_create( + name, transformations or default_transformations(), documents + ) + + workspace_id = workspace_id or os.environ.get("DASHSCOPE_WORKSPACE_ID") + api_key = api_key or os.environ.get("DASHSCOPE_API_KEY") + base_url = ( + base_url + or os.environ.get("DASHSCOPE_BASE_URL", None) + or DASHSCOPE_DEFAULT_BASE_URL + ) + headers = { + "Content-Type": "application/json", + "Accept-Encoding": "utf-8", + "X-DashScope-WorkSpace": workspace_id, + "Authorization": "Bearer " + api_key, + "X-DashScope-OpenAPISource": "CloudSDK", + } + + response = requests.put( + base_url + UPSERT_PIPELINE_ENDPOINT, + data=json.dumps(pipeline_create), + headers=headers, + ) + response_text = response.json() + pipeline_id = response_text.get("id", None) + + if response_text.get("code", "") != Status.SUCCESS.value or pipeline_id is None: + raise ValueError( + f"Failed to create index: {response_text.get('message', '')}\n{response_text}" + ) + if verbose: + print(f"Starting creating index {name}, pipeline_id: {pipeline_id}") + + response = requests.post( + base_url + START_PIPELINE_ENDPOINT.format(pipeline_id=pipeline_id), + headers=headers, + ) + response_text = response.json() + ingestion_id = response_text.get("ingestionId", None) + + if ( + response_text.get("code", "") != Status.SUCCESS.value + or ingestion_id is None + ): + raise ValueError( + f"Failed to start ingestion: {response_text.get('message', '')}\n{response_text}" + ) + if verbose: + print(f"Starting ingestion for index {name}, ingestion_id: {ingestion_id}") + + ingestion_status, failed_docs = run_ingestion( + base_url + + CHECK_INGESTION_ENDPOINT.format( + pipeline_id=pipeline_id, ingestion_id=ingestion_id + ), + headers, + verbose, + ) + + if verbose: + print(f"ingestion_status {ingestion_status}") + print(f"failed_docs: {failed_docs}") + + if ingestion_status == "FAILED": + print("Index {name} created failed!") + return None + + if verbose: + print(f"Index {name} created successfully!") + + return cls( + name, + transformations=transformations, + workspace_id=workspace_id, + api_key=api_key, + base_url=base_url, + timeout=timeout, + **kwargs, + ) + + def as_retriever(self, **kwargs: Any) -> BaseRetriever: + """Return a Retriever for this managed index.""" + from llama_index.indices.managed.dashscope.retriever import ( + DashScopeCloudRetriever, + ) + + return DashScopeCloudRetriever( + self.name, + **kwargs, + ) + + def as_query_engine(self, **kwargs: Any) -> BaseQueryEngine: + from llama_index.core.query_engine.retriever_query_engine import ( + RetrieverQueryEngine, + ) + + kwargs["retriever"] = self.as_retriever(**kwargs) + return RetrieverQueryEngine.from_args(**kwargs) + + def _insert( + self, + documents: List[Document], + transformations: Optional[List[TransformComponent]] = None, + verbose: bool = True, + **insert_kwargs: Any, + ) -> None: + """Insert a set of documents (each a node).""" + pipeline_id = get_pipeline_id( + self._base_url + PIPELINE_SIMPLE_ENDPOINT, + self._headers, + {"pipeline_name": self.name}, + ) + doc_insert = get_doc_insert( + transformations or default_transformations(), + documents, + ) + response = requests.put( + self._base_url + INSERT_DOC_ENDPOINT.format(pipeline_id=pipeline_id), + data=json.dumps(doc_insert), + headers=self._headers, + ) + response_text = response.json() + ingestion_id = response_text.get("ingestionId", None) + if ( + response_text.get("code", "") != Status.SUCCESS.value + or ingestion_id is None + ): + raise ValueError( + f"Failed to insert documents: {response_text.get('message', '')}\n{response_text}" + ) + + ingestion_status, failed_docs = run_ingestion( + self._base_url + + CHECK_INGESTION_ENDPOINT.format( + pipeline_id=pipeline_id, ingestion_id=ingestion_id + ), + self._headers, + verbose, + ) + + if verbose: + print(f"ingestion_status {ingestion_status}") + print(f"failed_docs: {failed_docs}") + + def delete_ref_doc( + self, + ref_doc_ids: Union[str, List[str]], + verbose: bool = True, + **delete_kwargs: Any, + ) -> None: + """Delete documents in index.""" + if isinstance(ref_doc_ids, str): + ref_doc_ids = [ref_doc_ids] + pipeline_id = get_pipeline_id( + self._base_url + PIPELINE_SIMPLE_ENDPOINT, + self._headers, + {"pipeline_name": self.name}, + ) + doc_delete = get_doc_delete(ref_doc_ids) + response = requests.post( + self._base_url + DELETE_DOC_ENDPOINT.format(pipeline_id=pipeline_id), + json=doc_delete, + headers=self._headers, + ) + response_text = response.json() + if response_text.get("code", "") != Status.SUCCESS.value: + raise ValueError( + f"Failed to delete documents: {response_text.get('message', '')}\n{response_text}" + ) + if verbose: + print(f"Delete documents {ref_doc_ids} successfully!") + + def update_ref_doc(self, document: Document, **update_kwargs: Any) -> None: + """Update a document and it's corresponding nodes.""" + raise NotImplementedError("update_ref_doc not implemented.") diff --git a/llama-index-integrations/indices/llama-index-indices-managed-dashscope/llama_index/indices/managed/dashscope/constants.py b/llama-index-integrations/indices/llama-index-indices-managed-dashscope/llama_index/indices/managed/dashscope/constants.py new file mode 100644 index 0000000000000..0e1f769fc2d39 --- /dev/null +++ b/llama-index-integrations/indices/llama-index-indices-managed-dashscope/llama_index/indices/managed/dashscope/constants.py @@ -0,0 +1,10 @@ +DASHSCOPE_DEFAULT_BASE_URL = "https://dashscope.aliyuncs.com" +UPSERT_PIPELINE_ENDPOINT = "/api/v1/indices/pipeline" +START_PIPELINE_ENDPOINT = "/api/v1/indices/pipeline/{pipeline_id}/managed_ingest" +CHECK_INGESTION_ENDPOINT = ( + "/api/v1/indices/pipeline/{pipeline_id}/managed_ingest/{ingestion_id}/status" +) +RETRIEVE_PIPELINE_ENDPOINT = "/api/v1/indices/pipeline/{pipeline_id}/retrieve" +PIPELINE_SIMPLE_ENDPOINT = "/api/v1/indices/pipeline_simple" +INSERT_DOC_ENDPOINT = "/api/v1/indices/pipeline/{pipeline_id}/documents" +DELETE_DOC_ENDPOINT = "/api/v1/indices/pipeline/{pipeline_id}/delete" diff --git a/llama-index-integrations/indices/llama-index-indices-managed-dashscope/llama_index/indices/managed/dashscope/retriever.py b/llama-index-integrations/indices/llama-index-indices-managed-dashscope/llama_index/indices/managed/dashscope/retriever.py new file mode 100644 index 0000000000000..3a51902103d3d --- /dev/null +++ b/llama-index-integrations/indices/llama-index-indices-managed-dashscope/llama_index/indices/managed/dashscope/retriever.py @@ -0,0 +1,195 @@ +import logging +from typing import List, Dict, Optional +import os + +from llama_index.core.base.base_retriever import BaseRetriever +from llama_index.core.callbacks.base import CallbackManager +from llama_index.core.callbacks.schema import CBEventType, EventPayload +from llama_index.core.schema import NodeWithScore, QueryBundle, TextNode, QueryType +from llama_index.core.instrumentation.events.retrieval import ( + RetrievalEndEvent, + RetrievalStartEvent, +) +import llama_index.core.instrumentation as instrument + +from llama_index.indices.managed.dashscope import utils +from llama_index.indices.managed.dashscope.constants import ( + DASHSCOPE_DEFAULT_BASE_URL, + RETRIEVE_PIPELINE_ENDPOINT, + PIPELINE_SIMPLE_ENDPOINT, +) + +dispatcher = instrument.get_dispatcher(__name__) + +logger = logging.getLogger(__name__) + + +class DashScopeCloudRetriever(BaseRetriever): + """Initialize the DashScopeCloud Retriever.""" + + def __init__( + self, + index_name: str, + api_key: Optional[str] = None, + workspace_id: Optional[str] = None, + dense_similarity_top_k: Optional[int] = 100, + sparse_similarity_top_k: Optional[int] = 100, + enable_rewrite: Optional[bool] = False, + rewrite_model_name: Optional[str] = "conv-rewrite-qwen-1.8b", + enable_reranking: Optional[bool] = True, + rerank_model_name: Optional[str] = "gte-rerank-hybrid", + rerank_min_score: Optional[float] = 0.0, + rerank_top_n: Optional[int] = 5, + callback_manager: Optional[CallbackManager] = None, + **kwargs, + ) -> None: + self.index_name = index_name + self.workspace_id = workspace_id or os.environ.get("DASHSCOPE_WORKSPACE_ID") + self._api_key = api_key or os.environ.get("DASHSCOPE_API_KEY") + self.dense_similarity_top_k = dense_similarity_top_k + self.sparse_similarity_top_k = sparse_similarity_top_k + self.enable_rewrite = enable_rewrite + self.rewrite_model_name = rewrite_model_name + self.enable_reranking = enable_reranking + self.rerank_model_name = rerank_model_name + self.rerank_min_score = rerank_min_score + self.rerank_top_n = rerank_top_n + + self.headers = { + "Content-Type": "application/json", + "Accept-Encoding": "utf-8", + "X-DashScope-WorkSpace": self.workspace_id, + "Authorization": self._api_key, + "X-DashScope-OpenAPISource": "CloudSDK", + } + + base_url = ( + os.environ.get("DASHSCOPE_BASE_URL", None) or DASHSCOPE_DEFAULT_BASE_URL + ) + self.pipeline_id = utils.get_pipeline_id( + base_url + PIPELINE_SIMPLE_ENDPOINT, + self.headers, + {"pipeline_name": self.index_name}, + ) + + self.base_url = base_url + RETRIEVE_PIPELINE_ENDPOINT.format( + pipeline_id=self.pipeline_id + ) + super().__init__(callback_manager) + + @dispatcher.span + def retrieve( + self, str_or_query_bundle: QueryType, query_history: List[Dict] = None + ) -> List[NodeWithScore]: + """Retrieve nodes given query. + + Args: + str_or_query_bundle (QueryType): Either a query string or + a QueryBundle object. + + """ + dispatch_event = dispatcher.get_dispatch_event() + + self._check_callback_manager() + dispatch_event( + RetrievalStartEvent( + str_or_query_bundle=str_or_query_bundle, + ) + ) + if isinstance(str_or_query_bundle, str): + query_bundle = QueryBundle(str_or_query_bundle) + else: + query_bundle = str_or_query_bundle + with self.callback_manager.as_trace("query"): + with self.callback_manager.event( + CBEventType.RETRIEVE, + payload={EventPayload.QUERY_STR: query_bundle.query_str}, + ) as retrieve_event: + nodes = self._retrieve(query_bundle, query_history=query_history) + nodes = self._handle_recursive_retrieval(query_bundle, nodes) + retrieve_event.on_end( + payload={EventPayload.NODES: nodes}, + ) + dispatch_event( + RetrievalEndEvent( + str_or_query_bundle=str_or_query_bundle, + nodes=nodes, + ) + ) + return nodes + + async def _aretrieve( + self, query_bundle: QueryBundle, query_history: List[Dict] = None + ) -> List[NodeWithScore]: + return self._retrieve(query_bundle, query_history=query_history) + + @dispatcher.span + async def aretrieve( + self, str_or_query_bundle: QueryType, query_history: List[Dict] = None + ) -> List[NodeWithScore]: + self._check_callback_manager() + dispatch_event = dispatcher.get_dispatch_event() + + dispatch_event( + RetrievalStartEvent( + str_or_query_bundle=str_or_query_bundle, + ) + ) + if isinstance(str_or_query_bundle, str): + query_bundle = QueryBundle(str_or_query_bundle) + else: + query_bundle = str_or_query_bundle + with self.callback_manager.as_trace("query"): + with self.callback_manager.event( + CBEventType.RETRIEVE, + payload={EventPayload.QUERY_STR: query_bundle.query_str}, + ) as retrieve_event: + nodes = await self._aretrieve( + query_bundle=query_bundle, query_history=query_history + ) + nodes = await self._ahandle_recursive_retrieval( + query_bundle=query_bundle, nodes=nodes + ) + retrieve_event.on_end( + payload={EventPayload.NODES: nodes}, + ) + dispatch_event( + RetrievalEndEvent( + str_or_query_bundle=str_or_query_bundle, + nodes=nodes, + ) + ) + return nodes + + def _retrieve(self, query_bundle: QueryBundle, **kwargs) -> List[NodeWithScore]: + # init params + params = { + "query": query_bundle.query_str, + "dense_similarity_top_k": self.dense_similarity_top_k, + "sparse_similarity_top_k": self.sparse_similarity_top_k, + "enable_rewrite": self.enable_rewrite, + "rewrite": [ + { + "model_name": self.rewrite_model_name, + "class_name": "DashScopeTextRewrite", + } + ], + "enable_reranking": self.enable_reranking, + "rerank": [ + { + "model_name": self.rerank_model_name, + } + ], + "rerank_min_score": self.rerank_min_score, + "rerank_top_n": self.rerank_top_n, + } + # extract query_history for multi-turn query rewrite + if "query_history" in kwargs: + params["query_hisory"] = kwargs.get("query_history") + + response_data = utils.post(self.base_url, headers=self.headers, params=params) + nodes = [] + for ele in response_data["nodes"]: + text_node = TextNode.parse_obj(ele["node"]) + nodes.append(NodeWithScore(node=text_node, score=ele["score"])) + return nodes diff --git a/llama-index-integrations/indices/llama-index-indices-managed-dashscope/llama_index/indices/managed/dashscope/transformations.py b/llama-index-integrations/indices/llama-index-indices-managed-dashscope/llama_index/indices/managed/dashscope/transformations.py new file mode 100644 index 0000000000000..a43822c1576c8 --- /dev/null +++ b/llama-index-integrations/indices/llama-index-indices-managed-dashscope/llama_index/indices/managed/dashscope/transformations.py @@ -0,0 +1,118 @@ +""" +This module maintains the list of transformations that are supported by dashscope. +""" + +from enum import Enum +from typing import Generic, TypeVar + +from llama_index.core.bridge.pydantic import ( + Field, + GenericModel, + ValidationError, +) + +from llama_index.core.schema import BaseComponent +from llama_index.core.ingestion.transformations import ( + TransformationCategories, + ConfigurableTransformation, +) + + +def dashscope_build_configurable_transformation_enum(): + """ + Build an enum of configurable transformations. + But conditional on if the corresponding component is available. + """ + + class ConfigurableComponent(Enum): + @classmethod + def from_component( + cls, component: BaseComponent + ) -> "ConfigurableTransformations": + component_class = type(component) + for component_type in cls: + if component_type.value.component_type == component_class: + return component_type + raise ValueError( + f"Component {component} is not a supported transformation component." + ) + + def build_configured_transformation( + self, component: BaseComponent + ) -> "DashScopeConfiguredTransformation": + component_type = self.value.component_type + if not isinstance(component, component_type): + raise ValueError( + f"The enum value {self} is not compatible with component of " + f"type {type(component)}" + ) + return DashScopeConfiguredTransformation[component_type]( # type: ignore + component=component, name=self.value.name + ) + + enum_members = [] + + # Node parsers + try: + from llama_index.node_parser.dashscope import DashScopeJsonNodeParser + + enum_members.append( + ( + "DASHSCOPE_JSON_NODE_PARSER", + ConfigurableTransformation( + name="DashScope Json Node Parser", + transformation_category=TransformationCategories.NODE_PARSER, + component_type=DashScopeJsonNodeParser, + ), + ) + ) + except (ImportError, ValidationError): + pass + + # Embeddings + try: + from llama_index.embeddings.dashscope import ( + DashScopeEmbedding, + ) # pants: no-infer-dep + + enum_members.append( + ( + "DASHSCOPE_EMBEDDING", + ConfigurableTransformation( + name="DashScope Embedding", + transformation_category=TransformationCategories.EMBEDDING, + component_type=DashScopeEmbedding, + ), + ) + ) + except (ImportError, ValidationError): + pass + + return ConfigurableComponent("ConfigurableTransformations", enum_members) + + +ConfigurableTransformations = dashscope_build_configurable_transformation_enum() + +T = TypeVar("T", bound=BaseComponent) + + +class DashScopeConfiguredTransformation(GenericModel, Generic[T]): + """ + A class containing metadata & implementation for a transformation in a dashscope pipeline. + """ + + name: str + component: T = Field(description="Component that implements the transformation") + + @classmethod + def from_component(cls, component: BaseComponent) -> "ConfiguredTransformation": + """ + Build a ConfiguredTransformation from a component in dashscope. + """ + return ConfigurableTransformations.from_component( + component + ).build_configured_transformation(component) + + @property + def configurable_transformation_type(self) -> ConfigurableTransformations: + return ConfigurableTransformations.from_component(self.component) diff --git a/llama-index-integrations/indices/llama-index-indices-managed-dashscope/llama_index/indices/managed/dashscope/utils.py b/llama-index-integrations/indices/llama-index-indices-managed-dashscope/llama_index/indices/managed/dashscope/utils.py new file mode 100644 index 0000000000000..46a0a5dd0dbc7 --- /dev/null +++ b/llama-index-integrations/indices/llama-index-indices-managed-dashscope/llama_index/indices/managed/dashscope/utils.py @@ -0,0 +1,58 @@ +import requests +import time + + +def post(base_url: str, headers: dict, params: dict): + response = requests.post(base_url, headers=headers, json=params) + if response.status_code != 200: + raise RuntimeError(response.text) + response_dict = response.json() + if response_dict["code"] != "Success": + raise RuntimeError(response_dict) + return response_dict + + +def get(base_url: str, headers: dict, params: dict): + response = requests.get(base_url, headers=headers, params=params) + if response.status_code != 200: + raise RuntimeError(response.text) + + response_dict = response.json() + if response_dict["code"] != "Success": + raise RuntimeError(response_dict) + return response_dict + + +def get_pipeline_id(base_url: str, headers: dict, params: dict): + response_dict = get(base_url, headers, params) + return response_dict.get("id", "") + + +def run_ingestion(request_url: str, headers: dict, verbose: bool = False): + ingestion_status = "" + failed_docs = [] + + while True: + response = requests.get( + request_url, + headers=headers, + ) + try: + response_text = response.json() + except Exception as e: + print(f"Failed to get response: \n{response.text}\nretrying...") + continue + + if response_text.get("code", "") != "Success": + print( + f"Failed to get ingestion status: {response_text.get('message', '')}\n{response_text}\nretrying..." + ) + continue + ingestion_status = response_text.get("ingestion_status", "") + failed_docs = response_text.get("failed_docs", "") + if verbose: + print(f"Current status: {ingestion_status}") + if ingestion_status in ["COMPLETED", "FAILED"]: + break + time.sleep(5) + return ingestion_status, failed_docs diff --git a/llama-index-integrations/indices/llama-index-indices-managed-dashscope/pyproject.toml b/llama-index-integrations/indices/llama-index-indices-managed-dashscope/pyproject.toml new file mode 100644 index 0000000000000..e9be531a5a344 --- /dev/null +++ b/llama-index-integrations/indices/llama-index-indices-managed-dashscope/pyproject.toml @@ -0,0 +1,59 @@ +[build-system] +build-backend = "poetry.core.masonry.api" +requires = ["poetry-core"] + +[tool.codespell] +check-filenames = true +check-hidden = true +# Feel free to un-skip examples, and experimental, you will just need to +# work through many typos (--write-changes and --interactive will help) +skip = "*.csv,*.html,*.json,*.jsonl,*.pdf,*.txt,*.ipynb" + +[tool.llamahub] +contains_example = false +import_path = "llama_index.indices.managed.dashscope" + +[tool.llamahub.class_authors] +DashScopeCloudIndex = "phantomgrapes" + +[tool.mypy] +disallow_untyped_defs = true +# Remove venv skip when integrated with pre-commit +exclude = ["_static", "build", "examples", "notebooks", "venv"] +ignore_missing_imports = true +python_version = "3.8" + +[tool.poetry] +authors = ["Ruixue Ding "] +description = "llama-index indices managed-dashscope integration" +license = "MIT" +name = "llama-index-indices-managed-dashscope" +packages = [{include = "llama_index/"}] +readme = "README.md" +version = "0.1.1" + +[tool.poetry.dependencies] +python = ">=3.8.1,<4.0" +llama-index-core = "^0.10.0" +llama-index-embeddings-dashscope = ">=0.1.3" +llama-index-readers-dashscope = ">=0.1.1" +llama-index-node-parser-dashscope = ">=0.1.2" + +[tool.poetry.group.dev.dependencies] +black = {extras = ["jupyter"], version = "<=23.9.1,>=23.7.0"} +codespell = {extras = ["toml"], version = ">=v2.2.6"} +ipython = "8.10.0" +jupyter = "^1.0.0" +mypy = "0.991" +pre-commit = "3.2.0" +pylint = "2.15.10" +pytest = "7.2.1" +pytest-mock = "3.11.1" +ruff = "0.0.292" +tree-sitter-languages = "^1.8.0" +types-Deprecated = ">=0.1.0" +types-PyYAML = "^6.0.12.12" +types-protobuf = "^4.24.0.4" +types-redis = "4.5.5.0" +types-requests = "2.28.11.8" # TODO: unpin when mypy>0.991 +types-setuptools = "67.1.0.0" diff --git a/llama-index-integrations/indices/llama-index-indices-managed-dashscope/tests/BUILD b/llama-index-integrations/indices/llama-index-indices-managed-dashscope/tests/BUILD new file mode 100644 index 0000000000000..dabf212d7e716 --- /dev/null +++ b/llama-index-integrations/indices/llama-index-indices-managed-dashscope/tests/BUILD @@ -0,0 +1 @@ +python_tests() diff --git a/llama-index-integrations/indices/llama-index-indices-managed-dashscope/tests/__init__.py b/llama-index-integrations/indices/llama-index-indices-managed-dashscope/tests/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/llama-index-integrations/indices/llama-index-indices-managed-dashscope/tests/test_indices_managed_dashscope.py b/llama-index-integrations/indices/llama-index-indices-managed-dashscope/tests/test_indices_managed_dashscope.py new file mode 100644 index 0000000000000..2e3900ba2c064 --- /dev/null +++ b/llama-index-integrations/indices/llama-index-indices-managed-dashscope/tests/test_indices_managed_dashscope.py @@ -0,0 +1,7 @@ +from llama_index.indices.managed.dashscope import DashScopeCloudIndex +from llama_index.core.indices.managed.base import BaseManagedIndex + + +def test_class(): + names_of_base_classes = [b.__name__ for b in DashScopeCloudIndex.__mro__] + assert BaseManagedIndex.__name__ in names_of_base_classes diff --git a/llama-index-integrations/node_parser/llama-index-node-parser-relational-dashscope/.gitignore b/llama-index-integrations/node_parser/llama-index-node-parser-relational-dashscope/.gitignore new file mode 100644 index 0000000000000..990c18de22908 --- /dev/null +++ b/llama-index-integrations/node_parser/llama-index-node-parser-relational-dashscope/.gitignore @@ -0,0 +1,153 @@ +llama_index/_static +.DS_Store +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +bin/ +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +etc/ +include/ +lib/ +lib64/ +parts/ +sdist/ +share/ +var/ +wheels/ +pip-wheel-metadata/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +.ruff_cache + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints +notebooks/ + +# IPython +profile_default/ +ipython_config.py + +# pyenv +.python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ +pyvenv.cfg + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# Jetbrains +.idea +modules/ +*.swp + +# VsCode +.vscode + +# pipenv +Pipfile +Pipfile.lock + +# pyright +pyrightconfig.json diff --git a/llama-index-integrations/node_parser/llama-index-node-parser-relational-dashscope/BUILD b/llama-index-integrations/node_parser/llama-index-node-parser-relational-dashscope/BUILD new file mode 100644 index 0000000000000..0896ca890d8bf --- /dev/null +++ b/llama-index-integrations/node_parser/llama-index-node-parser-relational-dashscope/BUILD @@ -0,0 +1,3 @@ +poetry_requirements( + name="poetry", +) diff --git a/llama-index-integrations/node_parser/llama-index-node-parser-relational-dashscope/Makefile b/llama-index-integrations/node_parser/llama-index-node-parser-relational-dashscope/Makefile new file mode 100644 index 0000000000000..b9eab05aa3706 --- /dev/null +++ b/llama-index-integrations/node_parser/llama-index-node-parser-relational-dashscope/Makefile @@ -0,0 +1,17 @@ +GIT_ROOT ?= $(shell git rev-parse --show-toplevel) + +help: ## Show all Makefile targets. + @grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[33m%-30s\033[0m %s\n", $$1, $$2}' + +format: ## Run code autoformatters (black). + pre-commit install + git ls-files | xargs pre-commit run black --files + +lint: ## Run linters: pre-commit (black, ruff, codespell) and mypy + pre-commit install && git ls-files | xargs pre-commit run --show-diff-on-failure --files + +test: ## Run tests via pytest. + pytest tests + +watch-docs: ## Build and watch documentation. + sphinx-autobuild docs/ docs/_build/html --open-browser --watch $(GIT_ROOT)/llama_index/ diff --git a/llama-index-integrations/node_parser/llama-index-node-parser-relational-dashscope/README.md b/llama-index-integrations/node_parser/llama-index-node-parser-relational-dashscope/README.md new file mode 100644 index 0000000000000..727f405fd25a4 --- /dev/null +++ b/llama-index-integrations/node_parser/llama-index-node-parser-relational-dashscope/README.md @@ -0,0 +1,48 @@ +# LlamaIndex Node_Parser-Relational Integration: Dashscope + +Transform your documents into nodes with ease using the Dashscope integration for LlamaIndex. This tool allows for precise control over chunk size, overlap size, and more, tailored for the Dashscope reader output format. + +## Installation + +```shell +pip install llama-index-node-parser-dashscope +``` + +## Quick Start + +Get up and running with just a few lines of code: + +```python +import json +import os +from llama_index.node_parser.relational.dashscope import ( + DashScopeJsonNodeParser, +) +from llama_index.core.ingestion import IngestionPipeline +from llama_index.core.schema import Document + +# Set your Dashscope API key in the environment +os.environ["DASHSCOPE_API_KEY"] = "your_api_key_here" + +documents = [ + # Prepare your documents obtained from the Dashscope reader +] + +# Initialize the DashScope JsonNodeParser +node_parser = DashScopeJsonNodeParser( + chunk_size=100, overlap_size=0, separator=" |,|,|。|?|!|\n|\?|\!" +) + +# Set up the ingestion pipeline with the node parser +pipeline = IngestionPipeline(transformations=[node_parser]) + +# Process the documents and print the resulting nodes +nodes = pipeline.run(documents=documents, show_progress=True) +for node in nodes: + print(node) +``` + +## Configuration + +- API Key: You need a Dashscope API key to begin. Set it in your environment as shown in the Quick Start section. +- Document Preparation: Your documents must be in the Dashscope reader output format. diff --git a/llama-index-integrations/node_parser/llama-index-node-parser-relational-dashscope/llama_index/node_parser/dashscope/BUILD b/llama-index-integrations/node_parser/llama-index-node-parser-relational-dashscope/llama_index/node_parser/dashscope/BUILD new file mode 100644 index 0000000000000..db46e8d6c978c --- /dev/null +++ b/llama-index-integrations/node_parser/llama-index-node-parser-relational-dashscope/llama_index/node_parser/dashscope/BUILD @@ -0,0 +1 @@ +python_sources() diff --git a/llama-index-integrations/node_parser/llama-index-node-parser-relational-dashscope/llama_index/node_parser/dashscope/__init__.py b/llama-index-integrations/node_parser/llama-index-node-parser-relational-dashscope/llama_index/node_parser/dashscope/__init__.py new file mode 100644 index 0000000000000..0e083796926d6 --- /dev/null +++ b/llama-index-integrations/node_parser/llama-index-node-parser-relational-dashscope/llama_index/node_parser/dashscope/__init__.py @@ -0,0 +1,4 @@ +from llama_index.node_parser.dashscope.base import DashScopeJsonNodeParser + + +__all__ = ["DashScopeJsonNodeParser"] diff --git a/llama-index-integrations/node_parser/llama-index-node-parser-relational-dashscope/llama_index/node_parser/dashscope/base.py b/llama-index-integrations/node_parser/llama-index-node-parser-relational-dashscope/llama_index/node_parser/dashscope/base.py new file mode 100644 index 0000000000000..3ed7d9815a9e8 --- /dev/null +++ b/llama-index-integrations/node_parser/llama-index-node-parser-relational-dashscope/llama_index/node_parser/dashscope/base.py @@ -0,0 +1,130 @@ +from typing import Any, Callable, List, Optional, Dict +import logging +import requests +import os +import json + +from llama_index.core.bridge.pydantic import Field +from llama_index.core.node_parser.relational.base_element import ( + BaseElementNodeParser, + Element, +) +from llama_index.core.schema import BaseNode, TextNode + + +class DashScopeJsonNodeParser(BaseElementNodeParser): + """DashScope Json format element node parser. + + Splits a json format document from DashScope Parse into Text Nodes and Index Nodes + corresponding to embedded objects (e.g. tables). + """ + + try_count_limit: int = Field( + default=10, description="Maximum number of retry attempts." + ) + chunk_size: int = Field(default=500, description="Size of each chunk to process.") + overlap_size: int = Field( + default=100, description="Overlap size between consecutive chunks." + ) + separator: str = Field( + default=" |,|,|。|?|!|\n|\\?|\\!", + description="Separator characters for splitting texts.", + ) + input_type: str = Field(default="idp", description="parse format type.") + language: str = Field( + default="cn", + description="language of tokenizor, accept cn, en, any. Notice that mode will be slow.", + ) + + @classmethod + def class_name(cls) -> str: + return "DashScopeJsonNodeParser" + + def get_nodes_from_node(self, node: TextNode) -> List[BaseNode]: + """Get nodes from node.""" + ftype = node.metadata.get("parse_fmt_type", self.input_type) + assert ftype in [ + "DASHSCOPE_DOCMIND", + "idp", + ], f"Unexpected parse_fmt_type: {node.metadata.get('parse_fmt_type', '')}" + + ftype_map = { + "DASHSCOPE_DOCMIND": "idp", + } + + my_input = { + "text": json.loads(node.get_content()), + "file_type": ftype_map.get(ftype, ftype), + "chunk_size": self.chunk_size, + "overlap_size": self.overlap_size, + "language": "cn", + "separator": self.separator, + } + + try_count = 0 + response_text = self.post_service(my_input) + while response_text is None and try_count < self.try_count_limit: + try_count += 1 + response_text = self.post_service(my_input) + if response_text is None: + logging.error("DashScopeJsonNodeParser Failed to get response from service") + return [] + + return self.parse_result(response_text, node) + + def post_service(self, my_input: Dict[str, Any]) -> Optional[Dict[str, Any]]: + DASHSCOPE_API_KEY = os.environ.get("DASHSCOPE_API_KEY", None) + if DASHSCOPE_API_KEY is None: + logging.error("DASHSCOPE_API_KEY is not set") + raise ValueError("DASHSCOPE_API_KEY is not set") + headers = { + "Content-Type": "application/json", + "Accept-Encoding": "utf-8", + "Authorization": "Bearer " + DASHSCOPE_API_KEY, + } + service_url = ( + os.getenv("DASHSCOPE_BASE_URL", "https://dashscope.aliyuncs.com") + + "/api/v1/indices/component/configed_transformations/spliter" + ) + try: + response = requests.post( + service_url, data=json.dumps(my_input), headers=headers + ) + response_text = response.json() + if "chunkService" in response_text: + return response_text["chunkService"]["chunkResult"] + else: + logging.error(f"{response_text}, try again.") + return None + except Exception as e: + logging.error(f"{e}, try again.") + return None + + def parse_result( + self, content_json: List[Dict[str, Any]], document: TextNode + ) -> List[BaseNode]: + nodes = [] + for data in content_json: + text = "\n".join( + [data["title"], data.get("hier_title", ""), data["content"]] + ) + nodes.append( + TextNode( + metadata=document.metadata, + text=text, + excluded_embed_metadata_keys=document.excluded_embed_metadata_keys, + excluded_llm_metadata_keys=document.excluded_llm_metadata_keys, + ) + ) + return nodes + + def extract_elements( + self, + text: str, + mode: Optional[str] = "json", + node_id: Optional[str] = None, + node_metadata: Optional[Dict[str, Any]] = None, + table_filters: Optional[List[Callable]] = None, + **kwargs: Any, + ) -> List[Element]: + return [] diff --git a/llama-index-integrations/node_parser/llama-index-node-parser-relational-dashscope/pyproject.toml b/llama-index-integrations/node_parser/llama-index-node-parser-relational-dashscope/pyproject.toml new file mode 100644 index 0000000000000..1a76dfcbae93a --- /dev/null +++ b/llama-index-integrations/node_parser/llama-index-node-parser-relational-dashscope/pyproject.toml @@ -0,0 +1,57 @@ +[build-system] +build-backend = "poetry.core.masonry.api" +requires = ["poetry-core"] + +[tool.codespell] +check-filenames = true +check-hidden = true +# Feel free to un-skip examples, and experimental, you will just need to +# work through many typos (--write-changes and --interactive will help) +skip = "*.csv,*.html,*.json,*.jsonl,*.pdf,*.txt,*.ipynb" + +[tool.llamahub] +contains_example = false +import_path = "llama_index.node_parser.dashscope" + +[tool.llamahub.class_authors] +DashScopeJsonNodeParser = "phantomgrapes" + +[tool.mypy] +disallow_untyped_defs = true +# Remove venv skip when integrated with pre-commit +exclude = ["_static", "build", "examples", "notebooks", "venv"] +ignore_missing_imports = true +python_version = "3.8" + +[tool.poetry] +authors = ["Ruixue Ding "] +description = "llama-index node_parser dashscope integration" +license = "MIT" +name = "llama-index-node-parser-dashscope" +packages = [{include = "llama_index/"}] +readme = "README.md" +version = "0.1.2" + +[tool.poetry.dependencies] +python = ">=3.8.1,<4.0" +llama-index-core = "^0.10.0" +requests = "*" + +[tool.poetry.group.dev.dependencies] +black = {extras = ["jupyter"], version = "<=23.9.1,>=23.7.0"} +codespell = {extras = ["toml"], version = ">=v2.2.6"} +ipython = "8.10.0" +jupyter = "^1.0.0" +mypy = "0.991" +pre-commit = "3.2.0" +pylint = "2.15.10" +pytest = "7.2.1" +pytest-mock = "3.11.1" +ruff = "0.0.292" +tree-sitter-languages = "^1.8.0" +types-Deprecated = ">=0.1.0" +types-PyYAML = "^6.0.12.12" +types-protobuf = "^4.24.0.4" +types-redis = "4.5.5.0" +types-requests = "2.28.11.8" # TODO: unpin when mypy>0.991 +types-setuptools = "67.1.0.0" diff --git a/llama-index-integrations/node_parser/llama-index-node-parser-relational-dashscope/tests/BUILD b/llama-index-integrations/node_parser/llama-index-node-parser-relational-dashscope/tests/BUILD new file mode 100644 index 0000000000000..dabf212d7e716 --- /dev/null +++ b/llama-index-integrations/node_parser/llama-index-node-parser-relational-dashscope/tests/BUILD @@ -0,0 +1 @@ +python_tests() diff --git a/llama-index-integrations/node_parser/llama-index-node-parser-relational-dashscope/tests/__init__.py b/llama-index-integrations/node_parser/llama-index-node-parser-relational-dashscope/tests/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/llama-index-integrations/node_parser/llama-index-node-parser-relational-dashscope/tests/test_node_parser_relational_dashscope.py b/llama-index-integrations/node_parser/llama-index-node-parser-relational-dashscope/tests/test_node_parser_relational_dashscope.py new file mode 100644 index 0000000000000..69ebb4bf0bd97 --- /dev/null +++ b/llama-index-integrations/node_parser/llama-index-node-parser-relational-dashscope/tests/test_node_parser_relational_dashscope.py @@ -0,0 +1,7 @@ +from llama_index.node_parser.dashscope import DashScopeJsonNodeParser +from llama_index.core.node_parser.relational.base_element import BaseElementNodeParser + + +def test_class(): + names_of_base_classes = [b.__name__ for b in DashScopeJsonNodeParser.__mro__] + assert BaseElementNodeParser.__name__ in names_of_base_classes diff --git a/llama-index-integrations/readers/llama-index-readers-dashscope/.gitignore b/llama-index-integrations/readers/llama-index-readers-dashscope/.gitignore new file mode 100644 index 0000000000000..990c18de22908 --- /dev/null +++ b/llama-index-integrations/readers/llama-index-readers-dashscope/.gitignore @@ -0,0 +1,153 @@ +llama_index/_static +.DS_Store +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +bin/ +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +etc/ +include/ +lib/ +lib64/ +parts/ +sdist/ +share/ +var/ +wheels/ +pip-wheel-metadata/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +.ruff_cache + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints +notebooks/ + +# IPython +profile_default/ +ipython_config.py + +# pyenv +.python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ +pyvenv.cfg + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# Jetbrains +.idea +modules/ +*.swp + +# VsCode +.vscode + +# pipenv +Pipfile +Pipfile.lock + +# pyright +pyrightconfig.json diff --git a/llama-index-integrations/readers/llama-index-readers-dashscope/BUILD b/llama-index-integrations/readers/llama-index-readers-dashscope/BUILD new file mode 100644 index 0000000000000..0896ca890d8bf --- /dev/null +++ b/llama-index-integrations/readers/llama-index-readers-dashscope/BUILD @@ -0,0 +1,3 @@ +poetry_requirements( + name="poetry", +) diff --git a/llama-index-integrations/readers/llama-index-readers-dashscope/Makefile b/llama-index-integrations/readers/llama-index-readers-dashscope/Makefile new file mode 100644 index 0000000000000..b9eab05aa3706 --- /dev/null +++ b/llama-index-integrations/readers/llama-index-readers-dashscope/Makefile @@ -0,0 +1,17 @@ +GIT_ROOT ?= $(shell git rev-parse --show-toplevel) + +help: ## Show all Makefile targets. + @grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[33m%-30s\033[0m %s\n", $$1, $$2}' + +format: ## Run code autoformatters (black). + pre-commit install + git ls-files | xargs pre-commit run black --files + +lint: ## Run linters: pre-commit (black, ruff, codespell) and mypy + pre-commit install && git ls-files | xargs pre-commit run --show-diff-on-failure --files + +test: ## Run tests via pytest. + pytest tests + +watch-docs: ## Build and watch documentation. + sphinx-autobuild docs/ docs/_build/html --open-browser --watch $(GIT_ROOT)/llama_index/ diff --git a/llama-index-integrations/readers/llama-index-readers-dashscope/README.md b/llama-index-integrations/readers/llama-index-readers-dashscope/README.md new file mode 100644 index 0000000000000..88fff9283b7d4 --- /dev/null +++ b/llama-index-integrations/readers/llama-index-readers-dashscope/README.md @@ -0,0 +1,50 @@ +# LlamaIndex Readers Integration: Dashscope + +## Installation + +```shelll +pip install llama-index-readers-dashscope +``` + +## Usage + +```python +from llama_index.readers.dashscope.base import DashScopeParse +from llama_index.readers.dashscope.utils import ResultType + +file_list = [ + # your files (accept doc, docx, pdf) +] + +parse = DashScopeParse(result_type=ResultType.DASHCOPE_DOCMIND) +documents = parse.load_data(file_path=file_list) +``` + +## Reader Setting: + +A full list of retriever settings/kwargs is below: + +- api_key: Optional[str] -- Your dashscope API key, which can be passed in through environment variables or parameters. + The parameter settings will override the results from the environment variables +- workspace_id: Optional[str] -- Your dashscope workspace_id, which can be passed in through environment variables or + parameters. The parameter settings will override the results from the environment variables +- base_url: Optional[str] -- The base url for the Dashscope API. The default value is "https://dashscope.aliyuncs.com". + The parameter settings will override the results from the environment variables. +- result_type: Optional[ResultType] -- The result type for the parser. The default value is ResultType.DASHCOPE_DOCMIND. +- num_workers: Optional[int] -- The number of workers to use sending API requests for parsing. The default value is 4, + greater than 0, less than 10. +- check_interval: Optional[int] -- The interval in seconds to check if the parsing is done. The default value is 5. +- max_timeout: Optional[int] -- The maximum timeout in seconds to wait for the parsing to finish. The default value is 3600. +- verbose: Optional[bool] -- Whether to print the progress of the parsing. The default value is True. +- show_progress: Optional[bool] -- Show progress when parsing multiple files. The default value is True. +- ignore_errors: Optional[bool] -- Whether or not to ignore and skip errors raised during parsing. The default value is + True. + +## Reader Input: + +- file_path: Union[str, List[str]] -- The file path or list of file paths to parse. + +## Reader Output: + +- List[llama_index.core.schema.Document] -- The list of documents parsed from the file. + - text: str -- The text of the document from DASHCOPE_DOCMIND. diff --git a/llama-index-integrations/readers/llama-index-readers-dashscope/llama_index/readers/dashscope/BUILD b/llama-index-integrations/readers/llama-index-readers-dashscope/llama_index/readers/dashscope/BUILD new file mode 100644 index 0000000000000..db46e8d6c978c --- /dev/null +++ b/llama-index-integrations/readers/llama-index-readers-dashscope/llama_index/readers/dashscope/BUILD @@ -0,0 +1 @@ +python_sources() diff --git a/llama-index-integrations/readers/llama-index-readers-dashscope/llama_index/readers/dashscope/__init__.py b/llama-index-integrations/readers/llama-index-readers-dashscope/llama_index/readers/dashscope/__init__.py new file mode 100644 index 0000000000000..141dfbaa5c9ee --- /dev/null +++ b/llama-index-integrations/readers/llama-index-readers-dashscope/llama_index/readers/dashscope/__init__.py @@ -0,0 +1,3 @@ +from llama_index.readers.dashscope.base import DashScopeParse, ResultType + +__all__ = ["DashScopeParse", "ResultType"] diff --git a/llama-index-integrations/readers/llama-index-readers-dashscope/llama_index/readers/dashscope/base.py b/llama-index-integrations/readers/llama-index-readers-dashscope/llama_index/readers/dashscope/base.py new file mode 100644 index 0000000000000..192e84f9c9d24 --- /dev/null +++ b/llama-index-integrations/readers/llama-index-readers-dashscope/llama_index/readers/dashscope/base.py @@ -0,0 +1,432 @@ +import os +import asyncio +import httpx +import time +from pathlib import Path +from tenacity import ( + retry, + wait_exponential, + before_sleep_log, + after_log, + retry_if_exception_type, + stop_after_delay, +) +from typing import List, Optional, Union + +from llama_index.core.async_utils import run_jobs +from llama_index.core.bridge.pydantic import Field, validator +from llama_index.core.readers.base import BasePydanticReader +from llama_index.core.schema import Document +from llama_index.readers.dashscope.utils import * + +from llama_index.readers.dashscope.domain.lease_domains import ( + DownloadFileLeaseResult, + UploadFileLeaseResult, + AddFileResult, + QueryFileResult, + DatahubDataStatusEnum, +) + +DASHSCOPE_DEFAULT_BASE_URL = "https://dashscope.aliyuncs.com" +DASHSCOPE_DEFAULT_DC_CATEGORY = os.getenv( + "DASHSCOPE_DEFAULT_DC_CATEGORY", default="default" +) + +logger = get_stream_logger(name=__name__) + + +class DashScopeParse(BasePydanticReader): + """A smart-parser for files.""" + + api_key: str = Field(default="", description="The API key for the DashScope API.") + workspace_id: str = Field( + default="", + description="The Workspace for the DashScope API.If not set, " + "it will use the default workspace.", + ) + category_id: str = Field( + default=DASHSCOPE_DEFAULT_DC_CATEGORY, + description="The dc category for the DashScope API.If not set, " + "it will use the default dc category.", + ) + base_url: str = Field( + default=DASHSCOPE_DEFAULT_BASE_URL, + description="The base URL of the DashScope Parsing API.", + ) + result_type: ResultType = Field( + default=ResultType.DASHSCOPE_DOCMIND, + description="The result type for the parser.", + ) + num_workers: int = Field( + default=4, + gt=0, + lt=10, + description="The number of workers to use sending API requests for parsing.", + ) + check_interval: int = Field( + default=5, + description="The interval in seconds to check if the parsing is done.", + ) + max_timeout: int = Field( + default=3600, + description="The maximum timeout in seconds to wait for the parsing to finish.", + ) + verbose: bool = Field( + default=True, description="Whether to print the progress of the parsing." + ) + show_progress: bool = Field( + default=True, description="Show progress when parsing multiple files." + ) + ignore_errors: bool = Field( + default=True, + description="Whether or not to ignore and skip errors raised during parsing.", + ) + parse_result: bool = Field( + default=True, + description="Whether or not to return parsed text content.", + ) + + @validator("api_key", pre=True, always=True) + def validate_api_key(cls, v: str) -> str: + """Validate the API key.""" + if not v: + import os + + api_key = os.getenv("DASHSCOPE_API_KEY", None) + if api_key is None: + raise ValueError("The API key [DASHSCOPE_API_KEY] is required.") + return api_key + + return v + + @validator("workspace_id", pre=True, always=True) + def validate_workspace_id(cls, v: str) -> str: + """Validate the Workspace.""" + if not v: + import os + + return os.getenv("DASHSCOPE_WORKSPACE_ID", "") + + return v + + @validator("category_id", pre=True, always=True) + def validate_category_id(cls, v: str) -> str: + """Validate the category.""" + if not v: + import os + + return os.getenv("DASHSCOPE_CATEGORY_ID", DASHSCOPE_DEFAULT_DC_CATEGORY) + return v + + @validator("base_url", pre=True, always=True) + def validate_base_url(cls, v: str) -> str: + """Validate the base URL.""" + if v and v != DASHSCOPE_DEFAULT_BASE_URL: + return v + else: + url = ( + os.getenv("DASHSCOPE_BASE_URL", None) + or "https://dashscope.aliyuncs.com" + ) + if url and not url.startswith(("http://", "https://")): + raise ValueError( + "The DASHSCOPE_BASE_URL must start with http or https. " + ) + return url or DASHSCOPE_DEFAULT_BASE_URL + + def _get_dashscope_header(self): + return { + "Authorization": f"Bearer {self.api_key}", + "Content-Type": "application/json", + "X-DashScope-WorkSpace": f"{self.workspace_id}", + "X-DashScope-OpenAPISource": "CloudSDK", + } + + # upload a document and get back a job_id + async def _create_job( + self, file_path: str, extra_info: Optional[dict] = None + ) -> str: + file_path = str(file_path) + UploadFileLeaseResult.is_file_valid(file_path=file_path) + + headers = self._get_dashscope_header() + + # load data + with open(file_path, "rb") as f: + upload_file_lease_result = self.__upload_lease(file_path, headers) + + upload_file_lease_result.upload(file_path, f) + + url = f"{self.base_url}/api/v1/datacenter/category/{self.category_id}/add_file" + async with httpx.AsyncClient(timeout=self.max_timeout) as client: + response = await client.post( + url, + headers=headers, + json={ + "lease_id": upload_file_lease_result.lease_id, + "parser": ResultType.DASHSCOPE_DOCMIND.value, + }, + ) + add_file_result = dashscope_response_handler( + response, "add_file", AddFileResult, url=url + ) + + return add_file_result.file_id + + @retry( + stop=stop_after_delay(60), + wait=wait_exponential(multiplier=5, max=60), + before_sleep=before_sleep_log(logger, logging.INFO), + after=after_log(logger, logging.INFO), + reraise=True, + retry=retry_if_exception_type(RetryException), + ) + def __upload_lease(self, file_path, headers): + url = f"{self.base_url}/api/v1/datacenter/category/{self.category_id}/upload_lease" + try: + with httpx.Client(timeout=self.max_timeout) as client: + response = client.post( + url, + headers=headers, + json={ + "file_name": os.path.basename(file_path), + "size_bytes": os.path.getsize(file_path), + "content_md5": get_file_md5(file_path), + }, + ) + except httpx.ConnectTimeout: + raise RetryException("Connect timeout") + except httpx.ReadTimeout: + raise RetryException("Read timeout") + except httpx.NetworkError: + raise RetryException("Network error") + + upload_file_lease_result = dashscope_response_handler( + response, "upload_lease", UploadFileLeaseResult, url=url + ) + logger.info( + f"{file_path} upload lease result: {upload_file_lease_result.lease_id}" + ) + return upload_file_lease_result + + async def _get_job_result( + self, data_id: str, result_type: str, verbose: bool = False + ) -> dict: + result_url = f"{self.base_url}/api/v1/datacenter/category/{self.category_id}/file/{data_id}/download_lease" + status_url = f"{self.base_url}/api/v1/datacenter/category/{self.category_id}/file/{data_id}/query" + + headers = self._get_dashscope_header() + + start = time.time() + tries = 0 + while True: + await asyncio.sleep(1) + tries += 1 + query_file_result = await self._dashscope_query( + data_id, headers, status_url + ) + + status = query_file_result.status + if DatahubDataStatusEnum.PARSE_SUCCESS.value == status: + async with httpx.AsyncClient(timeout=self.max_timeout) as client: + response = await client.post( + result_url, headers=headers, json={"file_id": data_id} + ) + down_file_lease_result = dashscope_response_handler( + response, + "download_lease", + DownloadFileLeaseResult, + url=result_url, + ) + if self.parse_result: + return { + result_type: down_file_lease_result.download(escape=True), + "job_id": data_id, + } + else: + return {result_type: "{}", "job_id": data_id} + elif ( + DatahubDataStatusEnum.PARSING.value == status + or DatahubDataStatusEnum.INIT.value == status + ): + end = time.time() + if end - start > self.max_timeout: + raise Exception(f"Timeout while parsing the file: {data_id}") + if verbose and tries % 5 == 0: + print(".", end="", flush=True) + + await asyncio.sleep(self.check_interval) + + continue + else: + raise Exception( + f"Failed to parse the file: {data_id}, status: {status}" + ) + + @retry( + stop=stop_after_delay(60), + wait=wait_exponential(multiplier=5, max=60), + before_sleep=before_sleep_log(logger, logging.INFO), + after=after_log(logger, logging.INFO), + reraise=True, + retry=retry_if_exception_type(RetryException), + ) + async def _dashscope_query(self, data_id, headers, status_url): + try: + async with httpx.AsyncClient(timeout=self.max_timeout) as client: + response = await client.post( + status_url, headers=headers, json={"file_id": data_id} + ) + return dashscope_response_handler( + response, "query", QueryFileResult, url=status_url + ) + except httpx.ConnectTimeout: + raise RetryException("Connect timeout") + except httpx.ReadTimeout: + raise RetryException("Read timeout") + except httpx.NetworkError: + raise RetryException("Network error") + + async def _aload_data( + self, file_path: str, extra_info: Optional[dict] = None, verbose: bool = False + ) -> List[Document]: + """Load data from the input path.""" + try: + data_id = await self._create_job(file_path, extra_info=extra_info) + logger.info(f"Started parsing the file [{file_path}] under [{data_id}]") + + result = await self._get_job_result( + data_id, self.result_type.value, verbose=verbose + ) + + document = Document( + text=result[self.result_type.value], + metadata=extra_info or {}, + ) + document.id_ = data_id + + return [document] + + except Exception as e: + logger.error(f"Error while parsing the file '{file_path}':{e!s}") + if self.ignore_errors: + return [] + else: + raise + + async def aload_data( + self, file_path: Union[List[str], str], extra_info: Optional[dict] = None + ) -> List[Document]: + """Load data from the input path.""" + if isinstance(file_path, (str, Path)): + return await self._aload_data( + file_path, extra_info=extra_info, verbose=self.verbose + ) + elif isinstance(file_path, list): + jobs = [ + self._aload_data( + f, + extra_info=extra_info, + verbose=self.verbose and not self.show_progress, + ) + for f in file_path + ] + try: + results = await run_jobs( + jobs, + workers=self.num_workers, + desc="Parsing files", + show_progress=self.show_progress, + ) + + # return flattened results + return [item for sublist in results for item in sublist] + except RuntimeError as e: + if nest_asyncio_err in str(e): + raise RuntimeError(nest_asyncio_msg) + else: + raise + else: + raise ValueError( + "The input file_path must be a string or a list of strings." + ) + + def load_data( + self, file_path: Union[List[str], str], extra_info: Optional[dict] = None + ) -> List[Document]: + extra_info = {"parse_fmt_type": ResultType.DASHSCOPE_DOCMIND.value} + """Load data from the input path.""" + try: + return asyncio.run(self.aload_data(file_path, extra_info)) + except RuntimeError as e: + if nest_asyncio_err in str(e): + raise RuntimeError(nest_asyncio_msg) + else: + raise + + async def _aget_json( + self, file_path: str, extra_info: Optional[dict] = None + ) -> List[dict]: + """Load data from the input path.""" + try: + job_id = await self._create_job(file_path, extra_info=extra_info) + if self.verbose: + logger.info("Started parsing the file under job_id %s" % job_id) + + result = await self._get_job_result( + job_id, ResultType.DASHSCOPE_DOCMIND.value + ) + result["job_id"] = job_id + result["file_path"] = file_path + return [result] + + except Exception as e: + logger.info(f"Error while parsing the file '{file_path}':", e) + if self.ignore_errors: + return [] + else: + raise + + async def aget_json( + self, file_path: Union[List[str], str], extra_info: Optional[dict] = None + ) -> List[dict]: + """Load data from the input path.""" + if isinstance(file_path, (str, Path)): + return await self._aget_json(file_path, extra_info=extra_info) + elif isinstance(file_path, list): + jobs = [self._aget_json(f, extra_info=extra_info) for f in file_path] + try: + results = await run_jobs( + jobs, + workers=self.num_workers, + desc="Parsing files", + show_progress=self.show_progress, + ) + + # return flattened results + return [item for sublist in results for item in sublist] + except RuntimeError as e: + if nest_asyncio_err in str(e): + raise RuntimeError(nest_asyncio_msg) + else: + raise + else: + raise ValueError( + "The input file_path must be a string or a list of strings." + ) + + def get_json_result( + self, file_path: Union[List[str], str], extra_info: Optional[dict] = None + ) -> List[dict]: + extra_info = {"parse_fmt_type": ResultType.DASHSCOPE_DOCMIND.value} + """Parse the input path.""" + try: + return asyncio.run(self.aget_json(file_path, extra_info)) + except RuntimeError as e: + if nest_asyncio_err in str(e): + raise RuntimeError(nest_asyncio_msg) + else: + raise + + def get_images(self, json_result: List[dict], download_path: str) -> List[dict]: + raise NotImplementedError diff --git a/llama-index-integrations/readers/llama-index-readers-dashscope/llama_index/readers/dashscope/domain/BUILD b/llama-index-integrations/readers/llama-index-readers-dashscope/llama_index/readers/dashscope/domain/BUILD new file mode 100644 index 0000000000000..db46e8d6c978c --- /dev/null +++ b/llama-index-integrations/readers/llama-index-readers-dashscope/llama_index/readers/dashscope/domain/BUILD @@ -0,0 +1 @@ +python_sources() diff --git a/llama-index-integrations/readers/llama-index-readers-dashscope/llama_index/readers/dashscope/domain/base_domains.py b/llama-index-integrations/readers/llama-index-readers-dashscope/llama_index/readers/dashscope/domain/base_domains.py new file mode 100644 index 0000000000000..d6e0e77cc32dc --- /dev/null +++ b/llama-index-integrations/readers/llama-index-readers-dashscope/llama_index/readers/dashscope/domain/base_domains.py @@ -0,0 +1,7 @@ +from abc import ABC + + +class DictToObject(ABC): + @classmethod + def from_dict(cls, data: dict): + pass diff --git a/llama-index-integrations/readers/llama-index-readers-dashscope/llama_index/readers/dashscope/domain/lease_domains.py b/llama-index-integrations/readers/llama-index-readers-dashscope/llama_index/readers/dashscope/domain/lease_domains.py new file mode 100644 index 0000000000000..bfe0b56ab36d4 --- /dev/null +++ b/llama-index-integrations/readers/llama-index-readers-dashscope/llama_index/readers/dashscope/domain/lease_domains.py @@ -0,0 +1,295 @@ +import os +import json +import requests +from enum import Enum +from llama_index.readers.dashscope.domain.base_domains import DictToObject +from llama_index.readers.dashscope.utils import get_stream_logger + +logger = get_stream_logger(name=__name__) + + +class FileUploadMethod(Enum): + OSS_PreSignedUrl = "OSS.PreSignedUrl" + + @classmethod + def from_value(cls, value): + for member in cls: + if member.value == value: + return member + raise ValueError(f"No enum member found for value '{value}'") + + +class UploadFileParameter: + def upload(self, file_name: str, file: object): + pass + + +class OssPreSignedUrlParameter(UploadFileParameter, DictToObject): + def __init__(self, url: str, method: str, headers: dict): + self.url = url + self.method = method + self.headers = headers + + @classmethod + def from_dict(cls, data: dict) -> "OssPreSignedUrlParameter": + if "method" not in data: + raise ValueError("OssPreSignedUrlParameter method key is required") + if "headers" not in data: + raise ValueError("OssPreSignedUrlParameter headers key is required") + if "url" not in data: + raise ValueError("OssPreSignedUrlParameter url key is required") + else: + return OssPreSignedUrlParameter( + data["url"], data["method"], data["headers"] + ) + + def upload(self, file_name: str, file: object): + logger.info(f"Start upload {file_name}.") + try: + if self.method == "PUT": + response = requests.put(self.url, data=file, headers=self.headers) + elif self.method == "POST": + response = requests.post(self.url, data=file, headers=self.headers) + else: + raise Exception(f"Upload {file_name} unsupported method: {self.method}") + if response.status_code != 200: + raise Exception( + f"Upload {file_name} failed with status code: {response.status_code} \n {self.url} \n {self.headers} \n {response.text}" + ) + logger.info(f"Upload {file_name} success.") + except requests.ConnectionError as ce: + logger.info(f"Upload {file_name} Error connecting to {self.url}: {ce}") + raise + except requests.RequestException as e: + logger.info( + f"Upload {file_name} An error occurred while uploading to {self.url}: {e}" + ) + raise + except Exception as e: + logger.info( + f"Upload {file_name} An error occurred while uploading to {self.url}: {e}" + ) + raise + + +class UploadFileLeaseResult(DictToObject): + def __init__(self, type: str, param: UploadFileParameter, lease_id: str): + self.type: str = type + self.param: UploadFileParameter = param + self.lease_id: str = lease_id + + @classmethod + def from_dict(cls, data: dict) -> "UploadFileLeaseResult": + if "lease_id" not in data: + raise ValueError("UploadFileLeaseResult lease_id key is required") + if "param" not in data: + raise ValueError("UploadFileLeaseResult param key is required") + if "type" not in data: + raise ValueError("UploadFileLeaseResult type key is required") + else: + if data["type"] == FileUploadMethod.OSS_PreSignedUrl.value: + return cls( + data["type"], + OssPreSignedUrlParameter.from_dict(data["param"]), + data["lease_id"], + ) + else: + raise ValueError(f"Unsupported upload type: {data['type']}") + + @staticmethod + def is_file_valid(file_path: str) -> None: + if file_path is None or file_path.strip() == "": + raise ValueError(f"file_path can't blank.") + file_path = str(file_path) + + # file_ext = os.path.splitext(file_path)[1] + # if file_ext not in SUPPORTED_FILE_TYPES: + # raise ValueError( + # f"Currently, only the following file types are supported: {SUPPORTED_FILE_TYPES} " + # f"Current file type: {file_ext}" + # ) + + if not os.path.exists(file_path): + raise FileNotFoundError(f"The file {file_path} does not exist.") + + def upload(self, file_name: str, file: object): + if self.type == FileUploadMethod.OSS_PreSignedUrl.value: + self.param.upload(file_name, file) + else: + raise ValueError(f"Invalid upload method: {self.type}") + + +class AddFileResult(DictToObject): + def __init__(self, file_id: str, parser: str): + self.file_id = file_id + self.parser = parser + + @classmethod + def from_dict(cls, data: dict): + default_values = {"file_id": "", "parser": ""} + + file_id = data.get("file_id", default_values["file_id"]) + parser = data.get("parser", default_values["parser"]) + + return cls(file_id, parser) + + +class QueryFileResult(DictToObject): + def __init__( + self, + file_id: str, + status: str, + file_name: str, + file_type: str, + parser: str, + size_bytes: int, + upload_time: str, + category: str, + ): + self.file_id = file_id + self.status = status + self.file_name = file_name + self.file_type = file_type + self.parser = parser + self.size_bytes = size_bytes + self.upload_time = upload_time + self.category = category + + @classmethod + def from_dict(cls, data: dict): + """ + Creates an instance of `QueryFileResult` from a dictionary. + + Args: + data (dict): A dictionary containing the necessary keys and values corresponding to the class attributes. + + Returns: + QueryFileResult: An instance of `QueryFileResult` populated with data from the input dictionary. + """ + default_values = { + "file_id": "", + "status": "", + "file_name": "", + "file_type": "", + "parser": "", + "size_bytes": 0, + "upload_time": "", + "category": "", + } + + return cls( + file_id=data.get("file_id", default_values["file_id"]), + status=data.get("status", default_values["status"]), + file_name=data.get("file_name", default_values["file_name"]), + file_type=data.get("file_type", default_values["file_type"]), + parser=data.get("parser", default_values["parser"]), + size_bytes=data.get("size_bytes", default_values["size_bytes"]), + upload_time=data.get("upload_time", default_values["upload_time"]), + category=data.get("category", default_values["category"]), + ) + + +class FileDownloadType(Enum): + HTTP = "HTTP" + + @classmethod + def from_value(cls, value): + for member in cls: + if member.value == value: + return member + raise ValueError(f"No enum member found for value '{value}'") + + +class HttpDownloadParameter(DictToObject): + def __init__(self, url, method, headers) -> None: + self.url = url + self.method = method + self.headers = headers + + @classmethod + def from_dict(cls, data: dict): + """ + Creates an instance of `QueryFileResult` from a dictionary. + + Args: + data (dict): A dictionary containing the necessary keys and values corresponding to the class attributes. + + Returns: + QueryFileResult: An instance of `QueryFileResult` populated with data from the input dictionary. + """ + default_values = {"url": "", "method": "GET", "headers": {}} + + return cls( + url=data.get("url", default_values["url"]), + method=data.get("method", default_values["method"]), + headers=data.get("headers", default_values["headers"]), + ) + + +class DownloadFileLeaseResult(DictToObject): + def __init__(self, file_id, lease_id, file_name, type, param) -> None: + self.file_id = file_id + self.lease_id = lease_id + self.file_name = file_name + self.type = type + self.param = param + + @classmethod + def from_dict(cls, data: dict): + """ + Creates an instance of `QueryFileResult` from a dictionary. + + Args: + data (dict): A dictionary containing the necessary keys and values corresponding to the class attributes. + + Returns: + QueryFileResult: An instance of `QueryFileResult` populated with data from the input dictionary. + """ + if "param" not in data: + raise ValueError("download_lease result param is required") + + default_values = { + "file_id": "", + "lease_id": "", + "file_name": "", + "type": FileDownloadType.HTTP.value, + "param": HttpDownloadParameter.from_dict(data["param"]), + } + + return cls( + file_id=data.get("file_id", default_values["file_id"]), + lease_id=data.get("lease_id", default_values["lease_id"]), + file_name=data.get("file_name", default_values["file_name"]), + type=FileDownloadType.from_value(data.get("type", default_values["type"])), + param=default_values["param"], + ) + + def download(self, escape: bool = False): + if self.type == FileDownloadType.HTTP: + if self.param.method == "GET": + json_bytes = requests.get( + url=self.param.url, headers=self.param.headers + ).content + json_str = json_bytes.decode("utf-8") + if escape: + return json.dumps(json_str, ensure_ascii=False) + else: + return json_str + else: + raise ValueError(f"Invalid download method: {self.param.method}") + else: + raise ValueError(f"Invalid download type: {self.type}") + + +class DatahubDataStatusEnum(Enum): + INIT = "INIT" + PARSING = "PARSING" + PARSE_SUCCESS = "PARSE_SUCCESS" + PARSE_FAILED = "PARSE_FAILED" + + @classmethod + def from_value(cls, value): + for member in cls: + if member.value == value: + return member + raise ValueError(f"No enum member found for value '{value}'") diff --git a/llama-index-integrations/readers/llama-index-readers-dashscope/llama_index/readers/dashscope/utils.py b/llama-index-integrations/readers/llama-index-readers-dashscope/llama_index/readers/dashscope/utils.py new file mode 100644 index 0000000000000..e27734878acb9 --- /dev/null +++ b/llama-index-integrations/readers/llama-index-readers-dashscope/llama_index/readers/dashscope/utils.py @@ -0,0 +1,151 @@ +import hashlib +import logging + +from enum import Enum +from httpx._models import Response +from typing import Dict, Any, Type, TypeVar +from llama_index.readers.dashscope.domain.base_domains import DictToObject + +T = TypeVar("T", bound=DictToObject) + +# Asyncio error messages +nest_asyncio_err = "cannot be called from a running event loop" +nest_asyncio_msg = "The event loop is already running. Add `import nest_asyncio; nest_asyncio.apply()` to your code to fix this issue." + + +def get_stream_logger(name="dashscope-parser", level=logging.INFO, format_string=None): + if not format_string: + format_string = "%(asctime)s %(name)s [%(levelname)s] %(thread)d : %(message)s" + logger = logging.getLogger(name) + logger.setLevel(level) + formatter = logging.Formatter(format_string) + fh = logging.StreamHandler() + fh.setLevel(level) + fh.setFormatter(formatter) + logger.addHandler(fh) + return logger + + +def get_file_md5(file_path): + with open(file_path, "rb") as f: + md5 = hashlib.md5() + while chunk := f.read(8192): + md5.update(chunk) + return md5.hexdigest() + + +def generate_request_id(): + """Generate a random request id.""" + import uuid + + return str(uuid.uuid4()) + + +def __is_response_successful(response_data: Dict[str, Any]) -> bool: + """Check if the response data indicates a successful operation.""" + return ("code" in response_data) and ( + response_data["code"] == "Success" or response_data["code"] == "success" + ) + + +def __raise_exception(response: Response, process: str) -> None: + """Log the error and raise a specific exception based on the response.""" + error_message = f"Failed to {process}: {response.text}" + raise ValueError(error_message) + + +class RetryException(Exception): + """ + Custom exception class to indicate a situation where an operation needs to be retried. + + This exception should be raised when an operation fails due to anticipated recoverable reasons, + suggesting to the caller that a retry logic might be appropriate. + """ + + def __init__( + self, message="Operation failed, requiring a retry", cause=None + ) -> None: + """ + Initialize a RetryException instance. + + :param message: Detailed information about the exception, a string by default set as "Operation failed, requiring a retry" + :param cause: The original exception object that caused this exception, optional + """ + super().__init__(message) + self.cause = cause + + def __str__(self) -> str: + """ + Return a string representation of the exception, including the original exception information if present. + + :return: String representation of the exception details + """ + if self.cause: + return f"{super().__str__()} caused by: {self.cause}" + else: + return super().__str__() + + +def __raise_exception_for_retry(response: Response, process: str) -> None: + """Log the error and raise a specific exception based on the response.""" + error_message = f"Failed to {process}: {response.text}" + raise RetryException(cause=error_message) + + +logger = get_stream_logger(name="DashScopeResponseHandler") + + +def dashscope_response_handler( + response: Response, process: str, result_class: Type[T], url: str = "" +) -> T: + """Handle the response from the DashScope API.""" + if response is None: + raise ValueError( + f"DashScopeParse {process} [URL:{url}] http response object is none." + ) + + if not isinstance(process, str) or not process: + raise ValueError( + "DashScopeParse func [dashscope_response_handler] process parameter is empty." + ) + + if response.status_code != 200: + logger.error( + f"DashScopeParse {process} [URL:{url}] response http status code is not 200: [{response.status_code}:{response.text}]" + ) + if response.status_code == 429: + __raise_exception_for_retry(response, process) + __raise_exception(response, process) + try: + response_data = response.json() + except Exception as e: + logger.error( + f"DashScopeParse {process} [URL:{url}] response data is not json: {response.text}." + ) + __raise_exception(response, process) + + if not __is_response_successful(response_data): + logger.error( + f"DashScopeParse {process} [URL:{url}] response fail: {response.text}." + ) + __raise_exception(response, process) + + if "data" not in response_data: + logger.error( + f"DashScopeParse {process} [URL:{url}] response data does not contain 'data' key: {response_data}." + ) + __raise_exception(response, process) + if "request_id" in response_data and process != "query": + logger.info( + f"DashScopeParse {process} [URL:{url}] request_id: {response_data['request_id']}." + ) + return result_class.from_dict(response_data["data"]) + + +class ResultType(Enum): + """The result type for the parser.""" + + DASHSCOPE_DOCMIND = "DASHSCOPE_DOCMIND" + + +SUPPORTED_FILE_TYPES = [".pdf", ".doc", ".docx"] diff --git a/llama-index-integrations/readers/llama-index-readers-dashscope/pyproject.toml b/llama-index-integrations/readers/llama-index-readers-dashscope/pyproject.toml new file mode 100644 index 0000000000000..b875b64a79c72 --- /dev/null +++ b/llama-index-integrations/readers/llama-index-readers-dashscope/pyproject.toml @@ -0,0 +1,60 @@ +[build-system] +build-backend = "poetry.core.masonry.api" +requires = ["poetry-core"] + +[tool.codespell] +check-filenames = true +check-hidden = true +# Feel free to un-skip examples, and experimental, you will just need to +# work through many typos (--write-changes and --interactive will help) +skip = "*.csv,*.html,*.json,*.jsonl,*.pdf,*.txt,*.ipynb" + +[tool.llamahub] +contains_example = false +import_path = "llama_index.readers.dashscope.base" + +[tool.llamahub.class_authors] +DashScopeParse = "phantomgrapes" + +[tool.mypy] +disallow_untyped_defs = true +# Remove venv skip when integrated with pre-commit +exclude = ["_static", "build", "examples", "notebooks", "venv"] +ignore_missing_imports = true +python_version = "3.8" + +[tool.poetry] +authors = ["Your Name "] +description = "llama-index readers dashscope integration" +license = "MIT" +name = "llama-index-readers-dashscope" +packages = [{include = "llama_index/"}] +readme = "README.md" +version = "0.1.1" + +[tool.poetry.dependencies] +python = ">=3.8.1,<4.0" +llama-index-core = "^0.10.0" +oss2 = "^2.18.5" +retrying = "^1.3.4" + +[tool.poetry.group.dev.dependencies] +black = {extras = ["jupyter"], version = "<=23.9.1,>=23.7.0"} +codespell = {extras = ["toml"], version = ">=v2.2.6"} +flask = "3.0.3" +ipython = "8.10.0" +jupyter = "^1.0.0" +mypy = "0.991" +pre-commit = "3.2.0" +pylint = "2.15.10" +pytest = "7.2.1" +pytest-mock = "3.11.1" +reportlab = "4.2.0" +ruff = "0.0.292" +tree-sitter-languages = "^1.8.0" +types-Deprecated = ">=0.1.0" +types-PyYAML = "^6.0.12.12" +types-protobuf = "^4.24.0.4" +types-redis = "4.5.5.0" +types-requests = "2.28.11.8" # TODO: unpin when mypy>0.991 +types-setuptools = "67.1.0.0" diff --git a/llama-index-integrations/readers/llama-index-readers-dashscope/tests/BUILD b/llama-index-integrations/readers/llama-index-readers-dashscope/tests/BUILD new file mode 100644 index 0000000000000..dabf212d7e716 --- /dev/null +++ b/llama-index-integrations/readers/llama-index-readers-dashscope/tests/BUILD @@ -0,0 +1 @@ +python_tests() diff --git a/llama-index-integrations/readers/llama-index-readers-dashscope/tests/__init__.py b/llama-index-integrations/readers/llama-index-readers-dashscope/tests/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/llama-index-integrations/readers/llama-index-readers-dashscope/tests/test_readers_dashscope.py b/llama-index-integrations/readers/llama-index-readers-dashscope/tests/test_readers_dashscope.py new file mode 100644 index 0000000000000..2f7f1686b1527 --- /dev/null +++ b/llama-index-integrations/readers/llama-index-readers-dashscope/tests/test_readers_dashscope.py @@ -0,0 +1,7 @@ +from llama_index.readers.dashscope import DashScopeParse +from llama_index.core.readers.base import BasePydanticReader + + +def test_class(): + names_of_base_classes = [b.__name__ for b in DashScopeParse.__mro__] + assert BasePydanticReader.__name__ in names_of_base_classes