Skip to content

Commit

Permalink
(db/snapshot): LSN logic change and design EventHandlerFactory (#2815)
Browse files Browse the repository at this point in the history
* (db/snapshot): add more visitors

Signed-off-by: peng.xu <peng.xu@zilliz.com>

* (db/snapshot): fix lint error

Signed-off-by: peng.xu <peng.xu@zilliz.com>

* (db/snapshot): add some Iterators

Signed-off-by: peng.xu <peng.xu@zilliz.com>

* (db/snapshot): update visitors

Signed-off-by: peng.xu <peng.xu@zilliz.com>

* (db/snapshot): add event handler factory

Signed-off-by: peng.xu <peng.xu@zilliz.com>

* (db/snapshot): update

Signed-off-by: peng.xu <peng.xu@zilliz.com>

* (db/snapshot): process lsn

Signed-off-by: peng.xu <peng.xu@zilliz.com>

* (db/snapshot): fix lint error

Signed-off-by: peng.xu <peng.xu@zilliz.com>

* (db/snapshot): small change

Signed-off-by: peng.xu <peng.xu@zilliz.com>

* (db/snapshot): precheck duplicate collection in create collection operation

Signed-off-by: peng.xu <peng.xu@zilliz.com>
  • Loading branch information
XuPeng-SH committed Jul 12, 2020
1 parent 141835a commit 6c5f42f
Show file tree
Hide file tree
Showing 8 changed files with 193 additions and 41 deletions.
4 changes: 0 additions & 4 deletions core/src/db/SSDBImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,6 @@ SSDBImpl::CreatePartition(const std::string& collection_name, const std::string&
if (options_.wal_enable_) {
// SS TODO
/* lsn = wal_mgr_->CreatePartition(collection_id, partition_tag); */
} else {
lsn = ss->GetCollection()->GetLsn();
}

snapshot::OperationContext context;
Expand Down Expand Up @@ -316,8 +314,6 @@ SSDBImpl::DropIndex(const std::string& collection_name, const std::string& field
// SS TODO: Check Index Type

snapshot::OperationContext context;
// SS TODO: no lsn for drop index
context.lsn = ss->GetCollectionCommit()->GetLsn();
STATUS_CHECK(ss->GetFieldElement(field_name, field_element_name, context.stale_field_element));
auto op = std::make_shared<snapshot::DropAllIndexOperation>(context, ss);
STATUS_CHECK(op->Push());
Expand Down
8 changes: 8 additions & 0 deletions core/src/db/snapshot/CompoundOperations.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,14 @@ Status
CreateCollectionOperation::DoExecute(Store& store) {
// TODO: Do some checks
CollectionPtr collection;
ScopedSnapshotT ss;
Snapshots::GetInstance().GetSnapshot(ss, c_context_.collection->GetName());
if (ss) {
std::stringstream emsg;
emsg << GetRepr() << ". Duplicated collection " << c_context_.collection->GetName();
return Status(SS_DUPLICATED_ERROR, emsg.str());
}

auto status = store.CreateCollection(Collection(c_context_.collection->GetName()), collection);
if (!status.ok()) {
std::cerr << status.ToString() << std::endl;
Expand Down
8 changes: 5 additions & 3 deletions core/src/db/snapshot/CompoundOperations.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,11 @@ class CompoundBaseOperation : public Operations {
Status
PreCheck() override {
// TODO
/* if (GetContextLsn() <= GetStartedSS()->GetMaxLsn()) { */
/* return Status(SS_INVALID_CONTEX_ERROR, "Invalid LSN found in operation"); */
/* } */
if (GetContextLsn() == 0) {
SetContextLsn(GetStartedSS()->GetMaxLsn());
} else if (GetContextLsn() <= GetStartedSS()->GetMaxLsn()) {
return Status(SS_INVALID_CONTEX_ERROR, "Invalid LSN found in operation");
}
return Status::OK();
}

Expand Down
6 changes: 3 additions & 3 deletions core/src/db/snapshot/Event.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ class ResourceGCEvent : public Event {

/* TODO: physically clean resource */
std::string res_path = GetResPath<ResourceT>(res_);
if (!boost::filesystem::exists(res_path)) {
return Status::OK();
}
/* if (!boost::filesystem::exists(res_path)) { */
/* return Status::OK(); */
/* } */
if (boost::filesystem::is_directory(res_path)) {
boost::filesystem::remove_all(res_path);
} else {
Expand Down
101 changes: 101 additions & 0 deletions core/src/db/snapshot/HandlerFactory.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

#pragma once

#include <map>
#include <memory>
#include <string>

#include "db/snapshot/Event.h"

namespace milvus {
namespace engine {
namespace snapshot {

class IEventHandler {
public:
using Ptr = std::shared_ptr<IEventHandler>;
static constexpr const char* EventName = "";
virtual const char*
GetEventName() const {
return EventName;
}
};

class IEventHandlerRegistrar {
public:
using Ptr = std::shared_ptr<IEventHandlerRegistrar>;

virtual IEventHandler::Ptr
GetHandler() = 0;
};

template <typename T>
class HandlerFactory {
public:
using ThisT = HandlerFactory<T>;

static ThisT&
GetInstance() {
static ThisT factory;
return factory;
}

IEventHandler::Ptr
GetHandler(const std::string& event_name) {
auto it = registry_.find(event_name);
if (it == registry_.end()) {
return nullptr;
}
return it->second->GetHandler();
}

void
Register(IEventHandlerRegistrar* registrar, const std::string& event_name) {
auto it = registry_.find(event_name);
if (it == registry_.end()) {
registry_[event_name] = registrar;
}
}

private:
std::map<std::string, IEventHandlerRegistrar*> registry_;
};

template <typename T, typename HandlerT>
class EventHandlerRegistrar : public IEventHandlerRegistrar {
public:
using FactoryT = HandlerFactory<T>;
using HandlerPtr = typename HandlerT::Ptr;
explicit EventHandlerRegistrar(const std::string& event_name) : event_name_(event_name) {
auto& factory = FactoryT::GetInstance();
factory.Register(this, event_name_);
}

HandlerPtr
GetHandler() {
return std::make_shared<HandlerT>();
}

protected:
std::string event_name_;
};

#define REGISTER_HANDLER(EXECUTOR, HANDLER) \
namespace { \
static milvus::engine::snapshot::EventHandlerRegistrar<EXECUTOR, HANDLER> EXECUTOR##HANDLER##_registrar( \
HANDLER ::EventName); \
}

} // namespace snapshot
} // namespace engine
} // namespace milvus
5 changes: 5 additions & 0 deletions core/src/db/snapshot/Operations.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ class Operations : public std::enable_shared_from_this<Operations> {
return context_.lsn;
}

void
SetContextLsn(LSN_TYPE lsn) {
context_.lsn = lsn;
}

virtual Status
CheckStale(const CheckStaleFunc& checker = nullptr) const;
virtual Status
Expand Down
4 changes: 0 additions & 4 deletions core/src/db/snapshot/Store.h
Original file line number Diff line number Diff line change
Expand Up @@ -197,10 +197,6 @@ class Store {
CreateCollection(Collection&& collection, CollectionPtr& return_v) {
std::unique_lock<std::shared_timed_mutex> lock(mutex_);
auto& resources = std::get<Collection::MapT>(resources_);
if (!collection.HasAssigned() && (name_ids_.find(collection.GetName()) != name_ids_.end()) &&
(resources[name_ids_[collection.GetName()]]->IsActive()) && !collection.IsDeactive()) {
return Status(SS_DUPLICATED_ERROR, "Duplicated");
}
auto c = std::make_shared<Collection>(collection);
auto& id = std::get<Index<Collection::MapT, MockResourcesT>::value>(ids_);
c->SetID(++id);
Expand Down
98 changes: 71 additions & 27 deletions core/unittest/ssdb/test_snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <algorithm>

#include "ssdb/utils.h"
#include "db/snapshot/HandlerFactory.h"

TEST_F(SnapshotTest, ResourcesTest) {
int nprobe = 16;
Expand Down Expand Up @@ -416,33 +417,32 @@ TEST_F(SnapshotTest, PartitionTest) {
}
}

// TODO: Open this test later
/* TEST_F(SnapshotTest, PartitionTest2) { */
/* std::string collection_name("c1"); */
/* LSN_TYPE lsn = 1; */
/* milvus::Status status; */

/* auto ss = CreateCollection(collection_name, ++lsn); */
/* ASSERT_TRUE(ss); */
/* ASSERT_EQ(lsn, ss->GetMaxLsn()); */

/* OperationContext context; */
/* context.lsn = lsn; */
/* auto cp_op = std::make_shared<CreatePartitionOperation>(context, ss); */
/* std::string partition_name("p1"); */
/* PartitionContext p_ctx; */
/* p_ctx.name = partition_name; */
/* PartitionPtr partition; */
/* status = cp_op->CommitNewPartition(p_ctx, partition); */
/* ASSERT_TRUE(status.ok()); */
/* ASSERT_TRUE(partition); */
/* ASSERT_EQ(partition->GetName(), partition_name); */
/* ASSERT_FALSE(partition->IsActive()); */
/* ASSERT_TRUE(partition->HasAssigned()); */

/* status = cp_op->Push(); */
/* ASSERT_FALSE(status.ok()); */
/* } */
TEST_F(SnapshotTest, PartitionTest2) {
std::string collection_name("c1");
LSN_TYPE lsn = 1;
milvus::Status status;

auto ss = CreateCollection(collection_name, ++lsn);
ASSERT_TRUE(ss);
ASSERT_EQ(lsn, ss->GetMaxLsn());

OperationContext context;
context.lsn = lsn;
auto cp_op = std::make_shared<CreatePartitionOperation>(context, ss);
std::string partition_name("p1");
PartitionContext p_ctx;
p_ctx.name = partition_name;
PartitionPtr partition;
status = cp_op->CommitNewPartition(p_ctx, partition);
ASSERT_TRUE(status.ok());
ASSERT_TRUE(partition);
ASSERT_EQ(partition->GetName(), partition_name);
ASSERT_FALSE(partition->IsActive());
ASSERT_TRUE(partition->HasAssigned());

status = cp_op->Push();
ASSERT_FALSE(status.ok());
}

TEST_F(SnapshotTest, IndexTest) {
LSN_TYPE lsn = 0;
Expand Down Expand Up @@ -1550,3 +1550,47 @@ TEST_F(SnapshotTest, CompoundTest2) {
ASSERT_EQ(final_segments, expect_segments);
// TODO: Check Total Segment Files Cnt
}

struct GCSchedule {
static constexpr const char* Name = "GCSchedule";
};

struct FlushSchedule {
static constexpr const char* Name = "FlushSchedule";
};

using IEventHandler = milvus::engine::snapshot::IEventHandler;
/* struct SampleHandler : public IEventHandler { */
/* static constexpr const char* EventName = "SampleHandler"; */
/* const char* */
/* GetEventName() const override { */
/* return EventName; */
/* } */
/* }; */

REGISTER_HANDLER(GCSchedule, IEventHandler);
/* REGISTER_HANDLER(GCSchedule, SampleHandler); */
REGISTER_HANDLER(FlushSchedule, IEventHandler);
/* REGISTER_HANDLER(FlushSchedule, SampleHandler); */

using GCScheduleFactory = milvus::engine::snapshot::HandlerFactory<GCSchedule>;
using FlushScheduleFactory = milvus::engine::snapshot::HandlerFactory<GCSchedule>;

TEST_F(SnapshotTest, RegistryTest) {
{
auto& factory = GCScheduleFactory::GetInstance();
auto ihandler = factory.GetHandler(IEventHandler::EventName);
ASSERT_TRUE(ihandler);
/* auto sihandler = factory.GetHandler(SampleHandler::EventName); */
/* ASSERT_TRUE(sihandler); */
/* ASSERT_EQ(SampleHandler::EventName, sihandler->GetEventName()); */
}
{
/* auto& factory = FlushScheduleFactory::GetInstance(); */
/* auto ihandler = factory.GetHandler(IEventHandler::EventName); */
/* ASSERT_TRUE(ihandler); */
/* auto sihandler = factory.GetHandler(SampleHandler::EventName); */
/* ASSERT_TRUE(sihandler); */
/* ASSERT_EQ(SampleHandler::EventName, sihandler->GetEventName()); */
}
}

0 comments on commit 6c5f42f

Please sign in to comment.