diff --git a/core/src/db/SnapshotVisitor.cpp b/core/src/db/SnapshotVisitor.cpp index b0300f38f044..4000523ae129 100644 --- a/core/src/db/SnapshotVisitor.cpp +++ b/core/src/db/SnapshotVisitor.cpp @@ -10,6 +10,7 @@ // or implied. See the License for the specific language governing permissions and limitations under the License. #include "db/SnapshotVisitor.h" +#include #include "db/SnapshotHandlers.h" #include "db/meta/MetaTypes.h" #include "db/snapshot/Snapshots.h" @@ -38,5 +39,50 @@ SnapshotVisitor::SegmentsToSearch(meta::FilesHolder& files_holder) { return handler->GetStatus(); } +SegmentFileVisitor::Ptr +SegmentFileVisitor::Build(snapshot::ScopedSnapshotT ss, snapshot::ID_TYPE segment_file_id) { + if (!ss) { + return nullptr; + } + + auto file = ss->GetResource(segment_file_id); + if (!file) { + return nullptr; + } + + auto visitor = std::make_shared(); + visitor->SetFile(file); + auto field_element = ss->GetResource(file->GetFieldElementId()); + auto field = ss->GetResource(field_element->GetFieldId()); + visitor->SetField(field); + visitor->SetFieldElement(field_element); + return visitor; +} + +SegmentVisitor::Ptr +SegmentVisitor::Build(snapshot::ScopedSnapshotT ss, snapshot::ID_TYPE segment_id) { + if (!ss) { + return nullptr; + } + auto segment = ss->GetResource(segment_id); + if (!segment) { + return nullptr; + } + + auto visitor = std::make_shared(); + visitor->SetSegment(segment); + + auto& file_ids = ss->GetSegmentFileIds(segment_id); + for (auto id : file_ids) { + auto file_visitor = SegmentFileVisitor::Build(ss, id); + if (!file_visitor) { + return nullptr; + } + visitor->InsertSegmentFile(file_visitor); + } + + return visitor; +} + } // namespace engine } // namespace milvus diff --git a/core/src/db/SnapshotVisitor.h b/core/src/db/SnapshotVisitor.h index 3cd018896843..48cd3df5b831 100644 --- a/core/src/db/SnapshotVisitor.h +++ b/core/src/db/SnapshotVisitor.h @@ -14,7 +14,9 @@ #include "db/meta/FilesHolder.h" #include "db/snapshot/Snapshot.h" +#include #include +#include #include namespace milvus { @@ -34,5 +36,79 @@ class SnapshotVisitor { Status status_; }; +class SegmentFileVisitor { + public: + using Ptr = std::shared_ptr; + + static Ptr + Build(snapshot::ScopedSnapshotT ss, snapshot::ID_TYPE segment_file_id); + + SegmentFileVisitor() = default; + + const snapshot::SegmentFilePtr + GetFile() const { + return file_; + } + const snapshot::FieldPtr + GetField() const { + return field_; + } + const snapshot::FieldElementPtr + GetFieldElement() const { + return field_element_; + } + + void + SetFile(snapshot::SegmentFilePtr file) { + file_ = file; + } + void + SetField(snapshot::FieldPtr field) { + field_ = field; + } + void + SetFieldElement(snapshot::FieldElementPtr field_element) { + field_element_ = field_element; + } + + protected: + snapshot::SegmentFilePtr file_; + snapshot::FieldPtr field_; + snapshot::FieldElementPtr field_element_; +}; + +class SegmentVisitor { + public: + using Ptr = std::shared_ptr; + using FileT = typename SegmentFileVisitor::Ptr; + using FilesMapT = std::map; + + static Ptr + Build(snapshot::ScopedSnapshotT ss, snapshot::ID_TYPE segment_id); + SegmentVisitor() = default; + + const FilesMapT& + GetSegmentFiles() const { + return files_map_; + } + const snapshot::SegmentPtr& + GetSegment() const { + return segment_; + } + + void + SetSegment(snapshot::SegmentPtr segment) { + segment_ = segment; + } + void + InsertSegmentFile(FileT segment_file) { + files_map_[segment_file->GetFile()->GetID()] = segment_file; + } + + protected: + snapshot::SegmentPtr segment_; + FilesMapT files_map_; +}; + } // namespace engine } // namespace milvus diff --git a/core/src/db/snapshot/Operations.h b/core/src/db/snapshot/Operations.h index 7ec39cf27295..5a0ad86b6f74 100644 --- a/core/src/db/snapshot/Operations.h +++ b/core/src/db/snapshot/Operations.h @@ -33,9 +33,9 @@ namespace engine { namespace snapshot { using CheckStaleFunc = std::function; -using StepsHolderT = std::tuple; +using StepsHolderT = std::tuple; enum OperationsType { Invalid, W_Leaf, O_Leaf, W_Compound, O_Compound }; diff --git a/core/src/db/snapshot/Snapshot.cpp b/core/src/db/snapshot/Snapshot.cpp index c33c5907b2c7..c1716d1f3a90 100644 --- a/core/src/db/snapshot/Snapshot.cpp +++ b/core/src/db/snapshot/Snapshot.cpp @@ -95,6 +95,7 @@ Snapshot::Snapshot(ID_TYPE ss_id) { AddResource(field_element); AddResource(segment_file); element_segfiles_map_[field_element_id][segment_id] = segment_file_id; + seg_segfiles_map_[segment_id].insert(segment_file_id); } } } diff --git a/core/src/db/snapshot/Snapshot.h b/core/src/db/snapshot/Snapshot.h index 0866f51e1680..67ba724f6091 100644 --- a/core/src/db/snapshot/Snapshot.h +++ b/core/src/db/snapshot/Snapshot.h @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -107,6 +108,15 @@ class Snapshot : public ReferenceProxy { return GetResources().cbegin()->second.Get(); } + const std::set& + GetSegmentFileIds(ID_TYPE segment_id) const { + auto it = seg_segfiles_map_.find(segment_id); + if (it == seg_segfiles_map_.end()) { + return empty_set_; + } + return it->second; + } + ID_TYPE GetLatestSchemaCommitId() const { return latest_schema_commit_id_; @@ -325,11 +335,13 @@ class Snapshot : public ReferenceProxy { std::map partition_names_map_; std::map> field_element_names_map_; std::map> element_segfiles_map_; + std::map> seg_segfiles_map_; std::map seg_segc_map_; std::map p_pc_map_; ID_TYPE latest_schema_commit_id_ = 0; std::map p_max_seg_num_; LSN_TYPE max_lsn_; + std::set empty_set_; }; using GCHandler = std::function; diff --git a/core/unittest/ssdb/test_db.cpp b/core/unittest/ssdb/test_db.cpp index 32319915154c..bef8c345efde 100644 --- a/core/unittest/ssdb/test_db.cpp +++ b/core/unittest/ssdb/test_db.cpp @@ -18,6 +18,9 @@ #include #include "ssdb/utils.h" +#include "db/SnapshotVisitor.h" + +using SegmentVisitor = milvus::engine::SegmentVisitor; milvus::Status CreateCollection(std::shared_ptr db, const std::string& collection_name, const LSN_TYPE& lsn) { @@ -200,3 +203,61 @@ TEST_F(SSDBTest, IndexTest) { } } } + +TEST_F(SSDBTest, VisitorTest) { + LSN_TYPE lsn = 0; + auto next_lsn = [&]() -> decltype(lsn) { + return ++lsn; + }; + + std::string c1 = "c1"; + auto status = CreateCollection(db_, c1, next_lsn()); + ASSERT_TRUE(status.ok()); + + std::stringstream p_name; + auto num = RandomInt(1, 3); + for (auto i = 0; i < num; ++i) { + p_name.str(""); + p_name << "partition_" << i; + status = db_->CreatePartition(c1, p_name.str()); + ASSERT_TRUE(status.ok()); + } + + ScopedSnapshotT ss; + status = Snapshots::GetInstance().GetSnapshot(ss, c1); + ASSERT_TRUE(status.ok()); + + SegmentFileContext sf_context; + SFContextBuilder(sf_context, ss); + + auto new_total = 0; + auto& partitions = ss->GetResources(); + for (auto& kv : partitions) { + num = RandomInt(1, 3); + for (auto i = 0; i < num; ++i) { + ASSERT_TRUE(CreateSegment(ss, kv.first, next_lsn(), sf_context).ok()); + } + new_total += num; + } + + status = Snapshots::GetInstance().GetSnapshot(ss, c1); + ASSERT_TRUE(status.ok()); + + auto executor = [&] (const Segment::Ptr& segment, IterateSegmentHandler* handler) -> Status { + auto visitor = SegmentVisitor::Build(ss, segment->GetID()); + if (!visitor) { + return Status(milvus::SS_ERROR, "Cannot build segment visitor"); + } + auto& files_map = visitor->GetSegmentFiles(); + for (auto& kv : files_map) { + std::cout << "segment " << segment->GetID() << " segment_file_id " << kv.first << std::endl; + std::cout << "element name is " << kv.second->GetFieldElement()->GetName() << std::endl; + } + return Status::OK(); + }; + + auto segment_handler = std::make_shared(ss, executor); + segment_handler->Iterate(); + std::cout << segment_handler->GetStatus().ToString() << std::endl; + ASSERT_TRUE(segment_handler->GetStatus().ok()); +} diff --git a/core/unittest/ssdb/utils.h b/core/unittest/ssdb/utils.h index f39f9a2912b7..0e4e0b8a70a1 100644 --- a/core/unittest/ssdb/utils.h +++ b/core/unittest/ssdb/utils.h @@ -70,6 +70,7 @@ using TQueue = milvus::BlockingQueue>; using SoftDeleteCollectionOperation = milvus::engine::snapshot::SoftDeleteOperation; using ParamsField = milvus::engine::snapshot::ParamsField; using IteratePartitionHandler = milvus::engine::snapshot::IterateHandler; +using IterateSegmentHandler = milvus::engine::snapshot::IterateHandler; using IterateSegmentFileHandler = milvus::engine::snapshot::IterateHandler; using SSDBImpl = milvus::engine::SSDBImpl; using Status = milvus::Status; @@ -119,6 +120,22 @@ struct PartitionCollector : public IteratePartitionHandler { std::vector partition_names_; }; +using SegmentExecutorT = std::function; +struct SegmentCollector : public IterateSegmentHandler { + using ResourceT = Segment; + using BaseT = IterateSegmentHandler; + + explicit SegmentCollector(ScopedSnapshotT ss, const SegmentExecutorT& executor) + : BaseT(ss), executor_(executor) {} + + Status + Handle(const typename ResourceT::Ptr& segment) override { + return executor_(segment, this); + } + + SegmentExecutorT executor_; +}; + using FilterT = std::function; struct SegmentFileCollector : public IterateSegmentFileHandler { using ResourceT = SegmentFile;