diff --git a/internal/core/src/segcore/FieldIndexing.cpp b/internal/core/src/segcore/FieldIndexing.cpp index 3eb545812fa1..ad5c95037565 100644 --- a/internal/core/src/segcore/FieldIndexing.cpp +++ b/internal/core/src/segcore/FieldIndexing.cpp @@ -36,6 +36,11 @@ VectorFieldIndexing::VectorFieldIndexing(const FieldMeta& field_meta, field_index_meta, segcore_config, SegmentType::Growing)) { + recreate_index(); +} + +void +VectorFieldIndexing::recreate_index() { index_ = std::make_unique>( config_->GetIndexType(), config_->GetMetricType(), @@ -128,6 +133,8 @@ VectorFieldIndexing::AppendSegmentIndexSparse(int64_t reserved_offset, } } catch (SegcoreError& error) { LOG_ERROR("growing sparse index build error: {}", error.what()); + recreate_index(); + index_cur_ = 0; return; } index_cur_.fetch_add(rows); @@ -204,6 +211,7 @@ VectorFieldIndexing::AppendSegmentIndexDense(int64_t reserved_offset, index_->BuildWithDataset(dataset, conf); } catch (SegcoreError& error) { LOG_ERROR("growing index build error: {}", error.what()); + recreate_index(); return; } index_cur_.fetch_add(vec_num); diff --git a/internal/core/src/segcore/FieldIndexing.h b/internal/core/src/segcore/FieldIndexing.h index 4a77a2dbaf59..19de3974747e 100644 --- a/internal/core/src/segcore/FieldIndexing.h +++ b/internal/core/src/segcore/FieldIndexing.h @@ -232,6 +232,8 @@ class VectorFieldIndexing : public FieldIndexing { get_search_params(const SearchInfo& searchInfo) const; private: + void + recreate_index(); // current number of rows in index. std::atomic index_cur_ = 0; // whether the growing index has been built. diff --git a/internal/core/src/segcore/SegmentGrowingImpl.cpp b/internal/core/src/segcore/SegmentGrowingImpl.cpp index 463a31ae47f7..e059d3b4423e 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.cpp +++ b/internal/core/src/segcore/SegmentGrowingImpl.cpp @@ -627,22 +627,35 @@ SegmentGrowingImpl::bulk_subscript_sparse_float_vector_impl( milvus::proto::schema::SparseFloatArray* output) const { AssertInfo(HasRawData(field_id.get()), "Growing segment loss raw data"); - // if index has finished building index, grab from index + // if index has finished building, grab from index without any + // synchronization operations. if (indexing_record_.SyncDataWithIndex(field_id)) { indexing_record_.GetDataFromIndex( field_id, seg_offsets, count, 0, output); return; } - // else copy from raw data - std::lock_guard guard(chunk_mutex_); - SparseRowsToProto( - [&](size_t i) { - auto offset = seg_offsets[i]; - return offset != INVALID_SEG_OFFSET ? vec_raw->get_element(offset) - : nullptr; - }, - count, - output); + { + std::lock_guard guard(chunk_mutex_); + // check again after lock to make sure: if index has finished building + // after the above check but before we grabbed the lock, we should grab + // from index as the data in chunk may have been removed in + // try_remove_chunks. + if (!indexing_record_.SyncDataWithIndex(field_id)) { + // copy from raw data + SparseRowsToProto( + [&](size_t i) { + auto offset = seg_offsets[i]; + return offset != INVALID_SEG_OFFSET + ? vec_raw->get_element(offset) + : nullptr; + }, + count, + output); + return; + } + // else: release lock and copy from index + } + indexing_record_.GetDataFromIndex(field_id, seg_offsets, count, 0, output); } template @@ -675,25 +688,38 @@ SegmentGrowingImpl::bulk_subscript_impl(FieldId field_id, // HasRawData interface guarantees that data can be fetched from growing segment AssertInfo(HasRawData(field_id.get()), "Growing segment loss raw data"); - // when data is in sync with index + + // if index has finished building, grab from index without any + // synchronization operations. if (indexing_record_.SyncDataWithIndex(field_id)) { indexing_record_.GetDataFromIndex( field_id, seg_offsets, count, element_sizeof, output_raw); return; } - // else copy from chunk - std::lock_guard guard(chunk_mutex_); - auto output_base = reinterpret_cast(output_raw); - for (int i = 0; i < count; ++i) { - auto dst = output_base + i * element_sizeof; - auto offset = seg_offsets[i]; - if (offset == INVALID_SEG_OFFSET) { - memset(dst, 0, element_sizeof); - } else { - auto src = (const uint8_t*)vec.get_element(offset); - memcpy(dst, src, element_sizeof); + { + std::lock_guard guard(chunk_mutex_); + // check again after lock to make sure: if index has finished building + // after the above check but before we grabbed the lock, we should grab + // from index as the data in chunk may have been removed in + // try_remove_chunks. + if (!indexing_record_.SyncDataWithIndex(field_id)) { + auto output_base = reinterpret_cast(output_raw); + for (int i = 0; i < count; ++i) { + auto dst = output_base + i * element_sizeof; + auto offset = seg_offsets[i]; + if (offset == INVALID_SEG_OFFSET) { + memset(dst, 0, element_sizeof); + } else { + auto src = (const uint8_t*)vec.get_element(offset); + memcpy(dst, src, element_sizeof); + } + } + return; } + // else: release lock and copy from index } + indexing_record_.GetDataFromIndex( + field_id, seg_offsets, count, element_sizeof, output_raw); } template