Skip to content

Commit

Permalink
improve snapshots and add some tests (#5144)
Browse files Browse the repository at this point in the history
  • Loading branch information
pengweisong committed Dec 29, 2022
1 parent e287363 commit ffaa254
Show file tree
Hide file tree
Showing 20 changed files with 711 additions and 38 deletions.
7 changes: 6 additions & 1 deletion src/clients/meta/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -904,6 +904,8 @@ Status MetaClient::handleResponse(const RESP& resp) {
return Status::Error("Charset and collate not match!");
case nebula::cpp2::ErrorCode::E_SNAPSHOT_FAILURE:
return Status::Error("Snapshot failure!");
case nebula::cpp2::ErrorCode::E_SNAPSHOT_RUNNING_JOBS:
return Status::Error("Snapshot failed encounter running jobs!");
case nebula::cpp2::ErrorCode::E_BLOCK_WRITE_FAILURE:
return Status::Error("Block write failure!");
case nebula::cpp2::ErrorCode::E_REBUILD_INDEX_FAILED:
Expand Down Expand Up @@ -2806,7 +2808,10 @@ folly::Future<StatusOr<bool>> MetaClient::createSnapshot() {
[](cpp2::ExecResp&& resp) -> bool {
return resp.get_code() == nebula::cpp2::ErrorCode::SUCCEEDED;
},
std::move(promise));
std::move(promise),
true,
0,
1);
return future;
}

Expand Down
1 change: 1 addition & 0 deletions src/common/datatypes/Value.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2859,6 +2859,7 @@ std::size_t Value::hash() const {
}
case Type::DATASET: {
LOG(DFATAL) << "Hash for DATASET has not been implemented";
break;
}
default: {
LOG(DFATAL) << "Unknown type";
Expand Down
1 change: 1 addition & 0 deletions src/common/graph/Response.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
\
/* Admin Failure */ \
X(E_SNAPSHOT_FAILURE, -2040) \
X(E_SNAPSHOT_RUNNING_JOBS, -2056) \
X(E_BLOCK_WRITE_FAILURE, -2041) \
X(E_REBUILD_INDEX_FAILURE, -2042) \
X(E_INDEX_WITH_TTL, -2043) \
Expand Down
2 changes: 1 addition & 1 deletion src/common/thrift/ThriftClientManager-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ std::shared_ptr<ClientType> ThriftClientManager<ClientType>::client(const HostAd
}
});
auto clientChannel = apache::thrift::RocketClientChannel::newChannel(std::move(socket));
if (timeout > 0) {
if (timeout >= 0) {
clientChannel->setTimeout(timeout);
}
if (compatibility) {
Expand Down
1 change: 1 addition & 0 deletions src/interface/common.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,7 @@ enum ErrorCode {

// Admin Failure
E_SNAPSHOT_FAILURE = -2040, // Failed to generate a snapshot
E_SNAPSHOT_RUNNING_JOBS = -2056, // Failed to generate a snapshot because encounter running jobs
E_BLOCK_WRITE_FAILURE = -2041, // Failed to write block data
E_REBUILD_INDEX_FAILURE = -2042,
E_INDEX_WITH_TTL = -2043,
Expand Down
2 changes: 1 addition & 1 deletion src/meta/ActiveHostsMan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ nebula::cpp2::ErrorCode ActiveHostsMan::updateHostInfo(kvstore::KVStore* kv,
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

bool ActiveHostsMan::machineRegisted(kvstore::KVStore* kv, const HostAddr& hostAddr) {
bool ActiveHostsMan::machineRegistered(kvstore::KVStore* kv, const HostAddr& hostAddr) {
auto machineKey = MetaKeyUtils::machineKey(hostAddr.host, hostAddr.port);
std::string machineValue;
auto code = kv->get(kDefaultSpaceId, kDefaultPartId, machineKey, &machineValue);
Expand Down
2 changes: 1 addition & 1 deletion src/meta/ActiveHostsMan.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ class ActiveHostsMan final {
* @param hostAddr Which host to register
* @return
*/
static bool machineRegisted(kvstore::KVStore* kv, const HostAddr& hostAddr);
static bool machineRegistered(kvstore::KVStore* kv, const HostAddr& hostAddr);

/**
* @brief Get all registered host
Expand Down
20 changes: 20 additions & 0 deletions src/meta/processors/BaseProcessor-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,26 @@ BaseProcessor<RESP>::getAllParts(GraphSpaceID spaceId) {
return partHostsMap;
}

template <typename RESP>
nebula::cpp2::ErrorCode BaseProcessor<RESP>::getAllMachines(
std::unordered_set<HostAddr>& machines) {
const auto& machinePrefix = MetaKeyUtils::machinePrefix();
std::unique_ptr<kvstore::KVIterator> machineIter;
auto retCode = kvstore_->prefix(kDefaultSpaceId, kDefaultPartId, machinePrefix, &machineIter);
if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(INFO) << "Failed to get machines, error " << apache::thrift::util::enumNameSafe(retCode);
return retCode;
}

while (machineIter->valid()) {
auto machine = MetaKeyUtils::parseMachineKey(machineIter->key());
machines.emplace(std::move(machine));
machineIter->next();
}

return nebula::cpp2::ErrorCode::SUCCEEDED;
}

} // namespace meta
} // namespace nebula
#endif
8 changes: 8 additions & 0 deletions src/meta/processors/BaseProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,14 @@ class BaseProcessor {
ErrorOr<nebula::cpp2::ErrorCode, std::unordered_map<PartitionID, std::vector<HostAddr>>>
getAllParts(GraphSpaceID spaceId);

/**
* @brief Get the all registered machines by command: ADD HOSTS
*
* @param machines
* @return nebula::cpp2::ErrorCode
*/
nebula::cpp2::ErrorCode getAllMachines(std::unordered_set<HostAddr>& machines);

protected:
kvstore::KVStore* kvstore_ = nullptr;
RESP resp_;
Expand Down
42 changes: 34 additions & 8 deletions src/meta/processors/admin/CreateSnapshotProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,20 @@ void CreateSnapshotProcessor::process(const cpp2::CreateSnapshotReq&) {
// rebuilding.
JobManager* jobMgr = JobManager::getInstance();
std::unordered_set<cpp2::JobType> jobTypes{cpp2::JobType::REBUILD_TAG_INDEX,
cpp2::JobType::REBUILD_EDGE_INDEX};
cpp2::JobType::REBUILD_EDGE_INDEX,
cpp2::JobType::COMPACT,
cpp2::JobType::INGEST,
cpp2::JobType::DATA_BALANCE,
cpp2::JobType::LEADER_BALANCE};
auto result = jobMgr->checkTypeJobRunning(jobTypes);
if (!nebula::ok(result)) {
handleErrorCode(nebula::error(result));
handleErrorCode(nebula::cpp2::ErrorCode::E_SNAPSHOT_FAILURE);
onFinished();
return;
}

if (nebula::value(result)) {
LOG(INFO) << "Index is rebuilding, not allowed to create snapshot.";
handleErrorCode(nebula::cpp2::ErrorCode::E_SNAPSHOT_FAILURE);
LOG(INFO) << "Mutating data job is running, not allowed to create snapshot.";
handleErrorCode(nebula::cpp2::ErrorCode::E_SNAPSHOT_RUNNING_JOBS);
onFinished();
return;
}
Expand All @@ -42,20 +45,37 @@ void CreateSnapshotProcessor::process(const cpp2::CreateSnapshotReq&) {
return;
}
auto hosts = std::move(nebula::value(activeHostsRet));

if (hosts.empty()) {
LOG(INFO) << "There is no active hosts";
handleErrorCode(nebula::cpp2::ErrorCode::E_NO_HOSTS);
onFinished();
return;
}

std::unordered_set<HostAddr> machines;
auto ret = getAllMachines(machines);
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
handleErrorCode(ret);
onFinished();
return;
}
// since all active hosts are checked registered, then we only need check count here
if (hosts.size() != machines.size()) {
LOG(INFO) << "There are some hosts registered(by `ADD HOSTS`) but not active, please "
"ensure all storaged active and you haven't registered useless machine;"
<< "registered machines count=" << machines.size()
<< "; active hosts count=" << hosts.size();
handleErrorCode(nebula::cpp2::ErrorCode::E_SNAPSHOT_FAILURE);
onFinished();
return;
}

std::vector<kvstore::KV> data;
cpp2::SnapshotStatus status = cpp2::SnapshotStatus::VALID;
nebula::cpp2::ErrorCode code = nebula::cpp2::ErrorCode::SUCCEEDED;
do {
// Step 1 : Blocking all writes action for storage engines.
auto ret = Snapshot::instance(kvstore_, client_)->blockingWrites(SignType::BLOCK_ON);
ret = Snapshot::instance(kvstore_, client_)->blockingWrites(SignType::BLOCK_ON);
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(INFO) << "Send blocking sign to storage engine error";
code = ret;
Expand All @@ -69,6 +89,7 @@ void CreateSnapshotProcessor::process(const cpp2::CreateSnapshotReq&) {
if (!nebula::ok(csRet)) {
LOG(INFO) << "Checkpoint create error on storage engine";
code = nebula::error(csRet);
cancelWriteBlocking();
status = cpp2::SnapshotStatus::INVALID;
break;
}
Expand Down Expand Up @@ -105,7 +126,12 @@ void CreateSnapshotProcessor::process(const cpp2::CreateSnapshotReq&) {
}
}

LOG(INFO) << "Create snapshot " << snapshot << " successfully";
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(INFO) << "Create snapshot " << snapshot
<< " failed: " << apache::thrift::util::enumNameSafe(code);
} else {
LOG(INFO) << "Create snapshot " << snapshot << " successfully";
}
handleErrorCode(code);
onFinished();
}
Expand Down
7 changes: 3 additions & 4 deletions src/meta/processors/admin/CreateSnapshotProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,16 @@ class CreateSnapshotProcessor : public BaseProcessor<cpp2::ExecResp> {
}
void process(const cpp2::CreateSnapshotReq& req);

private:
CreateSnapshotProcessor(kvstore::KVStore* kvstore, AdminClient* client)
: BaseProcessor<cpp2::ExecResp>(kvstore), client_(client) {}
/**
* @brief Cancel write blocking when create snapshot failed
*
* @return nebula::cpp2::ErrorCode
*/
nebula::cpp2::ErrorCode cancelWriteBlocking();

private:
CreateSnapshotProcessor(kvstore::KVStore* kvstore, AdminClient* client)
: BaseProcessor<cpp2::ExecResp>(kvstore), client_(client) {}

private:
AdminClient* client_;
};
Expand Down
32 changes: 17 additions & 15 deletions src/meta/processors/admin/DropSnapshotProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,18 @@ namespace meta {

void DropSnapshotProcessor::process(const cpp2::DropSnapshotReq& req) {
auto& snapshots = req.get_names();
if (snapshots.empty()) {
LOG(INFO) << "The snapshots to remove must be given";
handleErrorCode(nebula::cpp2::ErrorCode::E_SNAPSHOT_FAILURE);
onFinished();
return;
}

auto snapshot = snapshots[0];
if (snapshots.size() > 1) {
LOG(INFO) << "There are more than one snapshot are given"
<< "only the first one will be dropped, name=" << snapshot;
}
folly::SharedMutex::WriteHolder holder(LockUtils::snapshotLock());

// Check snapshot is exists
Expand All @@ -35,34 +46,25 @@ void DropSnapshotProcessor::process(const cpp2::DropSnapshotReq& req) {
}
auto val = nebula::value(ret);

auto hosts = MetaKeyUtils::parseSnapshotHosts(val);
auto peersRet = NetworkUtils::toHosts(hosts);
if (!peersRet.ok()) {
auto hostsStr = MetaKeyUtils::parseSnapshotHosts(val);
auto hostsRet = NetworkUtils::toHosts(hostsStr);
if (!hostsRet.ok()) {
LOG(INFO) << "Get checkpoint hosts error";
handleErrorCode(nebula::cpp2::ErrorCode::E_SNAPSHOT_FAILURE);
onFinished();
return;
}

auto peers = peersRet.value();
auto hosts = hostsRet.value();
auto batchHolder = std::make_unique<kvstore::BatchHolder>();
auto dsRet = Snapshot::instance(kvstore_, client_)->dropSnapshot(snapshot, std::move(peers));
auto dsRet = Snapshot::instance(kvstore_, client_)->dropSnapshot(snapshot, std::move(hosts));
if (dsRet != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(INFO) << "Drop snapshot error on storage engine";
// Need update the snapshot status to invalid, maybe some storage engine
// drop done.
batchHolder->put(MetaKeyUtils::snapshotKey(snapshot),
MetaKeyUtils::snapshotVal(cpp2::SnapshotStatus::INVALID, hosts));
LOG(INFO) << "Drop snapshot error on some storage engine";
}

auto dmRet = kvstore_->dropCheckpoint(kDefaultSpaceId, snapshot);
// TODO sky : need remove meta checkpoint from slave hosts.
if (dmRet != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(INFO) << "Drop snapshot error on meta engine";
// Need update the snapshot status to invalid, maybe storage engines drop
// done.
batchHolder->put(MetaKeyUtils::snapshotKey(snapshot),
MetaKeyUtils::snapshotVal(cpp2::SnapshotStatus::INVALID, hosts));
}

// Delete metadata of checkpoint
Expand Down
4 changes: 4 additions & 0 deletions src/meta/processors/admin/DropSnapshotProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ namespace meta {
/**
* @brief Drop snapshot for all spaces. It could drop snapshots
* created by CreateBackupProcessor or CreateCheckpointProcessor.
* It will drop the snapshots as possible as it can, but there maybe some snapshots left
* in some hosts since:
* 1. some storaged remove snapshot failed due to some reason.
* 2. the metad may change leader if it have multi-replications.
*
*/
class DropSnapshotProcessor : public BaseProcessor<cpp2::ExecResp> {
Expand Down
2 changes: 1 addition & 1 deletion src/meta/processors/admin/HBProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ void HBProcessor::process(const cpp2::HBReq& req) {
folly::SharedMutex::WriteHolder holder(LockUtils::lock());
if (role == cpp2::HostRole::STORAGE || role == cpp2::HostRole::STORAGE_LISTENER) {
if (role == cpp2::HostRole::STORAGE) {
if (!ActiveHostsMan::machineRegisted(kvstore_, host)) {
if (!ActiveHostsMan::machineRegistered(kvstore_, host)) {
LOG(INFO) << "Machine " << host << " is not registered";
handleErrorCode(nebula::cpp2::ErrorCode::E_MACHINE_NOT_FOUND);
setLeaderInfo();
Expand Down
22 changes: 17 additions & 5 deletions src/meta/processors/admin/SnapShot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ Snapshot::createSnapshot(const std::string& name) {
for (auto const& [host, spaces] : hostSpaces) {
auto snapshotRet = client_->createSnapshot(spaces, name, host).get();
if (!snapshotRet.ok()) {
LOG(INFO) << "create snapshot failed:" << snapshotRet.status().toString();
return nebula::cpp2::ErrorCode::E_RPC_FAILURE;
}
auto backupInfo = snapshotRet.value();
Expand Down Expand Up @@ -83,9 +84,10 @@ nebula::cpp2::ErrorCode Snapshot::dropSnapshot(const std::string& name,

auto result = client_->dropSnapshot(spaces, name, host).get();
if (!result.ok()) {
auto msg = "failed drop checkpoint : \"%s\". on host %s. error %s";
LOG(INFO) << folly::stringPrintf(
msg, name.c_str(), host.toString().c_str(), result.status().toString().c_str());
LOG(INFO) << folly::sformat("failed to drop checkpoint: {} on host: {}, error: {}",
name,
host.toString(),
result.status().toString());
}
}
return nebula::cpp2::ErrorCode::SUCCEEDED;
Expand All @@ -104,16 +106,26 @@ nebula::cpp2::ErrorCode Snapshot::blockingWrites(storage::cpp2::EngineSignType s
auto hostSpaces = nebula::value(hostSpacesRet);
auto ret = nebula::cpp2::ErrorCode::SUCCEEDED;
for (const auto& [host, spaces] : hostSpaces) {
LOG(INFO) << "will block write host: " << host;
if (sign == storage::cpp2::EngineSignType::BLOCK_ON) {
LOG(INFO) << "will block write host: " << host;
} else {
LOG(INFO) << "will unblock write host: " << host;
}

auto result = client_->blockingWrites(spaces, sign, host).get();
LOG(INFO) << "after block write host";
if (!result.ok()) {
LOG(INFO) << "Send blocking sign error on host " << host
<< ", errorcode: " << result.status().toString();
ret = nebula::cpp2::ErrorCode::E_BLOCK_WRITE_FAILURE;
if (sign == storage::cpp2::EngineSignType::BLOCK_ON) {
break;
}
} else {
if (sign == storage::cpp2::EngineSignType::BLOCK_ON) {
LOG(INFO) << "finished blocking write host: " << host;
} else {
LOG(INFO) << "finished unblocking write host: " << host;
}
}
}
return ret;
Expand Down
1 change: 1 addition & 0 deletions src/meta/processors/job/JobManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class JobManager : public boost::noncopyable, public nebula::cpp::NonMovable {
friend class JobManagerTest;
friend class GetStatsTest;
friend class CreateBackupProcessorTest;
friend class SnapshotProcessorsTest;
FRIEND_TEST(JobManagerTest, AddJob);
FRIEND_TEST(JobManagerTest, StatsJob);
FRIEND_TEST(JobManagerTest, JobPriority);
Expand Down
16 changes: 16 additions & 0 deletions src/meta/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,22 @@ nebula_add_test(
curl
)

nebula_add_test(
NAME
snapshot_processors_test
SOURCES
SnapshotProcessorsTest.cpp
OBJECTS
${meta_test_deps}
LIBRARIES
${ROCKSDB_LIBRARIES}
${THRIFT_LIBRARIES}
${PROXYGEN_LIBRARIES}
wangle
gtest
curl
)

nebula_add_test(
NAME
list_meta_cluster_info_test
Expand Down
Loading

0 comments on commit ffaa254

Please sign in to comment.