Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(db/snapshot): Add some visitors #2800

Merged
merged 4 commits into from
Jul 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 76 additions & 14 deletions core/src/db/SnapshotVisitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,23 +39,64 @@ SnapshotVisitor::SegmentsToSearch(meta::FilesHolder& files_holder) {
return handler->GetStatus();
}

SegmentFileVisitor::Ptr
SegmentFileVisitor::Build(snapshot::ScopedSnapshotT ss, snapshot::ID_TYPE segment_file_id) {
SegmentFieldElementVisitor::Ptr
SegmentFieldElementVisitor::Build(snapshot::ScopedSnapshotT ss, snapshot::ID_TYPE segment_id,
snapshot::ID_TYPE field_element_id) {
if (!ss) {
return nullptr;
}

auto file = ss->GetResource<snapshot::SegmentFile>(segment_file_id);
auto element = ss->GetResource<snapshot::FieldElement>(field_element_id);
if (!element) {
return nullptr;
}

auto visitor = std::make_shared<SegmentFieldElementVisitor>();
visitor->SetFieldElement(element);
auto segment = ss->GetResource<snapshot::Segment>(segment_id);
if (!segment) {
return nullptr;
}

auto file = ss->GetSegmentFile(segment_id, field_element_id);
if (!file) {
return nullptr;
}

auto visitor = std::make_shared<SegmentFileVisitor>();
visitor->SetFile(file);
auto field_element = ss->GetResource<snapshot::FieldElement>(file->GetFieldElementId());
auto field = ss->GetResource<snapshot::Field>(field_element->GetFieldId());
return visitor;
}

SegmentFieldVisitor::Ptr
SegmentFieldVisitor::Build(snapshot::ScopedSnapshotT ss, snapshot::ID_TYPE segment_id, snapshot::ID_TYPE field_id) {
if (!ss) {
return nullptr;
}

auto field = ss->GetResource<snapshot::Field>(field_id);
if (!field) {
return nullptr;
}

auto visitor = std::make_shared<SegmentFieldVisitor>();
visitor->SetField(field);
visitor->SetFieldElement(field_element);

auto executor = [&](const snapshot::FieldElement::Ptr& field_element,
snapshot::FieldElementIterator* itr) -> Status {
if (field_element->GetFieldId() != field_id) {
return Status::OK();
}
auto element_visitor = SegmentFieldElementVisitor::Build(ss, segment_id, field_element->GetID());
if (!element_visitor) {
return Status::OK();
}
visitor->InsertElement(element_visitor);
return Status::OK();
};

auto iterator = std::make_shared<snapshot::FieldElementIterator>(ss, executor);
iterator->Iterate();

return visitor;
}

Expand All @@ -72,17 +113,38 @@ SegmentVisitor::Build(snapshot::ScopedSnapshotT ss, snapshot::ID_TYPE segment_id
auto visitor = std::make_shared<SegmentVisitor>();
visitor->SetSegment(segment);

auto& file_ids = ss->GetSegmentFileIds(segment_id);
for (auto id : file_ids) {
auto file_visitor = SegmentFileVisitor::Build(ss, id);
if (!file_visitor) {
return nullptr;
auto executor = [&](const snapshot::Field::Ptr& field, snapshot::FieldIterator* itr) -> Status {
auto field_visitor = SegmentFieldVisitor::Build(ss, segment_id, field->GetID());
if (!field_visitor) {
return Status::OK();
}
visitor->InsertSegmentFile(file_visitor);
}
visitor->InsertField(field_visitor);

return Status::OK();
};

auto iterator = std::make_shared<snapshot::FieldIterator>(ss, executor);
iterator->Iterate();

return visitor;
}

std::string
SegmentVisitor::ToString() const {
std::stringstream ss;
ss << "SegmentVisitor[" << GetSegment()->GetID() << "]: \n";
auto& field_visitors = GetFieldVisitors();
for (auto& fkv : field_visitors) {
ss << " Field[" << fkv.first << "]\n";
auto& fe_visitors = fkv.second->GetElementVistors();
for (auto& fekv : fe_visitors) {
ss << " FieldElement[" << fekv.first << "] ";
ss << "SegmentFile [" << fekv.second->GetFile()->GetID() << "]\n";
}
}

return ss.str();
}

} // namespace engine
} // namespace milvus
100 changes: 75 additions & 25 deletions core/src/db/SnapshotVisitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,61 +36,106 @@ class SnapshotVisitor {
Status status_;
};

class SegmentFileVisitor {
class SegmentFieldElementVisitor {
public:
using Ptr = std::shared_ptr<SegmentFileVisitor>;
using Ptr = std::shared_ptr<SegmentFieldElementVisitor>;

static Ptr
Build(snapshot::ScopedSnapshotT ss, snapshot::ID_TYPE segment_file_id);
Build(snapshot::ScopedSnapshotT ss, snapshot::ID_TYPE segment_id, snapshot::ID_TYPE field_element_id);

SegmentFileVisitor() = default;
SegmentFieldElementVisitor() = default;

void
SetFieldElement(snapshot::FieldElementPtr field_element) {
field_element_ = field_element;
}
void
SetFile(snapshot::SegmentFilePtr file) {
file_ = file;
}

const snapshot::FieldElementPtr
GetElement() const {
return field_element_;
}
const snapshot::SegmentFilePtr
GetFile() const {
return file_;
}
const snapshot::FieldPtr

protected:
snapshot::FieldElementPtr field_element_;
snapshot::SegmentFilePtr file_;
};

class SegmentFieldVisitor {
public:
using Ptr = std::shared_ptr<SegmentFieldVisitor>;
using ElementT = typename SegmentFieldElementVisitor::Ptr;
using ElementsMapT = std::map<snapshot::ID_TYPE, ElementT>;

static Ptr
Build(snapshot::ScopedSnapshotT ss, snapshot::ID_TYPE segment_id, snapshot::ID_TYPE field_id);

SegmentFieldVisitor() = default;

const ElementsMapT&
GetElementVistors() const {
return elements_map_;
}
const snapshot::FieldPtr&
GetField() const {
return field_;
}
const snapshot::FieldElementPtr
GetFieldElement() const {
return field_element_;
}

void
SetFile(snapshot::SegmentFilePtr file) {
file_ = file;
}
void
SetField(snapshot::FieldPtr field) {
field_ = field;
}

void
SetFieldElement(snapshot::FieldElementPtr field_element) {
field_element_ = field_element;
InsertElement(ElementT element) {
elements_map_[element->GetElement()->GetID()] = element;
}

protected:
snapshot::SegmentFilePtr file_;
ElementsMapT elements_map_;
snapshot::FieldPtr field_;
snapshot::FieldElementPtr field_element_;
};

class SegmentVisitor {
public:
using Ptr = std::shared_ptr<SegmentVisitor>;
using FileT = typename SegmentFileVisitor::Ptr;
using FilesMapT = std::map<snapshot::ID_TYPE, FileT>;
using FieldVisitorT = typename SegmentFieldVisitor::Ptr;
using IdMapT = std::map<snapshot::ID_TYPE, FieldVisitorT>;
using NameMapT = std::map<std::string, FieldVisitorT>;

static Ptr
Build(snapshot::ScopedSnapshotT ss, snapshot::ID_TYPE segment_id);
SegmentVisitor() = default;

const FilesMapT&
GetSegmentFiles() const {
return files_map_;
const IdMapT&
GetFieldVisitors() const {
return id_map_;
}

FieldVisitorT
GetFieldVisitor(snapshot::ID_TYPE field_id) const {
auto it = id_map_.find(field_id);
if (it == id_map_.end()) {
return nullptr;
}
return it->second;
}
FieldVisitorT
GetFieldVisitor(const std::string& field_name) const {
auto it = name_map_.find(field_name);
if (it == name_map_.end()) {
return nullptr;
}
return it->second;
}

const snapshot::SegmentPtr&
GetSegment() const {
return segment_;
Expand All @@ -101,13 +146,18 @@ class SegmentVisitor {
segment_ = segment;
}
void
InsertSegmentFile(FileT segment_file) {
files_map_[segment_file->GetFile()->GetID()] = segment_file;
InsertField(FieldVisitorT field_visitor) {
id_map_[field_visitor->GetField()->GetID()] = field_visitor;
name_map_[field_visitor->GetField()->GetName()] = field_visitor;
}

std::string
ToString() const;

protected:
snapshot::SegmentPtr segment_;
FilesMapT files_map_;
IdMapT id_map_;
NameMapT name_map_;
};

} // namespace engine
Expand Down
33 changes: 15 additions & 18 deletions core/src/db/snapshot/IterateHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,22 @@ struct IterateHandler : public std::enable_shared_from_this<IterateHandler<T>> {
using ResourceT = T;
using ThisT = IterateHandler<ResourceT>;
using Ptr = std::shared_ptr<ThisT>;
using ExecutorT = std::function<Status(const typename T::Ptr&, ThisT*)>;

explicit IterateHandler(ScopedSnapshotT ss) : ss_(ss) {
explicit IterateHandler(ScopedSnapshotT ss, const ExecutorT& executor = {}) : ss_(ss), executor_(executor) {
}

virtual Status
PreIterate() {
return Status::OK();
}
virtual Status
Handle(const typename ResourceT::Ptr& resource) = 0;
Handle(const typename ResourceT::Ptr& resource) {
if (executor_) {
return executor_(resource, this);
}
return Status::OK();
}

virtual Status
PostIterate() {
Expand All @@ -59,26 +65,17 @@ struct IterateHandler : public std::enable_shared_from_this<IterateHandler<T>> {
}

ScopedSnapshotT ss_;
ExecutorT executor_;
Status status_;
mutable std::mutex mtx_;
};

using IterateSegmentHandler = IterateHandler<Segment>;
using SegmentExecutorT = std::function<Status(const Segment::Ptr&, IterateSegmentHandler*)>;
struct SegmentCollector : public IterateSegmentHandler {
using ResourceT = Segment;
using BaseT = IterateSegmentHandler;

explicit SegmentCollector(ScopedSnapshotT ss, const SegmentExecutorT& executor) : BaseT(ss), executor_(executor) {
}

Status
Handle(const typename ResourceT::Ptr& segment) override {
return executor_(segment, this);
}

SegmentExecutorT executor_;
};
using CollectionIterator = IterateHandler<Collection>;
using PartitionIterator = IterateHandler<Partition>;
using SegmentIterator = IterateHandler<Segment>;
using SegmentFileIterator = IterateHandler<SegmentFile>;
using FieldIterator = IterateHandler<Field>;
using FieldElementIterator = IterateHandler<FieldElement>;

} // namespace snapshot
} // namespace engine
Expand Down
15 changes: 15 additions & 0 deletions core/src/db/snapshot/Snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,21 @@ Snapshot::GetFieldElement(const std::string& field_name, const std::string& fiel
return Status::OK();
}

SegmentFilePtr
Snapshot::GetSegmentFile(ID_TYPE segment_id, ID_TYPE field_element_id) const {
auto it = element_segfiles_map_.find(field_element_id);
if (it == element_segfiles_map_.end()) {
return nullptr;
}

auto its = it->second.find(segment_id);
if (its == it->second.end()) {
return nullptr;
}

return GetResource<SegmentFile>(its->second);
}

const std::string
Snapshot::ToString() const {
auto to_matrix_string = [](const MappingT& mappings, int line_length, size_t ident = 0) -> std::string {
Expand Down
3 changes: 3 additions & 0 deletions core/src/db/snapshot/Snapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ class Snapshot : public ReferenceProxy {
return it->second;
}

SegmentFilePtr
GetSegmentFile(ID_TYPE segment_id, ID_TYPE field_element_id) const;

ID_TYPE
GetLatestSchemaCommitId() const {
return latest_schema_commit_id_;
Expand Down
10 changes: 3 additions & 7 deletions core/unittest/ssdb/test_db.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -244,20 +244,16 @@ TEST_F(SSDBTest, VisitorTest) {
status = Snapshots::GetInstance().GetSnapshot(ss, c1);
ASSERT_TRUE(status.ok());

auto executor = [&] (const Segment::Ptr& segment, IterateSegmentHandler* handler) -> Status {
auto executor = [&] (const Segment::Ptr& segment, SegmentIterator* handler) -> Status {
auto visitor = SegmentVisitor::Build(ss, segment->GetID());
if (!visitor) {
return Status(milvus::SS_ERROR, "Cannot build segment visitor");
}
auto& files_map = visitor->GetSegmentFiles();
for (auto& kv : files_map) {
std::cout << "segment " << segment->GetID() << " segment_file_id " << kv.first << std::endl;
std::cout << "element name is " << kv.second->GetFieldElement()->GetName() << std::endl;
}
std::cout << visitor->ToString() << std::endl;
return Status::OK();
};

auto segment_handler = std::make_shared<milvus::engine::snapshot::SegmentCollector>(ss, executor);
auto segment_handler = std::make_shared<SegmentIterator>(ss, executor);
segment_handler->Iterate();
std::cout << segment_handler->GetStatus().ToString() << std::endl;
ASSERT_TRUE(segment_handler->GetStatus().ok());
Expand Down
3 changes: 2 additions & 1 deletion core/unittest/ssdb/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,9 @@ using TQueue = milvus::BlockingQueue<std::tuple<ID_TYPE, ID_TYPE>>;
using SoftDeleteCollectionOperation = milvus::engine::snapshot::SoftDeleteOperation<Collection>;
using ParamsField = milvus::engine::snapshot::ParamsField;
using IteratePartitionHandler = milvus::engine::snapshot::IterateHandler<Partition>;
using IterateSegmentHandler = milvus::engine::snapshot::IterateHandler<Segment>;
using IterateSegmentFileHandler = milvus::engine::snapshot::IterateHandler<SegmentFile>;
using PartitionIterator = milvus::engine::snapshot::PartitionIterator;
using SegmentIterator = milvus::engine::snapshot::SegmentIterator;
using SSDBImpl = milvus::engine::SSDBImpl;
using Status = milvus::Status;

Expand Down