Skip to content

Commit

Permalink
(db/snapshot): Add some helper visitors (#2777)
Browse files Browse the repository at this point in the history
* (db/snapshot): add Segment related visitors

Signed-off-by: peng.xu <peng.xu@zilliz.com>

* (db/snapshot): fix lint error

Signed-off-by: peng.xu <peng.xu@zilliz.com>
  • Loading branch information
XuPeng-SH committed Jul 8, 2020
1 parent b7d9c2a commit af573f9
Show file tree
Hide file tree
Showing 7 changed files with 216 additions and 3 deletions.
46 changes: 46 additions & 0 deletions core/src/db/SnapshotVisitor.cpp
Expand Up @@ -10,6 +10,7 @@
// or implied. See the License for the specific language governing permissions and limitations under the License.

#include "db/SnapshotVisitor.h"
#include <sstream>
#include "db/SnapshotHandlers.h"
#include "db/meta/MetaTypes.h"
#include "db/snapshot/Snapshots.h"
Expand Down Expand Up @@ -38,5 +39,50 @@ SnapshotVisitor::SegmentsToSearch(meta::FilesHolder& files_holder) {
return handler->GetStatus();
}

SegmentFileVisitor::Ptr
SegmentFileVisitor::Build(snapshot::ScopedSnapshotT ss, snapshot::ID_TYPE segment_file_id) {
if (!ss) {
return nullptr;
}

auto file = ss->GetResource<snapshot::SegmentFile>(segment_file_id);
if (!file) {
return nullptr;
}

auto visitor = std::make_shared<SegmentFileVisitor>();
visitor->SetFile(file);
auto field_element = ss->GetResource<snapshot::FieldElement>(file->GetFieldElementId());
auto field = ss->GetResource<snapshot::Field>(field_element->GetFieldId());
visitor->SetField(field);
visitor->SetFieldElement(field_element);
return visitor;
}

SegmentVisitor::Ptr
SegmentVisitor::Build(snapshot::ScopedSnapshotT ss, snapshot::ID_TYPE segment_id) {
if (!ss) {
return nullptr;
}
auto segment = ss->GetResource<snapshot::Segment>(segment_id);
if (!segment) {
return nullptr;
}

auto visitor = std::make_shared<SegmentVisitor>();
visitor->SetSegment(segment);

auto& file_ids = ss->GetSegmentFileIds(segment_id);
for (auto id : file_ids) {
auto file_visitor = SegmentFileVisitor::Build(ss, id);
if (!file_visitor) {
return nullptr;
}
visitor->InsertSegmentFile(file_visitor);
}

return visitor;
}

} // namespace engine
} // namespace milvus
76 changes: 76 additions & 0 deletions core/src/db/SnapshotVisitor.h
Expand Up @@ -14,7 +14,9 @@
#include "db/meta/FilesHolder.h"
#include "db/snapshot/Snapshot.h"

#include <map>
#include <memory>
#include <set>
#include <string>

namespace milvus {
Expand All @@ -34,5 +36,79 @@ class SnapshotVisitor {
Status status_;
};

class SegmentFileVisitor {
public:
using Ptr = std::shared_ptr<SegmentFileVisitor>;

static Ptr
Build(snapshot::ScopedSnapshotT ss, snapshot::ID_TYPE segment_file_id);

SegmentFileVisitor() = default;

const snapshot::SegmentFilePtr
GetFile() const {
return file_;
}
const snapshot::FieldPtr
GetField() const {
return field_;
}
const snapshot::FieldElementPtr
GetFieldElement() const {
return field_element_;
}

void
SetFile(snapshot::SegmentFilePtr file) {
file_ = file;
}
void
SetField(snapshot::FieldPtr field) {
field_ = field;
}
void
SetFieldElement(snapshot::FieldElementPtr field_element) {
field_element_ = field_element;
}

protected:
snapshot::SegmentFilePtr file_;
snapshot::FieldPtr field_;
snapshot::FieldElementPtr field_element_;
};

class SegmentVisitor {
public:
using Ptr = std::shared_ptr<SegmentVisitor>;
using FileT = typename SegmentFileVisitor::Ptr;
using FilesMapT = std::map<snapshot::ID_TYPE, FileT>;

static Ptr
Build(snapshot::ScopedSnapshotT ss, snapshot::ID_TYPE segment_id);
SegmentVisitor() = default;

const FilesMapT&
GetSegmentFiles() const {
return files_map_;
}
const snapshot::SegmentPtr&
GetSegment() const {
return segment_;
}

void
SetSegment(snapshot::SegmentPtr segment) {
segment_ = segment;
}
void
InsertSegmentFile(FileT segment_file) {
files_map_[segment_file->GetFile()->GetID()] = segment_file;
}

protected:
snapshot::SegmentPtr segment_;
FilesMapT files_map_;
};

} // namespace engine
} // namespace milvus
6 changes: 3 additions & 3 deletions core/src/db/snapshot/Operations.h
Expand Up @@ -33,9 +33,9 @@ namespace engine {
namespace snapshot {

using CheckStaleFunc = std::function<Status(ScopedSnapshotT&)>;
using StepsHolderT = std::tuple<CollectionCommit::SetT, Collection::SetT, SchemaCommit::SetT, FieldCommit::SetT,
Field::SetT, FieldElement::SetT, PartitionCommit::SetT, Partition::SetT,
SegmentCommit::SetT, Segment::SetT, SegmentFile::SetT>;
using StepsHolderT = std::tuple<SegmentFile::SetT, SegmentCommit::SetT, Segment::SetT, PartitionCommit::SetT,
Partition::SetT, FieldElement::SetT, FieldCommit::SetT, Field::SetT, SchemaCommit::SetT,
CollectionCommit::SetT, Collection::SetT>;

enum OperationsType { Invalid, W_Leaf, O_Leaf, W_Compound, O_Compound };

Expand Down
1 change: 1 addition & 0 deletions core/src/db/snapshot/Snapshot.cpp
Expand Up @@ -95,6 +95,7 @@ Snapshot::Snapshot(ID_TYPE ss_id) {
AddResource<FieldElement>(field_element);
AddResource<SegmentFile>(segment_file);
element_segfiles_map_[field_element_id][segment_id] = segment_file_id;
seg_segfiles_map_[segment_id].insert(segment_file_id);
}
}
}
Expand Down
12 changes: 12 additions & 0 deletions core/src/db/snapshot/Snapshot.h
Expand Up @@ -20,6 +20,7 @@
#include <map>
#include <memory>
#include <mutex>
#include <set>
#include <shared_mutex>
#include <string>
#include <thread>
Expand Down Expand Up @@ -107,6 +108,15 @@ class Snapshot : public ReferenceProxy {
return GetResources<CollectionCommit>().cbegin()->second.Get();
}

const std::set<ID_TYPE>&
GetSegmentFileIds(ID_TYPE segment_id) const {
auto it = seg_segfiles_map_.find(segment_id);
if (it == seg_segfiles_map_.end()) {
return empty_set_;
}
return it->second;
}

ID_TYPE
GetLatestSchemaCommitId() const {
return latest_schema_commit_id_;
Expand Down Expand Up @@ -325,11 +335,13 @@ class Snapshot : public ReferenceProxy {
std::map<std::string, ID_TYPE> partition_names_map_;
std::map<std::string, std::map<std::string, ID_TYPE>> field_element_names_map_;
std::map<ID_TYPE, std::map<ID_TYPE, ID_TYPE>> element_segfiles_map_;
std::map<ID_TYPE, std::set<ID_TYPE>> seg_segfiles_map_;
std::map<ID_TYPE, ID_TYPE> seg_segc_map_;
std::map<ID_TYPE, ID_TYPE> p_pc_map_;
ID_TYPE latest_schema_commit_id_ = 0;
std::map<ID_TYPE, NUM_TYPE> p_max_seg_num_;
LSN_TYPE max_lsn_;
std::set<ID_TYPE> empty_set_;
};

using GCHandler = std::function<void(Snapshot::Ptr)>;
Expand Down
61 changes: 61 additions & 0 deletions core/unittest/ssdb/test_db.cpp
Expand Up @@ -18,6 +18,9 @@
#include <algorithm>

#include "ssdb/utils.h"
#include "db/SnapshotVisitor.h"

using SegmentVisitor = milvus::engine::SegmentVisitor;

milvus::Status
CreateCollection(std::shared_ptr<SSDBImpl> db, const std::string& collection_name, const LSN_TYPE& lsn) {
Expand Down Expand Up @@ -200,3 +203,61 @@ TEST_F(SSDBTest, IndexTest) {
}
}
}

TEST_F(SSDBTest, VisitorTest) {
LSN_TYPE lsn = 0;
auto next_lsn = [&]() -> decltype(lsn) {
return ++lsn;
};

std::string c1 = "c1";
auto status = CreateCollection(db_, c1, next_lsn());
ASSERT_TRUE(status.ok());

std::stringstream p_name;
auto num = RandomInt(1, 3);
for (auto i = 0; i < num; ++i) {
p_name.str("");
p_name << "partition_" << i;
status = db_->CreatePartition(c1, p_name.str());
ASSERT_TRUE(status.ok());
}

ScopedSnapshotT ss;
status = Snapshots::GetInstance().GetSnapshot(ss, c1);
ASSERT_TRUE(status.ok());

SegmentFileContext sf_context;
SFContextBuilder(sf_context, ss);

auto new_total = 0;
auto& partitions = ss->GetResources<Partition>();
for (auto& kv : partitions) {
num = RandomInt(1, 3);
for (auto i = 0; i < num; ++i) {
ASSERT_TRUE(CreateSegment(ss, kv.first, next_lsn(), sf_context).ok());
}
new_total += num;
}

status = Snapshots::GetInstance().GetSnapshot(ss, c1);
ASSERT_TRUE(status.ok());

auto executor = [&] (const Segment::Ptr& segment, IterateSegmentHandler* handler) -> Status {
auto visitor = SegmentVisitor::Build(ss, segment->GetID());
if (!visitor) {
return Status(milvus::SS_ERROR, "Cannot build segment visitor");
}
auto& files_map = visitor->GetSegmentFiles();
for (auto& kv : files_map) {
std::cout << "segment " << segment->GetID() << " segment_file_id " << kv.first << std::endl;
std::cout << "element name is " << kv.second->GetFieldElement()->GetName() << std::endl;
}
return Status::OK();
};

auto segment_handler = std::make_shared<SegmentCollector>(ss, executor);
segment_handler->Iterate();
std::cout << segment_handler->GetStatus().ToString() << std::endl;
ASSERT_TRUE(segment_handler->GetStatus().ok());
}
17 changes: 17 additions & 0 deletions core/unittest/ssdb/utils.h
Expand Up @@ -70,6 +70,7 @@ using TQueue = milvus::BlockingQueue<std::tuple<ID_TYPE, ID_TYPE>>;
using SoftDeleteCollectionOperation = milvus::engine::snapshot::SoftDeleteOperation<Collection>;
using ParamsField = milvus::engine::snapshot::ParamsField;
using IteratePartitionHandler = milvus::engine::snapshot::IterateHandler<Partition>;
using IterateSegmentHandler = milvus::engine::snapshot::IterateHandler<Segment>;
using IterateSegmentFileHandler = milvus::engine::snapshot::IterateHandler<SegmentFile>;
using SSDBImpl = milvus::engine::SSDBImpl;
using Status = milvus::Status;
Expand Down Expand Up @@ -119,6 +120,22 @@ struct PartitionCollector : public IteratePartitionHandler {
std::vector<std::string> partition_names_;
};

using SegmentExecutorT = std::function<Status(const Segment::Ptr&, IterateSegmentHandler*)>;
struct SegmentCollector : public IterateSegmentHandler {
using ResourceT = Segment;
using BaseT = IterateSegmentHandler;

explicit SegmentCollector(ScopedSnapshotT ss, const SegmentExecutorT& executor)
: BaseT(ss), executor_(executor) {}

Status
Handle(const typename ResourceT::Ptr& segment) override {
return executor_(segment, this);
}

SegmentExecutorT executor_;
};

using FilterT = std::function<bool(SegmentFile::Ptr)>;
struct SegmentFileCollector : public IterateSegmentFileHandler {
using ResourceT = SegmentFile;
Expand Down

0 comments on commit af573f9

Please sign in to comment.