From c12398e35ec631faad5c9e72e8b542da72992232 Mon Sep 17 00:00:00 2001 From: Vignesh Aigal Date: Fri, 18 Aug 2023 15:35:43 -0700 Subject: [PATCH 1/2] Support data source sync --- apps/tasks.py | 28 ++++++++++++- client/src/pages/data.jsx | 25 +++++++++-- datasources/apis.py | 13 +++++- .../handlers/datasource_type_interface.py | 22 +++++++++- datasources/handlers/website/url.py | 41 +++++++++++-------- datasources/serializers.py | 25 ++++++++++- datasources/urls.py | 4 ++ 7 files changed, 133 insertions(+), 25 deletions(-) diff --git a/apps/tasks.py b/apps/tasks.py index d571ad12da2..dd15d7e2693 100644 --- a/apps/tasks.py +++ b/apps/tasks.py @@ -30,7 +30,9 @@ def add_data_entry_task(datasource: DataSource, datasource_entry_items: List[Dat ) try: result = datasource_entry_handler.add_entry(datasource_entry_item) - datasource_entry_object.config = result.config + config_entry = result.config + config_entry["input"] = datasource_entry_item.dict() + datasource_entry_object.config = config_entry datasource_entry_object.size = result.size datasource_entry_object.status = DataSourceEntryStatus.READY datasource_entries_size += result.size @@ -85,6 +87,30 @@ def delete_data_entry_task(datasource: DataSource, entry_data: DataSourceEntry): datasource.save() return datasource_entry_items +def resync_data_entry_task(datasource: DataSource, entry_data: DataSourceEntry): + logger.info(f'Resyncing task for data_source_entry: %s' % str(entry_data)) + + datasource_entry_handler_cls = DataSourceTypeFactory.get_datasource_type_handler( + datasource.type, + ) + datasource_entry_handler: DataSourceProcessor = datasource_entry_handler_cls( + datasource, + ) + entry_data.status = DataSourceEntryStatus.PROCESSING + entry_data.save() + old_size = entry_data.size + + result = datasource_entry_handler.resync_entry(entry_data.config) + entry_data.size = result.size + config_entry = result.config + config_entry["input"] = entry_data.config["input"] + entry_data.config = config_entry + entry_data.status = DataSourceEntryStatus.READY + entry_data.save() + + datasource.size = datasource.size - old_size + result.size + datasource.save() + def delete_data_source_task(datasource): datasource_type = datasource.type diff --git a/client/src/pages/data.jsx b/client/src/pages/data.jsx index f7a4e34c8d6..2b4c71b3543 100644 --- a/client/src/pages/data.jsx +++ b/client/src/pages/data.jsx @@ -15,6 +15,7 @@ import { TextareaAutosize } from "@mui/base"; import DeleteOutlineOutlinedIcon from "@mui/icons-material/DeleteOutlineOutlined"; import AddOutlinedIcon from "@mui/icons-material/AddOutlined"; +import SyncOutlinedIcon from "@mui/icons-material/SyncOutlined"; import PeopleOutlineOutlinedIcon from "@mui/icons-material/PeopleOutlineOutlined"; import PersonOutlineOutlinedIcon from "@mui/icons-material/PersonOutlineOutlined"; @@ -313,6 +314,8 @@ export default function DataPage() { title: "Action", key: "operation", render: (record) => { + const isAdhocSyncSupported = record?.sync_config; + return ( - + + {isAdhocSyncSupported && ( + { + axios() + .post(`/api/datasource_entries/${record.uuid}/resync`) + .then((response) => { + reloadDataSourceEntries(); + reloadDataSourceEntries(); + }); + }} + > + + + )} ); }, @@ -348,8 +365,10 @@ export default function DataPage() { onRow={(record, rowIndex) => { return { onClick: (event) => { - setDataSourceEntryData(record); - setDataSourceEntryDrawerOpen(true); + if (event.target.tagName === "TD") { + setDataSourceEntryData(record); + setDataSourceEntryDrawerOpen(true); + } }, }; }} diff --git a/datasources/apis.py b/datasources/apis.py index effea17a73e..027d21e6e96 100644 --- a/datasources/apis.py +++ b/datasources/apis.py @@ -13,7 +13,7 @@ from .serializers import DataSourceEntrySerializer from .serializers import DataSourceSerializer from .serializers import DataSourceTypeSerializer -from apps.tasks import add_data_entry_task +from apps.tasks import add_data_entry_task, resync_data_entry_task from apps.tasks import delete_data_entry_task from apps.tasks import delete_data_source_task from common.utils.utils import extract_urls_from_sitemap @@ -87,7 +87,18 @@ def text_content(self, request, uid): ) return DRFResponse({'content': content, 'metadata': metadata}) + def resync(self, request, uid): + datasource_entry_object = get_object_or_404( + DataSourceEntry, uuid=uuid.UUID(uid), + ) + if datasource_entry_object.datasource.owner != request.user: + return DRFResponse(status=404) + resync_data_entry_task( + datasource_entry_object.datasource, datasource_entry_object, + ) + + return DRFResponse(status=202) class DataSourceViewSet(viewsets.ModelViewSet): queryset = DataSource.objects.all() serializer_class = DataSourceSerializer diff --git a/datasources/handlers/datasource_type_interface.py b/datasources/handlers/datasource_type_interface.py index b681e4a9aa2..a4df8f7d349 100644 --- a/datasources/handlers/datasource_type_interface.py +++ b/datasources/handlers/datasource_type_interface.py @@ -1,3 +1,4 @@ +from enum import Enum import json import logging from string import Template @@ -56,13 +57,23 @@ class DataSourceEntryItem(BaseModel): metadata: dict = {} data: Optional[dict] = None - +class DataSourceSyncType(str, Enum): + FULL = 'full' + INCREMENTAL = 'incremental' + +class DataSourceSyncConfiguration(_Schema): + sync_type: str = 'full' + class DataSourceProcessor(ProcessorInterface[BaseInputType, None, None]): @classmethod def get_content_key(cls) -> str: datasource_type_interface = cls.__orig_bases__[0] return datasource_type_interface.__args__[0].get_content_key() + + @classmethod + def get_sync_configuration(cls) -> Optional[dict]: + return None @classmethod def get_weaviate_schema(cls, class_name: str) -> dict: @@ -158,6 +169,7 @@ def _get_document_embeddings(self, text: str) -> OpenAIEmbeddingOutput: return None def add_entry(self, data: dict) -> Optional[DataSourceEntryItem]: + logger.info(f'Adding data_source_entry: {data}') documents = self.get_data_documents(data) documents = map( @@ -206,7 +218,13 @@ def delete_entry(self, data: dict) -> None: self.vectorstore._client.data_object.delete( document_id, self.datasource_class_name, ) - + + def resync_entry(self, data: dict) -> Optional[DataSourceEntryItem]: + # Delete old data + self.delete_entry(data) + # Add new data + return self.add_entry(DataSourceEntryItem(**data["input"])) + def delete_all_entries(self) -> None: self.vectorstore._client.schema.delete_class( self.datasource_class_name, diff --git a/datasources/handlers/website/url.py b/datasources/handlers/website/url.py index b57f3014775..a2c495c3e54 100644 --- a/datasources/handlers/website/url.py +++ b/datasources/handlers/website/url.py @@ -1,5 +1,5 @@ import logging -from typing import List +from typing import Any, List from typing import Optional from pydantic import Field @@ -9,7 +9,7 @@ from common.utils.text_extract import ExtraParams from common.utils.splitter import SpacyTextSplitter from common.utils.utils import extract_urls_from_sitemap -from datasources.handlers.datasource_type_interface import DataSourceEntryItem +from datasources.handlers.datasource_type_interface import DataSourceEntryItem, DataSourceSyncConfiguration, DataSourceSyncType from datasources.handlers.datasource_type_interface import DataSourceSchema from datasources.handlers.datasource_type_interface import DataSourceProcessor from datasources.handlers.datasource_type_interface import WEAVIATE_SCHEMA @@ -54,6 +54,26 @@ def name() -> str: @staticmethod def slug() -> str: return 'url' + + @classmethod + def get_sync_configuration(cls) -> Optional[dict]: + return DataSourceSyncConfiguration(sync_type=DataSourceSyncType.FULL).dict() + + def get_url_data(self, url: str) -> Optional[DataSourceEntryItem]: + if not url.startswith('https://') and not url.startswith('http://'): + url = f'https://{url}' + + text = extract_text_from_url( + url, extra_params=ExtraParams(openai_key=self.openai_key), + ) + docs = [ + Document( + page_content_key=self.get_content_key(), page_content=t, metadata={ + 'source': url, + }, + ) for t in SpacyTextSplitter(chunk_size=1500, length_func=len).split_text(text) + ] + return docs def validate_and_process(self, data: dict) -> List[DataSourceEntryItem]: entry = URLSchema(**data) @@ -83,22 +103,11 @@ def validate_and_process(self, data: dict) -> List[DataSourceEntryItem]: return list(map(lambda entry: DataSourceEntryItem(name=entry, data={'url': entry}), urls + sitemap_urls)) + + def get_data_documents(self, data: DataSourceEntryItem) -> Optional[DataSourceEntryItem]: url = data.data['url'] - if not url.startswith('https://') and not url.startswith('http://'): - url = f'https://{url}' - - text = extract_text_from_url( - url, extra_params=ExtraParams(openai_key=self.openai_key), - ) - docs = [ - Document( - page_content_key=self.get_content_key(), page_content=t, metadata={ - 'source': url, - }, - ) for t in SpacyTextSplitter(chunk_size=1500, length_func=len).split_text(text) - ] - return docs + return self.get_url_data(url) def similarity_search(self, query: str, *args, **kwargs) -> List[dict]: return super().similarity_search(query, *args, **kwargs) diff --git a/datasources/serializers.py b/datasources/serializers.py index 741ab1a2692..d59f9787938 100644 --- a/datasources/serializers.py +++ b/datasources/serializers.py @@ -11,6 +11,7 @@ class DataSourceTypeSerializer(serializers.ModelSerializer): entry_config_schema = serializers.SerializerMethodField() entry_config_ui_schema = serializers.SerializerMethodField() + sync_config = serializers.SerializerMethodField() def get_entry_config_schema(self, obj): datasource_type_handler_cls = DataSourceTypeFactory.get_datasource_type_handler( @@ -27,12 +28,20 @@ def get_entry_config_ui_schema(self, obj): if datasource_type_handler_cls is None: return {} return datasource_type_handler_cls.get_input_ui_schema() + + def get_sync_config(self, obj): + datasource_type_handler_cls = DataSourceTypeFactory.get_datasource_type_handler( + obj, + ) + if datasource_type_handler_cls is None: + return None + return datasource_type_handler_cls.get_sync_configuration() class Meta: model = DataSourceType fields = [ 'id', 'name', 'description', - 'entry_config_schema', 'entry_config_ui_schema', + 'entry_config_schema', 'entry_config_ui_schema', 'sync_config' ] @@ -53,10 +62,22 @@ class Meta: class DataSourceEntrySerializer(serializers.ModelSerializer): datasource = DataSourceSerializer() + sync_config = serializers.SerializerMethodField() + + def get_sync_config(self, obj): + datasource_type_handler_cls = DataSourceTypeFactory.get_datasource_type_handler( + obj.datasource.type, + ) + if datasource_type_handler_cls is None: + return None + if "input" not in obj.config: + return None + + return datasource_type_handler_cls.get_sync_configuration() class Meta: model = DataSourceEntry fields = [ 'uuid', 'datasource', 'config', - 'name', 'size', 'status', 'created_at', 'updated_at', + 'name', 'size', 'status', 'created_at', 'updated_at', 'sync_config' ] diff --git a/datasources/urls.py b/datasources/urls.py index 748e07a9fc9..eecf42fc853 100644 --- a/datasources/urls.py +++ b/datasources/urls.py @@ -48,4 +48,8 @@ 'api/datasource_entries//text_content', apis.DataSourceEntryViewSet.as_view({'get': 'text_content'}), ), + path( + 'api/datasource_entries//resync', + apis.DataSourceEntryViewSet.as_view({'post': 'resync'}), + ), ] From 22161a8b1d8860c99e795f8cd8775d50da27c921 Mon Sep 17 00:00:00 2001 From: Vignesh Aigal Date: Wed, 23 Aug 2023 20:11:00 -0700 Subject: [PATCH 2/2] Address review comments --- apps/tasks.py | 6 +++--- datasources/handlers/datasource_type_interface.py | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/apps/tasks.py b/apps/tasks.py index dd15d7e2693..abb53a0b148 100644 --- a/apps/tasks.py +++ b/apps/tasks.py @@ -30,9 +30,9 @@ def add_data_entry_task(datasource: DataSource, datasource_entry_items: List[Dat ) try: result = datasource_entry_handler.add_entry(datasource_entry_item) - config_entry = result.config - config_entry["input"] = datasource_entry_item.dict() - datasource_entry_object.config = config_entry + datasource_entry_config = result.config + datasource_entry_config["input"] = datasource_entry_item.dict() + datasource_entry_object.config = datasource_entry_config datasource_entry_object.size = result.size datasource_entry_object.status = DataSourceEntryStatus.READY datasource_entries_size += result.size diff --git a/datasources/handlers/datasource_type_interface.py b/datasources/handlers/datasource_type_interface.py index a4df8f7d349..849513a9cb6 100644 --- a/datasources/handlers/datasource_type_interface.py +++ b/datasources/handlers/datasource_type_interface.py @@ -62,7 +62,7 @@ class DataSourceSyncType(str, Enum): INCREMENTAL = 'incremental' class DataSourceSyncConfiguration(_Schema): - sync_type: str = 'full' + sync_type: DataSourceSyncType = 'full' class DataSourceProcessor(ProcessorInterface[BaseInputType, None, None]):