Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(db/snapshot): LSN logic change and design EventHandlerFactory #2815

Merged
merged 12 commits into from
Jul 12, 2020
4 changes: 0 additions & 4 deletions core/src/db/SSDBImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,6 @@ SSDBImpl::CreatePartition(const std::string& collection_name, const std::string&
if (options_.wal_enable_) {
// SS TODO
/* lsn = wal_mgr_->CreatePartition(collection_id, partition_tag); */
} else {
lsn = ss->GetCollection()->GetLsn();
}

snapshot::OperationContext context;
Expand Down Expand Up @@ -316,8 +314,6 @@ SSDBImpl::DropIndex(const std::string& collection_name, const std::string& field
// SS TODO: Check Index Type

snapshot::OperationContext context;
// SS TODO: no lsn for drop index
context.lsn = ss->GetCollectionCommit()->GetLsn();
STATUS_CHECK(ss->GetFieldElement(field_name, field_element_name, context.stale_field_element));
auto op = std::make_shared<snapshot::DropAllIndexOperation>(context, ss);
STATUS_CHECK(op->Push());
Expand Down
8 changes: 8 additions & 0 deletions core/src/db/snapshot/CompoundOperations.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,14 @@ Status
CreateCollectionOperation::DoExecute(Store& store) {
// TODO: Do some checks
CollectionPtr collection;
ScopedSnapshotT ss;
Snapshots::GetInstance().GetSnapshot(ss, c_context_.collection->GetName());
if (ss) {
std::stringstream emsg;
emsg << GetRepr() << ". Duplicated collection " << c_context_.collection->GetName();
return Status(SS_DUPLICATED_ERROR, emsg.str());
}

auto status = store.CreateCollection(Collection(c_context_.collection->GetName()), collection);
if (!status.ok()) {
std::cerr << status.ToString() << std::endl;
Expand Down
8 changes: 5 additions & 3 deletions core/src/db/snapshot/CompoundOperations.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,11 @@ class CompoundBaseOperation : public Operations {
Status
PreCheck() override {
// TODO
/* if (GetContextLsn() <= GetStartedSS()->GetMaxLsn()) { */
/* return Status(SS_INVALID_CONTEX_ERROR, "Invalid LSN found in operation"); */
/* } */
if (GetContextLsn() == 0) {
SetContextLsn(GetStartedSS()->GetMaxLsn());
} else if (GetContextLsn() <= GetStartedSS()->GetMaxLsn()) {
return Status(SS_INVALID_CONTEX_ERROR, "Invalid LSN found in operation");
}
return Status::OK();
}

Expand Down
6 changes: 3 additions & 3 deletions core/src/db/snapshot/Event.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ class ResourceGCEvent : public Event {

/* TODO: physically clean resource */
std::string res_path = GetResPath<ResourceT>(res_);
if (!boost::filesystem::exists(res_path)) {
return Status::OK();
}
/* if (!boost::filesystem::exists(res_path)) { */
/* return Status::OK(); */
/* } */
if (boost::filesystem::is_directory(res_path)) {
boost::filesystem::remove_all(res_path);
} else {
Expand Down
101 changes: 101 additions & 0 deletions core/src/db/snapshot/HandlerFactory.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

#pragma once

#include <map>
#include <memory>
#include <string>

#include "db/snapshot/Event.h"

namespace milvus {
namespace engine {
namespace snapshot {

class IEventHandler {
public:
using Ptr = std::shared_ptr<IEventHandler>;
static constexpr const char* EventName = "";
virtual const char*
GetEventName() const {
return EventName;
}
};

class IEventHandlerRegistrar {
public:
using Ptr = std::shared_ptr<IEventHandlerRegistrar>;

virtual IEventHandler::Ptr
GetHandler() = 0;
};

template <typename T>
class HandlerFactory {
public:
using ThisT = HandlerFactory<T>;

static ThisT&
GetInstance() {
static ThisT factory;
return factory;
}

IEventHandler::Ptr
GetHandler(const std::string& event_name) {
auto it = registry_.find(event_name);
if (it == registry_.end()) {
return nullptr;
}
return it->second->GetHandler();
}

void
Register(IEventHandlerRegistrar* registrar, const std::string& event_name) {
auto it = registry_.find(event_name);
if (it == registry_.end()) {
registry_[event_name] = registrar;
}
}

private:
std::map<std::string, IEventHandlerRegistrar*> registry_;
};

template <typename T, typename HandlerT>
class EventHandlerRegistrar : public IEventHandlerRegistrar {
public:
using FactoryT = HandlerFactory<T>;
using HandlerPtr = typename HandlerT::Ptr;
explicit EventHandlerRegistrar(const std::string& event_name) : event_name_(event_name) {
auto& factory = FactoryT::GetInstance();
factory.Register(this, event_name_);
}

HandlerPtr
GetHandler() {
return std::make_shared<HandlerT>();
}

protected:
std::string event_name_;
};

#define REGISTER_HANDLER(EXECUTOR, HANDLER) \
namespace { \
static milvus::engine::snapshot::EventHandlerRegistrar<EXECUTOR, HANDLER> EXECUTOR##HANDLER##_registrar( \
HANDLER ::EventName); \
}

} // namespace snapshot
} // namespace engine
} // namespace milvus
5 changes: 5 additions & 0 deletions core/src/db/snapshot/Operations.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ class Operations : public std::enable_shared_from_this<Operations> {
return context_.lsn;
}

void
SetContextLsn(LSN_TYPE lsn) {
context_.lsn = lsn;
}

virtual Status
CheckStale(const CheckStaleFunc& checker = nullptr) const;
virtual Status
Expand Down
4 changes: 0 additions & 4 deletions core/src/db/snapshot/Store.h
Original file line number Diff line number Diff line change
Expand Up @@ -197,10 +197,6 @@ class Store {
CreateCollection(Collection&& collection, CollectionPtr& return_v) {
std::unique_lock<std::shared_timed_mutex> lock(mutex_);
auto& resources = std::get<Collection::MapT>(resources_);
if (!collection.HasAssigned() && (name_ids_.find(collection.GetName()) != name_ids_.end()) &&
(resources[name_ids_[collection.GetName()]]->IsActive()) && !collection.IsDeactive()) {
return Status(SS_DUPLICATED_ERROR, "Duplicated");
}
auto c = std::make_shared<Collection>(collection);
auto& id = std::get<Index<Collection::MapT, MockResourcesT>::value>(ids_);
c->SetID(++id);
Expand Down
98 changes: 71 additions & 27 deletions core/unittest/ssdb/test_snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <algorithm>

#include "ssdb/utils.h"
#include "db/snapshot/HandlerFactory.h"

TEST_F(SnapshotTest, ResourcesTest) {
int nprobe = 16;
Expand Down Expand Up @@ -416,33 +417,32 @@ TEST_F(SnapshotTest, PartitionTest) {
}
}

// TODO: Open this test later
/* TEST_F(SnapshotTest, PartitionTest2) { */
/* std::string collection_name("c1"); */
/* LSN_TYPE lsn = 1; */
/* milvus::Status status; */

/* auto ss = CreateCollection(collection_name, ++lsn); */
/* ASSERT_TRUE(ss); */
/* ASSERT_EQ(lsn, ss->GetMaxLsn()); */

/* OperationContext context; */
/* context.lsn = lsn; */
/* auto cp_op = std::make_shared<CreatePartitionOperation>(context, ss); */
/* std::string partition_name("p1"); */
/* PartitionContext p_ctx; */
/* p_ctx.name = partition_name; */
/* PartitionPtr partition; */
/* status = cp_op->CommitNewPartition(p_ctx, partition); */
/* ASSERT_TRUE(status.ok()); */
/* ASSERT_TRUE(partition); */
/* ASSERT_EQ(partition->GetName(), partition_name); */
/* ASSERT_FALSE(partition->IsActive()); */
/* ASSERT_TRUE(partition->HasAssigned()); */

/* status = cp_op->Push(); */
/* ASSERT_FALSE(status.ok()); */
/* } */
TEST_F(SnapshotTest, PartitionTest2) {
std::string collection_name("c1");
LSN_TYPE lsn = 1;
milvus::Status status;

auto ss = CreateCollection(collection_name, ++lsn);
ASSERT_TRUE(ss);
ASSERT_EQ(lsn, ss->GetMaxLsn());

OperationContext context;
context.lsn = lsn;
auto cp_op = std::make_shared<CreatePartitionOperation>(context, ss);
std::string partition_name("p1");
PartitionContext p_ctx;
p_ctx.name = partition_name;
PartitionPtr partition;
status = cp_op->CommitNewPartition(p_ctx, partition);
ASSERT_TRUE(status.ok());
ASSERT_TRUE(partition);
ASSERT_EQ(partition->GetName(), partition_name);
ASSERT_FALSE(partition->IsActive());
ASSERT_TRUE(partition->HasAssigned());

status = cp_op->Push();
ASSERT_FALSE(status.ok());
}

TEST_F(SnapshotTest, IndexTest) {
LSN_TYPE lsn = 0;
Expand Down Expand Up @@ -1550,3 +1550,47 @@ TEST_F(SnapshotTest, CompoundTest2) {
ASSERT_EQ(final_segments, expect_segments);
// TODO: Check Total Segment Files Cnt
}

struct GCSchedule {
static constexpr const char* Name = "GCSchedule";
};

struct FlushSchedule {
static constexpr const char* Name = "FlushSchedule";
};

using IEventHandler = milvus::engine::snapshot::IEventHandler;
/* struct SampleHandler : public IEventHandler { */
/* static constexpr const char* EventName = "SampleHandler"; */
/* const char* */
/* GetEventName() const override { */
/* return EventName; */
/* } */
/* }; */

REGISTER_HANDLER(GCSchedule, IEventHandler);
/* REGISTER_HANDLER(GCSchedule, SampleHandler); */
REGISTER_HANDLER(FlushSchedule, IEventHandler);
/* REGISTER_HANDLER(FlushSchedule, SampleHandler); */

using GCScheduleFactory = milvus::engine::snapshot::HandlerFactory<GCSchedule>;
using FlushScheduleFactory = milvus::engine::snapshot::HandlerFactory<GCSchedule>;

TEST_F(SnapshotTest, RegistryTest) {
{
auto& factory = GCScheduleFactory::GetInstance();
auto ihandler = factory.GetHandler(IEventHandler::EventName);
ASSERT_TRUE(ihandler);
/* auto sihandler = factory.GetHandler(SampleHandler::EventName); */
/* ASSERT_TRUE(sihandler); */
/* ASSERT_EQ(SampleHandler::EventName, sihandler->GetEventName()); */
}
{
/* auto& factory = FlushScheduleFactory::GetInstance(); */
/* auto ihandler = factory.GetHandler(IEventHandler::EventName); */
/* ASSERT_TRUE(ihandler); */
/* auto sihandler = factory.GetHandler(SampleHandler::EventName); */
/* ASSERT_TRUE(sihandler); */
/* ASSERT_EQ(SampleHandler::EventName, sihandler->GetEventName()); */
}
}