Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

snapshot integrate #2854

Merged
merged 7 commits into from
Jul 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ Please mark all changes in change log and use the issue from GitHub
- \#2692 Milvus hangs during multi-thread concurrent search
- \#2739 Fix mishards start failed
- \#2752 Milvus formats vectors data to double-precision and return to http client
- \#2767 fix a bug of getting wrong nprobe limitation in knowhere on GPU version
- \#2767 Fix a bug of getting wrong nprobe limitation in knowhere on GPU version
- \#2776 Fix too many data copies during creating IVF index
- \#2813 To implemente RNSG IP

Expand Down
3 changes: 2 additions & 1 deletion core/src/db/SSDBImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,8 @@ SSDBImpl::GetEntityByID(const std::string& collection_name, const IDNumbers& id_
snapshot::ScopedSnapshotT ss;
STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));

auto handler = std::make_shared<GetEntityByIdSegmentHandler>(nullptr, ss, id_array, field_names);
std::string dir_root = options_.meta_.path_;
auto handler = std::make_shared<GetEntityByIdSegmentHandler>(nullptr, ss, dir_root, id_array, field_names);
handler->Iterate();
STATUS_CHECK(handler->GetStatus());

Expand Down
27 changes: 18 additions & 9 deletions core/src/db/SnapshotHandlers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,17 @@ SegmentsToSearchCollector::Handle(const snapshot::SegmentCommitPtr& segment_comm

///////////////////////////////////////////////////////////////////////////////
GetEntityByIdSegmentHandler::GetEntityByIdSegmentHandler(const std::shared_ptr<milvus::server::Context>& context,
engine::snapshot::ScopedSnapshotT ss, const IDNumbers& ids,
engine::snapshot::ScopedSnapshotT ss,
const std::string& dir_root, const IDNumbers& ids,
const std::vector<std::string>& field_names)
: BaseT(ss), context_(context), ids_(ids), field_names_(field_names), vector_data_(), attr_type_(), attr_data_() {
: BaseT(ss),
context_(context),
dir_root_(dir_root),
ids_(ids),
field_names_(field_names),
vector_data_(),
attr_type_(),
attr_data_() {
for (auto& field_name : field_names_) {
auto field_ptr = ss_->GetField(field_name);
auto field_type = field_ptr->GetFtype();
Expand All @@ -126,25 +134,26 @@ GetEntityByIdSegmentHandler::Handle(const snapshot::SegmentPtr& segment) {
if (segment_visitor == nullptr) {
return Status(DB_ERROR, "Fail to build segment visitor with id " + std::to_string(segment->GetID()));
}
segment::SSSegmentReader segment_reader(segment_visitor);
segment::SSSegmentReader segment_reader(dir_root_, segment_visitor);

/* load UID's bloom filter file */
auto uid_field_visitor = segment_visitor->GetFieldVisitor(DEFAULT_UID_NAME);
auto uid_bf_visitor = uid_field_visitor->GetElementVisitor(FieldElementType::FET_BLOOM_FILTER);
std::string uid_bf_path = snapshot::GetResPath<snapshot::SegmentFile>(uid_bf_visitor->GetFile());

/* load UID's bloom filter file */
auto uid_blf_visitor = uid_field_visitor->GetElementVisitor(FieldElementType::FET_BLOOM_FILTER);
std::string uid_blf_path = snapshot::GetResPath<snapshot::SegmentFile>(dir_root_, uid_blf_visitor->GetFile());

segment::IdBloomFilterPtr id_bloom_filter_ptr;
STATUS_CHECK(segment_reader.LoadBloomFilter(uid_bf_path, id_bloom_filter_ptr));
STATUS_CHECK(segment_reader.LoadBloomFilter(uid_blf_path, id_bloom_filter_ptr));

/* load UID's raw data */
auto uid_raw_visitor = uid_field_visitor->GetElementVisitor(FieldElementType::FET_RAW);
std::string uid_raw_path = snapshot::GetResPath<snapshot::SegmentFile>(uid_raw_visitor->GetFile());
std::string uid_raw_path = snapshot::GetResPath<snapshot::SegmentFile>(dir_root_, uid_raw_visitor->GetFile());
std::vector<segment::doc_id_t> uids;
STATUS_CHECK(segment_reader.LoadUids(uid_raw_path, uids));

/* load UID's deleted docs */
auto uid_del_visitor = uid_field_visitor->GetElementVisitor(FieldElementType::FET_DELETED_DOCS);
std::string uid_del_path = snapshot::GetResPath<snapshot::SegmentFile>(uid_del_visitor->GetFile());
std::string uid_del_path = snapshot::GetResPath<snapshot::SegmentFile>(dir_root_, uid_del_visitor->GetFile());
segment::DeletedDocsPtr deleted_docs_ptr;
STATUS_CHECK(segment_reader.LoadDeletedDocs(uid_del_path, deleted_docs_ptr));

Expand Down
4 changes: 3 additions & 1 deletion core/src/db/SnapshotHandlers.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,15 @@ struct SegmentsToSearchCollector : public snapshot::IterateHandler<snapshot::Seg
struct GetEntityByIdSegmentHandler : public snapshot::IterateHandler<snapshot::Segment> {
using ResourceT = snapshot::Segment;
using BaseT = snapshot::IterateHandler<ResourceT>;
GetEntityByIdSegmentHandler(const server::ContextPtr& context, snapshot::ScopedSnapshotT ss, const IDNumbers& ids,
GetEntityByIdSegmentHandler(const server::ContextPtr& context, snapshot::ScopedSnapshotT ss,
const std::string& dir_root, const IDNumbers& ids,
const std::vector<std::string>& field_names);

Status
Handle(const typename ResourceT::Ptr&) override;

const server::ContextPtr context_;
const std::string dir_root_;
const engine::IDNumbers ids_;
const std::vector<std::string> field_names_;
std::vector<engine::VectorsData> vector_data_;
Expand Down
6 changes: 3 additions & 3 deletions core/src/db/insert/SSMemManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "SSVectorSource.h"
#include "db/Constants.h"
#include "db/snapshot/Snapshots.h"
#include "knowhere/index/vector_index/helpers/IndexParameter.h"
#include "utils/Log.h"

namespace milvus {
Expand Down Expand Up @@ -128,17 +129,16 @@ SSMemManagerImpl::ValidateChunk(int64_t collection_id, int64_t partition_id, con
return Status(DB_ERROR, err_msg + name);
}
break;
case meta::hybrid::DataType::VECTOR:
case meta::hybrid::DataType::VECTOR_FLOAT:
case meta::hybrid::DataType::VECTOR_BINARY: {
json params = field->GetParams();
if (params.find(VECTOR_DIMENSION_PARAM) == params.end()) {
if (params.find(knowhere::meta::DIM) == params.end()) {
std::string msg = "Vector field params must contain: dimension";
LOG_SERVER_ERROR_ << msg;
return Status(DB_ERROR, msg);
}

int64_t dimension = params[VECTOR_DIMENSION_PARAM];
int64_t dimension = params[knowhere::meta::DIM];
int64_t row_size =
(ftype == meta::hybrid::DataType::VECTOR_BINARY) ? dimension / 8 : dimension * sizeof(float);
if (data_size != chunk->count_ * row_size) {
Expand Down
8 changes: 4 additions & 4 deletions core/src/db/insert/SSMemSegment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "db/meta/MetaTypes.h"
#include "db/snapshot/Operations.h"
#include "db/snapshot/Snapshots.h"
#include "knowhere/index/vector_index/helpers/IndexParameter.h"
#include "metrics/Metrics.h"
#include "segment/SegmentReader.h"
#include "utils/Log.h"
Expand Down Expand Up @@ -109,7 +110,7 @@ SSMemSegment::CreateSegment() {
auto visitor = SegmentVisitor::Build(ss, ctx.new_segment, ctx.new_segment_files);

// create segment writer
segment_writer_ptr_ = std::make_shared<segment::SSSegmentWriter>(visitor);
segment_writer_ptr_ = std::make_shared<segment::SSSegmentWriter>(options_.meta_.path_, visitor);

return Status::OK();
}
Expand Down Expand Up @@ -152,17 +153,16 @@ SSMemSegment::GetSingleEntitySize(size_t& single_size) {
case meta::hybrid::DataType::INT64:
single_size += sizeof(uint64_t);
break;
case meta::hybrid::DataType::VECTOR:
case meta::hybrid::DataType::VECTOR_FLOAT:
case meta::hybrid::DataType::VECTOR_BINARY: {
json params = field->GetParams();
if (params.find(VECTOR_DIMENSION_PARAM) == params.end()) {
if (params.find(knowhere::meta::DIM) == params.end()) {
std::string msg = "Vector field params must contain: dimension";
LOG_SERVER_ERROR_ << msg;
return Status(DB_ERROR, msg);
}

int64_t dimension = params[VECTOR_DIMENSION_PARAM];
int64_t dimension = params[knowhere::meta::DIM];
if (ftype == meta::hybrid::DataType::VECTOR_BINARY) {
single_size += (dimension / 8);
} else {
Expand Down
5 changes: 3 additions & 2 deletions core/src/db/snapshot/CompoundOperations.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ NewSegmentOperation::CommitNewSegmentFile(const SegmentFileContext& context, Seg
auto ctx = context;
ctx.segment_id = context_.new_segment->GetID();
ctx.partition_id = context_.new_segment->GetPartitionId();
ctx.collection_id = GetStartedSS()->GetCollectionId();
auto new_sf_op = std::make_shared<SegmentFileOperation>(ctx, GetStartedSS());
STATUS_CHECK(new_sf_op->Push());
STATUS_CHECK(new_sf_op->GetResource(created));
Expand Down Expand Up @@ -613,8 +614,8 @@ CreateCollectionOperation::DoExecute(Store& store) {
auto& field_schema = field_kv.first;
auto& field_elements = field_kv.second;
FieldPtr field;
status =
store.CreateResource<Field>(Field(field_schema->GetName(), field_idx, field_schema->GetFtype()), field);
status = store.CreateResource<Field>(
Field(field_schema->GetName(), field_idx, field_schema->GetFtype(), field_schema->GetParams()), field);
auto f_ctx_p = ResourceContextBuilder<Field>().SetOp(meta::oUpdate).CreatePtr();
AddStepWithLsn(*field, c_context_.lsn, f_ctx_p);
MappingT element_ids = {};
Expand Down
3 changes: 2 additions & 1 deletion core/src/db/snapshot/Event.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class ResourceGCEvent : public Event {
STATUS_CHECK((*sd_op)(store));

/* TODO: physically clean resource */
std::string res_path = GetResPath<ResourceT>(res_);
std::string res_path = GetResPath<ResourceT>(dir_root_, res_);
/* if (!boost::filesystem::exists(res_path)) { */
/* return Status::OK(); */
/* } */
Expand All @@ -68,6 +68,7 @@ class ResourceGCEvent : public Event {

private:
class ResourceT::Ptr res_;
std::string dir_root_;
};

} // namespace snapshot
Expand Down
40 changes: 24 additions & 16 deletions core/src/db/snapshot/ResourceHelper.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,57 +13,65 @@

#include <memory>
#include <string>
#include <vector>

#include "db/snapshot/Resources.h"
#include "utils/Status.h"

namespace milvus::engine::snapshot {

static const char* COLLECTION_PREFIX = "C_";
static const char* PARTITION_PREFIX = "P_";
static const char* SEGMENT_PREFIX = "S_";
static const char* SEGMENT_FILE_PREFIX = "F_";

template <class ResourceT>
inline std::string
GetResPath(const typename ResourceT::Ptr& res_ptr) {
GetResPath(const std::string& root, const typename ResourceT::Ptr& res_ptr) {
return std::string();
}

template <>
inline std::string
GetResPath<Collection>(const Collection::Ptr& res_ptr) {
GetResPath<Collection>(const std::string& root, const Collection::Ptr& res_ptr) {
std::stringstream ss;
ss << res_ptr->GetID();
ss << root << "/";
ss << COLLECTION_PREFIX << res_ptr->GetID();

return ss.str();
}

template <>
inline std::string
GetResPath<Partition>(const Partition::Ptr& res_ptr) {
GetResPath<Partition>(const std::string& root, const Partition::Ptr& res_ptr) {
std::stringstream ss;
ss << res_ptr->GetCollectionId() << "/";
ss << res_ptr->GetID();
ss << root << "/";
ss << COLLECTION_PREFIX << res_ptr->GetCollectionId() << "/";
ss << PARTITION_PREFIX << res_ptr->GetID();

return ss.str();
}

template <>
inline std::string
GetResPath<Segment>(const Segment::Ptr& res_ptr) {
GetResPath<Segment>(const std::string& root, const Segment::Ptr& res_ptr) {
std::stringstream ss;
ss << res_ptr->GetCollectionId() << "/";
ss << res_ptr->GetPartitionId() << "/";
ss << res_ptr->GetID();
ss << root << "/";
ss << COLLECTION_PREFIX << res_ptr->GetCollectionId() << "/";
ss << PARTITION_PREFIX << res_ptr->GetPartitionId() << "/";
ss << SEGMENT_PREFIX << res_ptr->GetID();

return ss.str();
}

template <>
inline std::string
GetResPath<SegmentFile>(const SegmentFile::Ptr& res_ptr) {
GetResPath<SegmentFile>(const std::string& root, const SegmentFile::Ptr& res_ptr) {
std::stringstream ss;
ss << res_ptr->GetCollectionId() << "/";
ss << res_ptr->GetPartitionId() << "/";
ss << res_ptr->GetSegmentId() << "/";
ss << res_ptr->GetID();
ss << root << "/";
ss << COLLECTION_PREFIX << res_ptr->GetCollectionId() << "/";
ss << PARTITION_PREFIX << res_ptr->GetPartitionId() << "/";
ss << SEGMENT_PREFIX << res_ptr->GetSegmentId() << "/";
ss << SEGMENT_FILE_PREFIX << res_ptr->GetID();

return ss.str();
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/db/snapshot/Snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ Snapshot::ToString() const {
ss << to_matrix_string(fc_m, row_element_size, 2);
for (auto& fe_id : fc_m) {
auto fe = GetResource<FieldElement>(fe_id);
ss << "\n\tFieldElement: id=" << fe_id << ",name=" << fe->GetName();
ss << "\n\tFieldElement: id=" << fe_id << ",name=" << fe->GetName() << " CID=" << fe->GetCollectionId();
}
}

Expand Down
33 changes: 19 additions & 14 deletions core/src/segment/SSSegmentReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@
namespace milvus {
namespace segment {

SSSegmentReader::SSSegmentReader(const engine::SegmentVisitorPtr& segment_visitor) : segment_visitor_(segment_visitor) {
SSSegmentReader::SSSegmentReader(const std::string& dir_root, const engine::SegmentVisitorPtr& segment_visitor)
: dir_root_(dir_root), segment_visitor_(segment_visitor) {
auto& segment_ptr = segment_visitor_->GetSegment();
std::string directory = engine::snapshot::GetResPath<engine::snapshot::Segment>(segment_ptr);
std::string directory =
engine::snapshot::GetResPath<engine::snapshot::Segment>(dir_root_, segment_visitor->GetSegment());

storage::IOReaderPtr reader_ptr = std::make_shared<storage::DiskIOReader>();
storage::IOWriterPtr writer_ptr = std::make_shared<storage::DiskIOWriter>();
Expand All @@ -55,36 +57,39 @@ SSSegmentReader::Load() {
try {
// auto& ss_codec = codec::SSCodec::instance();

auto& field_visitors_map = segment_visitor_->GetFieldVisitors();
auto uid_field_visitor = segment_visitor_->GetFieldVisitor(engine::DEFAULT_UID_NAME);

/* load UID's raw data */
auto uid_raw_visitor = uid_field_visitor->GetElementVisitor(engine::FieldElementType::FET_RAW);
std::string uid_raw_path =
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(uid_raw_visitor->GetFile());
std::vector<segment::doc_id_t> uids;
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_root_, uid_raw_visitor->GetFile());
STATUS_CHECK(LoadUids(uid_raw_path, segment_ptr_->vectors_ptr_->GetMutableUids()));

/* load UID's deleted docs */
auto uid_del_visitor = uid_field_visitor->GetElementVisitor(engine::FieldElementType::FET_DELETED_DOCS);
std::string uid_del_path =
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(uid_del_visitor->GetFile());
segment::DeletedDocsPtr deleted_docs_ptr;
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_root_, uid_del_visitor->GetFile());
STATUS_CHECK(LoadDeletedDocs(uid_del_path, segment_ptr_->deleted_docs_ptr_));

/* load other data */
Status s;
auto& field_visitors_map = segment_visitor_->GetFieldVisitors();
for (auto& f_kv : field_visitors_map) {
auto& field_visitor = f_kv.second;
auto& field = field_visitor->GetField();
for (auto& file_kv : field_visitor->GetElementVistors()) {
auto& field_element_visitor = file_kv.second;
auto& fv = f_kv.second;
auto& field = fv->GetField();
for (auto& file_kv : fv->GetElementVistors()) {
auto& fev = file_kv.second;
std::string file_path =
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_root_, fev->GetFile());
if (!s.ok()) {
LOG_ENGINE_WARNING_ << "Cannot get resource path";
}

auto& segment_file = field_element_visitor->GetFile();
auto& segment_file = fev->GetFile();
if (segment_file == nullptr) {
continue;
}
auto file_path = engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(segment_file);
auto& field_element = field_element_visitor->GetElement();
auto& field_element = fev->GetElement();

if ((field->GetFtype() == engine::FieldType::VECTOR_FLOAT ||
field->GetFtype() == engine::FieldType::VECTOR_BINARY) &&
Expand Down
3 changes: 2 additions & 1 deletion core/src/segment/SSSegmentReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ namespace segment {

class SSSegmentReader {
public:
explicit SSSegmentReader(const engine::SegmentVisitorPtr& segment_visitor);
explicit SSSegmentReader(const std::string& dir_root, const engine::SegmentVisitorPtr& segment_visitor);

// TODO(zhiru)
Status
Expand Down Expand Up @@ -70,6 +70,7 @@ class SSSegmentReader {
engine::SegmentVisitorPtr segment_visitor_;
storage::FSHandlerPtr fs_ptr_;
SegmentPtr segment_ptr_;
std::string dir_root_;
};

using SSSegmentReaderPtr = std::shared_ptr<SSSegmentReader>;
Expand Down