Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

snapshot integrate GetEntityByID #2753

Merged
merged 17 commits into from
Jul 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
23 changes: 21 additions & 2 deletions core/src/db/SSDBImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,7 @@ SSDBImpl::DropIndex(const std::string& collection_name, const std::string& field
}

Status
SSDBImpl::PreloadCollection(const std::shared_ptr<server::Context>& context, const std::string& collection_name,
bool force) {
SSDBImpl::PreloadCollection(const server::ContextPtr& context, const std::string& collection_name, bool force) {
CHECK_INITIALIZED;

snapshot::ScopedSnapshotT ss;
Expand All @@ -247,6 +246,26 @@ SSDBImpl::PreloadCollection(const std::shared_ptr<server::Context>& context, con
return handler->GetStatus();
}

Status
SSDBImpl::GetEntityByID(const std::string& collection_name, const IDNumbers& id_array,
const std::vector<std::string>& field_names, std::vector<VectorsData>& vector_data,
std::vector<meta::hybrid::DataType>& attr_type, std::vector<AttrsData>& attr_data) {
CHECK_INITIALIZED;

snapshot::ScopedSnapshotT ss;
STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));

auto handler = std::make_shared<GetEntityByIdSegmentHandler>(nullptr, ss, id_array, field_names);
handler->Iterate();
STATUS_CHECK(handler->GetStatus());

vector_data = std::move(handler->vector_data_);
attr_type = std::move(handler->attr_type_);
attr_data = std::move(handler->attr_data_);

return Status::OK();
}

////////////////////////////////////////////////////////////////////////////////
// Internal APIs
////////////////////////////////////////////////////////////////////////////////
Expand Down
8 changes: 6 additions & 2 deletions core/src/db/SSDBImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,7 @@ class SSDBImpl {
GetCollectionRowCount(const std::string& collection_name, uint64_t& row_count);

Status
PreloadCollection(const std::shared_ptr<server::Context>& context, const std::string& collection_name,
bool force = false);
PreloadCollection(const server::ContextPtr& context, const std::string& collection_name, bool force = false);

Status
CreatePartition(const std::string& collection_name, const std::string& partition_name);
Expand All @@ -76,6 +75,11 @@ class SSDBImpl {
Status
DropIndex(const std::string& collection_name, const std::string& field_name, const std::string& field_element_name);

Status
GetEntityByID(const std::string& collection_name, const IDNumbers& id_array,
const std::vector<std::string>& field_names, std::vector<engine::VectorsData>& vector_data,
std::vector<meta::hybrid::DataType>& attr_type, std::vector<engine::AttrsData>& attr_data);

private:
void
InternalFlush(const std::string& collection_id = "");
Expand Down
125 changes: 125 additions & 0 deletions core/src/db/SnapshotHandlers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@

#include "db/SnapshotHandlers.h"
#include "db/meta/MetaTypes.h"
#include "db/snapshot/ResourceHelper.h"
#include "db/snapshot/Snapshot.h"
#include "knowhere/index/vector_index/helpers/IndexParameter.h"
#include "segment/SegmentReader.h"

#include <unordered_map>
#include <utility>

namespace milvus {
namespace engine {
Expand Down Expand Up @@ -62,6 +69,7 @@ LoadVectorFieldHandler::Handle(const snapshot::FieldPtr& field) {
return status;
}

///////////////////////////////////////////////////////////////////////////////
SegmentsToSearchCollector::SegmentsToSearchCollector(snapshot::ScopedSnapshotT ss, meta::FilesHolder& holder)
: BaseT(ss), holder_(holder) {
}
Expand Down Expand Up @@ -93,5 +101,122 @@ SegmentsToSearchCollector::Handle(const snapshot::SegmentCommitPtr& segment_comm
holder_.MarkFile(schema);
}

///////////////////////////////////////////////////////////////////////////////
GetEntityByIdSegmentHandler::GetEntityByIdSegmentHandler(const std::shared_ptr<milvus::server::Context>& context,
engine::snapshot::ScopedSnapshotT ss, const IDNumbers& ids,
const std::vector<std::string>& field_names)
: BaseT(ss), context_(context), ids_(ids), field_names_(field_names), vector_data_(), attr_type_(), attr_data_() {
}

Status
GetEntityByIdSegmentHandler::Handle(const snapshot::SegmentPtr& segment) {
LOG_ENGINE_DEBUG_ << "Get entity by id in segment " << segment->GetID();

// Load bloom filter
std::string segment_dir = snapshot::GetResPath<snapshot::Segment>(segment);
segment::SegmentReader segment_reader(segment_dir);

segment::IdBloomFilterPtr id_bloom_filter_ptr;
STATUS_CHECK(segment_reader.LoadBloomFilter(id_bloom_filter_ptr));

// Load uids and check if the id is indeed present. If yes, find its offset.
std::vector<segment::doc_id_t> uids;
STATUS_CHECK(segment_reader.LoadUids(uids));

// Check whether the id has been deleted
segment::DeletedDocsPtr deleted_docs_ptr;
STATUS_CHECK(segment_reader.LoadDeletedDocs(deleted_docs_ptr));
auto& deleted_docs = deleted_docs_ptr->GetDeletedDocs();

for (auto id : ids_) {
AttrsData& attr_ref = attr_data_[id];
VectorsData& vector_ref = vector_data_[id];

/* fast check using bloom filter */
if (!id_bloom_filter_ptr->Check(id)) {
continue;
}

/* check if id really exists in uids */
auto found = std::find(uids.begin(), uids.end(), id);
if (found == uids.end()) {
continue;
}

/* check if this id is deleted */
auto offset = std::distance(uids.begin(), found);
auto deleted = std::find(deleted_docs.begin(), deleted_docs.end(), offset);
if (deleted != deleted_docs.end()) {
continue;
}

std::unordered_map<std::string, std::vector<uint8_t>> raw_attrs;
for (auto& field_name : field_names_) {
auto field_ptr = ss_->GetField(field_name);
auto field_params = field_ptr->GetParams();
auto dim = field_params[knowhere::meta::DIM].get<int64_t>();
auto field_type = field_ptr->GetFtype();

if (field_ptr->GetFtype() == (int64_t)meta::hybrid::DataType::VECTOR_BINARY) {
size_t vector_size = dim / 8;
std::vector<uint8_t> raw_vector;
STATUS_CHECK(segment_reader.LoadVectors(offset * vector_size, vector_size, raw_vector));

vector_ref.vector_count_ = 1;
vector_ref.binary_data_.swap(raw_vector);
} else if (field_ptr->GetFtype() == (int64_t)meta::hybrid::DataType::VECTOR_FLOAT) {
size_t vector_size = dim * sizeof(float);
std::vector<uint8_t> raw_vector;
STATUS_CHECK(segment_reader.LoadVectors(offset * vector_size, vector_size, raw_vector));

vector_ref.vector_count_ = 1;
std::vector<float> float_vector;
float_vector.resize(dim);
memcpy(float_vector.data(), raw_vector.data(), vector_size);
vector_ref.float_data_.swap(float_vector);
} else {
size_t num_bytes;
switch (field_type) {
case (int64_t)meta::hybrid::DataType::INT8: {
num_bytes = sizeof(int8_t);
break;
}
case (int64_t)meta::hybrid::DataType::INT16: {
num_bytes = sizeof(int16_t);
break;
}
case (int64_t)meta::hybrid::DataType::INT32: {
num_bytes = sizeof(int32_t);
break;
}
case (int64_t)meta::hybrid::DataType::INT64: {
num_bytes = sizeof(int64_t);
break;
}
case (int64_t)meta::hybrid::DataType::FLOAT: {
num_bytes = sizeof(float);
break;
}
case (int64_t)meta::hybrid::DataType::DOUBLE: {
num_bytes = sizeof(double);
break;
}
default: {
std::string msg = "Field type of " + field_name + " not supported";
return Status(DB_ERROR, msg);
}
}
std::vector<uint8_t> raw_attr;
STATUS_CHECK(segment_reader.LoadAttrs(field_name, offset * num_bytes, num_bytes, raw_attr));
raw_attrs.insert(std::make_pair(field_name, raw_attr));
}
}

attr_ref.attr_count_ = 1;
attr_ref.attr_data_ = raw_attrs;
}

return Status::OK();
}
} // namespace engine
} // namespace milvus
31 changes: 26 additions & 5 deletions core/src/db/SnapshotHandlers.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,38 +11,41 @@

#pragma once

#include "db/Types.h"
#include "db/meta/FilesHolder.h"
#include "db/snapshot/Snapshot.h"
#include "server/context/Context.h"
#include "utils/Log.h"

#include <memory>
#include <string>
#include <vector>

namespace milvus {
namespace engine {

struct LoadVectorFieldElementHandler : public snapshot::IterateHandler<snapshot::FieldElement> {
using ResourceT = snapshot::FieldElement;
using BaseT = snapshot::IterateHandler<ResourceT>;
LoadVectorFieldElementHandler(const std::shared_ptr<server::Context>& context, snapshot::ScopedSnapshotT ss,
LoadVectorFieldElementHandler(const server::ContextPtr& context, snapshot::ScopedSnapshotT ss,
const snapshot::FieldPtr& field);

Status
Handle(const typename ResourceT::Ptr&) override;

const std::shared_ptr<server::Context>& context_;
const snapshot::FieldPtr& field_;
const server::ContextPtr context_;
const snapshot::FieldPtr field_;
};

struct LoadVectorFieldHandler : public snapshot::IterateHandler<snapshot::Field> {
using ResourceT = snapshot::Field;
using BaseT = snapshot::IterateHandler<ResourceT>;
LoadVectorFieldHandler(const std::shared_ptr<server::Context>& context, snapshot::ScopedSnapshotT ss);
LoadVectorFieldHandler(const server::ContextPtr& context, snapshot::ScopedSnapshotT ss);

Status
Handle(const typename ResourceT::Ptr&) override;

const std::shared_ptr<server::Context>& context_;
const server::ContextPtr context_;
};

struct SegmentsToSearchCollector : public snapshot::IterateHandler<snapshot::SegmentCommit> {
Expand All @@ -56,5 +59,23 @@ struct SegmentsToSearchCollector : public snapshot::IterateHandler<snapshot::Seg
meta::FilesHolder& holder_;
};

///////////////////////////////////////////////////////////////////////////////
struct GetEntityByIdSegmentHandler : public snapshot::IterateHandler<snapshot::Segment> {
using ResourceT = snapshot::Segment;
using BaseT = snapshot::IterateHandler<ResourceT>;
GetEntityByIdSegmentHandler(const server::ContextPtr& context, snapshot::ScopedSnapshotT ss, const IDNumbers& ids,
const std::vector<std::string>& field_names);

Status
Handle(const typename ResourceT::Ptr&) override;

const server::ContextPtr context_;
const engine::IDNumbers ids_;
const std::vector<std::string> field_names_;
std::vector<engine::VectorsData> vector_data_;
std::vector<meta::hybrid::DataType> attr_type_;
std::vector<engine::AttrsData> attr_data_;
};

} // namespace engine
} // namespace milvus
3 changes: 3 additions & 0 deletions core/src/db/Types.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ typedef segment::doc_id_t IDNumber;
typedef IDNumber* IDNumberPtr;
typedef std::vector<IDNumber> IDNumbers;

typedef faiss::Index::distance_t VectorDistance;
typedef std::vector<VectorDistance> VectorDistances;

typedef std::vector<faiss::Index::idx_t> ResultIds;
typedef std::vector<faiss::Index::distance_t> ResultDistances;

Expand Down
42 changes: 13 additions & 29 deletions core/src/db/meta/MetaTypes.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,20 +102,21 @@ using Table2FileRef = std::map<std::string, File2RefCount>;
namespace hybrid {

enum class DataType {
INT8 = 1,
INT16 = 2,
INT32 = 3,
INT64 = 4,
NONE = 0,
BOOL = 1,
INT8 = 2,
INT16 = 3,
INT32 = 4,
INT64 = 5,

STRING = 20,

BOOL = 30,
FLOAT = 10,
DOUBLE = 11,

FLOAT = 40,
DOUBLE = 41,
STRING = 20,

VECTOR = 100,
UNKNOWN = 9999,
VECTOR_BINARY = 100,
VECTOR_FLOAT = 101,
VECTOR = 200,
};

struct VectorFieldSchema {
Expand All @@ -133,27 +134,10 @@ struct VectorFieldsSchema {
using VectorFieldSchemaPtr = std::shared_ptr<VectorFieldSchema>;

struct FieldSchema {
typedef enum {
INT8 = 1,
INT16 = 2,
INT32 = 3,
INT64 = 4,

STRING = 20,

BOOL = 30,

FLOAT = 40,
DOUBLE = 41,

VECTOR = 100,
UNKNOWN = 9999,
} FIELD_TYPE;

// TODO(yukun): need field_id?
std::string collection_id_;
std::string field_name_;
int32_t field_type_ = (int)INT8;
int32_t field_type_;
std::string field_params_;
};

Expand Down
19 changes: 8 additions & 11 deletions core/src/db/snapshot/Event.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,14 @@ class ResourceGCEvent : public Event {
STATUS_CHECK((*sd_op)(store));

/* TODO: physically clean resource */
std::vector<std::string> res_file_list;
STATUS_CHECK(GetResFiles<ResourceT>(res_file_list, res_));
for (auto& res_file : res_file_list) {
if (!boost::filesystem::exists(res_file)) {
continue;
}
if (boost::filesystem::is_directory(res_file)) {
boost::filesystem::remove_all(res_file);
} else {
boost::filesystem::remove(res_file);
}
std::string res_path = GetResPath<ResourceT>(res_);
if (!boost::filesystem::exists(res_path)) {
return Status::OK();
}
if (boost::filesystem::is_directory(res_path)) {
boost::filesystem::remove_all(res_path);
} else {
boost::filesystem::remove(res_path);
}

/* remove resource from meta */
Expand Down