From cc306178efaa9d8e2ffd6462844c7e8d57b6fbfc Mon Sep 17 00:00:00 2001 From: jyong Date: Mon, 1 Apr 2024 18:35:46 +0800 Subject: [PATCH 1/5] add keyword table s3 storage support --- .../rag/datasource/keyword/jieba/jieba.py | 30 ++++++++++++---- api/extensions/ext_storage.py | 14 ++++++++ ...ab037c40_add_keyworg_table_storage_type.py | 34 +++++++++++++++++++ api/models/dataset.py | 23 +++++++++++-- 4 files changed, 93 insertions(+), 8 deletions(-) create mode 100644 api/migrations/versions/17b5ab037c40_add_keyworg_table_storage_type.py diff --git a/api/core/rag/datasource/keyword/jieba/jieba.py b/api/core/rag/datasource/keyword/jieba/jieba.py index 344ef7babe3c2..e66254faeab38 100644 --- a/api/core/rag/datasource/keyword/jieba/jieba.py +++ b/api/core/rag/datasource/keyword/jieba/jieba.py @@ -2,6 +2,7 @@ from collections import defaultdict from typing import Any, Optional +from flask import current_app from pydantic import BaseModel from core.rag.datasource.keyword.jieba.jieba_keyword_table_handler import JiebaKeywordTableHandler @@ -9,6 +10,7 @@ from core.rag.models.document import Document from extensions.ext_database import db from extensions.ext_redis import redis_client +from extensions.ext_storage import storage from models.dataset import Dataset, DatasetKeywordTable, DocumentSegment @@ -108,6 +110,9 @@ def delete(self) -> None: if dataset_keyword_table: db.session.delete(dataset_keyword_table) db.session.commit() + if dataset_keyword_table.storage_type != 'local': + file_key = 'keyword_files/' + self.dataset.tenant_id + '/' + self.dataset.id + '.txt' + storage.delete(file_key) def _save_dataset_keyword_table(self, keyword_table): keyword_table_dict = { @@ -118,20 +123,34 @@ def _save_dataset_keyword_table(self, keyword_table): "table": keyword_table } } - self.dataset.dataset_keyword_table.keyword_table = json.dumps(keyword_table_dict, cls=SetEncoder) - db.session.commit() + + storage_type = current_app.config['STORAGE_TYPE'] + if storage_type == 'local': + self.dataset.dataset_keyword_table.keyword_table = json.dumps(keyword_table_dict, cls=SetEncoder) + db.session.commit() + else: + file_key = 'keyword_files/' + self.dataset.tenant_id + '/' + self.dataset.id + '.txt' + if storage.exists(file_key): + storage.delete(file_key) + storage.save(file_key, json.dumps(keyword_table_dict, cls=SetEncoder).encode('utf-8')) def _get_dataset_keyword_table(self) -> Optional[dict]: lock_name = 'keyword_indexing_lock_{}'.format(self.dataset.id) with redis_client.lock(lock_name, timeout=20): dataset_keyword_table = self.dataset.dataset_keyword_table if dataset_keyword_table: - if dataset_keyword_table.keyword_table_dict: - return dataset_keyword_table.keyword_table_dict['__data__']['table'] + keyword_table_dict = dataset_keyword_table.keyword_table_dict + if keyword_table_dict: + return keyword_table_dict['__data__']['table'] else: + storage_type = current_app.config['STORAGE_TYPE'] dataset_keyword_table = DatasetKeywordTable( dataset_id=self.dataset.id, - keyword_table=json.dumps({ + keyword_table='', + storage_type=storage_type, + ) + if storage_type == 'local': + dataset_keyword_table.keyword_table = json.dumps({ '__type__': 'keyword_table', '__data__': { "index_id": self.dataset.id, @@ -139,7 +158,6 @@ def _get_dataset_keyword_table(self) -> Optional[dict]: "table": {} } }, cls=SetEncoder) - ) db.session.add(dataset_keyword_table) db.session.commit() diff --git a/api/extensions/ext_storage.py b/api/extensions/ext_storage.py index 3a8e314d9273f..943cf4f58d467 100644 --- a/api/extensions/ext_storage.py +++ b/api/extensions/ext_storage.py @@ -172,6 +172,20 @@ def exists(self, filename): return os.path.exists(filename) + def delete(self, filename): + if self.storage_type == 's3': + self.client.delete_object(Bucket=self.bucket_name, Key=filename) + elif self.storage_type == 'azure-blob': + blob_container = self.client.get_container_client(container=self.bucket_name) + blob_container.delete_blob(filename) + else: + if not self.folder or self.folder.endswith('/'): + filename = self.folder + filename + else: + filename = self.folder + '/' + filename + if os.path.exists(filename): + os.remove(filename) + storage = Storage() diff --git a/api/migrations/versions/17b5ab037c40_add_keyworg_table_storage_type.py b/api/migrations/versions/17b5ab037c40_add_keyworg_table_storage_type.py new file mode 100644 index 0000000000000..a20e60e06877d --- /dev/null +++ b/api/migrations/versions/17b5ab037c40_add_keyworg_table_storage_type.py @@ -0,0 +1,34 @@ +"""add-keyworg-table-storage-type + +Revision ID: 17b5ab037c40 +Revises: a8f9b3c45e4a +Create Date: 2024-04-01 09:48:54.232201 + +""" +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision = '17b5ab037c40' +down_revision = 'a8f9b3c45e4a' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + + with op.batch_alter_table('dataset_keyword_tables', schema=None) as batch_op: + batch_op.add_column(sa.Column('storage_type', sa.String(length=255), server_default=sa.text("'local'::character varying"), nullable=False)) + + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + + with op.batch_alter_table('dataset_keyword_tables', schema=None) as batch_op: + batch_op.drop_column('storage_type') + + # ### end Alembic commands ### diff --git a/api/models/dataset.py b/api/models/dataset.py index 031bbe4dc7641..e549adfdfedcb 100644 --- a/api/models/dataset.py +++ b/api/models/dataset.py @@ -1,4 +1,5 @@ import json +import logging import pickle from json import JSONDecodeError @@ -6,6 +7,7 @@ from sqlalchemy.dialects.postgresql import JSONB, UUID from extensions.ext_database import db +from extensions.ext_storage import storage from models.account import Account from models.model import App, UploadFile @@ -441,6 +443,7 @@ class DatasetKeywordTable(db.Model): id = db.Column(UUID, primary_key=True, server_default=db.text('uuid_generate_v4()')) dataset_id = db.Column(UUID, nullable=False, unique=True) keyword_table = db.Column(db.Text, nullable=False) + storage_type = db.Column(db.String(255), nullable=False, server_default=db.text("'local'::character varying")) @property def keyword_table_dict(self): @@ -454,8 +457,24 @@ def object_hook(self, dct): if isinstance(node_idxs, list): dct[keyword] = set(node_idxs) return dct - - return json.loads(self.keyword_table, cls=SetDecoder) if self.keyword_table else None + # get dataset + dataset = Dataset.query.filter_by( + id=self.dataset_id + ).first() + if not dataset: + return None + if self.storage_type == 'local': + return json.loads(self.keyword_table, cls=SetDecoder) if self.keyword_table else None + else: + file_key = 'keyword_files/' + dataset.tenant_id + '/' + self.dataset_id + '.txt' + try: + keyword_table_text = storage.load_once(file_key) + if keyword_table_text: + return json.loads(keyword_table_text.decode('utf-8'), cls=SetDecoder) + return None + except Exception as e: + logging.exception(str(e)) + return None class Embedding(db.Model): From 6eee53efbd013e614e4bb5b06e2e9b359cf952f1 Mon Sep 17 00:00:00 2001 From: jyong Date: Mon, 1 Apr 2024 18:43:39 +0800 Subject: [PATCH 2/5] add keyword table s3 storage support --- .../versions/17b5ab037c40_add_keyworg_table_storage_type.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/api/migrations/versions/17b5ab037c40_add_keyworg_table_storage_type.py b/api/migrations/versions/17b5ab037c40_add_keyworg_table_storage_type.py index a20e60e06877d..154b4c83acfce 100644 --- a/api/migrations/versions/17b5ab037c40_add_keyworg_table_storage_type.py +++ b/api/migrations/versions/17b5ab037c40_add_keyworg_table_storage_type.py @@ -5,9 +5,8 @@ Create Date: 2024-04-01 09:48:54.232201 """ -from alembic import op import sqlalchemy as sa -from sqlalchemy.dialects import postgresql +from alembic import op # revision identifiers, used by Alembic. revision = '17b5ab037c40' From e74c51e1cf03216cb91551d30291ec7d4b15d4df Mon Sep 17 00:00:00 2001 From: jyong Date: Mon, 1 Apr 2024 19:47:42 +0800 Subject: [PATCH 3/5] add keyword table s3 storage support --- api/.env.example | 1 + api/config.py | 2 ++ api/core/rag/datasource/keyword/jieba/jieba.py | 10 +++++----- .../17b5ab037c40_add_keyworg_table_storage_type.py | 4 ++-- api/models/dataset.py | 4 ++-- 5 files changed, 12 insertions(+), 9 deletions(-) diff --git a/api/.env.example b/api/.env.example index eb4b026fa7a58..c41888b5276f2 100644 --- a/api/.env.example +++ b/api/.env.example @@ -137,3 +137,4 @@ SSRF_PROXY_HTTP_URL= SSRF_PROXY_HTTPS_URL= BATCH_UPLOAD_LIMIT=10 +KEYWORD_DATA_SOURCE_TYPE=database \ No newline at end of file diff --git a/api/config.py b/api/config.py index 211876a275ca2..7226412782f0b 100644 --- a/api/config.py +++ b/api/config.py @@ -62,6 +62,7 @@ 'KEYWORD_STORE': 'jieba', 'BATCH_UPLOAD_LIMIT': 20, 'TOOL_ICON_CACHE_MAX_AGE': 3600, + 'KEYWORD_DATA_SOURCE_TYPE': 'database', } @@ -303,6 +304,7 @@ def __init__(self): self.API_COMPRESSION_ENABLED = get_bool_env('API_COMPRESSION_ENABLED') self.TOOL_ICON_CACHE_MAX_AGE = get_env('TOOL_ICON_CACHE_MAX_AGE') + self.KEYWORD_DATA_SOURCE_TYPE = get_env('KEYWORD_DATA_SOURCE_TYPE') class CloudEditionConfig(Config): diff --git a/api/core/rag/datasource/keyword/jieba/jieba.py b/api/core/rag/datasource/keyword/jieba/jieba.py index e66254faeab38..72aafe0444d1e 100644 --- a/api/core/rag/datasource/keyword/jieba/jieba.py +++ b/api/core/rag/datasource/keyword/jieba/jieba.py @@ -124,8 +124,8 @@ def _save_dataset_keyword_table(self, keyword_table): } } - storage_type = current_app.config['STORAGE_TYPE'] - if storage_type == 'local': + keyword_data_source_type = current_app.config['KEYWORD_DATA_SOURCE_TYPE'] + if keyword_data_source_type == 'database': self.dataset.dataset_keyword_table.keyword_table = json.dumps(keyword_table_dict, cls=SetEncoder) db.session.commit() else: @@ -143,13 +143,13 @@ def _get_dataset_keyword_table(self) -> Optional[dict]: if keyword_table_dict: return keyword_table_dict['__data__']['table'] else: - storage_type = current_app.config['STORAGE_TYPE'] + keyword_data_source_type = current_app.config['KEYWORD_DATA_SOURCE_TYPE'] dataset_keyword_table = DatasetKeywordTable( dataset_id=self.dataset.id, keyword_table='', - storage_type=storage_type, + data_source_type=keyword_data_source_type, ) - if storage_type == 'local': + if keyword_data_source_type == 'database': dataset_keyword_table.keyword_table = json.dumps({ '__type__': 'keyword_table', '__data__': { diff --git a/api/migrations/versions/17b5ab037c40_add_keyworg_table_storage_type.py b/api/migrations/versions/17b5ab037c40_add_keyworg_table_storage_type.py index 154b4c83acfce..77071484892cb 100644 --- a/api/migrations/versions/17b5ab037c40_add_keyworg_table_storage_type.py +++ b/api/migrations/versions/17b5ab037c40_add_keyworg_table_storage_type.py @@ -19,7 +19,7 @@ def upgrade(): # ### commands auto generated by Alembic - please adjust! ### with op.batch_alter_table('dataset_keyword_tables', schema=None) as batch_op: - batch_op.add_column(sa.Column('storage_type', sa.String(length=255), server_default=sa.text("'local'::character varying"), nullable=False)) + batch_op.add_column(sa.Column('data_source_type', sa.String(length=255), server_default=sa.text("'database'::character varying"), nullable=False)) # ### end Alembic commands ### @@ -28,6 +28,6 @@ def downgrade(): # ### commands auto generated by Alembic - please adjust! ### with op.batch_alter_table('dataset_keyword_tables', schema=None) as batch_op: - batch_op.drop_column('storage_type') + batch_op.drop_column('data_source_type') # ### end Alembic commands ### diff --git a/api/models/dataset.py b/api/models/dataset.py index e549adfdfedcb..f90fc9abb7a55 100644 --- a/api/models/dataset.py +++ b/api/models/dataset.py @@ -443,7 +443,7 @@ class DatasetKeywordTable(db.Model): id = db.Column(UUID, primary_key=True, server_default=db.text('uuid_generate_v4()')) dataset_id = db.Column(UUID, nullable=False, unique=True) keyword_table = db.Column(db.Text, nullable=False) - storage_type = db.Column(db.String(255), nullable=False, server_default=db.text("'local'::character varying")) + data_source_type = db.Column(db.String(255), nullable=False, server_default=db.text("'database'::character varying")) @property def keyword_table_dict(self): @@ -463,7 +463,7 @@ def object_hook(self, dct): ).first() if not dataset: return None - if self.storage_type == 'local': + if self.data_source_type == 'database': return json.loads(self.keyword_table, cls=SetDecoder) if self.keyword_table else None else: file_key = 'keyword_files/' + dataset.tenant_id + '/' + self.dataset_id + '.txt' From 23118164f61f325732165a9b2b1f0447b412561b Mon Sep 17 00:00:00 2001 From: jyong Date: Mon, 1 Apr 2024 19:56:31 +0800 Subject: [PATCH 4/5] add keyword table s3 storage support --- api/core/rag/datasource/keyword/jieba/jieba.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/core/rag/datasource/keyword/jieba/jieba.py b/api/core/rag/datasource/keyword/jieba/jieba.py index 72aafe0444d1e..bcee0504b4ad2 100644 --- a/api/core/rag/datasource/keyword/jieba/jieba.py +++ b/api/core/rag/datasource/keyword/jieba/jieba.py @@ -110,7 +110,7 @@ def delete(self) -> None: if dataset_keyword_table: db.session.delete(dataset_keyword_table) db.session.commit() - if dataset_keyword_table.storage_type != 'local': + if dataset_keyword_table.data_source_type != 'database': file_key = 'keyword_files/' + self.dataset.tenant_id + '/' + self.dataset.id + '.txt' storage.delete(file_key) From ec489c8e6af18bda72d84f7456a2d73259c6824a Mon Sep 17 00:00:00 2001 From: jyong Date: Mon, 1 Apr 2024 20:10:49 +0800 Subject: [PATCH 5/5] add keyword table s3 storage support --- api/core/rag/datasource/keyword/jieba/jieba.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/api/core/rag/datasource/keyword/jieba/jieba.py b/api/core/rag/datasource/keyword/jieba/jieba.py index bcee0504b4ad2..46478ec1316eb 100644 --- a/api/core/rag/datasource/keyword/jieba/jieba.py +++ b/api/core/rag/datasource/keyword/jieba/jieba.py @@ -123,10 +123,10 @@ def _save_dataset_keyword_table(self, keyword_table): "table": keyword_table } } - - keyword_data_source_type = current_app.config['KEYWORD_DATA_SOURCE_TYPE'] + dataset_keyword_table = self.dataset.dataset_keyword_table + keyword_data_source_type = dataset_keyword_table.data_source_type if keyword_data_source_type == 'database': - self.dataset.dataset_keyword_table.keyword_table = json.dumps(keyword_table_dict, cls=SetEncoder) + dataset_keyword_table.keyword_table = json.dumps(keyword_table_dict, cls=SetEncoder) db.session.commit() else: file_key = 'keyword_files/' + self.dataset.tenant_id + '/' + self.dataset.id + '.txt'