From e7f811b8bef4817f7bb523bf52e4170d5faf23c2 Mon Sep 17 00:00:00 2001 From: "yudong.cai" Date: Thu, 2 Jul 2020 17:34:15 +0800 Subject: [PATCH 01/11] code opt Signed-off-by: yudong.cai --- core/src/db/DB.h | 3 -- core/src/db/SSDBImpl.cpp | 61 +++++++--------------------- core/src/db/SSDBImpl.h | 16 ++++---- core/src/db/snapshot/ResourceTypes.h | 4 +- 4 files changed, 25 insertions(+), 59 deletions(-) diff --git a/core/src/db/DB.h b/core/src/db/DB.h index 30682b591145..6da0e5b978d0 100644 --- a/core/src/db/DB.h +++ b/core/src/db/DB.h @@ -11,7 +11,6 @@ #pragma once -#include #include #include #include @@ -28,8 +27,6 @@ namespace milvus { namespace engine { -class Env; - class DB { public: DB() = default; diff --git a/core/src/db/SSDBImpl.cpp b/core/src/db/SSDBImpl.cpp index 5b2be9302bd9..c97b72eb5687 100644 --- a/core/src/db/SSDBImpl.cpp +++ b/core/src/db/SSDBImpl.cpp @@ -11,6 +11,7 @@ #include "db/SSDBImpl.h" #include "db/snapshot/CompoundOperations.h" +#include "db/snapshot/ResourceTypes.h" #include "db/snapshot/Snapshots.h" #include "wal/WalDefinations.h" @@ -77,15 +78,11 @@ SSDBImpl::CreateCollection(const snapshot::CreateCollectionContext& context) { CHECK_INITIALIZED; auto ctx = context; - if (options_.wal_enable_) { ctx.lsn = wal_mgr_->CreateCollection(context.collection->GetName()); } - auto op = std::make_shared(ctx); - auto status = op->Push(); - - return status; + return op->Push(); } Status @@ -94,41 +91,32 @@ SSDBImpl::DescribeCollection(const std::string& collection_name, snapshot::Colle CHECK_INITIALIZED; snapshot::ScopedSnapshotT ss; - auto status = snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name); - if (!status.ok()) { - return status; - } + STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name)); collection = ss->GetCollection(); - auto& fields = ss->GetResources(); for (auto& kv : fields) { fields_schema[kv.second.Get()] = ss->GetFieldElementsByField(kv.second->GetName()); } - return status; + return Status::OK(); } Status SSDBImpl::DropCollection(const std::string& name) { CHECK_INITIALIZED; - // dates partly delete files of the collection but currently we don't support LOG_ENGINE_DEBUG_ << "Prepare to delete collection " << name; snapshot::ScopedSnapshotT ss; auto& snapshots = snapshot::Snapshots::GetInstance(); - auto status = snapshots.GetSnapshot(ss, name); - if (!status.ok()) { - return status; - } + STATUS_CHECK(snapshots.GetSnapshot(ss, name)); if (options_.wal_enable_) { // SS TODO /* wal_mgr_->DropCollection(ss->GetCollectionId()); */ } - status = snapshots.DropCollection(ss->GetCollectionId(), std::numeric_limits::max()); - return status; + return snapshots.DropCollection(ss->GetCollectionId(), std::numeric_limits::max()); } Status @@ -154,13 +142,10 @@ Status SSDBImpl::CreatePartition(const std::string& collection_name, const std::string& partition_name) { CHECK_INITIALIZED; - uint64_t lsn = 0; snapshot::ScopedSnapshotT ss; - auto status = snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name); - if (!status.ok()) { - return status; - } + STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name)); + snapshot::LSN_TYPE lsn = 0; if (options_.wal_enable_) { // SS TODO /* lsn = wal_mgr_->CreatePartition(collection_id, partition_tag); */ @@ -175,13 +160,8 @@ SSDBImpl::CreatePartition(const std::string& collection_name, const std::string& snapshot::PartitionContext p_ctx; p_ctx.name = partition_name; snapshot::PartitionPtr partition; - status = op->CommitNewPartition(p_ctx, partition); - if (!status.ok()) { - return status; - } - - status = op->Push(); - return status; + STATUS_CHECK(op->CommitNewPartition(p_ctx, partition)); + return op->Push(); } Status @@ -189,10 +169,7 @@ SSDBImpl::DropPartition(const std::string& collection_name, const std::string& p CHECK_INITIALIZED; snapshot::ScopedSnapshotT ss; - auto status = snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name); - if (!status.ok()) { - return status; - } + STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name)); // SS TODO: Is below step needed? Or How to implement it? /* mem_mgr_->EraseMemVector(partition_name); */ @@ -200,9 +177,7 @@ SSDBImpl::DropPartition(const std::string& collection_name, const std::string& p snapshot::PartitionContext context; context.name = partition_name; auto op = std::make_shared(context, ss); - status = op->Push(); - - return status; + return op->Push(); } Status @@ -210,13 +185,10 @@ SSDBImpl::ShowPartitions(const std::string& collection_name, std::vectorGetPartitionNames()); - return status; + return Status::OK(); } Status @@ -225,10 +197,7 @@ SSDBImpl::PreloadCollection(const std::shared_ptr& context, con CHECK_INITIALIZED; snapshot::ScopedSnapshotT ss; - auto status = snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name); - if (!status.ok()) { - return status; - } + STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name)); auto handler = std::make_shared(context, ss); handler->Iterate(); diff --git a/core/src/db/SSDBImpl.h b/core/src/db/SSDBImpl.h index 1cc53387c557..2532294c2775 100644 --- a/core/src/db/SSDBImpl.h +++ b/core/src/db/SSDBImpl.h @@ -32,6 +32,14 @@ class SSDBImpl { public: explicit SSDBImpl(const DBOptions& options); + ~SSDBImpl(); + + Status + Start(); + + Status + Stop(); + Status CreateCollection(const snapshot::CreateCollectionContext& context); @@ -61,14 +69,6 @@ class SSDBImpl { Status ShowPartitions(const std::string& collection_name, std::vector& partition_names); - ~SSDBImpl(); - - Status - Start(); - - Status - Stop(); - private: DBOptions options_; std::atomic initialized_; diff --git a/core/src/db/snapshot/ResourceTypes.h b/core/src/db/snapshot/ResourceTypes.h index 21b3ce508689..7c74fd0d18f0 100644 --- a/core/src/db/snapshot/ResourceTypes.h +++ b/core/src/db/snapshot/ResourceTypes.h @@ -23,8 +23,8 @@ using ID_TYPE = int64_t; using NUM_TYPE = int64_t; using FTYPE_TYPE = int64_t; using TS_TYPE = int64_t; -using LSN_TYPE = uint64_t; -using SIZE_TYPE = uint64_t; +using LSN_TYPE = int64_t; +using SIZE_TYPE = int64_t; using MappingT = std::set; enum FieldType { VECTOR, INT32 }; From 05ab16676130ced561f2e529472d32cb317da2eb Mon Sep 17 00:00:00 2001 From: "yudong.cai" Date: Thu, 2 Jul 2020 18:44:07 +0800 Subject: [PATCH 02/11] add some APIs for SSDBImpl Signed-off-by: yudong.cai --- core/src/db/DBImpl.h | 43 +-------- core/src/db/SSDBImpl.cpp | 156 ++++++++++++++++++++++++++++++++- core/src/db/SSDBImpl.h | 30 +++++++ core/src/db/SimpleWaitNotify.h | 62 +++++++++++++ 4 files changed, 246 insertions(+), 45 deletions(-) create mode 100644 core/src/db/SimpleWaitNotify.h diff --git a/core/src/db/DBImpl.h b/core/src/db/DBImpl.h index 5d6fca5d842d..f9a201943e56 100644 --- a/core/src/db/DBImpl.h +++ b/core/src/db/DBImpl.h @@ -12,7 +12,6 @@ #pragma once #include -#include #include #include #include @@ -27,6 +26,7 @@ #include "config/handler/EngineConfigHandler.h" #include "db/DB.h" #include "db/IndexFailedChecker.h" +#include "db/SimpleWaitNotify.h" #include "db/Types.h" #include "db/insert/MemManager.h" #include "db/merge/MergeManager.h" @@ -325,47 +325,6 @@ class DBImpl : public DB, public server::CacheConfigHandler, public server::Engi std::thread bg_metric_thread_; std::thread bg_index_thread_; - struct SimpleWaitNotify { - bool notified_ = false; - std::mutex mutex_; - std::condition_variable cv_; - - void - Wait() { - std::unique_lock lck(mutex_); - if (!notified_) { - cv_.wait(lck); - } - notified_ = false; - } - - void - Wait_Until(const std::chrono::system_clock::time_point& tm_pint) { - std::unique_lock lck(mutex_); - if (!notified_) { - cv_.wait_until(lck, tm_pint); - } - notified_ = false; - } - - void - Wait_For(const std::chrono::system_clock::duration& tm_dur) { - std::unique_lock lck(mutex_); - if (!notified_) { - cv_.wait_for(lck, tm_dur); - } - notified_ = false; - } - - void - Notify() { - std::unique_lock lck(mutex_); - notified_ = true; - lck.unlock(); - cv_.notify_one(); - } - }; - SimpleWaitNotify swn_wal_; SimpleWaitNotify swn_flush_; SimpleWaitNotify swn_metric_; diff --git a/core/src/db/SSDBImpl.cpp b/core/src/db/SSDBImpl.cpp index c97b72eb5687..f390eb56f4ec 100644 --- a/core/src/db/SSDBImpl.cpp +++ b/core/src/db/SSDBImpl.cpp @@ -10,11 +10,16 @@ // or implied. See the License for the specific language governing permissions and limitations under the License. #include "db/SSDBImpl.h" +#include "cache/CpuCacheMgr.h" #include "db/snapshot/CompoundOperations.h" #include "db/snapshot/ResourceTypes.h" #include "db/snapshot/Snapshots.h" +#include "metrics/Metrics.h" +#include "metrics/SystemInfo.h" +#include "utils/Exception.h" #include "wal/WalDefinations.h" +#include #include #include @@ -22,6 +27,8 @@ namespace milvus { namespace engine { namespace { +constexpr int64_t BACKGROUND_METRIC_INTERVAL = 1; + static const Status SHUTDOWN_ERROR = Status(DB_ERROR, "Milvus server is shutdown!"); } // namespace @@ -46,9 +53,9 @@ SSDBImpl::~SSDBImpl() { Stop(); } -/////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -// external api -/////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +//////////////////////////////////////////////////////////////////////////////// +// External APIs +//////////////////////////////////////////////////////////////////////////////// Status SSDBImpl::Start() { if (initialized_.load(std::memory_order_acquire)) { @@ -205,5 +212,148 @@ SSDBImpl::PreloadCollection(const std::shared_ptr& context, con return handler->GetStatus(); } +//////////////////////////////////////////////////////////////////////////////// +// Internal APIs +//////////////////////////////////////////////////////////////////////////////// +void +SSDBImpl::InternalFlush(const std::string& collection_id) { + wal::MXLogRecord record; + record.type = wal::MXLogType::Flush; + record.collection_id = collection_id; + ExecWalRecord(record); +} + +void +SSDBImpl::BackgroundFlushThread() { + SetThreadName("flush_thread"); + server::SystemInfo::GetInstance().Init(); + while (true) { + if (!initialized_.load(std::memory_order_acquire)) { + LOG_ENGINE_DEBUG_ << "DB background flush thread exit"; + break; + } + + InternalFlush(); + if (options_.auto_flush_interval_ > 0) { + swn_flush_.Wait_For(std::chrono::seconds(options_.auto_flush_interval_)); + } else { + swn_flush_.Wait(); + } + } +} + +void +SSDBImpl::StartMetricTask() { + server::Metrics::GetInstance().KeepingAliveCounterIncrement(BACKGROUND_METRIC_INTERVAL); + int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage(); + int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity(); + fiu_do_on("DBImpl.StartMetricTask.InvalidTotalCache", cache_total = 0); + + if (cache_total > 0) { + double cache_usage_double = cache_usage; + server::Metrics::GetInstance().CpuCacheUsageGaugeSet(cache_usage_double * 100 / cache_total); + } else { + server::Metrics::GetInstance().CpuCacheUsageGaugeSet(0); + } + + server::Metrics::GetInstance().GpuCacheUsageGaugeSet(); + /* SS TODO */ + // uint64_t size; + // Size(size); + // server::Metrics::GetInstance().DataFileSizeGaugeSet(size); + server::Metrics::GetInstance().CPUUsagePercentSet(); + server::Metrics::GetInstance().RAMUsagePercentSet(); + server::Metrics::GetInstance().GPUPercentGaugeSet(); + server::Metrics::GetInstance().GPUMemoryUsageGaugeSet(); + server::Metrics::GetInstance().OctetsSet(); + + server::Metrics::GetInstance().CPUCoreUsagePercentSet(); + server::Metrics::GetInstance().GPUTemperature(); + server::Metrics::GetInstance().CPUTemperature(); + server::Metrics::GetInstance().PushToGateway(); +} + +void +SSDBImpl::BackgroundMetricThread() { + SetThreadName("metric_thread"); + server::SystemInfo::GetInstance().Init(); + while (true) { + if (!initialized_.load(std::memory_order_acquire)) { + LOG_ENGINE_DEBUG_ << "DB background metric thread exit"; + break; + } + + swn_metric_.Wait_For(std::chrono::seconds(BACKGROUND_METRIC_INTERVAL)); + StartMetricTask(); + meta::FilesHolder::PrintInfo(); + } +} + +Status +SSDBImpl::ExecWalRecord(const wal::MXLogRecord& record) { + return Status::OK(); +} + +void +SSDBImpl::BackgroundWalThread() { + SetThreadName("wal_thread"); + server::SystemInfo::GetInstance().Init(); + + std::chrono::system_clock::time_point next_auto_flush_time; + auto get_next_auto_flush_time = [&]() { + return std::chrono::system_clock::now() + std::chrono::seconds(options_.auto_flush_interval_); + }; + if (options_.auto_flush_interval_ > 0) { + next_auto_flush_time = get_next_auto_flush_time(); + } + + InternalFlush(); + while (true) { + if (options_.auto_flush_interval_ > 0) { + if (std::chrono::system_clock::now() >= next_auto_flush_time) { + InternalFlush(); + next_auto_flush_time = get_next_auto_flush_time(); + } + } + + wal::MXLogRecord record; + auto error_code = wal_mgr_->GetNextRecord(record); + if (error_code != WAL_SUCCESS) { + LOG_ENGINE_ERROR_ << "WAL background GetNextRecord error"; + break; + } + + if (record.type != wal::MXLogType::None) { + ExecWalRecord(record); + if (record.type == wal::MXLogType::Flush) { + // notify flush request to return + flush_req_swn_.Notify(); + + // if user flush all manually, update auto flush also + if (record.collection_id.empty() && options_.auto_flush_interval_ > 0) { + next_auto_flush_time = get_next_auto_flush_time(); + } + } + + } else { + if (!initialized_.load(std::memory_order_acquire)) { + InternalFlush(); + flush_req_swn_.Notify(); + // SS TODO + // WaitMergeFileFinish(); + // WaitBuildIndexFinish(); + LOG_ENGINE_DEBUG_ << "WAL background thread exit"; + break; + } + + if (options_.auto_flush_interval_ > 0) { + swn_wal_.Wait_Until(next_auto_flush_time); + } else { + swn_wal_.Wait(); + } + } + } +} + } // namespace engine } // namespace milvus diff --git a/core/src/db/SSDBImpl.h b/core/src/db/SSDBImpl.h index 2532294c2775..a735b4e35fa4 100644 --- a/core/src/db/SSDBImpl.h +++ b/core/src/db/SSDBImpl.h @@ -18,6 +18,7 @@ #include #include "db/Options.h" +#include "db/SimpleWaitNotify.h" #include "db/SnapshotHandlers.h" #include "db/snapshot/Context.h" #include "db/snapshot/ResourceTypes.h" @@ -69,10 +70,39 @@ class SSDBImpl { Status ShowPartitions(const std::string& collection_name, std::vector& partition_names); + private: + void + InternalFlush(const std::string& collection_id = ""); + + void + BackgroundFlushThread(); + + void + StartMetricTask(); + + void + BackgroundMetricThread(); + + Status + ExecWalRecord(const wal::MXLogRecord& record); + + void + BackgroundWalThread(); + private: DBOptions options_; std::atomic initialized_; + std::shared_ptr wal_mgr_; + std::shared_ptr bg_wal_thread_; + + std::shared_ptr bg_flush_thread_; + + SimpleWaitNotify swn_wal_; + SimpleWaitNotify swn_flush_; + SimpleWaitNotify swn_metric_; + + SimpleWaitNotify flush_req_swn_; }; // SSDBImpl } // namespace engine diff --git a/core/src/db/SimpleWaitNotify.h b/core/src/db/SimpleWaitNotify.h new file mode 100644 index 000000000000..a92a8ccee768 --- /dev/null +++ b/core/src/db/SimpleWaitNotify.h @@ -0,0 +1,62 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +#pragma once + +#include +#include + +namespace milvus { +namespace engine { + +struct SimpleWaitNotify { + bool notified_ = false; + std::mutex mutex_; + std::condition_variable cv_; + + void + Wait() { + std::unique_lock lck(mutex_); + if (!notified_) { + cv_.wait(lck); + } + notified_ = false; + } + + void + Wait_Until(const std::chrono::system_clock::time_point& tm_pint) { + std::unique_lock lck(mutex_); + if (!notified_) { + cv_.wait_until(lck, tm_pint); + } + notified_ = false; + } + + void + Wait_For(const std::chrono::system_clock::duration& tm_dur) { + std::unique_lock lck(mutex_); + if (!notified_) { + cv_.wait_for(lck, tm_dur); + } + notified_ = false; + } + + void + Notify() { + std::unique_lock lck(mutex_); + notified_ = true; + lck.unlock(); + cv_.notify_one(); + } +}; + +} // namespace engine +} // namespace milvus From 773561553eb0e2a3f8e38cf703e26056ac4af265 Mon Sep 17 00:00:00 2001 From: "yudong.cai" Date: Fri, 3 Jul 2020 17:27:12 +0800 Subject: [PATCH 03/11] partially add GetVectorById Signed-off-by: yudong.cai --- core/src/db/SSDBImpl.cpp | 364 +++++++++++++++++++++++++++++++ core/src/db/SSDBImpl.h | 18 ++ core/src/db/SnapshotHandlers.cpp | 37 ++++ core/src/db/SnapshotHandlers.h | 25 +++ 4 files changed, 444 insertions(+) diff --git a/core/src/db/SSDBImpl.cpp b/core/src/db/SSDBImpl.cpp index f390eb56f4ec..a296ea71de2b 100644 --- a/core/src/db/SSDBImpl.cpp +++ b/core/src/db/SSDBImpl.cpp @@ -16,6 +16,8 @@ #include "db/snapshot/Snapshots.h" #include "metrics/Metrics.h" #include "metrics/SystemInfo.h" +#include "segment/SegmentReader.h" +#include "segment/SegmentWriter.h" #include "utils/Exception.h" #include "wal/WalDefinations.h" @@ -212,9 +214,371 @@ SSDBImpl::PreloadCollection(const std::shared_ptr& context, con return handler->GetStatus(); } +Status +SSDBImpl::GetVectorsByID(const std::string& collection_name, const IDNumbers& id_array, + std::vector& vectors) { + CHECK_INITIALIZED; + + snapshot::ScopedSnapshotT ss; + STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name)); + + meta::FilesHolder files_holder; + + std::vector partition_names; + partition_names = std::move(ss->GetPartitionNames()); + partition_names.push_back(collection_name); + + cache::CpuCacheMgr::GetInstance()->PrintInfo(); + STATUS_CHECK(GetVectorsByIdHelper(id_array, vectors, files_holder)); + cache::CpuCacheMgr::GetInstance()->PrintInfo(); + + return Status::OK(); +} + +//Status +//SSDBImpl::GetEntitiesByID(const std::string& collection_id, const milvus::engine::IDNumbers& id_array, +// std::vector& vectors, std::vector& attrs) { +// if (!initialized_.load(std::memory_order_acquire)) { +// return SHUTDOWN_ERROR; +// } +// +// bool has_collection; +// auto status = HasCollection(collection_id, has_collection); +// if (!has_collection) { +// LOG_ENGINE_ERROR_ << "Collection " << collection_id << " does not exist: "; +// return Status(DB_NOT_FOUND, "Collection does not exist"); +// } +// if (!status.ok()) { +// return status; +// } +// +// engine::meta::CollectionSchema collection_schema; +// engine::meta::hybrid::FieldsSchema fields_schema; +// collection_schema.collection_id_ = collection_id; +// status = meta_ptr_->DescribeHybridCollection(collection_schema, fields_schema); +// if (!status.ok()) { +// return status; +// } +// std::unordered_map attr_type; +// for (auto schema : fields_schema.fields_schema_) { +// if (schema.field_type_ == (int32_t)engine::meta::hybrid::DataType::VECTOR) { +// continue; +// } +// attr_type.insert(std::make_pair(schema.field_name_, (engine::meta::hybrid::DataType)schema.field_type_)); +// } +// +// meta::FilesHolder files_holder; +// std::vector file_types{meta::SegmentSchema::FILE_TYPE::RAW, meta::SegmentSchema::FILE_TYPE::TO_INDEX, +// meta::SegmentSchema::FILE_TYPE::BACKUP}; +// +// status = meta_ptr_->FilesByType(collection_id, file_types, files_holder); +// if (!status.ok()) { +// std::string err_msg = "Failed to get files for GetEntitiesByID: " + status.message(); +// LOG_ENGINE_ERROR_ << err_msg; +// return status; +// } +// +// std::vector partition_array; +// status = meta_ptr_->ShowPartitions(collection_id, partition_array); +// if (!status.ok()) { +// std::string err_msg = "Failed to get partitions for GetEntitiesByID: " + status.message(); +// LOG_ENGINE_ERROR_ << err_msg; +// return status; +// } +// for (auto& schema : partition_array) { +// status = meta_ptr_->FilesByType(schema.collection_id_, file_types, files_holder); +// if (!status.ok()) { +// std::string err_msg = "Failed to get files for GetEntitiesByID: " + status.message(); +// LOG_ENGINE_ERROR_ << err_msg; +// return status; +// } +// } +// +// if (files_holder.HoldFiles().empty()) { +// LOG_ENGINE_DEBUG_ << "No files to get vector by id from"; +// return Status(DB_NOT_FOUND, "Collection is empty"); +// } +// +// cache::CpuCacheMgr::GetInstance()->PrintInfo(); +// status = GetEntitiesByIdHelper(collection_id, id_array, attr_type, vectors, attrs, files_holder); +// cache::CpuCacheMgr::GetInstance()->PrintInfo(); +// +// return status; +//} + //////////////////////////////////////////////////////////////////////////////// // Internal APIs //////////////////////////////////////////////////////////////////////////////// +Status +SSDBImpl::GetVectorsByIdHelper(const IDNumbers& id_array, std::vector& vectors, + meta::FilesHolder& files_holder) { + // attention: this is a copy, not a reference, since the files_holder.UnMarkFile will change the array internal + milvus::engine::meta::SegmentsSchema files = files_holder.HoldFiles(); + LOG_ENGINE_DEBUG_ << "Getting vector by id in " << files.size() << " files, id count = " << id_array.size(); + + // sometimes not all of id_array can be found, we need to return empty vector for id not found + // for example: + // id_array = [1, -1, 2, -1, 3] + // vectors should return [valid_vector, empty_vector, valid_vector, empty_vector, valid_vector] + // the ID2RAW is to ensure returned vector sequence is consist with id_array + using ID2VECTOR = std::map; + ID2VECTOR map_id2vector; + + vectors.clear(); + + IDNumbers temp_ids = id_array; + for (auto& file : files) { + if (temp_ids.empty()) { + break; // all vectors found, no need to continue + } + // Load bloom filter + std::string segment_dir; + engine::utils::GetParentPath(file.location_, segment_dir); + segment::SegmentReader segment_reader(segment_dir); + segment::IdBloomFilterPtr id_bloom_filter_ptr; + auto status = segment_reader.LoadBloomFilter(id_bloom_filter_ptr); + if (!status.ok()) { + return status; + } + + for (IDNumbers::iterator it = temp_ids.begin(); it != temp_ids.end();) { + int64_t vector_id = *it; + // each id must has a VectorsData + // if vector not found for an id, its VectorsData's vector_count = 0, else 1 + VectorsData& vector_ref = map_id2vector[vector_id]; + + // Check if the id is present in bloom filter. + if (id_bloom_filter_ptr->Check(vector_id)) { + // Load uids and check if the id is indeed present. If yes, find its offset. + std::vector uids; + auto status = segment_reader.LoadUids(uids); + if (!status.ok()) { + return status; + } + + auto found = std::find(uids.begin(), uids.end(), vector_id); + if (found != uids.end()) { + auto offset = std::distance(uids.begin(), found); + + // Check whether the id has been deleted + segment::DeletedDocsPtr deleted_docs_ptr; + status = segment_reader.LoadDeletedDocs(deleted_docs_ptr); + if (!status.ok()) { + LOG_ENGINE_ERROR_ << status.message(); + return status; + } + auto& deleted_docs = deleted_docs_ptr->GetDeletedDocs(); + + auto deleted = std::find(deleted_docs.begin(), deleted_docs.end(), offset); + if (deleted == deleted_docs.end()) { + // Load raw vector + bool is_binary = utils::IsBinaryMetricType(file.metric_type_); + size_t single_vector_bytes = is_binary ? file.dimension_ / 8 : file.dimension_ * sizeof(float); + std::vector raw_vector; + status = + segment_reader.LoadVectors(offset * single_vector_bytes, single_vector_bytes, raw_vector); + if (!status.ok()) { + LOG_ENGINE_ERROR_ << status.message(); + return status; + } + + vector_ref.vector_count_ = 1; + if (is_binary) { + vector_ref.binary_data_.swap(raw_vector); + } else { + std::vector float_vector; + float_vector.resize(file.dimension_); + memcpy(float_vector.data(), raw_vector.data(), single_vector_bytes); + vector_ref.float_data_.swap(float_vector); + } + temp_ids.erase(it); + continue; + } + } + } + + it++; + } + + // unmark file, allow the file to be deleted + files_holder.UnmarkFile(file); + } + + for (auto id : id_array) { + VectorsData& vector_ref = map_id2vector[id]; + + VectorsData data; + data.vector_count_ = vector_ref.vector_count_; + if (data.vector_count_ > 0) { + data.float_data_ = vector_ref.float_data_; // copy data since there could be duplicated id + data.binary_data_ = vector_ref.binary_data_; // copy data since there could be duplicated id + } + vectors.emplace_back(data); + } + + return Status::OK(); +} + +//Status +//SSDBImpl::GetEntitiesByIdHelper(const std::string& collection_id, const milvus::engine::IDNumbers& id_array, +// std::unordered_map& attr_type, +// std::vector& vectors, std::vector& attrs, +// milvus::engine::meta::FilesHolder& files_holder) { +// // attention: this is a copy, not a reference, since the files_holder.UnMarkFile will change the array internal +// milvus::engine::meta::SegmentsSchema files = files_holder.HoldFiles(); +// LOG_ENGINE_DEBUG_ << "Getting vector by id in " << files.size() << " files, id count = " << id_array.size(); +// +// // sometimes not all of id_array can be found, we need to return empty vector for id not found +// // for example: +// // id_array = [1, -1, 2, -1, 3] +// // vectors should return [valid_vector, empty_vector, valid_vector, empty_vector, valid_vector] +// // the ID2RAW is to ensure returned vector sequence is consist with id_array +// using ID2ATTR = std::map; +// using ID2VECTOR = std::map; +// ID2ATTR map_id2attr; +// ID2VECTOR map_id2vector; +// +// IDNumbers temp_ids = id_array; +// for (auto& file : files) { +// // Load bloom filter +// std::string segment_dir; +// engine::utils::GetParentPath(file.location_, segment_dir); +// segment::SegmentReader segment_reader(segment_dir); +// segment::IdBloomFilterPtr id_bloom_filter_ptr; +// segment_reader.LoadBloomFilter(id_bloom_filter_ptr); +// +// for (IDNumbers::iterator it = temp_ids.begin(); it != temp_ids.end();) { +// int64_t vector_id = *it; +// // each id must has a VectorsData +// // if vector not found for an id, its VectorsData's vector_count = 0, else 1 +// AttrsData& attr_ref = map_id2attr[vector_id]; +// VectorsData& vector_ref = map_id2vector[vector_id]; +// +// // Check if the id is present in bloom filter. +// if (id_bloom_filter_ptr->Check(vector_id)) { +// // Load uids and check if the id is indeed present. If yes, find its offset. +// std::vector uids; +// auto status = segment_reader.LoadUids(uids); +// if (!status.ok()) { +// return status; +// } +// +// auto found = std::find(uids.begin(), uids.end(), vector_id); +// if (found != uids.end()) { +// auto offset = std::distance(uids.begin(), found); +// +// // Check whether the id has been deleted +// segment::DeletedDocsPtr deleted_docs_ptr; +// status = segment_reader.LoadDeletedDocs(deleted_docs_ptr); +// if (!status.ok()) { +// LOG_ENGINE_ERROR_ << status.message(); +// return status; +// } +// auto& deleted_docs = deleted_docs_ptr->GetDeletedDocs(); +// +// auto deleted = std::find(deleted_docs.begin(), deleted_docs.end(), offset); +// if (deleted == deleted_docs.end()) { +// // Load raw vector +// bool is_binary = utils::IsBinaryMetricType(file.metric_type_); +// size_t single_vector_bytes = is_binary ? file.dimension_ / 8 : file.dimension_ * sizeof(float); +// std::vector raw_vector; +// status = +// segment_reader.LoadVectors(offset * single_vector_bytes, single_vector_bytes, raw_vector); +// if (!status.ok()) { +// LOG_ENGINE_ERROR_ << status.message(); +// return status; +// } +// +// std::unordered_map> raw_attrs; +// auto attr_it = attr_type.begin(); +// for (; attr_it != attr_type.end(); attr_it++) { +// size_t num_bytes; +// switch (attr_it->second) { +// case engine::meta::hybrid::DataType::INT8: { +// num_bytes = 1; +// break; +// } +// case engine::meta::hybrid::DataType::INT16: { +// num_bytes = 2; +// break; +// } +// case engine::meta::hybrid::DataType::INT32: { +// num_bytes = 4; +// break; +// } +// case engine::meta::hybrid::DataType::INT64: { +// num_bytes = 8; +// break; +// } +// case engine::meta::hybrid::DataType::FLOAT: { +// num_bytes = 4; +// break; +// } +// case engine::meta::hybrid::DataType::DOUBLE: { +// num_bytes = 8; +// break; +// } +// default: { +// std::string msg = "Field type of " + attr_it->first + " is wrong"; +// return Status{DB_ERROR, msg}; +// } +// } +// std::vector raw_attr; +// status = segment_reader.LoadAttrs(attr_it->first, offset * num_bytes, num_bytes, raw_attr); +// if (!status.ok()) { +// LOG_ENGINE_ERROR_ << status.message(); +// return status; +// } +// raw_attrs.insert(std::make_pair(attr_it->first, raw_attr)); +// } +// +// vector_ref.vector_count_ = 1; +// if (is_binary) { +// vector_ref.binary_data_.swap(raw_vector); +// } else { +// std::vector float_vector; +// float_vector.resize(file.dimension_); +// memcpy(float_vector.data(), raw_vector.data(), single_vector_bytes); +// vector_ref.float_data_.swap(float_vector); +// } +// +// attr_ref.attr_count_ = 1; +// attr_ref.attr_data_ = raw_attrs; +// attr_ref.attr_type_ = attr_type; +// temp_ids.erase(it); +// continue; +// } +// } +// } +// it++; +// } +// +// // unmark file, allow the file to be deleted +// files_holder.UnmarkFile(file); +// } +// +// for (auto id : id_array) { +// VectorsData& vector_ref = map_id2vector[id]; +// +// VectorsData data; +// data.vector_count_ = vector_ref.vector_count_; +// if (data.vector_count_ > 0) { +// data.float_data_ = vector_ref.float_data_; // copy data since there could be duplicated id +// data.binary_data_ = vector_ref.binary_data_; // copy data since there could be duplicated id +// } +// vectors.emplace_back(data); +// +// attrs.emplace_back(map_id2attr[id]); +// } +// +// if (vectors.empty()) { +// std::string msg = "Vectors not found in collection " + collection_id; +// LOG_ENGINE_DEBUG_ << msg; +// } +// +// return Status::OK(); +//} + void SSDBImpl::InternalFlush(const std::string& collection_id) { wal::MXLogRecord record; diff --git a/core/src/db/SSDBImpl.h b/core/src/db/SSDBImpl.h index a735b4e35fa4..98f7e333452f 100644 --- a/core/src/db/SSDBImpl.h +++ b/core/src/db/SSDBImpl.h @@ -70,7 +70,25 @@ class SSDBImpl { Status ShowPartitions(const std::string& collection_name, std::vector& partition_names); + Status + GetVectorsByID(const std::string& collection_name, const IDNumbers& id_array, + std::vector& vectors); + + Status + GetEntitiesByID(const std::string& collection_name, const IDNumbers& id_array, + std::vector& vectors, std::vector& attrs); + private: + Status + GetVectorsByIdHelper(const IDNumbers& id_array, std::vector& vectors, + meta::FilesHolder& files_holder); + + Status + GetEntitiesByIdHelper(const std::string& collection_id, const IDNumbers& id_array, + std::unordered_map& attr_type, + std::vector& vectors, std::vector& attrs, + meta::FilesHolder& files_holder); + void InternalFlush(const std::string& collection_id = ""); diff --git a/core/src/db/SnapshotHandlers.cpp b/core/src/db/SnapshotHandlers.cpp index f16476a4d0bd..8bef23f28f9e 100644 --- a/core/src/db/SnapshotHandlers.cpp +++ b/core/src/db/SnapshotHandlers.cpp @@ -62,6 +62,7 @@ LoadVectorFieldHandler::Handle(const snapshot::FieldPtr& field) { return status; } +/////////////////////////////////////////////////////////////////////////////// SegmentsToSearchCollector::SegmentsToSearchCollector(snapshot::ScopedSnapshotT ss, meta::FilesHolder& holder) : BaseT(ss), holder_(holder) { } @@ -93,5 +94,41 @@ SegmentsToSearchCollector::Handle(const snapshot::SegmentCommitPtr& segment_comm holder_.MarkFile(schema); } +/////////////////////////////////////////////////////////////////////////////// +GetVectorByIdSegmentHandler::GetVectorByIdSegmentHandler(const std::shared_ptr& context, + milvus::engine::snapshot::ScopedSnapshotT ss, + const snapshot::PartitionPtr& partition) + : BaseT(ss), context_(context), partition_(partition) { +} + +Status +GetVectorByIdSegmentHandler::Handle(const snapshot::SegmentPtr& segment) { + if (partition_->GetID() != segment->GetPartitionId()) { + return Status::OK(); + } + + // SS TODO + + return Status::OK(); +} + +GetVectorByIdPartitionHandler::GetVectorByIdPartitionHandler(const std::shared_ptr& context, + milvus::engine::snapshot::ScopedSnapshotT ss) + : BaseT(ss), context_(context) { +} + +Status +GetVectorByIdPartitionHandler::Handle(const snapshot::PartitionPtr& partion) { + if (context_ && context_->IsConnectionBroken()) { + LOG_ENGINE_DEBUG_ << "Connection broken"; + return Status(DB_ERROR, "Connection broken"); + } + + auto segment_handler = std::make_shared(context_, ss_, partion); + segment_handler->Iterate(); + + return segment_handler->GetStatus(); +} + } // namespace engine } // namespace milvus diff --git a/core/src/db/SnapshotHandlers.h b/core/src/db/SnapshotHandlers.h index c695954e5a20..3186aad21e24 100644 --- a/core/src/db/SnapshotHandlers.h +++ b/core/src/db/SnapshotHandlers.h @@ -56,5 +56,30 @@ struct SegmentsToSearchCollector : public snapshot::IterateHandler { + using ResourceT = snapshot::Segment; + using BaseT = snapshot::IterateHandler; + GetVectorByIdSegmentHandler(const std::shared_ptr& context, snapshot::ScopedSnapshotT ss, + const snapshot::PartitionPtr& partition); + + Status + Handle(const typename ResourceT::Ptr&) override; + + const std::shared_ptr& context_; + const snapshot::PartitionPtr& partition_; +}; + +struct GetVectorByIdPartitionHandler : public snapshot::IterateHandler { + using ResourceT = snapshot::Partition; + using BaseT = snapshot::IterateHandler; + GetVectorByIdPartitionHandler(const std::shared_ptr& context, snapshot::ScopedSnapshotT ss); + + Status + Handle(const typename ResourceT::Ptr&) override; + + const std::shared_ptr& context_; +}; + } // namespace engine } // namespace milvus From 040fefbafe1869a3a03862fdcf877a455ae9e458 Mon Sep 17 00:00:00 2001 From: "yudong.cai" Date: Fri, 3 Jul 2020 20:05:07 +0800 Subject: [PATCH 04/11] snapshot opt Signed-off-by: yudong.cai --- core/src/db/snapshot/Snapshot.cpp | 114 ++++++++++++++++-------------- core/src/db/snapshot/Snapshot.h | 25 ++++--- 2 files changed, 73 insertions(+), 66 deletions(-) diff --git a/core/src/db/snapshot/Snapshot.cpp b/core/src/db/snapshot/Snapshot.cpp index 0ac5ea01d9ec..75814692a7ce 100644 --- a/core/src/db/snapshot/Snapshot.cpp +++ b/core/src/db/snapshot/Snapshot.cpp @@ -29,91 +29,99 @@ Snapshot::UnRefAll() { std::apply([this](auto&... resource) { ((DoUnRef(resource)), ...); }, resources_); } -Snapshot::Snapshot(ID_TYPE id) { - auto collection_commit = CollectionCommitsHolder::GetInstance().GetResource(id, false); - AddResource(collection_commit); - max_lsn_ = collection_commit->GetLsn(); - auto& schema_holder = SchemaCommitsHolder::GetInstance(); - auto current_schema = schema_holder.GetResource(collection_commit->GetSchemaId(), false); - AddResource(current_schema); - current_schema_id_ = current_schema->GetID(); +Snapshot::Snapshot(ID_TYPE ss_id) { + auto& collection_commits_holder = CollectionCommitsHolder::GetInstance(); + auto& collections_holder = CollectionsHolder::GetInstance(); + auto& schema_commits_holder = SchemaCommitsHolder::GetInstance(); auto& field_commits_holder = FieldCommitsHolder::GetInstance(); auto& fields_holder = FieldsHolder::GetInstance(); auto& field_elements_holder = FieldElementsHolder::GetInstance(); - - auto collection = CollectionsHolder::GetInstance().GetResource(collection_commit->GetCollectionId(), false); - AddResource(collection); - auto& mappings = collection_commit->GetMappings(); auto& partition_commits_holder = PartitionCommitsHolder::GetInstance(); auto& partitions_holder = PartitionsHolder::GetInstance(); - auto& segments_holder = SegmentsHolder::GetInstance(); auto& segment_commits_holder = SegmentCommitsHolder::GetInstance(); + auto& segments_holder = SegmentsHolder::GetInstance(); auto& segment_files_holder = SegmentFilesHolder::GetInstance(); - auto ssid = id; - for (auto& id : mappings) { - auto partition_commit = partition_commits_holder.GetResource(id, false); - auto partition = partitions_holder.GetResource(partition_commit->GetPartitionId(), false); + auto collection_commit = collection_commits_holder.GetResource(ss_id, false); + AddResource(collection_commit); + + max_lsn_ = collection_commit->GetLsn(); + auto schema_commit = schema_commits_holder.GetResource(collection_commit->GetSchemaId(), false); + AddResource(schema_commit); + + current_schema_id_ = schema_commit->GetID(); + auto collection = collections_holder.GetResource(collection_commit->GetCollectionId(), false); + AddResource(collection); + + auto& collection_commit_mappings = collection_commit->GetMappings(); + for (auto p_c_id : collection_commit_mappings) { + auto partition_commit = partition_commits_holder.GetResource(p_c_id, false); + auto partition_id = partition_commit->GetPartitionId(); + auto partition = partitions_holder.GetResource(partition_id, false); + auto partition_name = partition->GetName(); AddResource(partition_commit); - p_pc_map_[partition_commit->GetPartitionId()] = partition_commit->GetID(); + + p_pc_map_[partition_id] = partition_commit->GetID(); AddResource(partition); - partition_names_map_[partition->GetName()] = partition->GetID(); - p_max_seg_num_[partition->GetID()] = 0; - auto& s_c_mappings = partition_commit->GetMappings(); - /* std::cout << "SS-" << ssid << "PC_MAP=("; */ + partition_names_map_[partition_name] = partition_id; + p_max_seg_num_[partition_id] = 0; + /* std::cout << "SS-" << ss_id << "PC_MAP=("; */ /* for (auto id : s_c_mappings) { */ /* std::cout << id << ","; */ /* } */ /* std::cout << ")" << std::endl; */ - for (auto& s_c_id : s_c_mappings) { + auto& partition_commit_mappings = partition_commit->GetMappings(); + for (auto s_c_id : partition_commit_mappings) { auto segment_commit = segment_commits_holder.GetResource(s_c_id, false); - auto segment = segments_holder.GetResource(segment_commit->GetSegmentId(), false); - auto schema = schema_holder.GetResource(segment_commit->GetSchemaId(), false); - AddResource(schema); + auto segment_id = segment_commit->GetSegmentId(); + auto segment = segments_holder.GetResource(segment_id, false); + auto segment_schema_id = segment_commit->GetSchemaId(); + auto segment_schema = schema_commits_holder.GetResource(segment_schema_id, false); + auto segment_partition_id = segment->GetPartitionId(); + AddResource(segment_schema); AddResource(segment_commit); - if (segment->GetNum() > p_max_seg_num_[segment->GetPartitionId()]) { - p_max_seg_num_[segment->GetPartitionId()] = segment->GetNum(); + if (segment->GetNum() > p_max_seg_num_[segment_partition_id]) { + p_max_seg_num_[segment_partition_id] = segment->GetNum(); } AddResource(segment); - seg_segc_map_[segment->GetID()] = segment_commit->GetID(); - auto& s_f_mappings = segment_commit->GetMappings(); - for (auto& s_f_id : s_f_mappings) { + + seg_segc_map_[segment_id] = segment_commit->GetID(); + auto& segment_commit_mappings = segment_commit->GetMappings(); + for (auto s_f_id : segment_commit_mappings) { auto segment_file = segment_files_holder.GetResource(s_f_id, false); - auto field_element = field_elements_holder.GetResource(segment_file->GetFieldElementId(), false); + auto segment_file_id = segment_file->GetID(); + auto segment_file_element_id = segment_file->GetFieldElementId(); + auto field_element = field_elements_holder.GetResource(segment_file_element_id, false); AddResource(field_element); AddResource(segment_file); - auto entry = element_segfiles_map_.find(segment_file->GetFieldElementId()); - if (entry == element_segfiles_map_.end()) { - element_segfiles_map_[segment_file->GetFieldElementId()] = { - {segment_file->GetSegmentId(), segment_file->GetID()}}; - } else { - entry->second[segment_file->GetSegmentId()] = segment_file->GetID(); - } + element_segfiles_map_[segment_file_element_id][segment_id] = segment_file_id; } } } - for (auto& kv : GetResources()) { - if (kv.first > latest_schema_commit_id_) + auto& schema_commit_mappings = schema_commit->GetMappings(); + auto& schema_commits = GetResources(); + for (auto& kv : schema_commits) { + if (kv.first > latest_schema_commit_id_) { latest_schema_commit_id_ = kv.first; + } auto& schema_commit = kv.second; - auto& s_c_m = current_schema->GetMappings(); - for (auto field_commit_id : s_c_m) { + for (auto field_commit_id : schema_commit_mappings) { auto field_commit = field_commits_holder.GetResource(field_commit_id, false); AddResource(field_commit); - auto field = fields_holder.GetResource(field_commit->GetFieldId(), false); + + auto field_id = field_commit->GetFieldId(); + auto field = fields_holder.GetResource(field_id, false); + auto field_name = field->GetName(); AddResource(field); - field_names_map_[field->GetName()] = field->GetID(); - auto& f_c_m = field_commit->GetMappings(); - for (auto field_element_id : f_c_m) { + + field_names_map_[field_name] = field_id; + auto& field_commit_mappings = field_commit->GetMappings(); + for (auto field_element_id : field_commit_mappings) { auto field_element = field_elements_holder.GetResource(field_element_id, false); AddResource(field_element); - auto entry = field_element_names_map_.find(field->GetName()); - if (entry == field_element_names_map_.end()) { - field_element_names_map_[field->GetName()] = {{field_element->GetName(), field_element->GetID()}}; - } else { - entry->second[field_element->GetName()] = field_element->GetID(); - } + auto field_element_name = field_element->GetName(); + field_element_names_map_[field_name][field_element_name] = field_element_id; } } } diff --git a/core/src/db/snapshot/Snapshot.h b/core/src/db/snapshot/Snapshot.h index 534547ab916d..784f913c96fe 100644 --- a/core/src/db/snapshot/Snapshot.h +++ b/core/src/db/snapshot/Snapshot.h @@ -50,7 +50,7 @@ class Snapshot : public ReferenceProxy { return GetCollectionCommit()->GetID(); } - [[nodiscard]] ID_TYPE + ID_TYPE GetCollectionId() const { auto it = GetResources().cbegin(); return it->first; @@ -67,17 +67,17 @@ class Snapshot : public ReferenceProxy { return GetResource(id); } - [[nodiscard]] const std::string& + const std::string& GetName() const { return GetResources().cbegin()->second->GetName(); } - [[nodiscard]] size_t + size_t NumberOfPartitions() const { return GetResources().size(); } - [[nodiscard]] const LSN_TYPE& + const LSN_TYPE& GetMaxLsn() const { return max_lsn_; } @@ -107,7 +107,7 @@ class Snapshot : public ReferenceProxy { return GetResources().cbegin()->second.Get(); } - [[nodiscard]] ID_TYPE + ID_TYPE GetLatestSchemaCommitId() const { return latest_schema_commit_id_; } @@ -163,7 +163,7 @@ class Snapshot : public ReferenceProxy { handler->SetStatus(status); } - [[nodiscard]] std::vector + std::vector GetFieldNames() const { std::vector names; for (auto& kv : field_names_map_) { @@ -172,19 +172,19 @@ class Snapshot : public ReferenceProxy { return std::move(names); } - [[nodiscard]] bool + bool HasField(const std::string& name) const { auto it = field_names_map_.find(name); return it != field_names_map_.end(); } - [[nodiscard]] bool + bool HasFieldElement(const std::string& field_name, const std::string& field_element_name) const { auto id = GetFieldElementId(field_name, field_element_name); return id > 0; } - [[nodiscard]] ID_TYPE + ID_TYPE GetSegmentFileId(const std::string& field_name, const std::string& field_element_name, ID_TYPE segment_id) const { auto field_element_id = GetFieldElementId(field_name, field_element_name); auto it = element_segfiles_map_.find(field_element_id); @@ -198,13 +198,13 @@ class Snapshot : public ReferenceProxy { return its->second; } - [[nodiscard]] bool + bool HasSegmentFile(const std::string& field_name, const std::string& field_element_name, ID_TYPE segment_id) const { auto id = GetSegmentFileId(field_name, field_element_name, segment_id); return id > 0; } - [[nodiscard]] ID_TYPE + ID_TYPE GetFieldElementId(const std::string& field_name, const std::string& field_element_name) const { auto itf = field_element_names_map_.find(field_name); if (itf == field_element_names_map_.end()) @@ -280,7 +280,7 @@ class Snapshot : public ReferenceProxy { } template - [[nodiscard]] const typename ResourceT::ScopedMapT& + const typename ResourceT::ScopedMapT& GetResources() const { return std::get::value>(resources_); } @@ -293,7 +293,6 @@ class Snapshot : public ReferenceProxy { if (it == resources.end()) { return nullptr; } - return it->second.Get(); } From bca4f66126ba71a8309531bae4f5efcfa6e1b2ac Mon Sep 17 00:00:00 2001 From: "yudong.cai" Date: Fri, 3 Jul 2020 20:32:08 +0800 Subject: [PATCH 05/11] fix typo Signed-off-by: yudong.cai --- core/src/db/snapshot/Snapshot.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/db/snapshot/Snapshot.cpp b/core/src/db/snapshot/Snapshot.cpp index 75814692a7ce..376574c86116 100644 --- a/core/src/db/snapshot/Snapshot.cpp +++ b/core/src/db/snapshot/Snapshot.cpp @@ -90,11 +90,11 @@ Snapshot::Snapshot(ID_TYPE ss_id) { for (auto s_f_id : segment_commit_mappings) { auto segment_file = segment_files_holder.GetResource(s_f_id, false); auto segment_file_id = segment_file->GetID(); - auto segment_file_element_id = segment_file->GetFieldElementId(); - auto field_element = field_elements_holder.GetResource(segment_file_element_id, false); + auto field_element_id = segment_file->GetFieldElementId(); + auto field_element = field_elements_holder.GetResource(field_element_id, false); AddResource(field_element); AddResource(segment_file); - element_segfiles_map_[segment_file_element_id][segment_id] = segment_file_id; + element_segfiles_map_[field_element_id][segment_id] = segment_file_id; } } } From 5561cac1600020329f71f8c140b52bb790ae5b9f Mon Sep 17 00:00:00 2001 From: "yudong.cai" Date: Sat, 4 Jul 2020 10:48:41 +0800 Subject: [PATCH 06/11] update GetVectorByID framework Signed-off-by: yudong.cai --- core/src/db/SSDBImpl.cpp | 319 ++----------------------------- core/src/db/SSDBImpl.h | 18 +- core/src/db/SnapshotHandlers.cpp | 130 ++++++++++--- core/src/db/SnapshotHandlers.h | 25 +-- core/src/db/Types.h | 5 + 5 files changed, 137 insertions(+), 360 deletions(-) diff --git a/core/src/db/SSDBImpl.cpp b/core/src/db/SSDBImpl.cpp index a296ea71de2b..e6868736570e 100644 --- a/core/src/db/SSDBImpl.cpp +++ b/core/src/db/SSDBImpl.cpp @@ -16,8 +16,6 @@ #include "db/snapshot/Snapshots.h" #include "metrics/Metrics.h" #include "metrics/SystemInfo.h" -#include "segment/SegmentReader.h" -#include "segment/SegmentWriter.h" #include "utils/Exception.h" #include "wal/WalDefinations.h" @@ -201,8 +199,7 @@ SSDBImpl::ShowPartitions(const std::string& collection_name, std::vector& context, const std::string& collection_name, - bool force) { +SSDBImpl::PreloadCollection(const server::ContextPtr& context, const std::string& collection_name, bool force) { CHECK_INITIALIZED; snapshot::ScopedSnapshotT ss; @@ -215,43 +212,29 @@ SSDBImpl::PreloadCollection(const std::shared_ptr& context, con } Status -SSDBImpl::GetVectorsByID(const std::string& collection_name, const IDNumbers& id_array, - std::vector& vectors) { +SSDBImpl::GetVectorByID(const std::string& collection_name, const VectorIds& id_array, + std::vector& vector_data) { CHECK_INITIALIZED; snapshot::ScopedSnapshotT ss; STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name)); - meta::FilesHolder files_holder; - - std::vector partition_names; - partition_names = std::move(ss->GetPartitionNames()); - partition_names.push_back(collection_name); - - cache::CpuCacheMgr::GetInstance()->PrintInfo(); - STATUS_CHECK(GetVectorsByIdHelper(id_array, vectors, files_holder)); - cache::CpuCacheMgr::GetInstance()->PrintInfo(); + auto handler = std::make_shared(nullptr, ss, id_array); + handler->Iterate(); + STATUS_CHECK(handler->GetStatus()); + vector_data = std::move(handler->data_); return Status::OK(); } -//Status -//SSDBImpl::GetEntitiesByID(const std::string& collection_id, const milvus::engine::IDNumbers& id_array, -// std::vector& vectors, std::vector& attrs) { -// if (!initialized_.load(std::memory_order_acquire)) { -// return SHUTDOWN_ERROR; -// } -// -// bool has_collection; -// auto status = HasCollection(collection_id, has_collection); -// if (!has_collection) { -// LOG_ENGINE_ERROR_ << "Collection " << collection_id << " does not exist: "; -// return Status(DB_NOT_FOUND, "Collection does not exist"); -// } -// if (!status.ok()) { -// return status; -// } -// +Status +SSDBImpl::GetEntityByID(const std::string& collection_name, const VectorIds& id_array, + std::vector& vectors, std::vector& attrs) { + CHECK_INITIALIZED; + + snapshot::ScopedSnapshotT ss; + STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name)); + // engine::meta::CollectionSchema collection_schema; // engine::meta::hybrid::FieldsSchema fields_schema; // collection_schema.collection_id_ = collection_id; @@ -304,281 +287,11 @@ SSDBImpl::GetVectorsByID(const std::string& collection_name, const IDNumbers& id // cache::CpuCacheMgr::GetInstance()->PrintInfo(); // // return status; -//} +} //////////////////////////////////////////////////////////////////////////////// // Internal APIs //////////////////////////////////////////////////////////////////////////////// -Status -SSDBImpl::GetVectorsByIdHelper(const IDNumbers& id_array, std::vector& vectors, - meta::FilesHolder& files_holder) { - // attention: this is a copy, not a reference, since the files_holder.UnMarkFile will change the array internal - milvus::engine::meta::SegmentsSchema files = files_holder.HoldFiles(); - LOG_ENGINE_DEBUG_ << "Getting vector by id in " << files.size() << " files, id count = " << id_array.size(); - - // sometimes not all of id_array can be found, we need to return empty vector for id not found - // for example: - // id_array = [1, -1, 2, -1, 3] - // vectors should return [valid_vector, empty_vector, valid_vector, empty_vector, valid_vector] - // the ID2RAW is to ensure returned vector sequence is consist with id_array - using ID2VECTOR = std::map; - ID2VECTOR map_id2vector; - - vectors.clear(); - - IDNumbers temp_ids = id_array; - for (auto& file : files) { - if (temp_ids.empty()) { - break; // all vectors found, no need to continue - } - // Load bloom filter - std::string segment_dir; - engine::utils::GetParentPath(file.location_, segment_dir); - segment::SegmentReader segment_reader(segment_dir); - segment::IdBloomFilterPtr id_bloom_filter_ptr; - auto status = segment_reader.LoadBloomFilter(id_bloom_filter_ptr); - if (!status.ok()) { - return status; - } - - for (IDNumbers::iterator it = temp_ids.begin(); it != temp_ids.end();) { - int64_t vector_id = *it; - // each id must has a VectorsData - // if vector not found for an id, its VectorsData's vector_count = 0, else 1 - VectorsData& vector_ref = map_id2vector[vector_id]; - - // Check if the id is present in bloom filter. - if (id_bloom_filter_ptr->Check(vector_id)) { - // Load uids and check if the id is indeed present. If yes, find its offset. - std::vector uids; - auto status = segment_reader.LoadUids(uids); - if (!status.ok()) { - return status; - } - - auto found = std::find(uids.begin(), uids.end(), vector_id); - if (found != uids.end()) { - auto offset = std::distance(uids.begin(), found); - - // Check whether the id has been deleted - segment::DeletedDocsPtr deleted_docs_ptr; - status = segment_reader.LoadDeletedDocs(deleted_docs_ptr); - if (!status.ok()) { - LOG_ENGINE_ERROR_ << status.message(); - return status; - } - auto& deleted_docs = deleted_docs_ptr->GetDeletedDocs(); - - auto deleted = std::find(deleted_docs.begin(), deleted_docs.end(), offset); - if (deleted == deleted_docs.end()) { - // Load raw vector - bool is_binary = utils::IsBinaryMetricType(file.metric_type_); - size_t single_vector_bytes = is_binary ? file.dimension_ / 8 : file.dimension_ * sizeof(float); - std::vector raw_vector; - status = - segment_reader.LoadVectors(offset * single_vector_bytes, single_vector_bytes, raw_vector); - if (!status.ok()) { - LOG_ENGINE_ERROR_ << status.message(); - return status; - } - - vector_ref.vector_count_ = 1; - if (is_binary) { - vector_ref.binary_data_.swap(raw_vector); - } else { - std::vector float_vector; - float_vector.resize(file.dimension_); - memcpy(float_vector.data(), raw_vector.data(), single_vector_bytes); - vector_ref.float_data_.swap(float_vector); - } - temp_ids.erase(it); - continue; - } - } - } - - it++; - } - - // unmark file, allow the file to be deleted - files_holder.UnmarkFile(file); - } - - for (auto id : id_array) { - VectorsData& vector_ref = map_id2vector[id]; - - VectorsData data; - data.vector_count_ = vector_ref.vector_count_; - if (data.vector_count_ > 0) { - data.float_data_ = vector_ref.float_data_; // copy data since there could be duplicated id - data.binary_data_ = vector_ref.binary_data_; // copy data since there could be duplicated id - } - vectors.emplace_back(data); - } - - return Status::OK(); -} - -//Status -//SSDBImpl::GetEntitiesByIdHelper(const std::string& collection_id, const milvus::engine::IDNumbers& id_array, -// std::unordered_map& attr_type, -// std::vector& vectors, std::vector& attrs, -// milvus::engine::meta::FilesHolder& files_holder) { -// // attention: this is a copy, not a reference, since the files_holder.UnMarkFile will change the array internal -// milvus::engine::meta::SegmentsSchema files = files_holder.HoldFiles(); -// LOG_ENGINE_DEBUG_ << "Getting vector by id in " << files.size() << " files, id count = " << id_array.size(); -// -// // sometimes not all of id_array can be found, we need to return empty vector for id not found -// // for example: -// // id_array = [1, -1, 2, -1, 3] -// // vectors should return [valid_vector, empty_vector, valid_vector, empty_vector, valid_vector] -// // the ID2RAW is to ensure returned vector sequence is consist with id_array -// using ID2ATTR = std::map; -// using ID2VECTOR = std::map; -// ID2ATTR map_id2attr; -// ID2VECTOR map_id2vector; -// -// IDNumbers temp_ids = id_array; -// for (auto& file : files) { -// // Load bloom filter -// std::string segment_dir; -// engine::utils::GetParentPath(file.location_, segment_dir); -// segment::SegmentReader segment_reader(segment_dir); -// segment::IdBloomFilterPtr id_bloom_filter_ptr; -// segment_reader.LoadBloomFilter(id_bloom_filter_ptr); -// -// for (IDNumbers::iterator it = temp_ids.begin(); it != temp_ids.end();) { -// int64_t vector_id = *it; -// // each id must has a VectorsData -// // if vector not found for an id, its VectorsData's vector_count = 0, else 1 -// AttrsData& attr_ref = map_id2attr[vector_id]; -// VectorsData& vector_ref = map_id2vector[vector_id]; -// -// // Check if the id is present in bloom filter. -// if (id_bloom_filter_ptr->Check(vector_id)) { -// // Load uids and check if the id is indeed present. If yes, find its offset. -// std::vector uids; -// auto status = segment_reader.LoadUids(uids); -// if (!status.ok()) { -// return status; -// } -// -// auto found = std::find(uids.begin(), uids.end(), vector_id); -// if (found != uids.end()) { -// auto offset = std::distance(uids.begin(), found); -// -// // Check whether the id has been deleted -// segment::DeletedDocsPtr deleted_docs_ptr; -// status = segment_reader.LoadDeletedDocs(deleted_docs_ptr); -// if (!status.ok()) { -// LOG_ENGINE_ERROR_ << status.message(); -// return status; -// } -// auto& deleted_docs = deleted_docs_ptr->GetDeletedDocs(); -// -// auto deleted = std::find(deleted_docs.begin(), deleted_docs.end(), offset); -// if (deleted == deleted_docs.end()) { -// // Load raw vector -// bool is_binary = utils::IsBinaryMetricType(file.metric_type_); -// size_t single_vector_bytes = is_binary ? file.dimension_ / 8 : file.dimension_ * sizeof(float); -// std::vector raw_vector; -// status = -// segment_reader.LoadVectors(offset * single_vector_bytes, single_vector_bytes, raw_vector); -// if (!status.ok()) { -// LOG_ENGINE_ERROR_ << status.message(); -// return status; -// } -// -// std::unordered_map> raw_attrs; -// auto attr_it = attr_type.begin(); -// for (; attr_it != attr_type.end(); attr_it++) { -// size_t num_bytes; -// switch (attr_it->second) { -// case engine::meta::hybrid::DataType::INT8: { -// num_bytes = 1; -// break; -// } -// case engine::meta::hybrid::DataType::INT16: { -// num_bytes = 2; -// break; -// } -// case engine::meta::hybrid::DataType::INT32: { -// num_bytes = 4; -// break; -// } -// case engine::meta::hybrid::DataType::INT64: { -// num_bytes = 8; -// break; -// } -// case engine::meta::hybrid::DataType::FLOAT: { -// num_bytes = 4; -// break; -// } -// case engine::meta::hybrid::DataType::DOUBLE: { -// num_bytes = 8; -// break; -// } -// default: { -// std::string msg = "Field type of " + attr_it->first + " is wrong"; -// return Status{DB_ERROR, msg}; -// } -// } -// std::vector raw_attr; -// status = segment_reader.LoadAttrs(attr_it->first, offset * num_bytes, num_bytes, raw_attr); -// if (!status.ok()) { -// LOG_ENGINE_ERROR_ << status.message(); -// return status; -// } -// raw_attrs.insert(std::make_pair(attr_it->first, raw_attr)); -// } -// -// vector_ref.vector_count_ = 1; -// if (is_binary) { -// vector_ref.binary_data_.swap(raw_vector); -// } else { -// std::vector float_vector; -// float_vector.resize(file.dimension_); -// memcpy(float_vector.data(), raw_vector.data(), single_vector_bytes); -// vector_ref.float_data_.swap(float_vector); -// } -// -// attr_ref.attr_count_ = 1; -// attr_ref.attr_data_ = raw_attrs; -// attr_ref.attr_type_ = attr_type; -// temp_ids.erase(it); -// continue; -// } -// } -// } -// it++; -// } -// -// // unmark file, allow the file to be deleted -// files_holder.UnmarkFile(file); -// } -// -// for (auto id : id_array) { -// VectorsData& vector_ref = map_id2vector[id]; -// -// VectorsData data; -// data.vector_count_ = vector_ref.vector_count_; -// if (data.vector_count_ > 0) { -// data.float_data_ = vector_ref.float_data_; // copy data since there could be duplicated id -// data.binary_data_ = vector_ref.binary_data_; // copy data since there could be duplicated id -// } -// vectors.emplace_back(data); -// -// attrs.emplace_back(map_id2attr[id]); -// } -// -// if (vectors.empty()) { -// std::string msg = "Vectors not found in collection " + collection_id; -// LOG_ENGINE_DEBUG_ << msg; -// } -// -// return Status::OK(); -//} - void SSDBImpl::InternalFlush(const std::string& collection_id) { wal::MXLogRecord record; diff --git a/core/src/db/SSDBImpl.h b/core/src/db/SSDBImpl.h index 98f7e333452f..cee5043aaf13 100644 --- a/core/src/db/SSDBImpl.h +++ b/core/src/db/SSDBImpl.h @@ -71,24 +71,14 @@ class SSDBImpl { ShowPartitions(const std::string& collection_name, std::vector& partition_names); Status - GetVectorsByID(const std::string& collection_name, const IDNumbers& id_array, - std::vector& vectors); + GetVectorByID(const std::string& collection_name, const IDNumbers& id_array, + std::vector& vector_data); Status - GetEntitiesByID(const std::string& collection_name, const IDNumbers& id_array, - std::vector& vectors, std::vector& attrs); + GetEntityByID(const std::string& collection_name, const IDNumbers& id_array, + std::vector& vectors, std::vector& attrs); private: - Status - GetVectorsByIdHelper(const IDNumbers& id_array, std::vector& vectors, - meta::FilesHolder& files_holder); - - Status - GetEntitiesByIdHelper(const std::string& collection_id, const IDNumbers& id_array, - std::unordered_map& attr_type, - std::vector& vectors, std::vector& attrs, - meta::FilesHolder& files_holder); - void InternalFlush(const std::string& collection_id = ""); diff --git a/core/src/db/SnapshotHandlers.cpp b/core/src/db/SnapshotHandlers.cpp index 8bef23f28f9e..bc9cbd74ad65 100644 --- a/core/src/db/SnapshotHandlers.cpp +++ b/core/src/db/SnapshotHandlers.cpp @@ -11,6 +11,7 @@ #include "db/SnapshotHandlers.h" #include "db/meta/MetaTypes.h" +#include "segment/SegmentReader.h" namespace milvus { namespace engine { @@ -96,39 +97,116 @@ SegmentsToSearchCollector::Handle(const snapshot::SegmentCommitPtr& segment_comm /////////////////////////////////////////////////////////////////////////////// GetVectorByIdSegmentHandler::GetVectorByIdSegmentHandler(const std::shared_ptr& context, - milvus::engine::snapshot::ScopedSnapshotT ss, - const snapshot::PartitionPtr& partition) - : BaseT(ss), context_(context), partition_(partition) { + engine::snapshot::ScopedSnapshotT ss, const VectorIds& ids) + : BaseT(ss), context_(context), vector_ids_(ids) { } Status GetVectorByIdSegmentHandler::Handle(const snapshot::SegmentPtr& segment) { - if (partition_->GetID() != segment->GetPartitionId()) { - return Status::OK(); - } - - // SS TODO + LOG_ENGINE_DEBUG_ << "Getting vector by id in segment " << segment->GetID(); + + // sometimes not all of id_array can be found, we need to return empty vector for id not found + // for example: + // id_array = [1, -1, 2, -1, 3] + // vectors should return [valid_vector, empty_vector, valid_vector, empty_vector, valid_vector] + // the ID2RAW is to ensure returned vector sequence is consist with id_array +// using ID2VECTOR = std::map; +// ID2VECTOR map_id2vector; +// +// vectors.clear(); +// +// IDNumbers temp_ids = id_array; +// for (auto& file : files) { +// if (temp_ids.empty()) { +// break; // all vectors found, no need to continue +// } +// // Load bloom filter +// std::string segment_dir; +// engine::utils::GetParentPath(file.location_, segment_dir); +// segment::SegmentReader segment_reader(segment_dir); +// segment::IdBloomFilterPtr id_bloom_filter_ptr; +// auto status = segment_reader.LoadBloomFilter(id_bloom_filter_ptr); +// if (!status.ok()) { +// return status; +// } +// +// for (IDNumbers::iterator it = temp_ids.begin(); it != temp_ids.end();) { +// int64_t vector_id = *it; +// // each id must has a VectorsData +// // if vector not found for an id, its VectorsData's vector_count = 0, else 1 +// VectorsData& vector_ref = map_id2vector[vector_id]; +// +// // Check if the id is present in bloom filter. +// if (id_bloom_filter_ptr->Check(vector_id)) { +// // Load uids and check if the id is indeed present. If yes, find its offset. +// std::vector uids; +// auto status = segment_reader.LoadUids(uids); +// if (!status.ok()) { +// return status; +// } +// +// auto found = std::find(uids.begin(), uids.end(), vector_id); +// if (found != uids.end()) { +// auto offset = std::distance(uids.begin(), found); +// +// // Check whether the id has been deleted +// segment::DeletedDocsPtr deleted_docs_ptr; +// status = segment_reader.LoadDeletedDocs(deleted_docs_ptr); +// if (!status.ok()) { +// LOG_ENGINE_ERROR_ << status.message(); +// return status; +// } +// auto& deleted_docs = deleted_docs_ptr->GetDeletedDocs(); +// +// auto deleted = std::find(deleted_docs.begin(), deleted_docs.end(), offset); +// if (deleted == deleted_docs.end()) { +// // Load raw vector +// bool is_binary = utils::IsBinaryMetricType(file.metric_type_); +// size_t single_vector_bytes = is_binary ? file.dimension_ / 8 : file.dimension_ * sizeof(float); +// std::vector raw_vector; +// status = +// segment_reader.LoadVectors(offset * single_vector_bytes, single_vector_bytes, raw_vector); +// if (!status.ok()) { +// LOG_ENGINE_ERROR_ << status.message(); +// return status; +// } +// +// vector_ref.vector_count_ = 1; +// if (is_binary) { +// vector_ref.binary_data_.swap(raw_vector); +// } else { +// std::vector float_vector; +// float_vector.resize(file.dimension_); +// memcpy(float_vector.data(), raw_vector.data(), single_vector_bytes); +// vector_ref.float_data_.swap(float_vector); +// } +// temp_ids.erase(it); +// continue; +// } +// } +// } +// +// it++; +// } +// +// // unmark file, allow the file to be deleted +// files_holder.UnmarkFile(file); +// } +// +// for (auto id : id_array) { +// VectorsData& vector_ref = map_id2vector[id]; +// +// VectorsData data; +// data.vector_count_ = vector_ref.vector_count_; +// if (data.vector_count_ > 0) { +// data.float_data_ = vector_ref.float_data_; // copy data since there could be duplicated id +// data.binary_data_ = vector_ref.binary_data_; // copy data since there could be duplicated id +// } +// vectors.emplace_back(data); +// } return Status::OK(); } -GetVectorByIdPartitionHandler::GetVectorByIdPartitionHandler(const std::shared_ptr& context, - milvus::engine::snapshot::ScopedSnapshotT ss) - : BaseT(ss), context_(context) { -} - -Status -GetVectorByIdPartitionHandler::Handle(const snapshot::PartitionPtr& partion) { - if (context_ && context_->IsConnectionBroken()) { - LOG_ENGINE_DEBUG_ << "Connection broken"; - return Status(DB_ERROR, "Connection broken"); - } - - auto segment_handler = std::make_shared(context_, ss_, partion); - segment_handler->Iterate(); - - return segment_handler->GetStatus(); -} - } // namespace engine } // namespace milvus diff --git a/core/src/db/SnapshotHandlers.h b/core/src/db/SnapshotHandlers.h index 3186aad21e24..56afae3164e5 100644 --- a/core/src/db/SnapshotHandlers.h +++ b/core/src/db/SnapshotHandlers.h @@ -13,6 +13,7 @@ #include "db/meta/FilesHolder.h" #include "db/snapshot/Snapshot.h" +#include "db/Types.h" #include "server/context/Context.h" #include "utils/Log.h" @@ -30,8 +31,8 @@ struct LoadVectorFieldElementHandler : public snapshot::IterateHandler& context_; - const snapshot::FieldPtr& field_; + const server::ContextPtr context_; + const snapshot::FieldPtr field_; }; struct LoadVectorFieldHandler : public snapshot::IterateHandler { @@ -42,7 +43,7 @@ struct LoadVectorFieldHandler : public snapshot::IterateHandler Status Handle(const typename ResourceT::Ptr&) override; - const std::shared_ptr& context_; + const server::ContextPtr context_; }; struct SegmentsToSearchCollector : public snapshot::IterateHandler { @@ -61,24 +62,14 @@ struct GetVectorByIdSegmentHandler : public snapshot::IterateHandler; GetVectorByIdSegmentHandler(const std::shared_ptr& context, snapshot::ScopedSnapshotT ss, - const snapshot::PartitionPtr& partition); + const VectorIds& ids); Status Handle(const typename ResourceT::Ptr&) override; - const std::shared_ptr& context_; - const snapshot::PartitionPtr& partition_; -}; - -struct GetVectorByIdPartitionHandler : public snapshot::IterateHandler { - using ResourceT = snapshot::Partition; - using BaseT = snapshot::IterateHandler; - GetVectorByIdPartitionHandler(const std::shared_ptr& context, snapshot::ScopedSnapshotT ss); - - Status - Handle(const typename ResourceT::Ptr&) override; - - const std::shared_ptr& context_; + const server::ContextPtr context_; + const engine::VectorIds vector_ids_; + std::vector data_; }; } // namespace engine diff --git a/core/src/db/Types.h b/core/src/db/Types.h index 8ad2c4df7eb3..d48d996b3cb4 100644 --- a/core/src/db/Types.h +++ b/core/src/db/Types.h @@ -33,6 +33,11 @@ typedef segment::doc_id_t IDNumber; typedef IDNumber* IDNumberPtr; typedef std::vector IDNumbers; +typedef faiss::Index::idx_t VectorId; +typedef std::vector VectorIds; +typedef faiss::Index::distance_t VectorDistance; +typedef std::vector VectorDistances; + typedef std::vector ResultIds; typedef std::vector ResultDistances; From 86de1a74b67ad97c542c2e1112d922856c93ea4c Mon Sep 17 00:00:00 2001 From: "yudong.cai" Date: Sat, 4 Jul 2020 12:20:40 +0800 Subject: [PATCH 07/11] rename GetResFiles to GetResPath Signed-off-by: yudong.cai --- core/src/db/snapshot/Event.h | 19 +++++++-------- core/src/db/snapshot/ResourceHelper.h | 34 ++++++++++++--------------- 2 files changed, 23 insertions(+), 30 deletions(-) diff --git a/core/src/db/snapshot/Event.h b/core/src/db/snapshot/Event.h index b7396f75d651..bf91faa486c5 100644 --- a/core/src/db/snapshot/Event.h +++ b/core/src/db/snapshot/Event.h @@ -49,17 +49,14 @@ class ResourceGCEvent : public Event { STATUS_CHECK((*sd_op)(store)); /* TODO: physically clean resource */ - std::vector res_file_list; - STATUS_CHECK(GetResFiles(res_file_list, res_)); - for (auto& res_file : res_file_list) { - if (!boost::filesystem::exists(res_file)) { - continue; - } - if (boost::filesystem::is_directory(res_file)) { - boost::filesystem::remove_all(res_file); - } else { - boost::filesystem::remove(res_file); - } + std::string res_path = GetResPath(res_); + if (!boost::filesystem::exists(res_path)) { + return Status::OK(); + } + if (boost::filesystem::is_directory(res_path)) { + boost::filesystem::remove_all(res_path); + } else { + boost::filesystem::remove(res_path); } /* remove resource from meta */ diff --git a/core/src/db/snapshot/ResourceHelper.h b/core/src/db/snapshot/ResourceHelper.h index 995d48a222f6..b515ee961821 100644 --- a/core/src/db/snapshot/ResourceHelper.h +++ b/core/src/db/snapshot/ResourceHelper.h @@ -20,55 +20,51 @@ namespace milvus::engine::snapshot { template -inline Status -GetResFiles(std::vector& file_list, typename ResourceT::Ptr& res_ptr) { - return Status::OK(); +inline std::string +GetResPath(const typename ResourceT::Ptr& res_ptr) { + return std::string(); } template <> -inline Status -GetResFiles(std::vector& file_list, Collection::Ptr& res_ptr) { +inline std::string +GetResPath(const Collection::Ptr& res_ptr) { std::stringstream ss; ss << res_ptr->GetID(); - file_list.push_back(ss.str()); - return Status::OK(); + return ss.str(); } template <> -inline Status -GetResFiles(std::vector& file_list, Partition::Ptr& res_ptr) { +inline std::string +GetResPath(const Partition::Ptr& res_ptr) { std::stringstream ss; ss << res_ptr->GetCollectionId() << "/"; ss << res_ptr->GetID(); - file_list.push_back(ss.str()); - return Status::OK(); + return ss.str(); } template <> -inline Status -GetResFiles(std::vector& file_list, Segment::Ptr& res_ptr) { +inline std::string +GetResPath(const Segment::Ptr& res_ptr) { std::stringstream ss; ss << res_ptr->GetCollectionId() << "/"; ss << res_ptr->GetPartitionId() << "/"; ss << res_ptr->GetID(); - file_list.push_back(ss.str()); - return Status::OK(); + return ss.str(); } template <> -inline Status -GetResFiles(std::vector& file_list, SegmentFile::Ptr& res_ptr) { +inline std::string +GetResPath(const SegmentFile::Ptr& res_ptr) { std::stringstream ss; ss << res_ptr->GetCollectionId() << "/"; ss << res_ptr->GetPartitionId() << "/"; ss << res_ptr->GetSegmentId() << "/"; ss << res_ptr->GetID(); - file_list.push_back(ss.str()); - return Status::OK(); + return ss.str(); } } // namespace milvus::engine::snapshot From ccbc2b73f3bae49f152d5d6f15b365c7e5ad28bb Mon Sep 17 00:00:00 2001 From: "yudong.cai" Date: Sat, 4 Jul 2020 12:21:12 +0800 Subject: [PATCH 08/11] update GetVectorByIdSegmentHandler Signed-off-by: yudong.cai --- core/src/db/SnapshotHandlers.cpp | 157 +++++++++++++------------------ 1 file changed, 64 insertions(+), 93 deletions(-) diff --git a/core/src/db/SnapshotHandlers.cpp b/core/src/db/SnapshotHandlers.cpp index bc9cbd74ad65..0085cfeb91b1 100644 --- a/core/src/db/SnapshotHandlers.cpp +++ b/core/src/db/SnapshotHandlers.cpp @@ -11,6 +11,7 @@ #include "db/SnapshotHandlers.h" #include "db/meta/MetaTypes.h" +#include "db/snapshot/ResourceHelper.h" #include "segment/SegmentReader.h" namespace milvus { @@ -98,7 +99,7 @@ SegmentsToSearchCollector::Handle(const snapshot::SegmentCommitPtr& segment_comm /////////////////////////////////////////////////////////////////////////////// GetVectorByIdSegmentHandler::GetVectorByIdSegmentHandler(const std::shared_ptr& context, engine::snapshot::ScopedSnapshotT ss, const VectorIds& ids) - : BaseT(ss), context_(context), vector_ids_(ids) { + : BaseT(ss), context_(context), vector_ids_(ids), data_() { } Status @@ -110,100 +111,70 @@ GetVectorByIdSegmentHandler::Handle(const snapshot::SegmentPtr& segment) { // id_array = [1, -1, 2, -1, 3] // vectors should return [valid_vector, empty_vector, valid_vector, empty_vector, valid_vector] // the ID2RAW is to ensure returned vector sequence is consist with id_array -// using ID2VECTOR = std::map; -// ID2VECTOR map_id2vector; + std::map map_id2vector; + + // Load bloom filter + std::string segment_dir = snapshot::GetResPath(segment); + segment::SegmentReader segment_reader(segment_dir); + segment::IdBloomFilterPtr id_bloom_filter_ptr; + STATUS_CHECK(segment_reader.LoadBloomFilter(id_bloom_filter_ptr)); + + for (auto vector_id : vector_ids_) { + // each id must has a VectorsData + // if vector not found for an id, its VectorsData's vector_count = 0, else 1 + VectorsData& vector_ref = map_id2vector[vector_id]; + + // Check if the id is present in bloom filter. + if (!id_bloom_filter_ptr->Check(vector_id)) { + continue; + } + + // Load uids and check if the id is indeed present. If yes, find its offset. + std::vector uids; + STATUS_CHECK(segment_reader.LoadUids(uids)); + + auto found = std::find(uids.begin(), uids.end(), vector_id); + if (found != uids.end()) { + auto offset = std::distance(uids.begin(), found); + + // Check whether the id has been deleted + segment::DeletedDocsPtr deleted_docs_ptr; + STATUS_CHECK(segment_reader.LoadDeletedDocs(deleted_docs_ptr)); + auto& deleted_docs = deleted_docs_ptr->GetDeletedDocs(); + + auto deleted = std::find(deleted_docs.begin(), deleted_docs.end(), offset); + if (deleted == deleted_docs.end()) { + // SS TODO + // Load raw vector +// bool is_binary = utils::IsBinaryMetricType(file.metric_type_); +// size_t single_vector_bytes = is_binary ? file.dimension_ / 8 : file.dimension_ * sizeof(float); +// std::vector raw_vector; +// STATUS_CHECK(segment_reader.LoadVectors(offset * single_vector_bytes, single_vector_bytes, raw_vector)); // -// vectors.clear(); -// -// IDNumbers temp_ids = id_array; -// for (auto& file : files) { -// if (temp_ids.empty()) { -// break; // all vectors found, no need to continue -// } -// // Load bloom filter -// std::string segment_dir; -// engine::utils::GetParentPath(file.location_, segment_dir); -// segment::SegmentReader segment_reader(segment_dir); -// segment::IdBloomFilterPtr id_bloom_filter_ptr; -// auto status = segment_reader.LoadBloomFilter(id_bloom_filter_ptr); -// if (!status.ok()) { -// return status; -// } -// -// for (IDNumbers::iterator it = temp_ids.begin(); it != temp_ids.end();) { -// int64_t vector_id = *it; -// // each id must has a VectorsData -// // if vector not found for an id, its VectorsData's vector_count = 0, else 1 -// VectorsData& vector_ref = map_id2vector[vector_id]; -// -// // Check if the id is present in bloom filter. -// if (id_bloom_filter_ptr->Check(vector_id)) { -// // Load uids and check if the id is indeed present. If yes, find its offset. -// std::vector uids; -// auto status = segment_reader.LoadUids(uids); -// if (!status.ok()) { -// return status; -// } -// -// auto found = std::find(uids.begin(), uids.end(), vector_id); -// if (found != uids.end()) { -// auto offset = std::distance(uids.begin(), found); -// -// // Check whether the id has been deleted -// segment::DeletedDocsPtr deleted_docs_ptr; -// status = segment_reader.LoadDeletedDocs(deleted_docs_ptr); -// if (!status.ok()) { -// LOG_ENGINE_ERROR_ << status.message(); -// return status; -// } -// auto& deleted_docs = deleted_docs_ptr->GetDeletedDocs(); -// -// auto deleted = std::find(deleted_docs.begin(), deleted_docs.end(), offset); -// if (deleted == deleted_docs.end()) { -// // Load raw vector -// bool is_binary = utils::IsBinaryMetricType(file.metric_type_); -// size_t single_vector_bytes = is_binary ? file.dimension_ / 8 : file.dimension_ * sizeof(float); -// std::vector raw_vector; -// status = -// segment_reader.LoadVectors(offset * single_vector_bytes, single_vector_bytes, raw_vector); -// if (!status.ok()) { -// LOG_ENGINE_ERROR_ << status.message(); -// return status; -// } -// -// vector_ref.vector_count_ = 1; -// if (is_binary) { -// vector_ref.binary_data_.swap(raw_vector); -// } else { -// std::vector float_vector; -// float_vector.resize(file.dimension_); -// memcpy(float_vector.data(), raw_vector.data(), single_vector_bytes); -// vector_ref.float_data_.swap(float_vector); -// } -// temp_ids.erase(it); -// continue; -// } +// vector_ref.vector_count_ = 1; +// if (is_binary) { +// vector_ref.binary_data_.swap(raw_vector); +// } else { +// std::vector float_vector; +// float_vector.resize(file.dimension_); +// memcpy(float_vector.data(), raw_vector.data(), single_vector_bytes); +// vector_ref.float_data_.swap(float_vector); // } -// } -// -// it++; -// } -// -// // unmark file, allow the file to be deleted -// files_holder.UnmarkFile(file); -// } -// -// for (auto id : id_array) { -// VectorsData& vector_ref = map_id2vector[id]; -// -// VectorsData data; -// data.vector_count_ = vector_ref.vector_count_; -// if (data.vector_count_ > 0) { -// data.float_data_ = vector_ref.float_data_; // copy data since there could be duplicated id -// data.binary_data_ = vector_ref.binary_data_; // copy data since there could be duplicated id -// } -// vectors.emplace_back(data); -// } + } + } + } + + for (auto id : vector_ids_) { + VectorsData& vector_ref = map_id2vector[id]; + + VectorsData data; + data.vector_count_ = vector_ref.vector_count_; + if (data.vector_count_ > 0) { + data.float_data_ = vector_ref.float_data_; // copy data since there could be duplicated id + data.binary_data_ = vector_ref.binary_data_; // copy data since there could be duplicated id + } + data_.emplace_back(data); + } return Status::OK(); } From 6dd70ba7e4ef2ff467d32ba62c7266ec2f3f6c00 Mon Sep 17 00:00:00 2001 From: "yudong.cai" Date: Mon, 6 Jul 2020 22:19:11 +0800 Subject: [PATCH 09/11] add GetEntityByID Signed-off-by: yudong.cai --- core/src/db/SSDBImpl.cpp | 74 ++------------ core/src/db/SSDBImpl.h | 7 +- core/src/db/SnapshotHandlers.cpp | 159 +++++++++++++++++++------------ core/src/db/SnapshotHandlers.h | 21 ++-- core/src/db/Types.h | 2 - core/src/db/meta/MetaTypes.h | 23 ++--- core/src/db/snapshot/Resources.h | 6 +- core/src/db/snapshot/Snapshot.h | 1 - 8 files changed, 137 insertions(+), 156 deletions(-) diff --git a/core/src/db/SSDBImpl.cpp b/core/src/db/SSDBImpl.cpp index dd9fc2bbb04d..69086fd405a2 100644 --- a/core/src/db/SSDBImpl.cpp +++ b/core/src/db/SSDBImpl.cpp @@ -247,81 +247,23 @@ SSDBImpl::PreloadCollection(const server::ContextPtr& context, const std::string } Status -SSDBImpl::GetVectorByID(const std::string& collection_name, const VectorIds& id_array, - std::vector& vector_data) { +SSDBImpl::GetEntityByID(const std::string& collection_name, const IDNumbers& id_array, + const std::vector& field_names, std::vector& vector_data, + std::vector& attr_type, std::vector& attr_data) { CHECK_INITIALIZED; snapshot::ScopedSnapshotT ss; STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name)); - auto handler = std::make_shared(nullptr, ss, id_array); + auto handler = std::make_shared(nullptr, ss, id_array, field_names); handler->Iterate(); STATUS_CHECK(handler->GetStatus()); - vector_data = std::move(handler->data_); - return Status::OK(); -} - -Status -SSDBImpl::GetEntityByID(const std::string& collection_name, const VectorIds& id_array, - std::vector& vectors, std::vector& attrs) { - CHECK_INITIALIZED; - - snapshot::ScopedSnapshotT ss; - STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name)); + vector_data = std::move(handler->vector_data_); + attr_type = std::move(handler->attr_type_); + attr_data = std::move(handler->attr_data_); -// engine::meta::CollectionSchema collection_schema; -// engine::meta::hybrid::FieldsSchema fields_schema; -// collection_schema.collection_id_ = collection_id; -// status = meta_ptr_->DescribeHybridCollection(collection_schema, fields_schema); -// if (!status.ok()) { -// return status; -// } -// std::unordered_map attr_type; -// for (auto schema : fields_schema.fields_schema_) { -// if (schema.field_type_ == (int32_t)engine::meta::hybrid::DataType::VECTOR) { -// continue; -// } -// attr_type.insert(std::make_pair(schema.field_name_, (engine::meta::hybrid::DataType)schema.field_type_)); -// } -// -// meta::FilesHolder files_holder; -// std::vector file_types{meta::SegmentSchema::FILE_TYPE::RAW, meta::SegmentSchema::FILE_TYPE::TO_INDEX, -// meta::SegmentSchema::FILE_TYPE::BACKUP}; -// -// status = meta_ptr_->FilesByType(collection_id, file_types, files_holder); -// if (!status.ok()) { -// std::string err_msg = "Failed to get files for GetEntitiesByID: " + status.message(); -// LOG_ENGINE_ERROR_ << err_msg; -// return status; -// } -// -// std::vector partition_array; -// status = meta_ptr_->ShowPartitions(collection_id, partition_array); -// if (!status.ok()) { -// std::string err_msg = "Failed to get partitions for GetEntitiesByID: " + status.message(); -// LOG_ENGINE_ERROR_ << err_msg; -// return status; -// } -// for (auto& schema : partition_array) { -// status = meta_ptr_->FilesByType(schema.collection_id_, file_types, files_holder); -// if (!status.ok()) { -// std::string err_msg = "Failed to get files for GetEntitiesByID: " + status.message(); -// LOG_ENGINE_ERROR_ << err_msg; -// return status; -// } -// } -// -// if (files_holder.HoldFiles().empty()) { -// LOG_ENGINE_DEBUG_ << "No files to get vector by id from"; -// return Status(DB_NOT_FOUND, "Collection is empty"); -// } -// -// cache::CpuCacheMgr::GetInstance()->PrintInfo(); -// status = GetEntitiesByIdHelper(collection_id, id_array, attr_type, vectors, attrs, files_holder); -// cache::CpuCacheMgr::GetInstance()->PrintInfo(); -// -// return status; + return Status::OK(); } //////////////////////////////////////////////////////////////////////////////// diff --git a/core/src/db/SSDBImpl.h b/core/src/db/SSDBImpl.h index 3003edad140e..0883153f2acc 100644 --- a/core/src/db/SSDBImpl.h +++ b/core/src/db/SSDBImpl.h @@ -75,13 +75,10 @@ class SSDBImpl { Status DropIndex(const std::string& collection_name, const std::string& field_name, const std::string& field_element_name); - Status - GetVectorByID(const std::string& collection_name, const IDNumbers& id_array, - std::vector& vector_data); - Status GetEntityByID(const std::string& collection_name, const IDNumbers& id_array, - std::vector& vectors, std::vector& attrs); + const std::vector& field_names, std::vector& vector_data, + std::vector& attr_type, std::vector& attr_data); private: void diff --git a/core/src/db/SnapshotHandlers.cpp b/core/src/db/SnapshotHandlers.cpp index 0085cfeb91b1..592cd0f97e83 100644 --- a/core/src/db/SnapshotHandlers.cpp +++ b/core/src/db/SnapshotHandlers.cpp @@ -12,8 +12,13 @@ #include "db/SnapshotHandlers.h" #include "db/meta/MetaTypes.h" #include "db/snapshot/ResourceHelper.h" +#include "db/snapshot/Snapshot.h" +#include "knowhere/index/vector_index/helpers/IndexParameter.h" #include "segment/SegmentReader.h" +#include +#include + namespace milvus { namespace engine { @@ -97,87 +102,121 @@ SegmentsToSearchCollector::Handle(const snapshot::SegmentCommitPtr& segment_comm } /////////////////////////////////////////////////////////////////////////////// -GetVectorByIdSegmentHandler::GetVectorByIdSegmentHandler(const std::shared_ptr& context, - engine::snapshot::ScopedSnapshotT ss, const VectorIds& ids) - : BaseT(ss), context_(context), vector_ids_(ids), data_() { +GetEntityByIdSegmentHandler::GetEntityByIdSegmentHandler(const std::shared_ptr& context, + engine::snapshot::ScopedSnapshotT ss, const IDNumbers& ids, + const std::vector& field_names) + : BaseT(ss), context_(context), ids_(ids), field_names_(field_names), vector_data_(), attr_type_(), attr_data_() { } Status -GetVectorByIdSegmentHandler::Handle(const snapshot::SegmentPtr& segment) { - LOG_ENGINE_DEBUG_ << "Getting vector by id in segment " << segment->GetID(); - - // sometimes not all of id_array can be found, we need to return empty vector for id not found - // for example: - // id_array = [1, -1, 2, -1, 3] - // vectors should return [valid_vector, empty_vector, valid_vector, empty_vector, valid_vector] - // the ID2RAW is to ensure returned vector sequence is consist with id_array - std::map map_id2vector; +GetEntityByIdSegmentHandler::Handle(const snapshot::SegmentPtr& segment) { + LOG_ENGINE_DEBUG_ << "Get entity by id in segment " << segment->GetID(); // Load bloom filter std::string segment_dir = snapshot::GetResPath(segment); segment::SegmentReader segment_reader(segment_dir); + segment::IdBloomFilterPtr id_bloom_filter_ptr; STATUS_CHECK(segment_reader.LoadBloomFilter(id_bloom_filter_ptr)); - for (auto vector_id : vector_ids_) { - // each id must has a VectorsData - // if vector not found for an id, its VectorsData's vector_count = 0, else 1 - VectorsData& vector_ref = map_id2vector[vector_id]; + // Load uids and check if the id is indeed present. If yes, find its offset. + std::vector uids; + STATUS_CHECK(segment_reader.LoadUids(uids)); + + // Check whether the id has been deleted + segment::DeletedDocsPtr deleted_docs_ptr; + STATUS_CHECK(segment_reader.LoadDeletedDocs(deleted_docs_ptr)); + auto& deleted_docs = deleted_docs_ptr->GetDeletedDocs(); - // Check if the id is present in bloom filter. - if (!id_bloom_filter_ptr->Check(vector_id)) { + for (auto id : ids_) { + AttrsData& attr_ref = attr_data_[id]; + VectorsData& vector_ref = vector_data_[id]; + + /* fast check using bloom filter */ + if (!id_bloom_filter_ptr->Check(id)) { continue; } - // Load uids and check if the id is indeed present. If yes, find its offset. - std::vector uids; - STATUS_CHECK(segment_reader.LoadUids(uids)); - - auto found = std::find(uids.begin(), uids.end(), vector_id); - if (found != uids.end()) { - auto offset = std::distance(uids.begin(), found); - - // Check whether the id has been deleted - segment::DeletedDocsPtr deleted_docs_ptr; - STATUS_CHECK(segment_reader.LoadDeletedDocs(deleted_docs_ptr)); - auto& deleted_docs = deleted_docs_ptr->GetDeletedDocs(); - - auto deleted = std::find(deleted_docs.begin(), deleted_docs.end(), offset); - if (deleted == deleted_docs.end()) { - // SS TODO - // Load raw vector -// bool is_binary = utils::IsBinaryMetricType(file.metric_type_); -// size_t single_vector_bytes = is_binary ? file.dimension_ / 8 : file.dimension_ * sizeof(float); -// std::vector raw_vector; -// STATUS_CHECK(segment_reader.LoadVectors(offset * single_vector_bytes, single_vector_bytes, raw_vector)); -// -// vector_ref.vector_count_ = 1; -// if (is_binary) { -// vector_ref.binary_data_.swap(raw_vector); -// } else { -// std::vector float_vector; -// float_vector.resize(file.dimension_); -// memcpy(float_vector.data(), raw_vector.data(), single_vector_bytes); -// vector_ref.float_data_.swap(float_vector); -// } - } + /* check if id really exists in uids */ + auto found = std::find(uids.begin(), uids.end(), id); + if (found == uids.end()) { + continue; } - } - for (auto id : vector_ids_) { - VectorsData& vector_ref = map_id2vector[id]; + /* check if this id is deleted */ + auto offset = std::distance(uids.begin(), found); + auto deleted = std::find(deleted_docs.begin(), deleted_docs.end(), offset); + if (deleted != deleted_docs.end()) { + continue; + } - VectorsData data; - data.vector_count_ = vector_ref.vector_count_; - if (data.vector_count_ > 0) { - data.float_data_ = vector_ref.float_data_; // copy data since there could be duplicated id - data.binary_data_ = vector_ref.binary_data_; // copy data since there could be duplicated id + std::unordered_map> raw_attrs; + for (auto& field_name : field_names_) { + auto field_ptr = ss_->GetField(field_name); + auto field_params = field_ptr->GetParamsJson(); + auto dim = field_params[knowhere::meta::DIM].get(); + auto field_type = field_ptr->GetFtype(); + + if (field_ptr->GetFtype() == (int64_t)meta::hybrid::DataType::VECTOR_BINARY) { + size_t vector_size = dim / 8; + std::vector raw_vector; + STATUS_CHECK(segment_reader.LoadVectors(offset * vector_size, vector_size, raw_vector)); + + vector_ref.vector_count_ = 1; + vector_ref.binary_data_.swap(raw_vector); + } else if (field_ptr->GetFtype() == (int64_t)meta::hybrid::DataType::VECTOR_FLOAT) { + size_t vector_size = dim * sizeof(float); + std::vector raw_vector; + STATUS_CHECK(segment_reader.LoadVectors(offset * vector_size, vector_size, raw_vector)); + + vector_ref.vector_count_ = 1; + std::vector float_vector; + float_vector.resize(dim); + memcpy(float_vector.data(), raw_vector.data(), vector_size); + vector_ref.float_data_.swap(float_vector); + } else { + size_t num_bytes; + switch (field_type) { + case (int64_t)meta::hybrid::DataType::INT8: { + num_bytes = sizeof(int8_t); + break; + } + case (int64_t)meta::hybrid::DataType::INT16: { + num_bytes = sizeof(int16_t); + break; + } + case (int64_t)meta::hybrid::DataType::INT32: { + num_bytes = sizeof(int32_t); + break; + } + case (int64_t)meta::hybrid::DataType::INT64: { + num_bytes = sizeof(int64_t); + break; + } + case (int64_t)meta::hybrid::DataType::FLOAT: { + num_bytes = sizeof(float); + break; + } + case (int64_t)meta::hybrid::DataType::DOUBLE: { + num_bytes = sizeof(double); + break; + } + default: { + std::string msg = "Field type of " + field_name + " not supported"; + return Status(DB_ERROR, msg); + } + } + std::vector raw_attr; + STATUS_CHECK(segment_reader.LoadAttrs(field_name, offset * num_bytes, num_bytes, raw_attr)); + raw_attrs.insert(std::make_pair(field_name, raw_attr)); + } } - data_.emplace_back(data); + + attr_ref.attr_count_ = 1; + attr_ref.attr_data_ = raw_attrs; } return Status::OK(); } - } // namespace engine } // namespace milvus diff --git a/core/src/db/SnapshotHandlers.h b/core/src/db/SnapshotHandlers.h index 56afae3164e5..13c4ac458e89 100644 --- a/core/src/db/SnapshotHandlers.h +++ b/core/src/db/SnapshotHandlers.h @@ -11,13 +11,15 @@ #pragma once +#include "db/Types.h" #include "db/meta/FilesHolder.h" #include "db/snapshot/Snapshot.h" -#include "db/Types.h" #include "server/context/Context.h" #include "utils/Log.h" #include +#include +#include namespace milvus { namespace engine { @@ -25,7 +27,7 @@ namespace engine { struct LoadVectorFieldElementHandler : public snapshot::IterateHandler { using ResourceT = snapshot::FieldElement; using BaseT = snapshot::IterateHandler; - LoadVectorFieldElementHandler(const std::shared_ptr& context, snapshot::ScopedSnapshotT ss, + LoadVectorFieldElementHandler(const server::ContextPtr& context, snapshot::ScopedSnapshotT ss, const snapshot::FieldPtr& field); Status @@ -38,7 +40,7 @@ struct LoadVectorFieldElementHandler : public snapshot::IterateHandler { using ResourceT = snapshot::Field; using BaseT = snapshot::IterateHandler; - LoadVectorFieldHandler(const std::shared_ptr& context, snapshot::ScopedSnapshotT ss); + LoadVectorFieldHandler(const server::ContextPtr& context, snapshot::ScopedSnapshotT ss); Status Handle(const typename ResourceT::Ptr&) override; @@ -58,18 +60,21 @@ struct SegmentsToSearchCollector : public snapshot::IterateHandler { +struct GetEntityByIdSegmentHandler : public snapshot::IterateHandler { using ResourceT = snapshot::Segment; using BaseT = snapshot::IterateHandler; - GetVectorByIdSegmentHandler(const std::shared_ptr& context, snapshot::ScopedSnapshotT ss, - const VectorIds& ids); + GetEntityByIdSegmentHandler(const server::ContextPtr& context, snapshot::ScopedSnapshotT ss, const IDNumbers& ids, + const std::vector& field_names); Status Handle(const typename ResourceT::Ptr&) override; const server::ContextPtr context_; - const engine::VectorIds vector_ids_; - std::vector data_; + const engine::IDNumbers ids_; + const std::vector field_names_; + std::vector vector_data_; + std::vector attr_type_; + std::vector attr_data_; }; } // namespace engine diff --git a/core/src/db/Types.h b/core/src/db/Types.h index d48d996b3cb4..d2f66b6cf8bb 100644 --- a/core/src/db/Types.h +++ b/core/src/db/Types.h @@ -33,8 +33,6 @@ typedef segment::doc_id_t IDNumber; typedef IDNumber* IDNumberPtr; typedef std::vector IDNumbers; -typedef faiss::Index::idx_t VectorId; -typedef std::vector VectorIds; typedef faiss::Index::distance_t VectorDistance; typedef std::vector VectorDistances; diff --git a/core/src/db/meta/MetaTypes.h b/core/src/db/meta/MetaTypes.h index e98876903072..81a358328f75 100644 --- a/core/src/db/meta/MetaTypes.h +++ b/core/src/db/meta/MetaTypes.h @@ -102,20 +102,21 @@ using Table2FileRef = std::map; namespace hybrid { enum class DataType { - INT8 = 1, - INT16 = 2, - INT32 = 3, - INT64 = 4, + NONE = 0, + BOOL = 1, + INT8 = 2, + INT16 = 3, + INT32 = 4, + INT64 = 5, - STRING = 20, - - BOOL = 30, + FLOAT = 10, + DOUBLE = 11, - FLOAT = 40, - DOUBLE = 41, + STRING = 20, - VECTOR = 100, - UNKNOWN = 9999, + VECTOR_BINARY = 100, + VECTOR_FLOAT = 101, + VECTOR = 200, }; struct VectorFieldSchema { diff --git a/core/src/db/snapshot/Resources.h b/core/src/db/snapshot/Resources.h index 9da7fc59cc1b..1b182e53f248 100644 --- a/core/src/db/snapshot/Resources.h +++ b/core/src/db/snapshot/Resources.h @@ -296,7 +296,7 @@ class NameField { class ParamsField { public: - explicit ParamsField(std::string params) : params_(std::move(params)), json_params_(json::parse(params_)) { + explicit ParamsField(std::string params) : params_(std::move(params)), params_json_(json::parse(params_)) { } const std::string& @@ -306,12 +306,12 @@ class ParamsField { const json& GetParamsJson() const { - return json_params_; + return params_json_; } protected: std::string params_; - json json_params_; + json params_json_; }; class SizeField { diff --git a/core/src/db/snapshot/Snapshot.h b/core/src/db/snapshot/Snapshot.h index a4d2fd87a8d3..0866f51e1680 100644 --- a/core/src/db/snapshot/Snapshot.h +++ b/core/src/db/snapshot/Snapshot.h @@ -182,7 +182,6 @@ class Snapshot : public ReferenceProxy { return it != field_names_map_.end(); } - bool FieldPtr GetField(const std::string& name) const; From 84f55488c8fc40feaf2e585e85c6b6c8f8dcf18f Mon Sep 17 00:00:00 2001 From: "yudong.cai" Date: Tue, 7 Jul 2020 00:00:24 +0800 Subject: [PATCH 10/11] update DataType Signed-off-by: yudong.cai --- core/src/db/meta/MetaTypes.h | 19 +-- core/src/grpc/gen-milvus/milvus.pb.cc | 182 +++++++++++++------------- core/src/grpc/gen-milvus/milvus.pb.h | 25 ++-- core/src/grpc/milvus.proto | 24 ++-- 4 files changed, 118 insertions(+), 132 deletions(-) diff --git a/core/src/db/meta/MetaTypes.h b/core/src/db/meta/MetaTypes.h index 81a358328f75..b926c2d789fc 100644 --- a/core/src/db/meta/MetaTypes.h +++ b/core/src/db/meta/MetaTypes.h @@ -134,27 +134,10 @@ struct VectorFieldsSchema { using VectorFieldSchemaPtr = std::shared_ptr; struct FieldSchema { - typedef enum { - INT8 = 1, - INT16 = 2, - INT32 = 3, - INT64 = 4, - - STRING = 20, - - BOOL = 30, - - FLOAT = 40, - DOUBLE = 41, - - VECTOR = 100, - UNKNOWN = 9999, - } FIELD_TYPE; - // TODO(yukun): need field_id? std::string collection_id_; std::string field_name_; - int32_t field_type_ = (int)INT8; + int32_t field_type_; std::string field_params_; }; diff --git a/core/src/grpc/gen-milvus/milvus.pb.cc b/core/src/grpc/gen-milvus/milvus.pb.cc index 9503eedfe588..ad2262c1d907 100644 --- a/core/src/grpc/gen-milvus/milvus.pb.cc +++ b/core/src/grpc/gen-milvus/milvus.pb.cc @@ -1673,92 +1673,93 @@ const char descriptor_table_protodef_milvus_2eproto[] PROTOBUF_SECTION_VARIABLE( "y\030\002 \003(\003\"\221\001\n\013HIndexParam\022#\n\006status\030\001 \001(\0132" "\023.milvus.grpc.Status\022\027\n\017collection_name\030" "\002 \001(\t\022\023\n\013field_names\030\003 \003(\t\022/\n\014extra_para" - "ms\030\004 \003(\0132\031.milvus.grpc.KeyValuePair*\206\001\n\010" - "DataType\022\010\n\004NULL\020\000\022\010\n\004INT8\020\001\022\t\n\005INT16\020\002\022" - "\t\n\005INT32\020\003\022\t\n\005INT64\020\004\022\n\n\006STRING\020\024\022\010\n\004BOO" - "L\020\036\022\t\n\005FLOAT\020(\022\n\n\006DOUBLE\020)\022\n\n\006VECTOR\020d\022\014" - "\n\007UNKNOWN\020\217N*C\n\017CompareOperator\022\006\n\002LT\020\000\022" - "\007\n\003LTE\020\001\022\006\n\002EQ\020\002\022\006\n\002GT\020\003\022\007\n\003GTE\020\004\022\006\n\002NE\020" - "\005*8\n\005Occur\022\013\n\007INVALID\020\000\022\010\n\004MUST\020\001\022\n\n\006SHO" - "ULD\020\002\022\014\n\010MUST_NOT\020\0032\256\030\n\rMilvusService\022H\n" - "\020CreateCollection\022\035.milvus.grpc.Collecti" - "onSchema\032\023.milvus.grpc.Status\"\000\022F\n\rHasCo" - "llection\022\033.milvus.grpc.CollectionName\032\026." - "milvus.grpc.BoolReply\"\000\022R\n\022DescribeColle" - "ction\022\033.milvus.grpc.CollectionName\032\035.mil" - "vus.grpc.CollectionSchema\"\000\022Q\n\017CountColl" - "ection\022\033.milvus.grpc.CollectionName\032\037.mi" - "lvus.grpc.CollectionRowCount\"\000\022J\n\017ShowCo" - "llections\022\024.milvus.grpc.Command\032\037.milvus" - ".grpc.CollectionNameList\"\000\022P\n\022ShowCollec" - "tionInfo\022\033.milvus.grpc.CollectionName\032\033." - "milvus.grpc.CollectionInfo\"\000\022D\n\016DropColl" - "ection\022\033.milvus.grpc.CollectionName\032\023.mi" - "lvus.grpc.Status\"\000\022=\n\013CreateIndex\022\027.milv" - "us.grpc.IndexParam\032\023.milvus.grpc.Status\"" - "\000\022G\n\rDescribeIndex\022\033.milvus.grpc.Collect" - "ionName\032\027.milvus.grpc.IndexParam\"\000\022\?\n\tDr" - "opIndex\022\033.milvus.grpc.CollectionName\032\023.m" - "ilvus.grpc.Status\"\000\022E\n\017CreatePartition\022\033" - ".milvus.grpc.PartitionParam\032\023.milvus.grp" - "c.Status\"\000\022E\n\014HasPartition\022\033.milvus.grpc" - ".PartitionParam\032\026.milvus.grpc.BoolReply\"" - "\000\022K\n\016ShowPartitions\022\033.milvus.grpc.Collec" - "tionName\032\032.milvus.grpc.PartitionList\"\000\022C" - "\n\rDropPartition\022\033.milvus.grpc.PartitionP" - "aram\032\023.milvus.grpc.Status\"\000\022<\n\006Insert\022\030." - "milvus.grpc.InsertParam\032\026.milvus.grpc.Ve" - "ctorIds\"\000\022J\n\016GetVectorsByID\022\034.milvus.grp" - "c.VectorsIdentity\032\030.milvus.grpc.VectorsD" - "ata\"\000\022H\n\014GetVectorIDs\022\036.milvus.grpc.GetV" - "ectorIDsParam\032\026.milvus.grpc.VectorIds\"\000\022" - "B\n\006Search\022\030.milvus.grpc.SearchParam\032\034.mi" - "lvus.grpc.TopKQueryResult\"\000\022J\n\nSearchByI" - "D\022\034.milvus.grpc.SearchByIDParam\032\034.milvus" - ".grpc.TopKQueryResult\"\000\022P\n\rSearchInFiles" - "\022\037.milvus.grpc.SearchInFilesParam\032\034.milv" - "us.grpc.TopKQueryResult\"\000\0227\n\003Cmd\022\024.milvu" - "s.grpc.Command\032\030.milvus.grpc.StringReply" - "\"\000\022A\n\nDeleteByID\022\034.milvus.grpc.DeleteByI" - "DParam\032\023.milvus.grpc.Status\"\000\022G\n\021Preload" - "Collection\022\033.milvus.grpc.CollectionName\032" - "\023.milvus.grpc.Status\"\000\022I\n\016ReloadSegments" - "\022 .milvus.grpc.ReLoadSegmentsParam\032\023.mil" - "vus.grpc.Status\"\000\0227\n\005Flush\022\027.milvus.grpc" - ".FlushParam\032\023.milvus.grpc.Status\"\000\022=\n\007Co" - "mpact\022\033.milvus.grpc.CollectionName\032\023.mil" - "vus.grpc.Status\"\000\022E\n\026CreateHybridCollect" - "ion\022\024.milvus.grpc.Mapping\032\023.milvus.grpc." - "Status\"\000\022L\n\023HasHybridCollection\022\033.milvus" - ".grpc.CollectionName\032\026.milvus.grpc.BoolR" - "eply\"\000\022J\n\024DropHybridCollection\022\033.milvus." + "ms\030\004 \003(\0132\031.milvus.grpc.KeyValuePair*\236\001\n\010" + "DataType\022\010\n\004NONE\020\000\022\010\n\004BOOL\020\001\022\010\n\004INT8\020\002\022\t" + "\n\005INT16\020\003\022\t\n\005INT32\020\004\022\t\n\005INT64\020\005\022\t\n\005FLOAT" + "\020\n\022\n\n\006DOUBLE\020\013\022\n\n\006STRING\020\024\022\021\n\rVECTOR_BIN" + "ARY\020d\022\020\n\014VECTOR_FLOAT\020e\022\013\n\006VECTOR\020\310\001*C\n\017" + "CompareOperator\022\006\n\002LT\020\000\022\007\n\003LTE\020\001\022\006\n\002EQ\020\002" + "\022\006\n\002GT\020\003\022\007\n\003GTE\020\004\022\006\n\002NE\020\005*8\n\005Occur\022\013\n\007IN" + "VALID\020\000\022\010\n\004MUST\020\001\022\n\n\006SHOULD\020\002\022\014\n\010MUST_NO" + "T\020\0032\256\030\n\rMilvusService\022H\n\020CreateCollectio" + "n\022\035.milvus.grpc.CollectionSchema\032\023.milvu" + "s.grpc.Status\"\000\022F\n\rHasCollection\022\033.milvu" + "s.grpc.CollectionName\032\026.milvus.grpc.Bool" + "Reply\"\000\022R\n\022DescribeCollection\022\033.milvus.g" + "rpc.CollectionName\032\035.milvus.grpc.Collect" + "ionSchema\"\000\022Q\n\017CountCollection\022\033.milvus." + "grpc.CollectionName\032\037.milvus.grpc.Collec" + "tionRowCount\"\000\022J\n\017ShowCollections\022\024.milv" + "us.grpc.Command\032\037.milvus.grpc.Collection" + "NameList\"\000\022P\n\022ShowCollectionInfo\022\033.milvu" + "s.grpc.CollectionName\032\033.milvus.grpc.Coll" + "ectionInfo\"\000\022D\n\016DropCollection\022\033.milvus." "grpc.CollectionName\032\023.milvus.grpc.Status" - "\"\000\022O\n\030DescribeHybridCollection\022\033.milvus." - "grpc.CollectionName\032\024.milvus.grpc.Mappin" - "g\"\000\022W\n\025CountHybridCollection\022\033.milvus.gr" - "pc.CollectionName\032\037.milvus.grpc.Collecti" - "onRowCount\"\000\022I\n\025ShowHybridCollections\022\024." - "milvus.grpc.Command\032\030.milvus.grpc.Mappin" - "gList\"\000\022V\n\030ShowHybridCollectionInfo\022\033.mi" - "lvus.grpc.CollectionName\032\033.milvus.grpc.C" - "ollectionInfo\"\000\022M\n\027PreloadHybridCollecti" - "on\022\033.milvus.grpc.CollectionName\032\023.milvus" - ".grpc.Status\"\000\022D\n\021CreateHybridIndex\022\030.mi" - "lvus.grpc.HIndexParam\032\023.milvus.grpc.Stat" - "us\"\000\022D\n\014InsertEntity\022\031.milvus.grpc.HInse" - "rtParam\032\027.milvus.grpc.HEntityIDs\"\000\022J\n\016Hy" - "bridSearchPB\022\033.milvus.grpc.HSearchParamP" - "B\032\031.milvus.grpc.HQueryResult\"\000\022F\n\014Hybrid" - "Search\022\031.milvus.grpc.HSearchParam\032\031.milv" - "us.grpc.HQueryResult\"\000\022]\n\026HybridSearchIn" - "Segments\022#.milvus.grpc.HSearchInSegments" - "Param\032\034.milvus.grpc.TopKQueryResult\"\000\022E\n" - "\rGetEntityByID\022\034.milvus.grpc.VectorsIden" - "tity\032\024.milvus.grpc.HEntity\"\000\022J\n\014GetEntit" - "yIDs\022\037.milvus.grpc.HGetEntityIDsParam\032\027." - "milvus.grpc.HEntityIDs\"\000\022J\n\022DeleteEntiti" - "esByID\022\035.milvus.grpc.HDeleteByIDParam\032\023." - "milvus.grpc.Status\"\000b\006proto3" + "\"\000\022=\n\013CreateIndex\022\027.milvus.grpc.IndexPar" + "am\032\023.milvus.grpc.Status\"\000\022G\n\rDescribeInd" + "ex\022\033.milvus.grpc.CollectionName\032\027.milvus" + ".grpc.IndexParam\"\000\022\?\n\tDropIndex\022\033.milvus" + ".grpc.CollectionName\032\023.milvus.grpc.Statu" + "s\"\000\022E\n\017CreatePartition\022\033.milvus.grpc.Par" + "titionParam\032\023.milvus.grpc.Status\"\000\022E\n\014Ha" + "sPartition\022\033.milvus.grpc.PartitionParam\032" + "\026.milvus.grpc.BoolReply\"\000\022K\n\016ShowPartiti" + "ons\022\033.milvus.grpc.CollectionName\032\032.milvu" + "s.grpc.PartitionList\"\000\022C\n\rDropPartition\022" + "\033.milvus.grpc.PartitionParam\032\023.milvus.gr" + "pc.Status\"\000\022<\n\006Insert\022\030.milvus.grpc.Inse" + "rtParam\032\026.milvus.grpc.VectorIds\"\000\022J\n\016Get" + "VectorsByID\022\034.milvus.grpc.VectorsIdentit" + "y\032\030.milvus.grpc.VectorsData\"\000\022H\n\014GetVect" + "orIDs\022\036.milvus.grpc.GetVectorIDsParam\032\026." + "milvus.grpc.VectorIds\"\000\022B\n\006Search\022\030.milv" + "us.grpc.SearchParam\032\034.milvus.grpc.TopKQu" + "eryResult\"\000\022J\n\nSearchByID\022\034.milvus.grpc." + "SearchByIDParam\032\034.milvus.grpc.TopKQueryR" + "esult\"\000\022P\n\rSearchInFiles\022\037.milvus.grpc.S" + "earchInFilesParam\032\034.milvus.grpc.TopKQuer" + "yResult\"\000\0227\n\003Cmd\022\024.milvus.grpc.Command\032\030" + ".milvus.grpc.StringReply\"\000\022A\n\nDeleteByID" + "\022\034.milvus.grpc.DeleteByIDParam\032\023.milvus." + "grpc.Status\"\000\022G\n\021PreloadCollection\022\033.mil" + "vus.grpc.CollectionName\032\023.milvus.grpc.St" + "atus\"\000\022I\n\016ReloadSegments\022 .milvus.grpc.R" + "eLoadSegmentsParam\032\023.milvus.grpc.Status\"" + "\000\0227\n\005Flush\022\027.milvus.grpc.FlushParam\032\023.mi" + "lvus.grpc.Status\"\000\022=\n\007Compact\022\033.milvus.g" + "rpc.CollectionName\032\023.milvus.grpc.Status\"" + "\000\022E\n\026CreateHybridCollection\022\024.milvus.grp" + "c.Mapping\032\023.milvus.grpc.Status\"\000\022L\n\023HasH" + "ybridCollection\022\033.milvus.grpc.Collection" + "Name\032\026.milvus.grpc.BoolReply\"\000\022J\n\024DropHy" + "bridCollection\022\033.milvus.grpc.CollectionN" + "ame\032\023.milvus.grpc.Status\"\000\022O\n\030DescribeHy" + "bridCollection\022\033.milvus.grpc.CollectionN" + "ame\032\024.milvus.grpc.Mapping\"\000\022W\n\025CountHybr" + "idCollection\022\033.milvus.grpc.CollectionNam" + "e\032\037.milvus.grpc.CollectionRowCount\"\000\022I\n\025" + "ShowHybridCollections\022\024.milvus.grpc.Comm" + "and\032\030.milvus.grpc.MappingList\"\000\022V\n\030ShowH" + "ybridCollectionInfo\022\033.milvus.grpc.Collec" + "tionName\032\033.milvus.grpc.CollectionInfo\"\000\022" + "M\n\027PreloadHybridCollection\022\033.milvus.grpc" + ".CollectionName\032\023.milvus.grpc.Status\"\000\022D" + "\n\021CreateHybridIndex\022\030.milvus.grpc.HIndex" + "Param\032\023.milvus.grpc.Status\"\000\022D\n\014InsertEn" + "tity\022\031.milvus.grpc.HInsertParam\032\027.milvus" + ".grpc.HEntityIDs\"\000\022J\n\016HybridSearchPB\022\033.m" + "ilvus.grpc.HSearchParamPB\032\031.milvus.grpc." + "HQueryResult\"\000\022F\n\014HybridSearch\022\031.milvus." + "grpc.HSearchParam\032\031.milvus.grpc.HQueryRe" + "sult\"\000\022]\n\026HybridSearchInSegments\022#.milvu" + "s.grpc.HSearchInSegmentsParam\032\034.milvus.g" + "rpc.TopKQueryResult\"\000\022E\n\rGetEntityByID\022\034" + ".milvus.grpc.VectorsIdentity\032\024.milvus.gr" + "pc.HEntity\"\000\022J\n\014GetEntityIDs\022\037.milvus.gr" + "pc.HGetEntityIDsParam\032\027.milvus.grpc.HEnt" + "ityIDs\"\000\022J\n\022DeleteEntitiesByID\022\035.milvus." + "grpc.HDeleteByIDParam\032\023.milvus.grpc.Stat" + "us\"\000b\006proto3" ; static const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable*const descriptor_table_milvus_2eproto_deps[1] = { &::descriptor_table_status_2eproto, @@ -1818,7 +1819,7 @@ static ::PROTOBUF_NAMESPACE_ID::internal::SCCInfoBase*const descriptor_table_mil static ::PROTOBUF_NAMESPACE_ID::internal::once_flag descriptor_table_milvus_2eproto_once; static bool descriptor_table_milvus_2eproto_initialized = false; const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable descriptor_table_milvus_2eproto = { - &descriptor_table_milvus_2eproto_initialized, descriptor_table_protodef_milvus_2eproto, "milvus.proto", 8908, + &descriptor_table_milvus_2eproto_initialized, descriptor_table_protodef_milvus_2eproto, "milvus.proto", 8932, &descriptor_table_milvus_2eproto_once, descriptor_table_milvus_2eproto_sccs, descriptor_table_milvus_2eproto_deps, 50, 1, schemas, file_default_instances, TableStruct_milvus_2eproto::offsets, file_level_metadata_milvus_2eproto, 51, file_level_enum_descriptors_milvus_2eproto, file_level_service_descriptors_milvus_2eproto, @@ -1839,12 +1840,13 @@ bool DataType_IsValid(int value) { case 2: case 3: case 4: + case 5: + case 10: + case 11: case 20: - case 30: - case 40: - case 41: case 100: - case 9999: + case 101: + case 200: return true; default: return false; diff --git a/core/src/grpc/gen-milvus/milvus.pb.h b/core/src/grpc/gen-milvus/milvus.pb.h index 4ae825adb299..6d072f0ba142 100644 --- a/core/src/grpc/gen-milvus/milvus.pb.h +++ b/core/src/grpc/gen-milvus/milvus.pb.h @@ -270,23 +270,24 @@ namespace milvus { namespace grpc { enum DataType : int { - NULL_ = 0, - INT8 = 1, - INT16 = 2, - INT32 = 3, - INT64 = 4, + NONE = 0, + BOOL = 1, + INT8 = 2, + INT16 = 3, + INT32 = 4, + INT64 = 5, + FLOAT = 10, + DOUBLE = 11, STRING = 20, - BOOL = 30, - FLOAT = 40, - DOUBLE = 41, - VECTOR = 100, - UNKNOWN = 9999, + VECTOR_BINARY = 100, + VECTOR_FLOAT = 101, + VECTOR = 200, DataType_INT_MIN_SENTINEL_DO_NOT_USE_ = std::numeric_limits<::PROTOBUF_NAMESPACE_ID::int32>::min(), DataType_INT_MAX_SENTINEL_DO_NOT_USE_ = std::numeric_limits<::PROTOBUF_NAMESPACE_ID::int32>::max() }; bool DataType_IsValid(int value); -constexpr DataType DataType_MIN = NULL_; -constexpr DataType DataType_MAX = UNKNOWN; +constexpr DataType DataType_MIN = NONE; +constexpr DataType DataType_MAX = VECTOR; constexpr int DataType_ARRAYSIZE = DataType_MAX + 1; const ::PROTOBUF_NAMESPACE_ID::EnumDescriptor* DataType_descriptor(); diff --git a/core/src/grpc/milvus.proto b/core/src/grpc/milvus.proto index 330ab454f0a1..f8c6e894c08a 100644 --- a/core/src/grpc/milvus.proto +++ b/core/src/grpc/milvus.proto @@ -224,21 +224,21 @@ message GetVectorIDsParam { /********************************************************************************************************************/ enum DataType { - NULL = 0; - INT8 = 1; - INT16 = 2; - INT32 = 3; - INT64 = 4; + NONE = 0; + BOOL = 1; + INT8 = 2; + INT16 = 3; + INT32 = 4; + INT64 = 5; - STRING = 20; - - BOOL = 30; + FLOAT = 10; + DOUBLE = 11; - FLOAT = 40; - DOUBLE = 41; + STRING = 20; - VECTOR = 100; - UNKNOWN = 9999; + VECTOR_BINARY = 100; + VECTOR_FLOAT = 101; + VECTOR = 200; } /////////////////////////////////////////////////////////////////// From db3269274a3b43dc6558b5bfc81b54f1d1a2d575 Mon Sep 17 00:00:00 2001 From: "yudong.cai" Date: Tue, 7 Jul 2020 00:44:15 +0800 Subject: [PATCH 11/11] update ParamField Signed-off-by: yudong.cai --- core/src/db/SnapshotHandlers.cpp | 2 +- core/src/db/snapshot/Resources.cpp | 8 ++++---- core/src/db/snapshot/Resources.h | 20 +++++++------------- core/unittest/ssdb/test_snapshot.cpp | 7 +++---- 4 files changed, 15 insertions(+), 22 deletions(-) diff --git a/core/src/db/SnapshotHandlers.cpp b/core/src/db/SnapshotHandlers.cpp index 592cd0f97e83..396ba9e37f24 100644 --- a/core/src/db/SnapshotHandlers.cpp +++ b/core/src/db/SnapshotHandlers.cpp @@ -153,7 +153,7 @@ GetEntityByIdSegmentHandler::Handle(const snapshot::SegmentPtr& segment) { std::unordered_map> raw_attrs; for (auto& field_name : field_names_) { auto field_ptr = ss_->GetField(field_name); - auto field_params = field_ptr->GetParamsJson(); + auto field_params = field_ptr->GetParams(); auto dim = field_params[knowhere::meta::DIM].get(); auto field_type = field_ptr->GetFtype(); diff --git a/core/src/db/snapshot/Resources.cpp b/core/src/db/snapshot/Resources.cpp index 0a24822d421e..5867b63b0502 100644 --- a/core/src/db/snapshot/Resources.cpp +++ b/core/src/db/snapshot/Resources.cpp @@ -16,7 +16,7 @@ namespace milvus::engine::snapshot { -Collection::Collection(const std::string& name, const std::string& params, ID_TYPE id, LSN_TYPE lsn, State state, +Collection::Collection(const std::string& name, const json& params, ID_TYPE id, LSN_TYPE lsn, State state, TS_TYPE created_on, TS_TYPE updated_on) : NameField(name), ParamsField(params), @@ -159,8 +159,8 @@ SchemaCommit::SchemaCommit(ID_TYPE collection_id, const MappingT& mappings, ID_T UpdatedOnField(updated_on) { } -Field::Field(const std::string& name, NUM_TYPE num, FTYPE_TYPE ftype, const std::string& params, ID_TYPE id, - LSN_TYPE lsn, State state, TS_TYPE created_on, TS_TYPE updated_on) +Field::Field(const std::string& name, NUM_TYPE num, FTYPE_TYPE ftype, const json& params, ID_TYPE id, LSN_TYPE lsn, + State state, TS_TYPE created_on, TS_TYPE updated_on) : NameField(name), NumField(num), FtypeField(ftype), @@ -185,7 +185,7 @@ FieldCommit::FieldCommit(ID_TYPE collection_id, ID_TYPE field_id, const MappingT } FieldElement::FieldElement(ID_TYPE collection_id, ID_TYPE field_id, const std::string& name, FTYPE_TYPE ftype, - const std::string& params, ID_TYPE id, LSN_TYPE lsn, State state, TS_TYPE created_on, + const json& params, ID_TYPE id, LSN_TYPE lsn, State state, TS_TYPE created_on, TS_TYPE updated_on) : CollectionIdField(collection_id), FieldIdField(field_id), diff --git a/core/src/db/snapshot/Resources.h b/core/src/db/snapshot/Resources.h index 1b182e53f248..9ef67a42e0ff 100644 --- a/core/src/db/snapshot/Resources.h +++ b/core/src/db/snapshot/Resources.h @@ -29,7 +29,7 @@ using milvus::engine::utils::GetMicroSecTimeStamp; namespace milvus::engine::snapshot { -static constexpr const char* JEmpty = "{}"; +static const json JEmpty = {}; class MappingsField { public: @@ -296,22 +296,16 @@ class NameField { class ParamsField { public: - explicit ParamsField(std::string params) : params_(std::move(params)), params_json_(json::parse(params_)) { + explicit ParamsField(const json& params) : params_(params) { } - const std::string& + const json& GetParams() const { return params_; } - const json& - GetParamsJson() const { - return params_json_; - } - protected: - std::string params_; - json params_json_; + json params_; }; class SizeField { @@ -364,7 +358,7 @@ class Collection : public BaseResource, using VecT = std::vector; static constexpr const char* Name = "Collection"; - explicit Collection(const std::string& name, const std::string& params = JEmpty, ID_TYPE id = 0, LSN_TYPE lsn = 0, + explicit Collection(const std::string& name, const json& params = JEmpty, ID_TYPE id = 0, LSN_TYPE lsn = 0, State status = PENDING, TS_TYPE created_on = GetMicroSecTimeStamp(), TS_TYPE UpdatedOnField = GetMicroSecTimeStamp()); }; @@ -574,7 +568,7 @@ class Field : public BaseResource, using VecT = std::vector; static constexpr const char* Name = "Field"; - Field(const std::string& name, NUM_TYPE num, FTYPE_TYPE ftype, const std::string& params = JEmpty, ID_TYPE id = 0, + Field(const std::string& name, NUM_TYPE num, FTYPE_TYPE ftype, const json& params = JEmpty, ID_TYPE id = 0, LSN_TYPE lsn = 0, State status = PENDING, TS_TYPE created_on = GetMicroSecTimeStamp(), TS_TYPE UpdatedOnField = GetMicroSecTimeStamp()); }; @@ -624,7 +618,7 @@ class FieldElement : public BaseResource, using VecT = std::vector; static constexpr const char* Name = "FieldElement"; FieldElement(ID_TYPE collection_id, ID_TYPE field_id, const std::string& name, FTYPE_TYPE ftype, - const std::string& params = JEmpty, ID_TYPE id = 0, LSN_TYPE lsn = 0, State status = PENDING, + const json& params = JEmpty, ID_TYPE id = 0, LSN_TYPE lsn = 0, State status = PENDING, TS_TYPE created_on = GetMicroSecTimeStamp(), TS_TYPE UpdatedOnField = GetMicroSecTimeStamp()); }; diff --git a/core/unittest/ssdb/test_snapshot.cpp b/core/unittest/ssdb/test_snapshot.cpp index 82f16230561e..f549033978d5 100644 --- a/core/unittest/ssdb/test_snapshot.cpp +++ b/core/unittest/ssdb/test_snapshot.cpp @@ -22,11 +22,10 @@ TEST_F(SnapshotTest, ResourcesTest) { int nprobe = 16; milvus::json params = {{"nprobe", nprobe}}; - ParamsField p_field(params.dump()); - ASSERT_EQ(params.dump(), p_field.GetParams()); - ASSERT_EQ(params, p_field.GetParamsJson()); + ParamsField p_field(params); + ASSERT_EQ(params, p_field.GetParams()); - auto nprobe_real = p_field.GetParamsJson().at("nprobe").get(); + auto nprobe_real = p_field.GetParams().at("nprobe").get(); ASSERT_EQ(nprobe, nprobe_real); }