Skip to content

Commit

Permalink
snapshot integrate GetEntityByID (#2753)
Browse files Browse the repository at this point in the history
* code opt

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* add some APIs for SSDBImpl

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* partially add GetVectorById

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* snapshot opt

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* fix typo

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* update GetVectorByID framework

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* rename GetResFiles to GetResPath

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* update GetVectorByIdSegmentHandler

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* add GetEntityByID

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* update DataType

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* update ParamField

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>
  • Loading branch information
cydrain committed Jul 7, 2020
1 parent 43496d5 commit 1b4e49a
Show file tree
Hide file tree
Showing 16 changed files with 421 additions and 269 deletions.
23 changes: 21 additions & 2 deletions core/src/db/SSDBImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -315,8 +315,7 @@ SSDBImpl::DropIndex(const std::string& collection_name, const std::string& field
}

Status
SSDBImpl::PreloadCollection(const std::shared_ptr<server::Context>& context, const std::string& collection_name,
bool force) {
SSDBImpl::PreloadCollection(const server::ContextPtr& context, const std::string& collection_name, bool force) {
CHECK_INITIALIZED;

snapshot::ScopedSnapshotT ss;
Expand All @@ -328,6 +327,26 @@ SSDBImpl::PreloadCollection(const std::shared_ptr<server::Context>& context, con
return handler->GetStatus();
}

Status
SSDBImpl::GetEntityByID(const std::string& collection_name, const IDNumbers& id_array,
const std::vector<std::string>& field_names, std::vector<VectorsData>& vector_data,
std::vector<meta::hybrid::DataType>& attr_type, std::vector<AttrsData>& attr_data) {
CHECK_INITIALIZED;

snapshot::ScopedSnapshotT ss;
STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));

auto handler = std::make_shared<GetEntityByIdSegmentHandler>(nullptr, ss, id_array, field_names);
handler->Iterate();
STATUS_CHECK(handler->GetStatus());

vector_data = std::move(handler->vector_data_);
attr_type = std::move(handler->attr_type_);
attr_data = std::move(handler->attr_data_);

return Status::OK();
}

////////////////////////////////////////////////////////////////////////////////
// Internal APIs
////////////////////////////////////////////////////////////////////////////////
Expand Down
8 changes: 6 additions & 2 deletions core/src/db/SSDBImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,7 @@ class SSDBImpl {
GetCollectionRowCount(const std::string& collection_name, uint64_t& row_count);

Status
PreloadCollection(const std::shared_ptr<server::Context>& context, const std::string& collection_name,
bool force = false);
PreloadCollection(const server::ContextPtr& context, const std::string& collection_name, bool force = false);

Status
CreatePartition(const std::string& collection_name, const std::string& partition_name);
Expand All @@ -83,6 +82,11 @@ class SSDBImpl {
Status
DropIndex(const std::string& collection_name, const std::string& field_name, const std::string& field_element_name);

Status
GetEntityByID(const std::string& collection_name, const IDNumbers& id_array,
const std::vector<std::string>& field_names, std::vector<engine::VectorsData>& vector_data,
std::vector<meta::hybrid::DataType>& attr_type, std::vector<engine::AttrsData>& attr_data);

private:
void
InternalFlush(const std::string& collection_id = "");
Expand Down
125 changes: 125 additions & 0 deletions core/src/db/SnapshotHandlers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@

#include "db/SnapshotHandlers.h"
#include "db/meta/MetaTypes.h"
#include "db/snapshot/ResourceHelper.h"
#include "db/snapshot/Snapshot.h"
#include "knowhere/index/vector_index/helpers/IndexParameter.h"
#include "segment/SegmentReader.h"

#include <unordered_map>
#include <utility>

namespace milvus {
namespace engine {
Expand Down Expand Up @@ -62,6 +69,7 @@ LoadVectorFieldHandler::Handle(const snapshot::FieldPtr& field) {
return status;
}

///////////////////////////////////////////////////////////////////////////////
SegmentsToSearchCollector::SegmentsToSearchCollector(snapshot::ScopedSnapshotT ss, meta::FilesHolder& holder)
: BaseT(ss), holder_(holder) {
}
Expand Down Expand Up @@ -93,5 +101,122 @@ SegmentsToSearchCollector::Handle(const snapshot::SegmentCommitPtr& segment_comm
holder_.MarkFile(schema);
}

///////////////////////////////////////////////////////////////////////////////
GetEntityByIdSegmentHandler::GetEntityByIdSegmentHandler(const std::shared_ptr<milvus::server::Context>& context,
engine::snapshot::ScopedSnapshotT ss, const IDNumbers& ids,
const std::vector<std::string>& field_names)
: BaseT(ss), context_(context), ids_(ids), field_names_(field_names), vector_data_(), attr_type_(), attr_data_() {
}

Status
GetEntityByIdSegmentHandler::Handle(const snapshot::SegmentPtr& segment) {
LOG_ENGINE_DEBUG_ << "Get entity by id in segment " << segment->GetID();

// Load bloom filter
std::string segment_dir = snapshot::GetResPath<snapshot::Segment>(segment);
segment::SegmentReader segment_reader(segment_dir);

segment::IdBloomFilterPtr id_bloom_filter_ptr;
STATUS_CHECK(segment_reader.LoadBloomFilter(id_bloom_filter_ptr));

// Load uids and check if the id is indeed present. If yes, find its offset.
std::vector<segment::doc_id_t> uids;
STATUS_CHECK(segment_reader.LoadUids(uids));

// Check whether the id has been deleted
segment::DeletedDocsPtr deleted_docs_ptr;
STATUS_CHECK(segment_reader.LoadDeletedDocs(deleted_docs_ptr));
auto& deleted_docs = deleted_docs_ptr->GetDeletedDocs();

for (auto id : ids_) {
AttrsData& attr_ref = attr_data_[id];
VectorsData& vector_ref = vector_data_[id];

/* fast check using bloom filter */
if (!id_bloom_filter_ptr->Check(id)) {
continue;
}

/* check if id really exists in uids */
auto found = std::find(uids.begin(), uids.end(), id);
if (found == uids.end()) {
continue;
}

/* check if this id is deleted */
auto offset = std::distance(uids.begin(), found);
auto deleted = std::find(deleted_docs.begin(), deleted_docs.end(), offset);
if (deleted != deleted_docs.end()) {
continue;
}

std::unordered_map<std::string, std::vector<uint8_t>> raw_attrs;
for (auto& field_name : field_names_) {
auto field_ptr = ss_->GetField(field_name);
auto field_params = field_ptr->GetParams();
auto dim = field_params[knowhere::meta::DIM].get<int64_t>();
auto field_type = field_ptr->GetFtype();

if (field_ptr->GetFtype() == (int64_t)meta::hybrid::DataType::VECTOR_BINARY) {
size_t vector_size = dim / 8;
std::vector<uint8_t> raw_vector;
STATUS_CHECK(segment_reader.LoadVectors(offset * vector_size, vector_size, raw_vector));

vector_ref.vector_count_ = 1;
vector_ref.binary_data_.swap(raw_vector);
} else if (field_ptr->GetFtype() == (int64_t)meta::hybrid::DataType::VECTOR_FLOAT) {
size_t vector_size = dim * sizeof(float);
std::vector<uint8_t> raw_vector;
STATUS_CHECK(segment_reader.LoadVectors(offset * vector_size, vector_size, raw_vector));

vector_ref.vector_count_ = 1;
std::vector<float> float_vector;
float_vector.resize(dim);
memcpy(float_vector.data(), raw_vector.data(), vector_size);
vector_ref.float_data_.swap(float_vector);
} else {
size_t num_bytes;
switch (field_type) {
case (int64_t)meta::hybrid::DataType::INT8: {
num_bytes = sizeof(int8_t);
break;
}
case (int64_t)meta::hybrid::DataType::INT16: {
num_bytes = sizeof(int16_t);
break;
}
case (int64_t)meta::hybrid::DataType::INT32: {
num_bytes = sizeof(int32_t);
break;
}
case (int64_t)meta::hybrid::DataType::INT64: {
num_bytes = sizeof(int64_t);
break;
}
case (int64_t)meta::hybrid::DataType::FLOAT: {
num_bytes = sizeof(float);
break;
}
case (int64_t)meta::hybrid::DataType::DOUBLE: {
num_bytes = sizeof(double);
break;
}
default: {
std::string msg = "Field type of " + field_name + " not supported";
return Status(DB_ERROR, msg);
}
}
std::vector<uint8_t> raw_attr;
STATUS_CHECK(segment_reader.LoadAttrs(field_name, offset * num_bytes, num_bytes, raw_attr));
raw_attrs.insert(std::make_pair(field_name, raw_attr));
}
}

attr_ref.attr_count_ = 1;
attr_ref.attr_data_ = raw_attrs;
}

return Status::OK();
}
} // namespace engine
} // namespace milvus
31 changes: 26 additions & 5 deletions core/src/db/SnapshotHandlers.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,38 +11,41 @@

#pragma once

#include "db/Types.h"
#include "db/meta/FilesHolder.h"
#include "db/snapshot/Snapshot.h"
#include "server/context/Context.h"
#include "utils/Log.h"

#include <memory>
#include <string>
#include <vector>

namespace milvus {
namespace engine {

struct LoadVectorFieldElementHandler : public snapshot::IterateHandler<snapshot::FieldElement> {
using ResourceT = snapshot::FieldElement;
using BaseT = snapshot::IterateHandler<ResourceT>;
LoadVectorFieldElementHandler(const std::shared_ptr<server::Context>& context, snapshot::ScopedSnapshotT ss,
LoadVectorFieldElementHandler(const server::ContextPtr& context, snapshot::ScopedSnapshotT ss,
const snapshot::FieldPtr& field);

Status
Handle(const typename ResourceT::Ptr&) override;

const std::shared_ptr<server::Context>& context_;
const snapshot::FieldPtr& field_;
const server::ContextPtr context_;
const snapshot::FieldPtr field_;
};

struct LoadVectorFieldHandler : public snapshot::IterateHandler<snapshot::Field> {
using ResourceT = snapshot::Field;
using BaseT = snapshot::IterateHandler<ResourceT>;
LoadVectorFieldHandler(const std::shared_ptr<server::Context>& context, snapshot::ScopedSnapshotT ss);
LoadVectorFieldHandler(const server::ContextPtr& context, snapshot::ScopedSnapshotT ss);

Status
Handle(const typename ResourceT::Ptr&) override;

const std::shared_ptr<server::Context>& context_;
const server::ContextPtr context_;
};

struct SegmentsToSearchCollector : public snapshot::IterateHandler<snapshot::SegmentCommit> {
Expand All @@ -56,5 +59,23 @@ struct SegmentsToSearchCollector : public snapshot::IterateHandler<snapshot::Seg
meta::FilesHolder& holder_;
};

///////////////////////////////////////////////////////////////////////////////
struct GetEntityByIdSegmentHandler : public snapshot::IterateHandler<snapshot::Segment> {
using ResourceT = snapshot::Segment;
using BaseT = snapshot::IterateHandler<ResourceT>;
GetEntityByIdSegmentHandler(const server::ContextPtr& context, snapshot::ScopedSnapshotT ss, const IDNumbers& ids,
const std::vector<std::string>& field_names);

Status
Handle(const typename ResourceT::Ptr&) override;

const server::ContextPtr context_;
const engine::IDNumbers ids_;
const std::vector<std::string> field_names_;
std::vector<engine::VectorsData> vector_data_;
std::vector<meta::hybrid::DataType> attr_type_;
std::vector<engine::AttrsData> attr_data_;
};

} // namespace engine
} // namespace milvus
3 changes: 3 additions & 0 deletions core/src/db/Types.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ typedef segment::doc_id_t IDNumber;
typedef IDNumber* IDNumberPtr;
typedef std::vector<IDNumber> IDNumbers;

typedef faiss::Index::distance_t VectorDistance;
typedef std::vector<VectorDistance> VectorDistances;

typedef std::vector<faiss::Index::idx_t> ResultIds;
typedef std::vector<faiss::Index::distance_t> ResultDistances;

Expand Down
42 changes: 13 additions & 29 deletions core/src/db/meta/MetaTypes.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,20 +102,21 @@ using Table2FileRef = std::map<std::string, File2RefCount>;
namespace hybrid {

enum class DataType {
INT8 = 1,
INT16 = 2,
INT32 = 3,
INT64 = 4,
NONE = 0,
BOOL = 1,
INT8 = 2,
INT16 = 3,
INT32 = 4,
INT64 = 5,

STRING = 20,

BOOL = 30,
FLOAT = 10,
DOUBLE = 11,

FLOAT = 40,
DOUBLE = 41,
STRING = 20,

VECTOR = 100,
UNKNOWN = 9999,
VECTOR_BINARY = 100,
VECTOR_FLOAT = 101,
VECTOR = 200,
};

struct VectorFieldSchema {
Expand All @@ -133,27 +134,10 @@ struct VectorFieldsSchema {
using VectorFieldSchemaPtr = std::shared_ptr<VectorFieldSchema>;

struct FieldSchema {
typedef enum {
INT8 = 1,
INT16 = 2,
INT32 = 3,
INT64 = 4,

STRING = 20,

BOOL = 30,

FLOAT = 40,
DOUBLE = 41,

VECTOR = 100,
UNKNOWN = 9999,
} FIELD_TYPE;

// TODO(yukun): need field_id?
std::string collection_id_;
std::string field_name_;
int32_t field_type_ = (int)INT8;
int32_t field_type_;
std::string field_params_;
};

Expand Down
19 changes: 8 additions & 11 deletions core/src/db/snapshot/Event.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,14 @@ class ResourceGCEvent : public Event {
STATUS_CHECK((*sd_op)(store));

/* TODO: physically clean resource */
std::vector<std::string> res_file_list;
STATUS_CHECK(GetResFiles<ResourceT>(res_file_list, res_));
for (auto& res_file : res_file_list) {
if (!boost::filesystem::exists(res_file)) {
continue;
}
if (boost::filesystem::is_directory(res_file)) {
boost::filesystem::remove_all(res_file);
} else {
boost::filesystem::remove(res_file);
}
std::string res_path = GetResPath<ResourceT>(res_);
if (!boost::filesystem::exists(res_path)) {
return Status::OK();
}
if (boost::filesystem::is_directory(res_path)) {
boost::filesystem::remove_all(res_path);
} else {
boost::filesystem::remove(res_path);
}

/* remove resource from meta */
Expand Down

0 comments on commit 1b4e49a

Please sign in to comment.