Skip to content

Commit c9495ba

Browse files
obdevshenyunlongJLY2015
authored andcommitted
[CP] fix semantic index post creating failed and the failure of embedding for overly long data.
Co-authored-by: shenyunlong.syl <ylshen0919@gmail.com> Co-authored-by: JLY2015 <1623359870@qq.com>
1 parent 176a6e1 commit c9495ba

10 files changed

+644
-252
lines changed

src/share/ob_debug_sync_point.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -757,6 +757,8 @@ class ObString;
757757
ACT(BEFORE_INC_MAJOR_DDL_MERGE_UPDATE_TABLE_STORE,)\
758758
ACT(AFTER_INC_MAJOR_MERGE_GET_SSTABLE,)\
759759
ACT(AFTER_GET_MTL_TENANT_LOCK,)\
760+
ACT(BEFORE_DO_EMBEDDING_TASK,)\
761+
ACT(BEFORE_COMPLETE_EMBEDDING_TASK,)\
760762
ACT(MAX_DEBUG_SYNC_POINT,)
761763

762764
DECLARE_ENUM(ObDebugSyncPoint, debug_sync_point, OB_DEBUG_SYNC_POINT_DEF);

src/share/vector_index/ob_hybrid_vector_refresh_task.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -508,6 +508,7 @@ int ObHybridVectorRefreshTask::prepare_for_embedding(ObPluginVectorIndexAdaptor
508508
storage::ObTableScanParam *&table_scan_param = task_ctx->table_scan_param_;
509509
schema::ObTableParam *&table_param = task_ctx->table_param_;
510510
storage::ObValueRowIterator &delta_delete_iter = task_ctx->delta_delete_iter_;
511+
ObCollationType col_type = CS_TYPE_INVALID;
511512
int64_t dim = 0;
512513
int64_t loop_cnt = 0;
513514
int64_t http_timeout_us = 0;
@@ -528,6 +529,8 @@ int ObHybridVectorRefreshTask::prepare_for_embedding(ObPluginVectorIndexAdaptor
528529
LOG_WARN("unexpected error", K(ret), KPC(task_ctx));
529530
} else if (OB_FAIL(adaptor.get_dim(dim))) {
530531
LOG_WARN("get dim failed", K(ret));
532+
} else if (OB_FAIL(ObVectorIndexUtil::get_index_column_collation_type(tenant_id_, adaptor.get_embedded_table_id(), col_type))) {
533+
LOG_WARN("failed to get chunc column col_type", K(ret), K(adaptor));
531534
} else {
532535
if (OB_NOT_NULL(tsc_iter) || OB_NOT_NULL(table_scan_param) || OB_NOT_NULL(table_param)) {
533536
if (OB_ISNULL(tsc_iter) || OB_ISNULL(table_scan_param) || OB_ISNULL(table_param)) {
@@ -656,6 +659,7 @@ int ObHybridVectorRefreshTask::prepare_for_embedding(ObPluginVectorIndexAdaptor
656659
const ObAiModelEndpointInfo *endpoint = task_ctx->endpoint_; // endpoint should not be null after init.
657660
task_ctx->embedding_task_ = new(task_buf)ObEmbeddingTask(task_ctx->allocator_);
658661
ObPluginVectorIndexService *service = MTL(ObPluginVectorIndexService *);
662+
659663
if (OB_ISNULL(service)) {
660664
ret = OB_ERR_UNEXPECTED;
661665
LOG_WARN("unexpected null ptr", K(ret), KPC(service));
@@ -664,7 +668,8 @@ int ObHybridVectorRefreshTask::prepare_for_embedding(ObPluginVectorIndexAdaptor
664668
} else if (OB_FAIL(ob_write_string(task_ctx->allocator_, endpoint->get_url(), url, true))) {
665669
LOG_WARN("fail to write string", K(ret));
666670
} else if (OB_FAIL(task_ctx->embedding_task_->init(url, endpoint->get_request_model_name(),
667-
endpoint->get_provider(), access_key, chunk_array, dim, http_timeout_us, http_max_retries))) {
671+
endpoint->get_provider(), access_key, chunk_array, col_type, dim, http_timeout_us,
672+
http_max_retries, ctx_->task_status_.task_id_, ObEmbeddingTasSourceType::ASYNC_INDEX))) {
668673
LOG_WARN("failed to init embedding task", K(ret), KPC(endpoint));
669674
} else {
670675
ObEmbeddingTaskHandler *embedding_handler = nullptr;

src/share/vector_index/ob_vector_embedding_handler.cpp

Lines changed: 385 additions & 208 deletions
Large diffs are not rendered by default.

src/share/vector_index/ob_vector_embedding_handler.h

Lines changed: 74 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,12 @@ class ObEmbeddingTaskPhaseManager {
155155
}
156156
};
157157

158+
enum class ObEmbeddingTasSourceType
159+
{
160+
INDEX_PIPELINE = 0,
161+
ASYNC_INDEX = 1,
162+
};
163+
158164
class ObEmbeddingTaskHandler;
159165

160166
// Constants for field lengths
@@ -169,10 +175,14 @@ class ObEmbeddingTask
169175
const ObString &provider,
170176
const ObString &user_key,
171177
const ObIArray<ObString> &input_chunks,
178+
const ObCollationType col_type,
172179
int64_t dimension,
173180
int64_t http_timeout_us,
174181
int64_t http_max_retries,
175-
storage::ObEmbeddingIOCallbackHandle *cb_handle = nullptr);
182+
int64_t source_task_id,
183+
ObEmbeddingTasSourceType source_task_type,
184+
storage::ObEmbeddingIOCallbackHandle *cb_handle = nullptr,
185+
bool always_retry = false);
176186
template <typename ThreadPoolType>
177187
int do_work(ThreadPoolType *thread_pool);
178188
int64_t get_task_id() const { return task_id_; }
@@ -181,29 +191,56 @@ class ObEmbeddingTask
181191

182192
TO_STRING_KV(K_(is_inited),
183193
K_(task_id),
194+
K_(phase),
184195
K_(model_url),
185196
K_(model_name),
186-
K_(user_key),
187197
K(input_chunks_.count()),
188198
K(output_vectors_.count()),
189199
K_(dimension),
190200
K_(batch_size),
201+
K_(current_batch_idx),
202+
K_(http_total_retry_count),
203+
K_(http_retry_count),
204+
K_(http_max_retry_count),
205+
K_(http_retry_start_time_us),
206+
K_(http_last_retry_time_us),
191207
K_(processed_chunks),
192208
K_(total_chunks),
193-
K_(process_callback_offset));
209+
K_(process_callback_offset),
210+
K_(col_type),
211+
K_(current_input_token_limit),
212+
K_(need_cancel),
213+
K_(source_task_id),
214+
K_(source_task_type),
215+
K_(always_retry),
216+
K_(next_handle_task_time_us),
217+
K_(internal_error_code));
194218
bool is_completed();
195219
void retain_if_managed();
196220
void release_if_managed();
197221
int get_async_result(ObArray<float*> &output_vectors);
198222
// 公共方法用于外部设置任务失败
199223
int mark_task_failed(int error_code);
200224
int maybe_callback();
201-
int wait_for_completion();
225+
// Wait for task completion with periodic check interval
226+
// @param check_interval_us: the interval to wait before returning, must be > 0
227+
// @return:
228+
// - OB_INVALID_ARGUMENT: if check_interval_us <= 0
229+
// - OB_NOT_INIT: if task is not initialized
230+
// - OB_SUCCESS: if task completed (callback_done_ is true)
231+
// - OB_TIMEOUT: if task has exceeded wait_for_completion_timeout_us_ (real timeout)
232+
// - OB_EAGAIN: if wait timed out but task not yet exceeded total timeout,
233+
// caller should check cancel status and retry
234+
// Usage: call in a loop, check for cancel between calls, handle OB_TIMEOUT as task failure
235+
int wait_for_completion(int64_t check_interval_us);
202236
int wake_up();
203237
void disable_callback();
204238
void set_callback_done();
205239
bool need_callback() { return cb_handle_ != nullptr ? true : false; };
206-
240+
void set_need_cancel() { ATOMIC_STORE(&need_cancel_, true); }
241+
bool need_cancel() const { return ATOMIC_LOAD(&need_cancel_); }
242+
common::ObCurTraceId::TraceId get_trace_id() const { return trace_id_; }
243+
void set_always_retry(bool always_retry) { always_retry_ = always_retry; }
207244
public:
208245
static const ObString MODEL_URL_NAME;
209246
static const ObString MODEL_NAME_NAME;
@@ -215,28 +252,35 @@ class ObEmbeddingTask
215252
static const ObString USER_KEY_NAME;
216253
static const ObString INPUT_NAME;
217254
static const ObString DIMENSIONS_NAME;
255+
static const ObString REQUEST_TOO_LARGE_ERROR_MSG;
218256

219257
static const int64_t HTTP_REQUEST_TIMEOUT; // 20 seconds
220258

221259
// Reschedule related constants
222260
static const int64_t MAX_RESCHEDULE_RETRY_CNT;
223261
static const int64_t RESCHEDULE_RETRY_INTERVAL_US;
262+
static const int64_t MAX_NEXT_HANDLE_INTERVAL_US;
224263

225264
// HTTP retry related constants
226265
static const int64_t MAX_HTTP_RETRY_CNT;
227266
static const int64_t HTTP_RETRY_BASE_INTERVAL_US;
228267
static const int64_t HTTP_RETRY_MAX_INTERVAL_US;
229268
static const int64_t HTTP_RETRY_MULTIPLIER;
269+
static const int64_t ALWAYS_RETRY_MIN_INTERVAL_US;
270+
static const int64_t MIN_EMBEDDING_MODEL_RPM;
271+
static const int64_t EMBEDDING_MODEL_WAIT_RATIO;
230272

231273
// Callback related constants
232274
static const int64_t CALLBACK_BATCH_SIZE;
275+
static const int64_t MAX_INPUT_TOKEN; // Default max token count for each input: 512
233276

234277
private:
278+
void init_members(); // Common initialization for all constructors
235279
void reset();
236280
bool is_finished() const; // Internal use only - no lock needed
237-
void set_stop();
281+
void set_stop(int ret_code);
238282
int set_phase(ObEmbeddingTaskPhase new_phase);
239-
int complete_task(ObEmbeddingTaskPhase new_phase, int result_code, bool finished = true);
283+
int complete_task(int result_code, bool finished = true);
240284
int start_async_work();
241285
int check_async_progress();
242286

@@ -256,14 +300,24 @@ class ObEmbeddingTask
256300
int parse_embedding_response(const char *response_data, size_t response_size);
257301

258302
// Helper methods for retry logic
259-
bool should_retry_http_request(int64_t http_error_code) const;
303+
bool should_retry_http_request(int64_t http_error_code, const ObString &http_error_msg) const;
260304
bool is_batch_size_related_error(int64_t http_error_code) const;
261305
int64_t calculate_retry_interval() const;
262306
int adjust_batch_size_for_retry();
307+
int adjust_input_token_limit_for_retry();
263308
void reset_retry_state();
264309
int map_http_error_to_internal_error(int64_t http_error_code) const;
265310
void try_increase_batch_size();
266311
int init_curl_handler(const ObString &model_url, const ObString &user_key, const int64_t http_timeout_us);
312+
bool is_request_too_large(int64_t http_error_code, const ObString &content) const;
313+
int truncate_text_by_token_count(ObString &text, const ObCollationType cs_type, const int64_t max_token_count) const;
314+
bool can_retry_request() const { return http_retry_count_ < http_max_retry_count_; }
315+
bool is_ready_to_handle() const; // Check if current time exceeds next_handle_task_time_us_
316+
int finish_task();
317+
int try_rescheule_task();
318+
int calc_max_wait_completion_time_us(int64_t http_timeout_us, int64_t http_max_retry_count,
319+
int64_t batch_size, int64_t input_chunks_count,
320+
int64_t &max_wait_completion_time_us) const;
267321

268322
struct HttpResponseData {
269323
HttpResponseData(ObIAllocator &allocator) : data(nullptr), size(0), allocator(allocator) {}
@@ -365,20 +419,31 @@ class ObEmbeddingTask
365419
int64_t wait_for_completion_timeout_us_; // For controlling the maximum timeout of waiting for completion
366420

367421
bool need_retry_flag_;
422+
bool always_retry_; // If true, task can retry even after exceeding max retry count, but must wait at least 3 minutes
423+
int64_t last_exceeded_retry_time_us_; // Time when retry count was exceeded
368424

369425
// Batch size adjustment for retry
370426
uint32_t original_batch_size_;
371427
bool batch_size_adjusted_;
372428
uint32_t current_batch_size_;
373429
uint32_t successful_requests_count_;
374430

431+
// Input token limit adjustment for retry (when request too large)
432+
int64_t current_input_token_limit_;
433+
375434
ObThreadCond task_cond_;
376435
bool callback_done_;
377436

378437
// TODO(fanfangyao.ffy): use taskhandle to manage task reference count
379438
// ref_cnt_ is only used to track the reference count of the post create embedding task
380439
int64_t ref_cnt_;
381-
440+
ObCollationType col_type_;
441+
bool need_cancel_;
442+
common::ObCurTraceId::TraceId trace_id_;
443+
int64_t source_task_id_;
444+
ObEmbeddingTasSourceType source_task_type_;
445+
int64_t next_handle_task_time_us_;
446+
ObEmbeddingTaskHandler *thread_pool_;
382447
private:
383448
DISALLOW_COPY_AND_ASSIGN(ObEmbeddingTask);
384449
};

src/share/vector_index/ob_vector_index_util.cpp

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3622,6 +3622,70 @@ int ObVectorIndexUtil::check_index_param(
36223622

36233623
return ret;
36243624
}
3625+
// index_table_id must be table which has vector column
3626+
int ObVectorIndexUtil::get_index_column_collation_type(
3627+
const int64_t tenant_id,
3628+
const uint64_t index_table_id,
3629+
ObCollationType &col_type)
3630+
{
3631+
int ret = OB_SUCCESS;
3632+
const ObTableSchema *data_table_schema = nullptr;
3633+
const ObTableSchema *table_schema = nullptr;
3634+
int64_t main_table_id = OB_INVALID_ID;
3635+
ObArray<uint64_t> tmp_column_ids;
3636+
col_type = CS_TYPE_INVALID;
3637+
ObSchemaGetterGuard schema_guard;
3638+
3639+
if (!is_valid_tenant_id(tenant_id) || OB_INVALID_ID == index_table_id) {
3640+
ret = OB_INVALID_ARGUMENT;
3641+
LOG_WARN("invalid argument", K(ret), K(index_table_id));
3642+
} else if (OB_FAIL(ObMultiVersionSchemaService::get_instance().get_tenant_schema_guard(tenant_id, schema_guard))) {
3643+
LOG_WARN("fail to get tenant schema guard", K(ret), K(MTL_ID()));
3644+
} else if (OB_FAIL(schema_guard.get_table_schema(tenant_id, index_table_id, table_schema))) {
3645+
LOG_WARN("fail to get table scheam", K(ret), K(tenant_id), K(index_table_id));
3646+
} else if (OB_ISNULL(table_schema)) {
3647+
ret = OB_TABLE_NOT_EXIST;
3648+
LOG_INFO("table not exit", K(ret), K(tenant_id), K(index_table_id));
3649+
} else if (OB_FALSE_IT(main_table_id = table_schema->get_data_table_id())) {
3650+
} else if (OB_INVALID_ID == main_table_id) {
3651+
ret = OB_ERR_UNEXPECTED;
3652+
LOG_WARN("unexpected invalid id", K(ret), K(main_table_id));
3653+
} else if (OB_FAIL(schema_guard.get_table_schema(tenant_id, main_table_id, data_table_schema))) {
3654+
LOG_WARN("fail to get table scheam", K(ret), K(tenant_id), K(index_table_id));
3655+
} else if (OB_ISNULL(data_table_schema)) {
3656+
ret = OB_TABLE_NOT_EXIST;
3657+
LOG_INFO("table not exit", K(ret), K(tenant_id), K(main_table_id));
3658+
} else if (OB_FAIL(table_schema->get_column_ids(tmp_column_ids))) {
3659+
LOG_WARN("fail to get index table all column ids", K(ret), K(data_table_schema));
3660+
} else {
3661+
for (int64_t i = 0; OB_SUCC(ret) && i < tmp_column_ids.count() && col_type == CS_TYPE_INVALID; ++i) {
3662+
const ObColumnSchemaV2 *col_schema = data_table_schema->get_column_schema(tmp_column_ids[i]);
3663+
if (OB_ISNULL(col_schema)) {
3664+
ret = OB_ERR_UNEXPECTED;
3665+
LOG_WARN("unexpected null column schema ptr", K(ret));
3666+
} else if (!col_schema->is_vec_hnsw_vector_column()) {
3667+
// only need vector column
3668+
} else {
3669+
ObArray<uint64_t> cascaded_column_ids;
3670+
if (OB_FAIL(col_schema->get_cascaded_column_ids(cascaded_column_ids))) {
3671+
LOG_WARN("failed to get cascaded column ids", K(ret));
3672+
} else {
3673+
for (int64_t j = 0; OB_SUCC(ret) && j < cascaded_column_ids.count() && col_type == CS_TYPE_INVALID; ++j) {
3674+
const ObColumnSchemaV2 *cascaded_column = NULL;
3675+
if (OB_ISNULL(cascaded_column = data_table_schema->get_column_schema(cascaded_column_ids.at(j)))) {
3676+
ret = OB_ERR_UNEXPECTED;
3677+
LOG_WARN("unexpected cascaded column", K(ret));
3678+
} else {
3679+
col_type = cascaded_column->get_collation_type();
3680+
LOG_DEBUG("get vector index collation type", K(col_type));
3681+
}
3682+
}
3683+
}
3684+
}
3685+
}
3686+
}
3687+
return ret;
3688+
}
36253689

36263690
int ObVectorIndexUtil::get_vector_index_type(
36273691
sql::ObRawExpr *&raw_expr,

src/share/vector_index/ob_vector_index_util.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -832,6 +832,9 @@ class ObVectorIndexUtil final
832832
const ObString &new_idx_params,
833833
const ObTableSchema &index_table_schema,
834834
bool &need_embedding_when_rebuild);
835+
836+
static int get_index_column_collation_type(const int64_t tenant_id, const uint64_t index_table_id, ObCollationType &col_type);
837+
835838
private:
836839
static void save_column_schema(
837840
const ObColumnSchemaV2 *&old_column,

src/storage/ddl/ob_ddl_pipeline.cpp

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,8 @@ ObVectorIndexTabletContext::ObVectorIndexTabletContext()
9393
lob_inrow_threshold_(0), rowkey_cnt_(0), column_cnt_(0), snapshot_version_(0), index_type_(share::VIAT_MAX), helper_(nullptr),
9494
allocator_("VecIndexCtx", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()),
9595
memory_context_(MTL(ObPluginVectorIndexService *)->get_memory_context()),
96-
all_vsag_use_mem_(MTL(ObPluginVectorIndexService *)->get_all_vsag_use_mem())
96+
all_vsag_use_mem_(MTL(ObPluginVectorIndexService *)->get_all_vsag_use_mem()),
97+
table_id_(0)
9798
{
9899

99100
}
@@ -120,6 +121,8 @@ int ObVectorIndexTabletContext::init(
120121
rowkey_cnt_ = ddl_table_schema.table_item_.rowkey_column_num_;
121122
column_cnt_ = ddl_table_schema.column_items_.count();
122123
snapshot_version_ = snapshot_version;
124+
table_id_ = ddl_table_schema.table_id_;
125+
123126
if (schema::is_vec_index_snapshot_data_type(index_type)) {
124127
if (OB_FAIL(init_hnsw_index(ddl_table_schema))) {
125128
LOG_WARN("init hnsw index failed", K(ret));
@@ -1661,13 +1664,18 @@ int ObHNSWEmbeddingOperator::init(const ObTabletID &tablet_id)
16611664
ret = OB_ERR_UNEXPECTED;
16621665
LOG_WARN("error unexpected, vector index ctx is null", K(ret));
16631666
} else {
1667+
const uint64_t table_id = vector_index_ctx->table_id_;
16641668
vec_dim_ = vector_index_ctx->vec_dim_;
16651669
rowkey_cnt_ = vector_index_ctx->rowkey_cnt_;
16661670
text_col_idx_ = vector_index_ctx->vector_chunk_col_idx_;
16671671
extra_column_idxs_.reset();
16681672
ObVectorIndexParam index_param;
1673+
ObSchemaGetterGuard schema_guard;
1674+
ObCollationType col_type = CS_TYPE_INVALID;
16691675

1670-
if (OB_FAIL(vector_index_ctx->build_extra_column_idxs(static_cast<int32_t>(text_col_idx_), extra_column_idxs_))) {
1676+
if (OB_FAIL(ObVectorIndexUtil::get_index_column_collation_type(MTL_ID(), table_id, col_type))) {
1677+
LOG_WARN("fail to get vector column collation type", K(ret), K(text_col_idx_), K(table_id));
1678+
} else if (OB_FAIL(vector_index_ctx->build_extra_column_idxs(static_cast<int32_t>(text_col_idx_), extra_column_idxs_))) {
16711679
LOG_WARN("build_extra_column_idxs failed", K(ret), K(text_col_idx_));
16721680
} else if (OB_FAIL(ObVectorIndexUtil::parser_params_from_string(vector_index_ctx->vec_idx_param_, ObVectorIndexType::VIT_HNSW_INDEX, index_param, false))) {
16731681
LOG_WARN("failed to parser params from string", K(ret));
@@ -1679,12 +1687,12 @@ int ObHNSWEmbeddingOperator::init(const ObTabletID &tablet_id)
16791687
ret = OB_ALLOCATE_MEMORY_FAILED;
16801688
LOG_WARN("failed to alloc ObEmbeddingTaskMgr", K(ret));
16811689
} else {
1682-
embedmgr_ = new (buf) ObEmbeddingTaskMgr();
1690+
embedmgr_ = new (buf) ObEmbeddingTaskMgr(*this);
16831691
}
16841692
}
16851693

16861694
if (OB_SUCC(ret)) {
1687-
if (OB_FAIL(embedmgr_->init(model_id_))) {
1695+
if (OB_FAIL(embedmgr_->init(model_id_, col_type))) {
16881696
embedmgr_->~ObEmbeddingTaskMgr();
16891697
op_allocator_.free(embedmgr_);
16901698
embedmgr_ = nullptr;

src/storage/ddl/ob_ddl_pipeline.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ TO_STRING_KV(K_(tenant_id), K_(ls_id), K_(tablet_id), K_(snapshot_version), K_(i
169169
common::ObArenaAllocator allocator_;
170170
lib::MemoryContext &memory_context_;
171171
uint64_t *all_vsag_use_mem_;
172+
uint64_t table_id_;
172173
};
173174

174175
class ObVectorIndexRowIterator

0 commit comments

Comments
 (0)