Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rewrite mem manager 1 #2762

Merged
merged 10 commits into from
Jul 9, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 182 additions & 1 deletion core/src/db/SSDBImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "db/snapshot/CompoundOperations.h"
#include "db/snapshot/ResourceTypes.h"
#include "db/snapshot/Snapshots.h"
#include "insert/MemManagerFactory.h"
#include "metrics/Metrics.h"
#include "metrics/SystemInfo.h"
#include "utils/Exception.h"
Expand Down Expand Up @@ -41,6 +42,8 @@ static const Status SHUTDOWN_ERROR = Status(DB_ERROR, "Milvus server is shutdown

SSDBImpl::SSDBImpl(const DBOptions& options)
: options_(options), initialized_(false), merge_thread_pool_(1, 1), index_thread_pool_(1, 1) {
mem_mgr_ = MemManagerFactory::SSBuild(options_);

if (options_.wal_enable_) {
wal::MXLogConfiguration mxlog_config;
mxlog_config.recovery_error_ignore = options_.recovery_error_ignore_;
Expand Down Expand Up @@ -204,6 +207,8 @@ SSDBImpl::DropCollection(const std::string& name) {
/* wal_mgr_->DropCollection(ss->GetCollectionId()); */
}

auto status = mem_mgr_->EraseMemVector(ss->GetCollectionId()); // not allow insert

return snapshots.DropCollection(ss->GetCollectionId(), std::numeric_limits<snapshot::LSN_TYPE>::max());
}

Expand Down Expand Up @@ -597,7 +602,183 @@ SSDBImpl::WaitBuildIndexFinish() {

Status
SSDBImpl::ExecWalRecord(const wal::MXLogRecord& record) {
// TODO:
// auto collections_flushed = [&](const std::string collection_id,
// const std::set<std::string>& target_collection_names) -> uint64_t {
// uint64_t max_lsn = 0;
// if (options_.wal_enable_ && !target_collection_names.empty()) {
// uint64_t lsn = 0;
// for (auto& collection_name : target_collection_names) {
// snapshot::ScopedSnapshotT ss;
// snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name);
// lsn = ss->GetMaxLsn();
// if (lsn > max_lsn) {
// max_lsn = lsn;
// }
// }
// wal_mgr_->CollectionFlushed(collection_id, lsn);
// }
//
// std::set<std::string> merge_collection_ids;
// for (auto& collection : target_collection_names) {
// merge_collection_ids.insert(collection);
// }
// StartMergeTask(merge_collection_ids);
// return max_lsn;
// };
//
// auto force_flush_if_mem_full = [&]() -> uint64_t {
// if (mem_mgr_->GetCurrentMem() > options_.insert_buffer_size_) {
// LOG_ENGINE_DEBUG_ << LogOut("[%s][%ld] ", "insert", 0) << "Insert buffer size exceeds limit. Force
// flush"; InternalFlush();
// }
// };
//
// Status status;
//
// switch (record.type) {
// case wal::MXLogType::Entity: {
// snapshot::ScopedSnapshotT ss;
// status = snapshot::Snapshots::GetInstance().GetSnapshot(ss, record.collection_id);
// if (!status.ok()) {
// LOG_WAL_ERROR_ << LogOut("[%s][%ld] ", "insert", 0) << "Get partition fail: " << status.message();
// return status;
// }
// snapshot::PartitionPtr part = ss->GetPartition(record.partition_tag);
// if (part == nullptr) {
// LOG_WAL_ERROR_ << LogOut("[%s][%ld] ", "insert", 0) << "Get partition fail: " << status.message();
// return status;
// }
//
// status = mem_mgr_->InsertEntities(
// target_collection_name, record.length, record.ids, (record.data_size / record.length /
// sizeof(float)), (const float*)record.data, record.attr_nbytes, record.attr_data_size,
// record.attr_data, record.lsn);
// force_flush_if_mem_full();
//
// // metrics
// milvus::server::CollectInsertMetrics metrics(record.length, status);
// break;
// }
// case wal::MXLogType::InsertBinary: {
// std::string target_collection_name;
// status = GetPartitionByTag(record.collection_id, record.partition_tag, target_collection_name);
// if (!status.ok()) {
// LOG_WAL_ERROR_ << LogOut("[%s][%ld] ", "insert", 0) << "Get partition fail: " << status.message();
// return status;
// }
//
// status = mem_mgr_->InsertVectors(target_collection_name, record.length, record.ids,
// (record.data_size / record.length / sizeof(uint8_t)),
// (const u_int8_t*)record.data, record.lsn);
// force_flush_if_mem_full();
//
// // metrics
// milvus::server::CollectInsertMetrics metrics(record.length, status);
// break;
// }
//
// case wal::MXLogType::InsertVector: {
// std::string target_collection_name;
// status = GetPartitionByTag(record.collection_id, record.partition_tag, target_collection_name);
// if (!status.ok()) {
// LOG_WAL_ERROR_ << LogOut("[%s][%ld] ", "insert", 0) << "Get partition fail: " << status.message();
// return status;
// }
//
// status = mem_mgr_->InsertVectors(target_collection_name, record.length, record.ids,
// (record.data_size / record.length / sizeof(float)),
// (const float*)record.data, record.lsn);
// force_flush_if_mem_full();
//
// // metrics
// milvus::server::CollectInsertMetrics metrics(record.length, status);
// break;
// }
//
// case wal::MXLogType::Delete: {
// std::vector<meta::CollectionSchema> partition_array;
// status = meta_ptr_->ShowPartitions(record.collection_id, partition_array);
// if (!status.ok()) {
// return status;
// }
//
// std::vector<std::string> collection_ids{record.collection_id};
// for (auto& partition : partition_array) {
// auto& partition_collection_id = partition.collection_id_;
// collection_ids.emplace_back(partition_collection_id);
// }
//
// if (record.length == 1) {
// for (auto& collection_id : collection_ids) {
// status = mem_mgr_->DeleteVector(collection_id, *record.ids, record.lsn);
// if (!status.ok()) {
// return status;
// }
// }
// } else {
// for (auto& collection_id : collection_ids) {
// status = mem_mgr_->DeleteVectors(collection_id, record.length, record.ids, record.lsn);
// if (!status.ok()) {
// return status;
// }
// }
// }
// break;
// }
//
// case wal::MXLogType::Flush: {
// if (!record.collection_id.empty()) {
// // flush one collection
// std::vector<meta::CollectionSchema> partition_array;
// status = meta_ptr_->ShowPartitions(record.collection_id, partition_array);
// if (!status.ok()) {
// return status;
// }
//
// std::vector<std::string> collection_ids{record.collection_id};
// for (auto& partition : partition_array) {
// auto& partition_collection_id = partition.collection_id_;
// collection_ids.emplace_back(partition_collection_id);
// }
//
// std::set<std::string> flushed_collections;
// for (auto& collection_id : collection_ids) {
// const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);
// status = mem_mgr_->Flush(collection_id);
// if (!status.ok()) {
// break;
// }
// flushed_collections.insert(collection_id);
//
// status = FlushAttrsIndex(collection_id);
// if (!status.ok()) {
// return status;
// }
// }
//
// collections_flushed(record.collection_id, flushed_collections);
//
// } else {
// // flush all collections
// std::set<std::string> collection_ids;
// {
// const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);
// status = mem_mgr_->Flush(collection_ids);
// }
//
// uint64_t lsn = collections_flushed("", collection_ids);
// if (options_.wal_enable_) {
// wal_mgr_->RemoveOldFiles(lsn);
// }
// }
// break;
// }
//
// default:
// break;
// }
//
// return status;
return Status::OK();
}

Expand Down
4 changes: 2 additions & 2 deletions core/src/db/SSDBImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
#include "db/Options.h"
#include "db/SimpleWaitNotify.h"
#include "db/SnapshotHandlers.h"
#include "db/insert/MemManager.h"
#include "db/insert/SSMemManager.h"
#include "db/snapshot/Context.h"
#include "db/snapshot/ResourceTypes.h"
#include "db/snapshot/Resources.h"
Expand Down Expand Up @@ -131,7 +131,7 @@ class SSDBImpl {
DBOptions options_;
std::atomic<bool> initialized_;

MemManagerPtr mem_mgr_;
SSMemManagerPtr mem_mgr_;

std::shared_ptr<wal::WalManager> wal_mgr_;
std::thread bg_wal_thread_;
Expand Down
6 changes: 6 additions & 0 deletions core/src/db/insert/MemManagerFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#include "db/insert/MemManagerFactory.h"
#include "MemManagerImpl.h"
#include "SSMemManagerImpl.h"
#include "utils/Exception.h"
#include "utils/Log.h"

Expand All @@ -30,5 +31,10 @@ MemManagerFactory::Build(const std::shared_ptr<meta::Meta>& meta, const DBOption
return std::make_shared<MemManagerImpl>(meta, options);
}

SSMemManagerPtr
MemManagerFactory::SSBuild(const DBOptions& options) {
return std::make_shared<SSMemManagerImpl>(options);
}

} // namespace engine
} // namespace milvus
4 changes: 4 additions & 0 deletions core/src/db/insert/MemManagerFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#pragma once

#include "MemManager.h"
#include "SSMemManager.h"
#include "db/meta/Meta.h"

#include <memory>
Expand All @@ -23,6 +24,9 @@ class MemManagerFactory {
public:
static MemManagerPtr
Build(const std::shared_ptr<meta::Meta>& meta, const DBOptions& options);

static SSMemManagerPtr
SSBuild(const DBOptions& options);
};

} // namespace engine
Expand Down
76 changes: 76 additions & 0 deletions core/src/db/insert/SSMemManager.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

#pragma once

#include <memory>
#include <set>
#include <string>
#include <unordered_map>
#include <vector>

#include "db/Types.h"
#include "utils/Status.h"

namespace milvus {
namespace engine {

class SSMemManager {
public:
virtual Status
InsertVectors(int64_t collection_id, int64_t partition_id, int64_t length, const IDNumber* vector_ids, int64_t dim,
const float* vectors, uint64_t lsn) = 0;

virtual Status
InsertVectors(int64_t collection_id, int64_t partition_id, int64_t length, const IDNumber* vector_ids, int64_t dim,
const uint8_t* vectors, uint64_t lsn) = 0;

virtual Status
InsertEntities(int64_t collection_id, int64_t partition_id, int64_t length, const IDNumber* vector_ids, int64_t dim,
const float* vectors, const std::unordered_map<std::string, uint64_t>& attr_nbytes,
const std::unordered_map<std::string, uint64_t>& attr_size,
const std::unordered_map<std::string, std::vector<uint8_t>>& attr_data, uint64_t lsn) = 0;

virtual Status
DeleteVector(int64_t collection_id, IDNumber vector_id, uint64_t lsn) = 0;

virtual Status
DeleteVectors(int64_t collection_id, int64_t length, const IDNumber* vector_ids, uint64_t lsn) = 0;

virtual Status
Flush(int64_t collection_id) = 0;

virtual Status
Flush(std::set<int64_t>& collection_ids) = 0;

// virtual Status
// Serialize(std::set<std::string>& table_ids) = 0;

virtual Status
EraseMemVector(int64_t collection_id) = 0;

virtual Status
EraseMemVector(int64_t collection_id, int64_t partition_id) = 0;

virtual size_t
GetCurrentMutableMem() = 0;

virtual size_t
GetCurrentImmutableMem() = 0;

virtual size_t
GetCurrentMem() = 0;
}; // MemManagerAbstract

using SSMemManagerPtr = std::shared_ptr<SSMemManager>;

} // namespace engine
} // namespace milvus
Loading