Skip to content

Commit

Permalink
(db/snapshot): Add some visitors (#2800)
Browse files Browse the repository at this point in the history
* (db/snapshot): add more visitors

Signed-off-by: peng.xu <peng.xu@zilliz.com>

* (db/snapshot): fix lint error

Signed-off-by: peng.xu <peng.xu@zilliz.com>

* (db/snapshot): add some Iterators

Signed-off-by: peng.xu <peng.xu@zilliz.com>

* (db/snapshot): update visitors

Signed-off-by: peng.xu <peng.xu@zilliz.com>
  • Loading branch information
XuPeng-SH committed Jul 10, 2020
1 parent 5857f64 commit 9d4febd
Show file tree
Hide file tree
Showing 7 changed files with 189 additions and 65 deletions.
90 changes: 76 additions & 14 deletions core/src/db/SnapshotVisitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,23 +39,64 @@ SnapshotVisitor::SegmentsToSearch(meta::FilesHolder& files_holder) {
return handler->GetStatus();
}

SegmentFileVisitor::Ptr
SegmentFileVisitor::Build(snapshot::ScopedSnapshotT ss, snapshot::ID_TYPE segment_file_id) {
SegmentFieldElementVisitor::Ptr
SegmentFieldElementVisitor::Build(snapshot::ScopedSnapshotT ss, snapshot::ID_TYPE segment_id,
snapshot::ID_TYPE field_element_id) {
if (!ss) {
return nullptr;
}

auto file = ss->GetResource<snapshot::SegmentFile>(segment_file_id);
auto element = ss->GetResource<snapshot::FieldElement>(field_element_id);
if (!element) {
return nullptr;
}

auto visitor = std::make_shared<SegmentFieldElementVisitor>();
visitor->SetFieldElement(element);
auto segment = ss->GetResource<snapshot::Segment>(segment_id);
if (!segment) {
return nullptr;
}

auto file = ss->GetSegmentFile(segment_id, field_element_id);
if (!file) {
return nullptr;
}

auto visitor = std::make_shared<SegmentFileVisitor>();
visitor->SetFile(file);
auto field_element = ss->GetResource<snapshot::FieldElement>(file->GetFieldElementId());
auto field = ss->GetResource<snapshot::Field>(field_element->GetFieldId());
return visitor;
}

SegmentFieldVisitor::Ptr
SegmentFieldVisitor::Build(snapshot::ScopedSnapshotT ss, snapshot::ID_TYPE segment_id, snapshot::ID_TYPE field_id) {
if (!ss) {
return nullptr;
}

auto field = ss->GetResource<snapshot::Field>(field_id);
if (!field) {
return nullptr;
}

auto visitor = std::make_shared<SegmentFieldVisitor>();
visitor->SetField(field);
visitor->SetFieldElement(field_element);

auto executor = [&](const snapshot::FieldElement::Ptr& field_element,
snapshot::FieldElementIterator* itr) -> Status {
if (field_element->GetFieldId() != field_id) {
return Status::OK();
}
auto element_visitor = SegmentFieldElementVisitor::Build(ss, segment_id, field_element->GetID());
if (!element_visitor) {
return Status::OK();
}
visitor->InsertElement(element_visitor);
return Status::OK();
};

auto iterator = std::make_shared<snapshot::FieldElementIterator>(ss, executor);
iterator->Iterate();

return visitor;
}

Expand All @@ -72,17 +113,38 @@ SegmentVisitor::Build(snapshot::ScopedSnapshotT ss, snapshot::ID_TYPE segment_id
auto visitor = std::make_shared<SegmentVisitor>();
visitor->SetSegment(segment);

auto& file_ids = ss->GetSegmentFileIds(segment_id);
for (auto id : file_ids) {
auto file_visitor = SegmentFileVisitor::Build(ss, id);
if (!file_visitor) {
return nullptr;
auto executor = [&](const snapshot::Field::Ptr& field, snapshot::FieldIterator* itr) -> Status {
auto field_visitor = SegmentFieldVisitor::Build(ss, segment_id, field->GetID());
if (!field_visitor) {
return Status::OK();
}
visitor->InsertSegmentFile(file_visitor);
}
visitor->InsertField(field_visitor);

return Status::OK();
};

auto iterator = std::make_shared<snapshot::FieldIterator>(ss, executor);
iterator->Iterate();

return visitor;
}

std::string
SegmentVisitor::ToString() const {
std::stringstream ss;
ss << "SegmentVisitor[" << GetSegment()->GetID() << "]: \n";
auto& field_visitors = GetFieldVisitors();
for (auto& fkv : field_visitors) {
ss << " Field[" << fkv.first << "]\n";
auto& fe_visitors = fkv.second->GetElementVistors();
for (auto& fekv : fe_visitors) {
ss << " FieldElement[" << fekv.first << "] ";
ss << "SegmentFile [" << fekv.second->GetFile()->GetID() << "]\n";
}
}

return ss.str();
}

} // namespace engine
} // namespace milvus
100 changes: 75 additions & 25 deletions core/src/db/SnapshotVisitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,61 +36,106 @@ class SnapshotVisitor {
Status status_;
};

class SegmentFileVisitor {
class SegmentFieldElementVisitor {
public:
using Ptr = std::shared_ptr<SegmentFileVisitor>;
using Ptr = std::shared_ptr<SegmentFieldElementVisitor>;

static Ptr
Build(snapshot::ScopedSnapshotT ss, snapshot::ID_TYPE segment_file_id);
Build(snapshot::ScopedSnapshotT ss, snapshot::ID_TYPE segment_id, snapshot::ID_TYPE field_element_id);

SegmentFileVisitor() = default;
SegmentFieldElementVisitor() = default;

void
SetFieldElement(snapshot::FieldElementPtr field_element) {
field_element_ = field_element;
}
void
SetFile(snapshot::SegmentFilePtr file) {
file_ = file;
}

const snapshot::FieldElementPtr
GetElement() const {
return field_element_;
}
const snapshot::SegmentFilePtr
GetFile() const {
return file_;
}
const snapshot::FieldPtr

protected:
snapshot::FieldElementPtr field_element_;
snapshot::SegmentFilePtr file_;
};

class SegmentFieldVisitor {
public:
using Ptr = std::shared_ptr<SegmentFieldVisitor>;
using ElementT = typename SegmentFieldElementVisitor::Ptr;
using ElementsMapT = std::map<snapshot::ID_TYPE, ElementT>;

static Ptr
Build(snapshot::ScopedSnapshotT ss, snapshot::ID_TYPE segment_id, snapshot::ID_TYPE field_id);

SegmentFieldVisitor() = default;

const ElementsMapT&
GetElementVistors() const {
return elements_map_;
}
const snapshot::FieldPtr&
GetField() const {
return field_;
}
const snapshot::FieldElementPtr
GetFieldElement() const {
return field_element_;
}

void
SetFile(snapshot::SegmentFilePtr file) {
file_ = file;
}
void
SetField(snapshot::FieldPtr field) {
field_ = field;
}

void
SetFieldElement(snapshot::FieldElementPtr field_element) {
field_element_ = field_element;
InsertElement(ElementT element) {
elements_map_[element->GetElement()->GetID()] = element;
}

protected:
snapshot::SegmentFilePtr file_;
ElementsMapT elements_map_;
snapshot::FieldPtr field_;
snapshot::FieldElementPtr field_element_;
};

class SegmentVisitor {
public:
using Ptr = std::shared_ptr<SegmentVisitor>;
using FileT = typename SegmentFileVisitor::Ptr;
using FilesMapT = std::map<snapshot::ID_TYPE, FileT>;
using FieldVisitorT = typename SegmentFieldVisitor::Ptr;
using IdMapT = std::map<snapshot::ID_TYPE, FieldVisitorT>;
using NameMapT = std::map<std::string, FieldVisitorT>;

static Ptr
Build(snapshot::ScopedSnapshotT ss, snapshot::ID_TYPE segment_id);
SegmentVisitor() = default;

const FilesMapT&
GetSegmentFiles() const {
return files_map_;
const IdMapT&
GetFieldVisitors() const {
return id_map_;
}

FieldVisitorT
GetFieldVisitor(snapshot::ID_TYPE field_id) const {
auto it = id_map_.find(field_id);
if (it == id_map_.end()) {
return nullptr;
}
return it->second;
}
FieldVisitorT
GetFieldVisitor(const std::string& field_name) const {
auto it = name_map_.find(field_name);
if (it == name_map_.end()) {
return nullptr;
}
return it->second;
}

const snapshot::SegmentPtr&
GetSegment() const {
return segment_;
Expand All @@ -101,13 +146,18 @@ class SegmentVisitor {
segment_ = segment;
}
void
InsertSegmentFile(FileT segment_file) {
files_map_[segment_file->GetFile()->GetID()] = segment_file;
InsertField(FieldVisitorT field_visitor) {
id_map_[field_visitor->GetField()->GetID()] = field_visitor;
name_map_[field_visitor->GetField()->GetName()] = field_visitor;
}

std::string
ToString() const;

protected:
snapshot::SegmentPtr segment_;
FilesMapT files_map_;
IdMapT id_map_;
NameMapT name_map_;
};

} // namespace engine
Expand Down
33 changes: 15 additions & 18 deletions core/src/db/snapshot/IterateHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,22 @@ struct IterateHandler : public std::enable_shared_from_this<IterateHandler<T>> {
using ResourceT = T;
using ThisT = IterateHandler<ResourceT>;
using Ptr = std::shared_ptr<ThisT>;
using ExecutorT = std::function<Status(const typename T::Ptr&, ThisT*)>;

explicit IterateHandler(ScopedSnapshotT ss) : ss_(ss) {
explicit IterateHandler(ScopedSnapshotT ss, const ExecutorT& executor = {}) : ss_(ss), executor_(executor) {
}

virtual Status
PreIterate() {
return Status::OK();
}
virtual Status
Handle(const typename ResourceT::Ptr& resource) = 0;
Handle(const typename ResourceT::Ptr& resource) {
if (executor_) {
return executor_(resource, this);
}
return Status::OK();
}

virtual Status
PostIterate() {
Expand All @@ -59,26 +65,17 @@ struct IterateHandler : public std::enable_shared_from_this<IterateHandler<T>> {
}

ScopedSnapshotT ss_;
ExecutorT executor_;
Status status_;
mutable std::mutex mtx_;
};

using IterateSegmentHandler = IterateHandler<Segment>;
using SegmentExecutorT = std::function<Status(const Segment::Ptr&, IterateSegmentHandler*)>;
struct SegmentCollector : public IterateSegmentHandler {
using ResourceT = Segment;
using BaseT = IterateSegmentHandler;

explicit SegmentCollector(ScopedSnapshotT ss, const SegmentExecutorT& executor) : BaseT(ss), executor_(executor) {
}

Status
Handle(const typename ResourceT::Ptr& segment) override {
return executor_(segment, this);
}

SegmentExecutorT executor_;
};
using CollectionIterator = IterateHandler<Collection>;
using PartitionIterator = IterateHandler<Partition>;
using SegmentIterator = IterateHandler<Segment>;
using SegmentFileIterator = IterateHandler<SegmentFile>;
using FieldIterator = IterateHandler<Field>;
using FieldElementIterator = IterateHandler<FieldElement>;

} // namespace snapshot
} // namespace engine
Expand Down
15 changes: 15 additions & 0 deletions core/src/db/snapshot/Snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,21 @@ Snapshot::GetFieldElement(const std::string& field_name, const std::string& fiel
return Status::OK();
}

SegmentFilePtr
Snapshot::GetSegmentFile(ID_TYPE segment_id, ID_TYPE field_element_id) const {
auto it = element_segfiles_map_.find(field_element_id);
if (it == element_segfiles_map_.end()) {
return nullptr;
}

auto its = it->second.find(segment_id);
if (its == it->second.end()) {
return nullptr;
}

return GetResource<SegmentFile>(its->second);
}

const std::string
Snapshot::ToString() const {
auto to_matrix_string = [](const MappingT& mappings, int line_length, size_t ident = 0) -> std::string {
Expand Down
3 changes: 3 additions & 0 deletions core/src/db/snapshot/Snapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ class Snapshot : public ReferenceProxy {
return it->second;
}

SegmentFilePtr
GetSegmentFile(ID_TYPE segment_id, ID_TYPE field_element_id) const;

ID_TYPE
GetLatestSchemaCommitId() const {
return latest_schema_commit_id_;
Expand Down
10 changes: 3 additions & 7 deletions core/unittest/ssdb/test_db.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -244,20 +244,16 @@ TEST_F(SSDBTest, VisitorTest) {
status = Snapshots::GetInstance().GetSnapshot(ss, c1);
ASSERT_TRUE(status.ok());

auto executor = [&] (const Segment::Ptr& segment, IterateSegmentHandler* handler) -> Status {
auto executor = [&] (const Segment::Ptr& segment, SegmentIterator* handler) -> Status {
auto visitor = SegmentVisitor::Build(ss, segment->GetID());
if (!visitor) {
return Status(milvus::SS_ERROR, "Cannot build segment visitor");
}
auto& files_map = visitor->GetSegmentFiles();
for (auto& kv : files_map) {
std::cout << "segment " << segment->GetID() << " segment_file_id " << kv.first << std::endl;
std::cout << "element name is " << kv.second->GetFieldElement()->GetName() << std::endl;
}
std::cout << visitor->ToString() << std::endl;
return Status::OK();
};

auto segment_handler = std::make_shared<milvus::engine::snapshot::SegmentCollector>(ss, executor);
auto segment_handler = std::make_shared<SegmentIterator>(ss, executor);
segment_handler->Iterate();
std::cout << segment_handler->GetStatus().ToString() << std::endl;
ASSERT_TRUE(segment_handler->GetStatus().ok());
Expand Down
3 changes: 2 additions & 1 deletion core/unittest/ssdb/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,9 @@ using TQueue = milvus::BlockingQueue<std::tuple<ID_TYPE, ID_TYPE>>;
using SoftDeleteCollectionOperation = milvus::engine::snapshot::SoftDeleteOperation<Collection>;
using ParamsField = milvus::engine::snapshot::ParamsField;
using IteratePartitionHandler = milvus::engine::snapshot::IterateHandler<Partition>;
using IterateSegmentHandler = milvus::engine::snapshot::IterateHandler<Segment>;
using IterateSegmentFileHandler = milvus::engine::snapshot::IterateHandler<SegmentFile>;
using PartitionIterator = milvus::engine::snapshot::PartitionIterator;
using SegmentIterator = milvus::engine::snapshot::SegmentIterator;
using SSDBImpl = milvus::engine::SSDBImpl;
using Status = milvus::Status;

Expand Down

0 comments on commit 9d4febd

Please sign in to comment.