Skip to content

Commit

Permalink
snapshot integrate (#2854)
Browse files Browse the repository at this point in the history
* add test_segment

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* update interface GetResPath

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* retry ci

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* update SSSegmentWriter

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>
  • Loading branch information
cydrain committed Jul 15, 2020
1 parent 4cb8383 commit d473c7a
Show file tree
Hide file tree
Showing 19 changed files with 204 additions and 101 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ Please mark all changes in change log and use the issue from GitHub
- \#2692 Milvus hangs during multi-thread concurrent search
- \#2739 Fix mishards start failed
- \#2752 Milvus formats vectors data to double-precision and return to http client
- \#2767 fix a bug of getting wrong nprobe limitation in knowhere on GPU version
- \#2767 Fix a bug of getting wrong nprobe limitation in knowhere on GPU version
- \#2776 Fix too many data copies during creating IVF index
- \#2813 To implemente RNSG IP

Expand Down
3 changes: 2 additions & 1 deletion core/src/db/SSDBImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,8 @@ SSDBImpl::GetEntityByID(const std::string& collection_name, const IDNumbers& id_
snapshot::ScopedSnapshotT ss;
STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));

auto handler = std::make_shared<GetEntityByIdSegmentHandler>(nullptr, ss, id_array, field_names);
std::string dir_root = options_.meta_.path_;
auto handler = std::make_shared<GetEntityByIdSegmentHandler>(nullptr, ss, dir_root, id_array, field_names);
handler->Iterate();
STATUS_CHECK(handler->GetStatus());

Expand Down
27 changes: 18 additions & 9 deletions core/src/db/SnapshotHandlers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,17 @@ SegmentsToSearchCollector::Handle(const snapshot::SegmentCommitPtr& segment_comm

///////////////////////////////////////////////////////////////////////////////
GetEntityByIdSegmentHandler::GetEntityByIdSegmentHandler(const std::shared_ptr<milvus::server::Context>& context,
engine::snapshot::ScopedSnapshotT ss, const IDNumbers& ids,
engine::snapshot::ScopedSnapshotT ss,
const std::string& dir_root, const IDNumbers& ids,
const std::vector<std::string>& field_names)
: BaseT(ss), context_(context), ids_(ids), field_names_(field_names), vector_data_(), attr_type_(), attr_data_() {
: BaseT(ss),
context_(context),
dir_root_(dir_root),
ids_(ids),
field_names_(field_names),
vector_data_(),
attr_type_(),
attr_data_() {
for (auto& field_name : field_names_) {
auto field_ptr = ss_->GetField(field_name);
auto field_type = field_ptr->GetFtype();
Expand All @@ -126,25 +134,26 @@ GetEntityByIdSegmentHandler::Handle(const snapshot::SegmentPtr& segment) {
if (segment_visitor == nullptr) {
return Status(DB_ERROR, "Fail to build segment visitor with id " + std::to_string(segment->GetID()));
}
segment::SSSegmentReader segment_reader(segment_visitor);
segment::SSSegmentReader segment_reader(dir_root_, segment_visitor);

/* load UID's bloom filter file */
auto uid_field_visitor = segment_visitor->GetFieldVisitor(DEFAULT_UID_NAME);
auto uid_bf_visitor = uid_field_visitor->GetElementVisitor(FieldElementType::FET_BLOOM_FILTER);
std::string uid_bf_path = snapshot::GetResPath<snapshot::SegmentFile>(uid_bf_visitor->GetFile());

/* load UID's bloom filter file */
auto uid_blf_visitor = uid_field_visitor->GetElementVisitor(FieldElementType::FET_BLOOM_FILTER);
std::string uid_blf_path = snapshot::GetResPath<snapshot::SegmentFile>(dir_root_, uid_blf_visitor->GetFile());

segment::IdBloomFilterPtr id_bloom_filter_ptr;
STATUS_CHECK(segment_reader.LoadBloomFilter(uid_bf_path, id_bloom_filter_ptr));
STATUS_CHECK(segment_reader.LoadBloomFilter(uid_blf_path, id_bloom_filter_ptr));

/* load UID's raw data */
auto uid_raw_visitor = uid_field_visitor->GetElementVisitor(FieldElementType::FET_RAW);
std::string uid_raw_path = snapshot::GetResPath<snapshot::SegmentFile>(uid_raw_visitor->GetFile());
std::string uid_raw_path = snapshot::GetResPath<snapshot::SegmentFile>(dir_root_, uid_raw_visitor->GetFile());
std::vector<segment::doc_id_t> uids;
STATUS_CHECK(segment_reader.LoadUids(uid_raw_path, uids));

/* load UID's deleted docs */
auto uid_del_visitor = uid_field_visitor->GetElementVisitor(FieldElementType::FET_DELETED_DOCS);
std::string uid_del_path = snapshot::GetResPath<snapshot::SegmentFile>(uid_del_visitor->GetFile());
std::string uid_del_path = snapshot::GetResPath<snapshot::SegmentFile>(dir_root_, uid_del_visitor->GetFile());
segment::DeletedDocsPtr deleted_docs_ptr;
STATUS_CHECK(segment_reader.LoadDeletedDocs(uid_del_path, deleted_docs_ptr));

Expand Down
4 changes: 3 additions & 1 deletion core/src/db/SnapshotHandlers.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,15 @@ struct SegmentsToSearchCollector : public snapshot::IterateHandler<snapshot::Seg
struct GetEntityByIdSegmentHandler : public snapshot::IterateHandler<snapshot::Segment> {
using ResourceT = snapshot::Segment;
using BaseT = snapshot::IterateHandler<ResourceT>;
GetEntityByIdSegmentHandler(const server::ContextPtr& context, snapshot::ScopedSnapshotT ss, const IDNumbers& ids,
GetEntityByIdSegmentHandler(const server::ContextPtr& context, snapshot::ScopedSnapshotT ss,
const std::string& dir_root, const IDNumbers& ids,
const std::vector<std::string>& field_names);

Status
Handle(const typename ResourceT::Ptr&) override;

const server::ContextPtr context_;
const std::string dir_root_;
const engine::IDNumbers ids_;
const std::vector<std::string> field_names_;
std::vector<engine::VectorsData> vector_data_;
Expand Down
6 changes: 3 additions & 3 deletions core/src/db/insert/SSMemManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "SSVectorSource.h"
#include "db/Constants.h"
#include "db/snapshot/Snapshots.h"
#include "knowhere/index/vector_index/helpers/IndexParameter.h"
#include "utils/Log.h"

namespace milvus {
Expand Down Expand Up @@ -128,17 +129,16 @@ SSMemManagerImpl::ValidateChunk(int64_t collection_id, int64_t partition_id, con
return Status(DB_ERROR, err_msg + name);
}
break;
case meta::hybrid::DataType::VECTOR:
case meta::hybrid::DataType::VECTOR_FLOAT:
case meta::hybrid::DataType::VECTOR_BINARY: {
json params = field->GetParams();
if (params.find(VECTOR_DIMENSION_PARAM) == params.end()) {
if (params.find(knowhere::meta::DIM) == params.end()) {
std::string msg = "Vector field params must contain: dimension";
LOG_SERVER_ERROR_ << msg;
return Status(DB_ERROR, msg);
}

int64_t dimension = params[VECTOR_DIMENSION_PARAM];
int64_t dimension = params[knowhere::meta::DIM];
int64_t row_size =
(ftype == meta::hybrid::DataType::VECTOR_BINARY) ? dimension / 8 : dimension * sizeof(float);
if (data_size != chunk->count_ * row_size) {
Expand Down
8 changes: 4 additions & 4 deletions core/src/db/insert/SSMemSegment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "db/meta/MetaTypes.h"
#include "db/snapshot/Operations.h"
#include "db/snapshot/Snapshots.h"
#include "knowhere/index/vector_index/helpers/IndexParameter.h"
#include "metrics/Metrics.h"
#include "segment/SegmentReader.h"
#include "utils/Log.h"
Expand Down Expand Up @@ -109,7 +110,7 @@ SSMemSegment::CreateSegment() {
auto visitor = SegmentVisitor::Build(ss, ctx.new_segment, ctx.new_segment_files);

// create segment writer
segment_writer_ptr_ = std::make_shared<segment::SSSegmentWriter>(visitor);
segment_writer_ptr_ = std::make_shared<segment::SSSegmentWriter>(options_.meta_.path_, visitor);

return Status::OK();
}
Expand Down Expand Up @@ -152,17 +153,16 @@ SSMemSegment::GetSingleEntitySize(size_t& single_size) {
case meta::hybrid::DataType::INT64:
single_size += sizeof(uint64_t);
break;
case meta::hybrid::DataType::VECTOR:
case meta::hybrid::DataType::VECTOR_FLOAT:
case meta::hybrid::DataType::VECTOR_BINARY: {
json params = field->GetParams();
if (params.find(VECTOR_DIMENSION_PARAM) == params.end()) {
if (params.find(knowhere::meta::DIM) == params.end()) {
std::string msg = "Vector field params must contain: dimension";
LOG_SERVER_ERROR_ << msg;
return Status(DB_ERROR, msg);
}

int64_t dimension = params[VECTOR_DIMENSION_PARAM];
int64_t dimension = params[knowhere::meta::DIM];
if (ftype == meta::hybrid::DataType::VECTOR_BINARY) {
single_size += (dimension / 8);
} else {
Expand Down
5 changes: 3 additions & 2 deletions core/src/db/snapshot/CompoundOperations.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ NewSegmentOperation::CommitNewSegmentFile(const SegmentFileContext& context, Seg
auto ctx = context;
ctx.segment_id = context_.new_segment->GetID();
ctx.partition_id = context_.new_segment->GetPartitionId();
ctx.collection_id = GetStartedSS()->GetCollectionId();
auto new_sf_op = std::make_shared<SegmentFileOperation>(ctx, GetStartedSS());
STATUS_CHECK(new_sf_op->Push());
STATUS_CHECK(new_sf_op->GetResource(created));
Expand Down Expand Up @@ -613,8 +614,8 @@ CreateCollectionOperation::DoExecute(Store& store) {
auto& field_schema = field_kv.first;
auto& field_elements = field_kv.second;
FieldPtr field;
status =
store.CreateResource<Field>(Field(field_schema->GetName(), field_idx, field_schema->GetFtype()), field);
status = store.CreateResource<Field>(
Field(field_schema->GetName(), field_idx, field_schema->GetFtype(), field_schema->GetParams()), field);
auto f_ctx_p = ResourceContextBuilder<Field>().SetOp(meta::oUpdate).CreatePtr();
AddStepWithLsn(*field, c_context_.lsn, f_ctx_p);
MappingT element_ids = {};
Expand Down
3 changes: 2 additions & 1 deletion core/src/db/snapshot/Event.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class ResourceGCEvent : public Event {
STATUS_CHECK((*sd_op)(store));

/* TODO: physically clean resource */
std::string res_path = GetResPath<ResourceT>(res_);
std::string res_path = GetResPath<ResourceT>(dir_root_, res_);
/* if (!boost::filesystem::exists(res_path)) { */
/* return Status::OK(); */
/* } */
Expand All @@ -68,6 +68,7 @@ class ResourceGCEvent : public Event {

private:
class ResourceT::Ptr res_;
std::string dir_root_;
};

} // namespace snapshot
Expand Down
40 changes: 24 additions & 16 deletions core/src/db/snapshot/ResourceHelper.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,57 +13,65 @@

#include <memory>
#include <string>
#include <vector>

#include "db/snapshot/Resources.h"
#include "utils/Status.h"

namespace milvus::engine::snapshot {

static const char* COLLECTION_PREFIX = "C_";
static const char* PARTITION_PREFIX = "P_";
static const char* SEGMENT_PREFIX = "S_";
static const char* SEGMENT_FILE_PREFIX = "F_";

template <class ResourceT>
inline std::string
GetResPath(const typename ResourceT::Ptr& res_ptr) {
GetResPath(const std::string& root, const typename ResourceT::Ptr& res_ptr) {
return std::string();
}

template <>
inline std::string
GetResPath<Collection>(const Collection::Ptr& res_ptr) {
GetResPath<Collection>(const std::string& root, const Collection::Ptr& res_ptr) {
std::stringstream ss;
ss << res_ptr->GetID();
ss << root << "/";
ss << COLLECTION_PREFIX << res_ptr->GetID();

return ss.str();
}

template <>
inline std::string
GetResPath<Partition>(const Partition::Ptr& res_ptr) {
GetResPath<Partition>(const std::string& root, const Partition::Ptr& res_ptr) {
std::stringstream ss;
ss << res_ptr->GetCollectionId() << "/";
ss << res_ptr->GetID();
ss << root << "/";
ss << COLLECTION_PREFIX << res_ptr->GetCollectionId() << "/";
ss << PARTITION_PREFIX << res_ptr->GetID();

return ss.str();
}

template <>
inline std::string
GetResPath<Segment>(const Segment::Ptr& res_ptr) {
GetResPath<Segment>(const std::string& root, const Segment::Ptr& res_ptr) {
std::stringstream ss;
ss << res_ptr->GetCollectionId() << "/";
ss << res_ptr->GetPartitionId() << "/";
ss << res_ptr->GetID();
ss << root << "/";
ss << COLLECTION_PREFIX << res_ptr->GetCollectionId() << "/";
ss << PARTITION_PREFIX << res_ptr->GetPartitionId() << "/";
ss << SEGMENT_PREFIX << res_ptr->GetID();

return ss.str();
}

template <>
inline std::string
GetResPath<SegmentFile>(const SegmentFile::Ptr& res_ptr) {
GetResPath<SegmentFile>(const std::string& root, const SegmentFile::Ptr& res_ptr) {
std::stringstream ss;
ss << res_ptr->GetCollectionId() << "/";
ss << res_ptr->GetPartitionId() << "/";
ss << res_ptr->GetSegmentId() << "/";
ss << res_ptr->GetID();
ss << root << "/";
ss << COLLECTION_PREFIX << res_ptr->GetCollectionId() << "/";
ss << PARTITION_PREFIX << res_ptr->GetPartitionId() << "/";
ss << SEGMENT_PREFIX << res_ptr->GetSegmentId() << "/";
ss << SEGMENT_FILE_PREFIX << res_ptr->GetID();

return ss.str();
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/db/snapshot/Snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ Snapshot::ToString() const {
ss << to_matrix_string(fc_m, row_element_size, 2);
for (auto& fe_id : fc_m) {
auto fe = GetResource<FieldElement>(fe_id);
ss << "\n\tFieldElement: id=" << fe_id << ",name=" << fe->GetName();
ss << "\n\tFieldElement: id=" << fe_id << ",name=" << fe->GetName() << " CID=" << fe->GetCollectionId();
}
}

Expand Down
33 changes: 19 additions & 14 deletions core/src/segment/SSSegmentReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@
namespace milvus {
namespace segment {

SSSegmentReader::SSSegmentReader(const engine::SegmentVisitorPtr& segment_visitor) : segment_visitor_(segment_visitor) {
SSSegmentReader::SSSegmentReader(const std::string& dir_root, const engine::SegmentVisitorPtr& segment_visitor)
: dir_root_(dir_root), segment_visitor_(segment_visitor) {
auto& segment_ptr = segment_visitor_->GetSegment();
std::string directory = engine::snapshot::GetResPath<engine::snapshot::Segment>(segment_ptr);
std::string directory =
engine::snapshot::GetResPath<engine::snapshot::Segment>(dir_root_, segment_visitor->GetSegment());

storage::IOReaderPtr reader_ptr = std::make_shared<storage::DiskIOReader>();
storage::IOWriterPtr writer_ptr = std::make_shared<storage::DiskIOWriter>();
Expand All @@ -55,36 +57,39 @@ SSSegmentReader::Load() {
try {
// auto& ss_codec = codec::SSCodec::instance();

auto& field_visitors_map = segment_visitor_->GetFieldVisitors();
auto uid_field_visitor = segment_visitor_->GetFieldVisitor(engine::DEFAULT_UID_NAME);

/* load UID's raw data */
auto uid_raw_visitor = uid_field_visitor->GetElementVisitor(engine::FieldElementType::FET_RAW);
std::string uid_raw_path =
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(uid_raw_visitor->GetFile());
std::vector<segment::doc_id_t> uids;
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_root_, uid_raw_visitor->GetFile());
STATUS_CHECK(LoadUids(uid_raw_path, segment_ptr_->vectors_ptr_->GetMutableUids()));

/* load UID's deleted docs */
auto uid_del_visitor = uid_field_visitor->GetElementVisitor(engine::FieldElementType::FET_DELETED_DOCS);
std::string uid_del_path =
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(uid_del_visitor->GetFile());
segment::DeletedDocsPtr deleted_docs_ptr;
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_root_, uid_del_visitor->GetFile());
STATUS_CHECK(LoadDeletedDocs(uid_del_path, segment_ptr_->deleted_docs_ptr_));

/* load other data */
Status s;
auto& field_visitors_map = segment_visitor_->GetFieldVisitors();
for (auto& f_kv : field_visitors_map) {
auto& field_visitor = f_kv.second;
auto& field = field_visitor->GetField();
for (auto& file_kv : field_visitor->GetElementVistors()) {
auto& field_element_visitor = file_kv.second;
auto& fv = f_kv.second;
auto& field = fv->GetField();
for (auto& file_kv : fv->GetElementVistors()) {
auto& fev = file_kv.second;
std::string file_path =
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_root_, fev->GetFile());
if (!s.ok()) {
LOG_ENGINE_WARNING_ << "Cannot get resource path";
}

auto& segment_file = field_element_visitor->GetFile();
auto& segment_file = fev->GetFile();
if (segment_file == nullptr) {
continue;
}
auto file_path = engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(segment_file);
auto& field_element = field_element_visitor->GetElement();
auto& field_element = fev->GetElement();

if ((field->GetFtype() == engine::FieldType::VECTOR_FLOAT ||
field->GetFtype() == engine::FieldType::VECTOR_BINARY) &&
Expand Down
3 changes: 2 additions & 1 deletion core/src/segment/SSSegmentReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ namespace segment {

class SSSegmentReader {
public:
explicit SSSegmentReader(const engine::SegmentVisitorPtr& segment_visitor);
explicit SSSegmentReader(const std::string& dir_root, const engine::SegmentVisitorPtr& segment_visitor);

// TODO(zhiru)
Status
Expand Down Expand Up @@ -70,6 +70,7 @@ class SSSegmentReader {
engine::SegmentVisitorPtr segment_visitor_;
storage::FSHandlerPtr fs_ptr_;
SegmentPtr segment_ptr_;
std::string dir_root_;
};

using SSSegmentReaderPtr = std::shared_ptr<SSSegmentReader>;
Expand Down
Loading

0 comments on commit d473c7a

Please sign in to comment.