From 43f213113661a3fedd9ee4cbf8e8059fbddb4dbc Mon Sep 17 00:00:00 2001 From: yaphet <4414314+darionyaphet@users.noreply.github.com> Date: Fri, 25 Feb 2022 14:36:11 +0800 Subject: [PATCH] Merge locks into one (#3904) * fix lock * increase heartbeat lock to avoid blocking --- src/clients/meta/MetaClient.h | 2 +- src/meta/ActiveHostsMan.cpp | 2 - src/meta/http/MetaHttpReplaceHostHandler.cpp | 4 +- src/meta/processors/BaseProcessor-inl.h | 6 -- src/meta/processors/Common.h | 37 ++++------ .../processors/admin/AgentHBProcessor.cpp | 1 + .../admin/CreateBackupProcessor.cpp | 10 +-- .../admin/CreateSnapshotProcessor.cpp | 2 +- .../admin/DropSnapshotProcessor.cpp | 10 +-- src/meta/processors/admin/HBProcessor.cpp | 2 +- .../admin/ListClusterInfoProcessor.cpp | 3 +- .../admin/ListSnapshotsProcessor.cpp | 3 +- .../processors/admin/RestoreProcessor.cpp | 4 +- src/meta/processors/admin/SnapShot.cpp | 2 +- .../processors/config/GetConfigProcessor.cpp | 2 +- .../config/ListConfigsProcessor.cpp | 4 +- .../processors/config/RegConfigProcessor.cpp | 2 +- .../processors/config/SetConfigProcessor.cpp | 4 +- .../processors/id/GetWorkerIdProcessor.cpp | 20 ++---- src/meta/processors/id/GetWorkerIdProcessor.h | 2 - .../index/CreateEdgeIndexProcessor.cpp | 2 +- .../index/CreateTagIndexProcessor.cpp | 2 +- .../index/DropEdgeIndexProcessor.cpp | 2 +- .../index/DropTagIndexProcessor.cpp | 2 +- .../processors/index/FTIndexProcessor.cpp | 7 +- .../index/GetEdgeIndexProcessor.cpp | 2 +- .../processors/index/GetTagIndexProcessor.cpp | 2 +- .../index/ListEdgeIndexesProcessor.cpp | 2 +- .../index/ListTagIndexesProcessor.cpp | 2 +- src/meta/processors/job/AdminJobProcessor.cpp | 1 + .../processors/job/BalanceJobExecutor.cpp | 1 + .../processors/job/DataBalanceJobExecutor.cpp | 1 + .../job/LeaderBalanceJobExecutor.cpp | 6 +- .../processors/job/ZoneBalanceJobExecutor.cpp | 1 + .../processors/listener/ListenerProcessor.cpp | 7 +- .../processors/parts/AlterSpaceProcessor.cpp | 15 ++-- .../parts/CreateSpaceAsProcessor.cpp | 5 +- .../processors/parts/CreateSpaceProcessor.cpp | 3 +- .../processors/parts/DropSpaceProcessor.cpp | 2 +- .../parts/GetPartsAllocProcessor.cpp | 2 +- .../processors/parts/GetSpaceProcessor.cpp | 2 +- .../processors/parts/ListHostsProcessor.cpp | 2 +- .../processors/parts/ListPartsProcessor.cpp | 10 +-- .../processors/parts/ListSpacesProcessor.cpp | 2 +- .../processors/schema/AlterEdgeProcessor.cpp | 2 +- .../processors/schema/AlterTagProcessor.cpp | 2 +- .../processors/schema/CreateEdgeProcessor.cpp | 2 +- .../processors/schema/CreateTagProcessor.cpp | 2 +- .../processors/schema/DropEdgeProcessor.cpp | 2 +- .../processors/schema/DropTagProcessor.cpp | 2 +- .../processors/schema/GetEdgeProcessor.cpp | 2 +- .../processors/schema/GetTagProcessor.cpp | 2 +- .../processors/schema/ListEdgesProcessor.cpp | 2 +- .../processors/schema/ListTagsProcessor.cpp | 2 +- .../processors/service/ServiceProcessor.cpp | 6 +- .../session/SessionManagerProcessor.cpp | 12 ++-- .../user/AuthenticationProcessor.cpp | 23 +++--- .../zone/AddHostsIntoZoneProcessor.cpp | 3 +- .../processors/zone/AddHostsProcessor.cpp | 3 +- .../processors/zone/DivideZoneProcessor.cpp | 3 +- .../processors/zone/DropHostsProcessor.cpp | 3 +- .../processors/zone/DropZoneProcessor.cpp | 2 +- src/meta/processors/zone/GetZoneProcessor.cpp | 2 +- .../processors/zone/ListZonesProcessor.cpp | 2 +- .../processors/zone/MergeZoneProcessor.cpp | 3 +- .../processors/zone/RenameZoneProcessor.cpp | 3 +- src/meta/test/CreateBackupProcessorTest.cpp | 22 +++--- src/meta/test/IndexProcessorTest.cpp | 10 +-- src/meta/test/ProcessorTest.cpp | 1 - src/meta/test/RestoreProcessorTest.cpp | 27 +++---- src/meta/test/TestUtils.h | 72 ++++++++++--------- src/storage/test/ChainAddEdgesTest.cpp | 1 - src/storage/test/ChainDeleteEdgesTest.cpp | 1 - src/storage/test/ChainResumeEdgeTest.cpp | 1 - src/storage/test/ChainUpdateEdgeTest.cpp | 1 - 75 files changed, 183 insertions(+), 243 deletions(-) diff --git a/src/clients/meta/MetaClient.h b/src/clients/meta/MetaClient.h index 8e4ebd41de0..01584a974f7 100644 --- a/src/clients/meta/MetaClient.h +++ b/src/clients/meta/MetaClient.h @@ -220,8 +220,8 @@ class MetaClient { FRIEND_TEST(MetaClientTest, RetryUntilLimitTest); FRIEND_TEST(MetaClientTest, RocksdbOptionsTest); FRIEND_TEST(MetaClientTest, VerifyClientTest); - friend class KillQueryMetaWrapper; FRIEND_TEST(ChainAddEdgesTest, AddEdgesLocalTest); + friend class KillQueryMetaWrapper; friend class storage::MetaClientTestUpdater; public: diff --git a/src/meta/ActiveHostsMan.cpp b/src/meta/ActiveHostsMan.cpp index 05c11df24dc..8e81f0c28d5 100644 --- a/src/meta/ActiveHostsMan.cpp +++ b/src/meta/ActiveHostsMan.cpp @@ -64,7 +64,6 @@ nebula::cpp2::ErrorCode ActiveHostsMan::updateHostInfo(kvstore::KVStore* kv, bool hasUpdate = !data.empty(); data.emplace_back(MetaKeyUtils::hostKey(hostAddr.host, hostAddr.port), HostInfo::encodeV2(info)); - folly::SharedMutex::WriteHolder wHolder(LockUtils::spaceLock()); folly::Baton baton; nebula::cpp2::ErrorCode ret; kv->asyncMultiPut( @@ -266,7 +265,6 @@ nebula::cpp2::ErrorCode LastUpdateTimeMan::update(kvstore::KVStore* kv, data.emplace_back(MetaKeyUtils::lastUpdateTimeKey(), MetaKeyUtils::lastUpdateTimeVal(timeInMilliSec)); - folly::SharedMutex::WriteHolder wHolder(LockUtils::lastUpdateTimeLock()); folly::Baton baton; nebula::cpp2::ErrorCode ret; kv->asyncMultiPut( diff --git a/src/meta/http/MetaHttpReplaceHostHandler.cpp b/src/meta/http/MetaHttpReplaceHostHandler.cpp index edc602a1094..6ddf01dcb76 100644 --- a/src/meta/http/MetaHttpReplaceHostHandler.cpp +++ b/src/meta/http/MetaHttpReplaceHostHandler.cpp @@ -106,7 +106,7 @@ void MetaHttpReplaceHostHandler::onError(ProxygenError error) noexcept { } bool MetaHttpReplaceHostHandler::replaceHostInPart(std::string ipv4From, std::string ipv4To) { - folly::SharedMutex::WriteHolder wHolder(LockUtils::spaceLock()); + folly::SharedMutex::WriteHolder holder(LockUtils::lock()); const auto& spacePrefix = MetaKeyUtils::spacePrefix(); std::unique_ptr iter; auto kvRet = kvstore_->prefix(kDefaultSpaceId, kDefaultPartId, spacePrefix, &iter); @@ -163,7 +163,7 @@ bool MetaHttpReplaceHostHandler::replaceHostInPart(std::string ipv4From, std::st } bool MetaHttpReplaceHostHandler::replaceHostInZone(std::string ipv4From, std::string ipv4To) { - folly::SharedMutex::WriteHolder wHolder(LockUtils::spaceLock()); + folly::SharedMutex::WriteHolder holder(LockUtils::lock()); const auto& zonePrefix = MetaKeyUtils::zonePrefix(); std::unique_ptr iter; auto kvRet = kvstore_->prefix(kDefaultSpaceId, kDefaultPartId, zonePrefix, &iter); diff --git a/src/meta/processors/BaseProcessor-inl.h b/src/meta/processors/BaseProcessor-inl.h index 69598871b95..f90446cb482 100644 --- a/src/meta/processors/BaseProcessor-inl.h +++ b/src/meta/processors/BaseProcessor-inl.h @@ -134,7 +134,6 @@ ErrorOr> BaseProcessor:: template ErrorOr BaseProcessor::autoIncrementId() { - folly::SharedMutex::WriteHolder holder(LockUtils::idLock()); const std::string kIdKey = MetaKeyUtils::idKey(); int32_t id; std::string val; @@ -186,9 +185,6 @@ ErrorOr BaseProcessor::getAvailableGloba template ErrorOr BaseProcessor::autoIncrementIdInSpace( GraphSpaceID spaceId) { - folly::SharedMutex::WriteHolder wHolder(LockUtils::localIdLock()); - folly::SharedMutex::ReadHolder rHolder(LockUtils::idLock()); - auto localIdkey = MetaKeyUtils::localIdKey(spaceId); int32_t id; std::string val; @@ -228,7 +224,6 @@ ErrorOr BaseProcessor::autoIncrementIdIn template nebula::cpp2::ErrorCode BaseProcessor::spaceExist(GraphSpaceID spaceId) { - folly::SharedMutex::ReadHolder rHolder(LockUtils::spaceLock()); auto spaceKey = MetaKeyUtils::spaceKey(spaceId); auto ret = doGet(std::move(spaceKey)); if (nebula::ok(ret)) { @@ -617,7 +612,6 @@ ErrorOr BaseProcessor::getZoneId( template nebula::cpp2::ErrorCode BaseProcessor::listenerExist(GraphSpaceID space, cpp2::ListenerType type) { - folly::SharedMutex::ReadHolder rHolder(LockUtils::listenerLock()); const auto& prefix = MetaKeyUtils::listenerPrefix(space, type); auto ret = doPrefix(prefix); if (!nebula::ok(ret)) { diff --git a/src/meta/processors/Common.h b/src/meta/processors/Common.h index 1c7ab8c1a1d..2aff0f5b932 100644 --- a/src/meta/processors/Common.h +++ b/src/meta/processors/Common.h @@ -14,32 +14,21 @@ namespace meta { class LockUtils { public: LockUtils() = delete; -#define GENERATE_LOCK(Entry) \ - static folly::SharedMutex& Entry##Lock() { \ - static folly::SharedMutex l; \ - return l; \ + + static folly::SharedMutex& lock() { + static folly::SharedMutex lock; + return lock; + } + + static folly::SharedMutex& snapshotLock() { + static folly::SharedMutex snapshotLock; + return snapshotLock; } - GENERATE_LOCK(lastUpdateTime); - GENERATE_LOCK(space); - GENERATE_LOCK(id); - GENERATE_LOCK(workerId); - GENERATE_LOCK(localId); - GENERATE_LOCK(tagAndEdge); - GENERATE_LOCK(tagIndex); - GENERATE_LOCK(edgeIndex); - GENERATE_LOCK(service); - GENERATE_LOCK(fulltextIndex); - GENERATE_LOCK(user); - GENERATE_LOCK(config); - GENERATE_LOCK(snapshot); - GENERATE_LOCK(group); - GENERATE_LOCK(zone); - GENERATE_LOCK(listener); - GENERATE_LOCK(session); - GENERATE_LOCK(machine); - -#undef GENERATE_LOCK + static folly::SharedMutex& sessionLock() { + static folly::SharedMutex sessionLock; + return sessionLock; + } }; } // namespace meta diff --git a/src/meta/processors/admin/AgentHBProcessor.cpp b/src/meta/processors/admin/AgentHBProcessor.cpp index 8f6b42181d4..ac9c818588d 100644 --- a/src/meta/processors/admin/AgentHBProcessor.cpp +++ b/src/meta/processors/admin/AgentHBProcessor.cpp @@ -28,6 +28,7 @@ void AgentHBProcessor::process(const cpp2::AgentHBReq& req) { HostAddr agentAddr((*req.host_ref()).host, (*req.host_ref()).port); LOG(INFO) << "Receive heartbeat from " << agentAddr << ", role = AGENT"; + folly::SharedMutex::WriteHolder holder(LockUtils::lock()); nebula::cpp2::ErrorCode ret = nebula::cpp2::ErrorCode::SUCCEEDED; do { // update agent host info diff --git a/src/meta/processors/admin/CreateBackupProcessor.cpp b/src/meta/processors/admin/CreateBackupProcessor.cpp index 85d74c7001d..b345a9f0e3a 100644 --- a/src/meta/processors/admin/CreateBackupProcessor.cpp +++ b/src/meta/processors/admin/CreateBackupProcessor.cpp @@ -15,7 +15,6 @@ namespace meta { ErrorOr> CreateBackupProcessor::spaceNameToId(const std::vector* backupSpaces) { - folly::SharedMutex::ReadHolder rHolder(LockUtils::spaceLock()); std::unordered_set spaces; bool allSpaces = backupSpaces == nullptr || backupSpaces->empty(); @@ -98,8 +97,7 @@ void CreateBackupProcessor::process(const cpp2::CreateBackupReq& req) { return; } - folly::SharedMutex::WriteHolder wHolder(LockUtils::snapshotLock()); - + folly::SharedMutex::WriteHolder holder(LockUtils::snapshotLock()); // get active storage host list auto activeHostsRet = ActiveHostsMan::getActiveHosts(kvstore_); if (!nebula::ok(activeHostsRet)) { @@ -239,11 +237,7 @@ void CreateBackupProcessor::process(const cpp2::CreateBackupReq& req) { backup.backup_name_ref() = std::move(backupName); backup.full_ref() = true; bool allSpaces = backupSpaces == nullptr || backupSpaces->empty(); - if (allSpaces) { - backup.all_spaces_ref() = true; - } else { - backup.all_spaces_ref() = false; - } + backup.all_spaces_ref() = allSpaces; backup.create_time_ref() = time::WallClock::fastNowInMilliSec(); handleErrorCode(nebula::cpp2::ErrorCode::SUCCEEDED); diff --git a/src/meta/processors/admin/CreateSnapshotProcessor.cpp b/src/meta/processors/admin/CreateSnapshotProcessor.cpp index d38ae92a688..90c7bbd89c6 100644 --- a/src/meta/processors/admin/CreateSnapshotProcessor.cpp +++ b/src/meta/processors/admin/CreateSnapshotProcessor.cpp @@ -31,7 +31,7 @@ void CreateSnapshotProcessor::process(const cpp2::CreateSnapshotReq&) { } auto snapshot = folly::sformat("SNAPSHOT_{}", MetaKeyUtils::genTimestampStr()); - folly::SharedMutex::WriteHolder wHolder(LockUtils::snapshotLock()); + folly::SharedMutex::WriteHolder holder(LockUtils::snapshotLock()); auto activeHostsRet = ActiveHostsMan::getActiveHosts(kvstore_); if (!nebula::ok(activeHostsRet)) { diff --git a/src/meta/processors/admin/DropSnapshotProcessor.cpp b/src/meta/processors/admin/DropSnapshotProcessor.cpp index e93e72a61f6..641c03633fd 100644 --- a/src/meta/processors/admin/DropSnapshotProcessor.cpp +++ b/src/meta/processors/admin/DropSnapshotProcessor.cpp @@ -13,7 +13,7 @@ namespace meta { void DropSnapshotProcessor::process(const cpp2::DropSnapshotReq& req) { auto& snapshot = req.get_name(); - folly::SharedMutex::WriteHolder wHolder(LockUtils::snapshotLock()); + folly::SharedMutex::WriteHolder holder(LockUtils::snapshotLock()); // Check snapshot is exists auto key = MetaKeyUtils::snapshotKey(snapshot); @@ -53,9 +53,7 @@ void DropSnapshotProcessor::process(const cpp2::DropSnapshotReq& req) { MetaKeyUtils::snapshotVal(cpp2::SnapshotStatus::INVALID, hosts)); auto putRet = doSyncPut(std::move(data)); if (putRet != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(INFO) << "Update snapshot status error. " - "snapshot : " - << snapshot; + LOG(INFO) << "Update snapshot status error. snapshot: " << snapshot; } handleErrorCode(putRet); onFinished(); @@ -72,9 +70,7 @@ void DropSnapshotProcessor::process(const cpp2::DropSnapshotReq& req) { MetaKeyUtils::snapshotVal(cpp2::SnapshotStatus::INVALID, hosts)); auto putRet = doSyncPut(std::move(data)); if (putRet != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(INFO) << "Update snapshot status error. " - "snapshot : " - << snapshot; + LOG(INFO) << "Update snapshot status error. snapshot: " << snapshot; } handleErrorCode(putRet); onFinished(); diff --git a/src/meta/processors/admin/HBProcessor.cpp b/src/meta/processors/admin/HBProcessor.cpp index db61e2f8bce..e699352c053 100644 --- a/src/meta/processors/admin/HBProcessor.cpp +++ b/src/meta/processors/admin/HBProcessor.cpp @@ -32,7 +32,7 @@ void HBProcessor::process(const cpp2::HBReq& req) { auto role = req.get_role(); LOG(INFO) << "Receive heartbeat from " << host << ", role = " << apache::thrift::util::enumNameSafe(role); - + folly::SharedMutex::WriteHolder holder(LockUtils::lock()); if (role == cpp2::HostRole::STORAGE) { if (!ActiveHostsMan::machineRegisted(kvstore_, host)) { LOG(INFO) << "Machine " << host << " is not registed"; diff --git a/src/meta/processors/admin/ListClusterInfoProcessor.cpp b/src/meta/processors/admin/ListClusterInfoProcessor.cpp index dbbbcfbe987..caceef2bf96 100644 --- a/src/meta/processors/admin/ListClusterInfoProcessor.cpp +++ b/src/meta/processors/admin/ListClusterInfoProcessor.cpp @@ -26,10 +26,11 @@ void ListClusterInfoProcessor::process(const cpp2::ListClusterInfoReq& req) { std::unordered_map> hostServices; // non-meta services, may include inactive services + folly::SharedMutex::ReadHolder holder(LockUtils::lock()); const auto& hostPrefix = MetaKeyUtils::hostPrefix(); auto iterRet = doPrefix(hostPrefix); if (!nebula::ok(iterRet)) { - LOG(INFO) << "get host prefix failed:" + LOG(INFO) << "get host prefix failed: " << apache::thrift::util::enumNameSafe(nebula::error(iterRet)); handleErrorCode(nebula::cpp2::ErrorCode::E_LIST_CLUSTER_FAILURE); onFinished(); diff --git a/src/meta/processors/admin/ListSnapshotsProcessor.cpp b/src/meta/processors/admin/ListSnapshotsProcessor.cpp index aa4b808360a..8d37194154e 100644 --- a/src/meta/processors/admin/ListSnapshotsProcessor.cpp +++ b/src/meta/processors/admin/ListSnapshotsProcessor.cpp @@ -11,6 +11,7 @@ namespace nebula { namespace meta { void ListSnapshotsProcessor::process(const cpp2::ListSnapshotsReq&) { + folly::SharedMutex::ReadHolder holder(LockUtils::lock()); const auto& prefix = MetaKeyUtils::snapshotPrefix(); auto iterRet = doPrefix(prefix); if (!nebula::ok(iterRet)) { @@ -20,8 +21,8 @@ void ListSnapshotsProcessor::process(const cpp2::ListSnapshotsReq&) { onFinished(); return; } - auto iter = nebula::value(iterRet).get(); + auto iter = nebula::value(iterRet).get(); std::vector snapshots; while (iter->valid()) { auto val = iter->val(); diff --git a/src/meta/processors/admin/RestoreProcessor.cpp b/src/meta/processors/admin/RestoreProcessor.cpp index 264d9315e0e..311d3996f42 100644 --- a/src/meta/processors/admin/RestoreProcessor.cpp +++ b/src/meta/processors/admin/RestoreProcessor.cpp @@ -13,7 +13,7 @@ namespace meta { nebula::cpp2::ErrorCode RestoreProcessor::replaceHostInPartition(const HostAddr& ipv4From, const HostAddr& ipv4To, bool direct) { - folly::SharedMutex::WriteHolder wHolder(LockUtils::spaceLock()); + folly::SharedMutex::WriteHolder holder(LockUtils::lock()); auto retCode = nebula::cpp2::ErrorCode::SUCCEEDED; const auto& spacePrefix = MetaKeyUtils::spacePrefix(); auto iterRet = doPrefix(spacePrefix, direct); @@ -83,7 +83,7 @@ nebula::cpp2::ErrorCode RestoreProcessor::replaceHostInPartition(const HostAddr& nebula::cpp2::ErrorCode RestoreProcessor::replaceHostInZone(const HostAddr& ipv4From, const HostAddr& ipv4To, bool direct) { - folly::SharedMutex::WriteHolder wHolder(LockUtils::spaceLock()); + folly::SharedMutex::WriteHolder holder(LockUtils::lock()); auto retCode = nebula::cpp2::ErrorCode::SUCCEEDED; const auto& zonePrefix = MetaKeyUtils::zonePrefix(); auto iterRet = doPrefix(zonePrefix, direct); diff --git a/src/meta/processors/admin/SnapShot.cpp b/src/meta/processors/admin/SnapShot.cpp index 29c5827f4b5..ccdcf0d65ff 100644 --- a/src/meta/processors/admin/SnapShot.cpp +++ b/src/meta/processors/admin/SnapShot.cpp @@ -106,6 +106,7 @@ nebula::cpp2::ErrorCode Snapshot::blockingWrites(storage::cpp2::EngineSignType s for (const auto& [host, spaces] : hostSpaces) { LOG(INFO) << "will block write host: " << host; auto result = client_->blockingWrites(spaces, sign, host).get(); + LOG(INFO) << "after block write host"; if (!result.ok()) { LOG(INFO) << "Send blocking sign error on host " << host << ", errorcode: " << result.status().toString(); @@ -120,7 +121,6 @@ nebula::cpp2::ErrorCode Snapshot::blockingWrites(storage::cpp2::EngineSignType s ErrorOr>> Snapshot::getHostSpaces() { - folly::SharedMutex::ReadHolder rHolder(LockUtils::spaceLock()); const auto& prefix = MetaKeyUtils::partPrefix(); std::unique_ptr iter; auto retCode = kv_->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); diff --git a/src/meta/processors/config/GetConfigProcessor.cpp b/src/meta/processors/config/GetConfigProcessor.cpp index 47a37da6b6c..4ed4237b86d 100644 --- a/src/meta/processors/config/GetConfigProcessor.cpp +++ b/src/meta/processors/config/GetConfigProcessor.cpp @@ -15,7 +15,7 @@ void GetConfigProcessor::process(const cpp2::GetConfigReq& req) { auto code = nebula::cpp2::ErrorCode::SUCCEEDED; do { - folly::SharedMutex::ReadHolder rHolder(LockUtils::configLock()); + folly::SharedMutex::ReadHolder holder(LockUtils::lock()); if (module != cpp2::ConfigModule::ALL) { code = getOneConfig(module, name, items); if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { diff --git a/src/meta/processors/config/ListConfigsProcessor.cpp b/src/meta/processors/config/ListConfigsProcessor.cpp index 788efce778b..364e8f66811 100644 --- a/src/meta/processors/config/ListConfigsProcessor.cpp +++ b/src/meta/processors/config/ListConfigsProcessor.cpp @@ -9,7 +9,7 @@ namespace nebula { namespace meta { void ListConfigsProcessor::process(const cpp2::ListConfigsReq& req) { - folly::SharedMutex::ReadHolder rHolder(LockUtils::configLock()); + folly::SharedMutex::ReadHolder holder(LockUtils::lock()); const auto& prefix = MetaKeyUtils::configKeyPrefix(req.get_module()); auto iterRet = doPrefix(prefix); @@ -20,8 +20,8 @@ void ListConfigsProcessor::process(const cpp2::ListConfigsReq& req) { onFinished(); return; } - auto iter = nebula::value(iterRet).get(); + auto iter = nebula::value(iterRet).get(); std::vector items; while (iter->valid()) { auto key = iter->key(); diff --git a/src/meta/processors/config/RegConfigProcessor.cpp b/src/meta/processors/config/RegConfigProcessor.cpp index cf70c7321e9..f55cb17f474 100644 --- a/src/meta/processors/config/RegConfigProcessor.cpp +++ b/src/meta/processors/config/RegConfigProcessor.cpp @@ -11,7 +11,7 @@ namespace meta { void RegConfigProcessor::process(const cpp2::RegConfigReq& req) { std::vector data; { - folly::SharedMutex::WriteHolder wHolder(LockUtils::configLock()); + folly::SharedMutex::WriteHolder holder(LockUtils::lock()); for (const auto& item : req.get_items()) { auto module = item.get_module(); auto name = item.get_name(); diff --git a/src/meta/processors/config/SetConfigProcessor.cpp b/src/meta/processors/config/SetConfigProcessor.cpp index 53a92636d97..d7ab907aa3f 100644 --- a/src/meta/processors/config/SetConfigProcessor.cpp +++ b/src/meta/processors/config/SetConfigProcessor.cpp @@ -16,8 +16,8 @@ void SetConfigProcessor::process(const cpp2::SetConfigReq& req) { auto name = req.get_item().get_name(); auto value = req.get_item().get_value(); - folly::SharedMutex::WriteHolder wHolder(LockUtils::configLock()); - auto code = nebula::cpp2::ErrorCode::SUCCEEDED; + folly::SharedMutex::WriteHolder holder(LockUtils::lock()); + nebula::cpp2::ErrorCode code = nebula::cpp2::ErrorCode::SUCCEEDED; do { if (module != cpp2::ConfigModule::ALL) { // When we set config of a specified module, check if it exists. diff --git a/src/meta/processors/id/GetWorkerIdProcessor.cpp b/src/meta/processors/id/GetWorkerIdProcessor.cpp index 798240ab9a0..a714a4942f4 100644 --- a/src/meta/processors/id/GetWorkerIdProcessor.cpp +++ b/src/meta/processors/id/GetWorkerIdProcessor.cpp @@ -20,7 +20,7 @@ void GetWorkerIdProcessor::process(const cpp2::GetWorkerIdReq& req) { return; } - folly::SharedMutex::WriteHolder wHolder(LockUtils::workerIdLock()); + folly::SharedMutex::WriteHolder holder(LockUtils::lock()); auto newResult = doGet(kIdKey); if (!nebula::ok(newResult)) { handleErrorCode(nebula::cpp2::ErrorCode::E_WORKER_ID_FAILED); @@ -29,25 +29,13 @@ void GetWorkerIdProcessor::process(const cpp2::GetWorkerIdReq& req) { } int64_t workerId = std::stoi(std::move(nebula::value(newResult))); + resp_.workerid_ref() = workerId; + handleErrorCode(nebula::cpp2::ErrorCode::SUCCEEDED); + // TODO: (jackwener) limit worker, add LOG ERROR doPut(std::vector{{ipAddr, std::to_string(workerId + 1)}}); - - handleErrorCode(nebula::cpp2::ErrorCode::SUCCEEDED); - resp_.workerid_ref() = workerId; onFinished(); } -void GetWorkerIdProcessor::doPut(std::vector data) { - folly::Baton baton; - kvstore_->asyncMultiPut(kDefaultSpaceId, - kDefaultPartId, - std::move(data), - [this, &baton](nebula::cpp2::ErrorCode code) { - this->handleErrorCode(code); - baton.post(); - }); - baton.wait(); -} - } // namespace meta } // namespace nebula diff --git a/src/meta/processors/id/GetWorkerIdProcessor.h b/src/meta/processors/id/GetWorkerIdProcessor.h index f2db79836f2..c63a4a09048 100644 --- a/src/meta/processors/id/GetWorkerIdProcessor.h +++ b/src/meta/processors/id/GetWorkerIdProcessor.h @@ -36,8 +36,6 @@ class GetWorkerIdProcessor : public BaseProcessor { UNUSED(once); } - void doPut(std::vector data); - inline static const string kIdKey = "snowflake_worker_id"; }; diff --git a/src/meta/processors/index/CreateEdgeIndexProcessor.cpp b/src/meta/processors/index/CreateEdgeIndexProcessor.cpp index b38626ccf7f..ab5e97f06e9 100644 --- a/src/meta/processors/index/CreateEdgeIndexProcessor.cpp +++ b/src/meta/processors/index/CreateEdgeIndexProcessor.cpp @@ -37,7 +37,7 @@ void CreateEdgeIndexProcessor::process(const cpp2::CreateEdgeIndexReq& req) { } folly::SharedMutex::ReadHolder rHolder(LockUtils::snapshotLock()); - folly::SharedMutex::WriteHolder wHolder(LockUtils::edgeIndexLock()); + folly::SharedMutex::WriteHolder holder(LockUtils::lock()); auto ret = getIndexID(space, indexName); if (nebula::ok(ret)) { if (req.get_if_not_exists()) { diff --git a/src/meta/processors/index/CreateTagIndexProcessor.cpp b/src/meta/processors/index/CreateTagIndexProcessor.cpp index 68edfdbbfc5..e0ab420d686 100644 --- a/src/meta/processors/index/CreateTagIndexProcessor.cpp +++ b/src/meta/processors/index/CreateTagIndexProcessor.cpp @@ -37,7 +37,7 @@ void CreateTagIndexProcessor::process(const cpp2::CreateTagIndexReq& req) { } folly::SharedMutex::ReadHolder rHolder(LockUtils::snapshotLock()); - folly::SharedMutex::WriteHolder wHolder(LockUtils::tagIndexLock()); + folly::SharedMutex::WriteHolder holder(LockUtils::lock()); auto ret = getIndexID(space, indexName); if (nebula::ok(ret)) { if (req.get_if_not_exists()) { diff --git a/src/meta/processors/index/DropEdgeIndexProcessor.cpp b/src/meta/processors/index/DropEdgeIndexProcessor.cpp index 580969f9c08..6154bb5ff24 100644 --- a/src/meta/processors/index/DropEdgeIndexProcessor.cpp +++ b/src/meta/processors/index/DropEdgeIndexProcessor.cpp @@ -12,7 +12,7 @@ void DropEdgeIndexProcessor::process(const cpp2::DropEdgeIndexReq& req) { auto spaceID = req.get_space_id(); const auto& indexName = req.get_index_name(); CHECK_SPACE_ID_AND_RETURN(spaceID); - folly::SharedMutex::WriteHolder wHolder(LockUtils::edgeIndexLock()); + folly::SharedMutex::WriteHolder holder(LockUtils::lock()); auto edgeIndexIDRet = getIndexID(spaceID, indexName); if (!nebula::ok(edgeIndexIDRet)) { diff --git a/src/meta/processors/index/DropTagIndexProcessor.cpp b/src/meta/processors/index/DropTagIndexProcessor.cpp index 0f9755b0411..46a8b98e6d5 100644 --- a/src/meta/processors/index/DropTagIndexProcessor.cpp +++ b/src/meta/processors/index/DropTagIndexProcessor.cpp @@ -12,7 +12,7 @@ void DropTagIndexProcessor::process(const cpp2::DropTagIndexReq& req) { auto spaceID = req.get_space_id(); const auto& indexName = req.get_index_name(); CHECK_SPACE_ID_AND_RETURN(spaceID); - folly::SharedMutex::WriteHolder wHolder(LockUtils::tagIndexLock()); + folly::SharedMutex::WriteHolder holder(LockUtils::lock()); auto tagIndexIDRet = getIndexID(spaceID, indexName); if (!nebula::ok(tagIndexIDRet)) { diff --git a/src/meta/processors/index/FTIndexProcessor.cpp b/src/meta/processors/index/FTIndexProcessor.cpp index 3bf899d6280..cfbddb96805 100644 --- a/src/meta/processors/index/FTIndexProcessor.cpp +++ b/src/meta/processors/index/FTIndexProcessor.cpp @@ -11,11 +11,10 @@ namespace nebula { namespace meta { void CreateFTIndexProcessor::process(const cpp2::CreateFTIndexReq& req) { - folly::SharedMutex::WriteHolder wHolder(LockUtils::fulltextIndexLock()); + folly::SharedMutex::WriteHolder holder(LockUtils::lock()); const auto& index = req.get_index(); const std::string& name = req.get_fulltext_index_name(); CHECK_SPACE_ID_AND_RETURN(index.get_space_id()); - folly::SharedMutex::ReadHolder rHolder(LockUtils::tagAndEdgeLock()); auto isEdge = index.get_depend_schema().getType() == nebula::cpp2::SchemaID::Type::edge_type; auto schemaPrefix = isEdge ? MetaKeyUtils::schemaEdgePrefix( index.get_space_id(), index.get_depend_schema().get_edge_type()) @@ -117,7 +116,7 @@ void CreateFTIndexProcessor::process(const cpp2::CreateFTIndexReq& req) { void DropFTIndexProcessor::process(const cpp2::DropFTIndexReq& req) { CHECK_SPACE_ID_AND_RETURN(req.get_space_id()); - folly::SharedMutex::WriteHolder wHolder(LockUtils::fulltextIndexLock()); + folly::SharedMutex::WriteHolder holder(LockUtils::lock()); auto indexKey = MetaKeyUtils::fulltextIndexKey(req.get_fulltext_index_name()); auto ret = doGet(indexKey); if (!nebula::ok(ret)) { @@ -138,7 +137,7 @@ void DropFTIndexProcessor::process(const cpp2::DropFTIndexReq& req) { } void ListFTIndexesProcessor::process(const cpp2::ListFTIndexesReq&) { - folly::SharedMutex::ReadHolder rHolder(LockUtils::fulltextIndexLock()); + folly::SharedMutex::ReadHolder holder(LockUtils::lock()); const auto& prefix = MetaKeyUtils::fulltextIndexPrefix(); auto iterRet = doPrefix(prefix); if (!nebula::ok(iterRet)) { diff --git a/src/meta/processors/index/GetEdgeIndexProcessor.cpp b/src/meta/processors/index/GetEdgeIndexProcessor.cpp index ede1aa7353e..c7e09d70b20 100644 --- a/src/meta/processors/index/GetEdgeIndexProcessor.cpp +++ b/src/meta/processors/index/GetEdgeIndexProcessor.cpp @@ -12,7 +12,7 @@ void GetEdgeIndexProcessor::process(const cpp2::GetEdgeIndexReq& req) { auto spaceID = req.get_space_id(); CHECK_SPACE_ID_AND_RETURN(spaceID); auto indexName = req.get_index_name(); - folly::SharedMutex::ReadHolder rHolder(LockUtils::edgeIndexLock()); + folly::SharedMutex::ReadHolder holder(LockUtils::lock()); auto edgeIndexIDRet = getIndexID(spaceID, indexName); if (!nebula::ok(edgeIndexIDRet)) { auto retCode = nebula::error(edgeIndexIDRet); diff --git a/src/meta/processors/index/GetTagIndexProcessor.cpp b/src/meta/processors/index/GetTagIndexProcessor.cpp index 63b98de6457..34c34df5a8d 100644 --- a/src/meta/processors/index/GetTagIndexProcessor.cpp +++ b/src/meta/processors/index/GetTagIndexProcessor.cpp @@ -12,7 +12,7 @@ void GetTagIndexProcessor::process(const cpp2::GetTagIndexReq& req) { auto spaceID = req.get_space_id(); const auto& indexName = req.get_index_name(); CHECK_SPACE_ID_AND_RETURN(spaceID); - folly::SharedMutex::ReadHolder rHolder(LockUtils::tagIndexLock()); + folly::SharedMutex::ReadHolder holder(LockUtils::lock()); auto tagIndexIDRet = getIndexID(spaceID, indexName); if (!nebula::ok(tagIndexIDRet)) { diff --git a/src/meta/processors/index/ListEdgeIndexesProcessor.cpp b/src/meta/processors/index/ListEdgeIndexesProcessor.cpp index a1865b6671d..4731783839f 100644 --- a/src/meta/processors/index/ListEdgeIndexesProcessor.cpp +++ b/src/meta/processors/index/ListEdgeIndexesProcessor.cpp @@ -12,7 +12,7 @@ void ListEdgeIndexesProcessor::process(const cpp2::ListEdgeIndexesReq& req) { auto space = req.get_space_id(); CHECK_SPACE_ID_AND_RETURN(space); - folly::SharedMutex::ReadHolder rHolder(LockUtils::edgeIndexLock()); + folly::SharedMutex::ReadHolder holder(LockUtils::lock()); const auto& prefix = MetaKeyUtils::indexPrefix(space); auto iterRet = doPrefix(prefix); if (!nebula::ok(iterRet)) { diff --git a/src/meta/processors/index/ListTagIndexesProcessor.cpp b/src/meta/processors/index/ListTagIndexesProcessor.cpp index 2cbebf9b709..5647ad5c890 100644 --- a/src/meta/processors/index/ListTagIndexesProcessor.cpp +++ b/src/meta/processors/index/ListTagIndexesProcessor.cpp @@ -12,7 +12,7 @@ void ListTagIndexesProcessor::process(const cpp2::ListTagIndexesReq& req) { auto space = req.get_space_id(); CHECK_SPACE_ID_AND_RETURN(space); - folly::SharedMutex::ReadHolder rHolder(LockUtils::tagIndexLock()); + folly::SharedMutex::ReadHolder holder(LockUtils::lock()); const auto& prefix = MetaKeyUtils::indexPrefix(space); auto iterRet = doPrefix(prefix); if (!nebula::ok(iterRet)) { diff --git a/src/meta/processors/job/AdminJobProcessor.cpp b/src/meta/processors/job/AdminJobProcessor.cpp index 5e68ea361c4..6b3bf5cc7b4 100644 --- a/src/meta/processors/job/AdminJobProcessor.cpp +++ b/src/meta/processors/job/AdminJobProcessor.cpp @@ -49,6 +49,7 @@ void AdminJobProcessor::process(const cpp2::AdminJobReq& req) { break; } + folly::SharedMutex::WriteHolder holder(LockUtils::lock()); auto jobId = autoIncrementId(); // check if Job not exists if (!nebula::ok(jobId)) { diff --git a/src/meta/processors/job/BalanceJobExecutor.cpp b/src/meta/processors/job/BalanceJobExecutor.cpp index 3c4aab94c61..030d7329136 100644 --- a/src/meta/processors/job/BalanceJobExecutor.cpp +++ b/src/meta/processors/job/BalanceJobExecutor.cpp @@ -54,6 +54,7 @@ nebula::cpp2::ErrorCode BalanceJobExecutor::recovery() { auto optJob = nebula::value(optJobRet); plan_.reset(new BalancePlan(optJob, kvstore_, adminClient_)); plan_->setFinishCallBack([this](meta::cpp2::JobStatus status) { + folly::SharedMutex::WriteHolder holder(LockUtils::lock()); if (LastUpdateTimeMan::update(kvstore_, time::WallClock::fastNowInMilliSec()) != nebula::cpp2::ErrorCode::SUCCEEDED) { LOG(INFO) << "Balance plan " << plan_->id() << " update meta failed"; diff --git a/src/meta/processors/job/DataBalanceJobExecutor.cpp b/src/meta/processors/job/DataBalanceJobExecutor.cpp index c38007b5709..8169d612c62 100644 --- a/src/meta/processors/job/DataBalanceJobExecutor.cpp +++ b/src/meta/processors/job/DataBalanceJobExecutor.cpp @@ -22,6 +22,7 @@ folly::Future DataBalanceJobExecutor::executeInternal() { } } plan_->setFinishCallBack([this](meta::cpp2::JobStatus status) { + folly::SharedMutex::WriteHolder holder(LockUtils::lock()); if (LastUpdateTimeMan::update(kvstore_, time::WallClock::fastNowInMilliSec()) != nebula::cpp2::ErrorCode::SUCCEEDED) { LOG(INFO) << "Balance plan " << plan_->id() << " update meta failed"; diff --git a/src/meta/processors/job/LeaderBalanceJobExecutor.cpp b/src/meta/processors/job/LeaderBalanceJobExecutor.cpp index 6f950ada72f..d039f41b179 100644 --- a/src/meta/processors/job/LeaderBalanceJobExecutor.cpp +++ b/src/meta/processors/job/LeaderBalanceJobExecutor.cpp @@ -22,7 +22,7 @@ namespace meta { nebula::cpp2::ErrorCode LeaderBalanceJobExecutor::getAllSpaces( std::vector>& spaces) { // Get all spaces - folly::SharedMutex::ReadHolder rHolder(LockUtils::spaceLock()); + folly::SharedMutex::ReadHolder holder(LockUtils::lock()); const auto& prefix = MetaKeyUtils::spacePrefix(); std::unique_ptr iter; auto retCode = kvstore_->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); @@ -45,7 +45,7 @@ ErrorOr LeaderBalanceJobExecutor::getHostParts(Gr bool dependentOnZone, HostParts& hostParts, int32_t& totalParts) { - folly::SharedMutex::ReadHolder rHolder(LockUtils::spaceLock()); + folly::SharedMutex::ReadHolder holder(LockUtils::lock()); const auto& prefix = MetaKeyUtils::partPrefix(spaceId); std::unique_ptr iter; auto retCode = kvstore_->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); @@ -307,7 +307,7 @@ ErrorOr LeaderBalanceJobExecutor::buildLeaderBala HostParts leaderHostParts; size_t leaderParts = 0; // store peers of all paritions in peerMap - folly::SharedMutex::ReadHolder rHolder(LockUtils::spaceLock()); + folly::SharedMutex::ReadHolder holder(LockUtils::lock()); const auto& prefix = MetaKeyUtils::partPrefix(spaceId); std::unique_ptr iter; auto retCode = kvstore_->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); diff --git a/src/meta/processors/job/ZoneBalanceJobExecutor.cpp b/src/meta/processors/job/ZoneBalanceJobExecutor.cpp index 33b557ccc55..52a43a0c1ea 100644 --- a/src/meta/processors/job/ZoneBalanceJobExecutor.cpp +++ b/src/meta/processors/job/ZoneBalanceJobExecutor.cpp @@ -45,6 +45,7 @@ folly::Future ZoneBalanceJobExecutor::executeInternal() { } } plan_->setFinishCallBack([this](meta::cpp2::JobStatus status) { + folly::SharedMutex::WriteHolder holder(LockUtils::lock()); if (LastUpdateTimeMan::update(kvstore_, time::WallClock::fastNowInMilliSec()) != nebula::cpp2::ErrorCode::SUCCEEDED) { LOG(INFO) << "Balance plan " << plan_->id() << " update meta failed"; diff --git a/src/meta/processors/listener/ListenerProcessor.cpp b/src/meta/processors/listener/ListenerProcessor.cpp index 36966be8cb9..7134c1a0433 100644 --- a/src/meta/processors/listener/ListenerProcessor.cpp +++ b/src/meta/processors/listener/ListenerProcessor.cpp @@ -16,6 +16,7 @@ namespace meta { void AddListenerProcessor::process(const cpp2::AddListenerReq& req) { auto space = req.get_space_id(); CHECK_SPACE_ID_AND_RETURN(space); + folly::SharedMutex::WriteHolder holder(LockUtils::lock()); auto type = req.get_type(); const auto& hosts = req.get_hosts(); auto ret = listenerExist(space, type); @@ -32,8 +33,6 @@ void AddListenerProcessor::process(const cpp2::AddListenerReq& req) { } // TODO : (sky) if type is elasticsearch, need check text search service. - folly::SharedMutex::WriteHolder wHolder(LockUtils::listenerLock()); - folly::SharedMutex::ReadHolder rHolder(LockUtils::spaceLock()); const auto& prefix = MetaKeyUtils::partPrefix(space); auto iterRet = doPrefix(prefix); if (!nebula::ok(iterRet)) { @@ -61,6 +60,7 @@ void AddListenerProcessor::process(const cpp2::AddListenerReq& req) { void RemoveListenerProcessor::process(const cpp2::RemoveListenerReq& req) { auto space = req.get_space_id(); CHECK_SPACE_ID_AND_RETURN(space); + folly::SharedMutex::WriteHolder holder(LockUtils::lock()); auto type = req.get_type(); auto ret = listenerExist(space, type); if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { @@ -74,7 +74,6 @@ void RemoveListenerProcessor::process(const cpp2::RemoveListenerReq& req) { return; } - folly::SharedMutex::WriteHolder wHolder(LockUtils::listenerLock()); std::vector keys; const auto& prefix = MetaKeyUtils::listenerPrefix(space, type); auto iterRet = doPrefix(prefix); @@ -97,7 +96,7 @@ void RemoveListenerProcessor::process(const cpp2::RemoveListenerReq& req) { void ListListenerProcessor::process(const cpp2::ListListenerReq& req) { auto space = req.get_space_id(); CHECK_SPACE_ID_AND_RETURN(space); - folly::SharedMutex::ReadHolder rHolder(LockUtils::listenerLock()); + folly::SharedMutex::ReadHolder holder(LockUtils::lock()); const auto& prefix = MetaKeyUtils::listenerPrefix(space); auto iterRet = doPrefix(prefix); if (!nebula::ok(iterRet)) { diff --git a/src/meta/processors/parts/AlterSpaceProcessor.cpp b/src/meta/processors/parts/AlterSpaceProcessor.cpp index d89f1590f68..56d061f169f 100644 --- a/src/meta/processors/parts/AlterSpaceProcessor.cpp +++ b/src/meta/processors/parts/AlterSpaceProcessor.cpp @@ -8,7 +8,7 @@ namespace nebula { namespace meta { void AlterSpaceProcessor::process(const cpp2::AlterSpaceReq& req) { - folly::SharedMutex::WriteHolder wHolder(LockUtils::spaceLock()); + folly::SharedMutex::WriteHolder holder(LockUtils::lock()); const std::vector& zones = req.get_paras(); const std::string& spaceName = req.get_space_name(); cpp2::AlterSpaceOp op = req.get_op(); @@ -39,8 +39,7 @@ nebula::cpp2::ErrorCode AlterSpaceProcessor::addZones(const std::string& spaceNa auto spaceId = nebula::value(spaceRet); std::string spaceKey = MetaKeyUtils::spaceKey(spaceId); std::string spaceVal; - nebula::cpp2::ErrorCode retCode = - kvstore_->get(kDefaultSpaceId, kDefaultPartId, spaceKey, &spaceVal); + auto retCode = kvstore_->get(kDefaultSpaceId, kDefaultPartId, spaceKey, &spaceVal); if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { return retCode; } @@ -54,8 +53,7 @@ nebula::cpp2::ErrorCode AlterSpaceProcessor::addZones(const std::string& spaceNa for (const std::string& z : distinctZones) { std::string zoneKey = MetaKeyUtils::zoneKey(z); std::string zoneVal; - nebula::cpp2::ErrorCode zoneRet = - kvstore_->get(kDefaultSpaceId, kDefaultPartId, zoneKey, &zoneVal); + auto zoneRet = kvstore_->get(kDefaultSpaceId, kDefaultPartId, zoneKey, &zoneVal); if (zoneRet != nebula::cpp2::ErrorCode::SUCCEEDED) { return zoneRet == nebula::cpp2::ErrorCode::E_KEY_NOT_FOUND ? nebula::cpp2::ErrorCode::E_ZONE_NOT_FOUND @@ -73,7 +71,7 @@ nebula::cpp2::ErrorCode AlterSpaceProcessor::addZones(const std::string& spaceNa std::vector data; data.emplace_back(MetaKeyUtils::spaceKey(spaceId), MetaKeyUtils::spaceVal(properties)); folly::Baton baton; - auto ret = nebula::cpp2::ErrorCode::SUCCEEDED; + nebula::cpp2::ErrorCode ret = nebula::cpp2::ErrorCode::SUCCEEDED; kvstore_->asyncMultiPut(kDefaultSpaceId, kDefaultPartId, std::move(data), @@ -85,10 +83,7 @@ nebula::cpp2::ErrorCode AlterSpaceProcessor::addZones(const std::string& spaceNa baton.post(); }); baton.wait(); - if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { - return ret; - } - return nebula::cpp2::ErrorCode::SUCCEEDED; + return ret; } } // namespace meta diff --git a/src/meta/processors/parts/CreateSpaceAsProcessor.cpp b/src/meta/processors/parts/CreateSpaceAsProcessor.cpp index 5a7f1dd03f4..4470595b3af 100644 --- a/src/meta/processors/parts/CreateSpaceAsProcessor.cpp +++ b/src/meta/processors/parts/CreateSpaceAsProcessor.cpp @@ -18,7 +18,7 @@ void CreateSpaceAsProcessor::process(const cpp2::CreateSpaceAsReq &req) { onFinished(); }; - folly::SharedMutex::WriteHolder wHolder(LockUtils::spaceLock()); + folly::SharedMutex::WriteHolder holder(LockUtils::lock()); auto oldSpaceName = req.get_old_space_name(); auto newSpaceName = req.get_new_space_name(); auto oldSpaceId = getSpaceId(oldSpaceName); @@ -132,7 +132,6 @@ ErrorOr> CreateSpaceAsProcesso ErrorOr> CreateSpaceAsProcessor::makeNewTags( GraphSpaceID oldSpaceId, GraphSpaceID newSpaceId) { - folly::SharedMutex::ReadHolder rHolder(LockUtils::tagAndEdgeLock()); auto prefix = MetaKeyUtils::schemaTagsPrefix(oldSpaceId); auto tagPrefix = doPrefix(prefix); if (!nebula::ok(tagPrefix)) { @@ -164,7 +163,6 @@ ErrorOr> CreateSpaceAsProcesso ErrorOr> CreateSpaceAsProcessor::makeNewEdges( GraphSpaceID oldSpaceId, GraphSpaceID newSpaceId) { - folly::SharedMutex::ReadHolder rHolder(LockUtils::tagAndEdgeLock()); auto prefix = MetaKeyUtils::schemaEdgesPrefix(oldSpaceId); auto edgePrefix = doPrefix(prefix); if (!nebula::ok(edgePrefix)) { @@ -196,7 +194,6 @@ ErrorOr> CreateSpaceAsProcesso ErrorOr> CreateSpaceAsProcessor::makeNewIndexes( GraphSpaceID oldSpaceId, GraphSpaceID newSpaceId) { - folly::SharedMutex::ReadHolder rHolder(LockUtils::tagAndEdgeLock()); auto prefix = MetaKeyUtils::indexPrefix(oldSpaceId); auto indexPrefix = doPrefix(prefix); if (!nebula::ok(indexPrefix)) { diff --git a/src/meta/processors/parts/CreateSpaceProcessor.cpp b/src/meta/processors/parts/CreateSpaceProcessor.cpp index 75c985b0a91..76bd2ddc3af 100644 --- a/src/meta/processors/parts/CreateSpaceProcessor.cpp +++ b/src/meta/processors/parts/CreateSpaceProcessor.cpp @@ -15,7 +15,7 @@ namespace nebula { namespace meta { void CreateSpaceProcessor::process(const cpp2::CreateSpaceReq& req) { - folly::SharedMutex::WriteHolder wHolder(LockUtils::spaceLock()); + folly::SharedMutex::WriteHolder holder(LockUtils::lock()); auto properties = req.get_properties(); auto spaceName = properties.get_space_name(); auto spaceRet = getSpaceId(spaceName); @@ -154,7 +154,6 @@ void CreateSpaceProcessor::process(const cpp2::CreateSpaceReq& req) { data.emplace_back(MetaKeyUtils::indexSpaceKey(spaceName), std::string(reinterpret_cast(&spaceId), sizeof(spaceId))); data.emplace_back(MetaKeyUtils::spaceKey(spaceId), MetaKeyUtils::spaceVal(properties)); - folly::SharedMutex::ReadHolder zHolder(LockUtils::zoneLock()); for (auto& zone : zones) { auto zoneKey = MetaKeyUtils::zoneKey(zone); auto ret = doGet(zoneKey); diff --git a/src/meta/processors/parts/DropSpaceProcessor.cpp b/src/meta/processors/parts/DropSpaceProcessor.cpp index b36f3b0c4b4..aeeaaff8543 100644 --- a/src/meta/processors/parts/DropSpaceProcessor.cpp +++ b/src/meta/processors/parts/DropSpaceProcessor.cpp @@ -10,7 +10,7 @@ namespace meta { void DropSpaceProcessor::process(const cpp2::DropSpaceReq& req) { folly::SharedMutex::ReadHolder rHolder(LockUtils::snapshotLock()); - folly::SharedMutex::WriteHolder wHolder(LockUtils::spaceLock()); + folly::SharedMutex::WriteHolder holder(LockUtils::lock()); const auto& spaceName = req.get_space_name(); auto spaceRet = getSpaceId(spaceName); diff --git a/src/meta/processors/parts/GetPartsAllocProcessor.cpp b/src/meta/processors/parts/GetPartsAllocProcessor.cpp index 2a5d81e2a20..4a9b7eea5f9 100644 --- a/src/meta/processors/parts/GetPartsAllocProcessor.cpp +++ b/src/meta/processors/parts/GetPartsAllocProcessor.cpp @@ -9,7 +9,7 @@ namespace nebula { namespace meta { void GetPartsAllocProcessor::process(const cpp2::GetPartsAllocReq& req) { - folly::SharedMutex::ReadHolder rHolder(LockUtils::spaceLock()); + folly::SharedMutex::ReadHolder holder(LockUtils::lock()); auto spaceId = req.get_space_id(); auto prefix = MetaKeyUtils::partPrefix(spaceId); auto iterRet = doPrefix(prefix); diff --git a/src/meta/processors/parts/GetSpaceProcessor.cpp b/src/meta/processors/parts/GetSpaceProcessor.cpp index 72ba8b8ee5c..f9671146984 100644 --- a/src/meta/processors/parts/GetSpaceProcessor.cpp +++ b/src/meta/processors/parts/GetSpaceProcessor.cpp @@ -9,7 +9,7 @@ namespace nebula { namespace meta { void GetSpaceProcessor::process(const cpp2::GetSpaceReq& req) { - folly::SharedMutex::ReadHolder rHolder(LockUtils::spaceLock()); + folly::SharedMutex::ReadHolder holder(LockUtils::lock()); const auto& spaceName = req.get_space_name(); auto spaceRet = getSpaceId(spaceName); diff --git a/src/meta/processors/parts/ListHostsProcessor.cpp b/src/meta/processors/parts/ListHostsProcessor.cpp index f472d75c9e7..fe1c76998ed 100644 --- a/src/meta/processors/parts/ListHostsProcessor.cpp +++ b/src/meta/processors/parts/ListHostsProcessor.cpp @@ -37,7 +37,7 @@ static cpp2::HostRole toHostRole(cpp2::ListHostType type) { void ListHostsProcessor::process(const cpp2::ListHostsReq& req) { nebula::cpp2::ErrorCode retCode; { - folly::SharedMutex::ReadHolder rHolder(LockUtils::spaceLock()); + folly::SharedMutex::ReadHolder holder(LockUtils::lock()); retCode = getSpaceIdNameMap(); if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { handleErrorCode(retCode); diff --git a/src/meta/processors/parts/ListPartsProcessor.cpp b/src/meta/processors/parts/ListPartsProcessor.cpp index 5ebbaa84377..d3dbc4d572a 100644 --- a/src/meta/processors/parts/ListPartsProcessor.cpp +++ b/src/meta/processors/parts/ListPartsProcessor.cpp @@ -18,10 +18,10 @@ void ListPartsProcessor::process(const cpp2::ListPartsReq& req) { partIds_ = req.get_part_ids(); std::unordered_map> partHostsMap; + folly::SharedMutex::ReadHolder holder(LockUtils::lock()); if (!partIds_.empty()) { // Only show the specified parts showAllParts_ = false; - folly::SharedMutex::ReadHolder rHolder(LockUtils::spaceLock()); for (const auto& partId : partIds_) { auto partKey = MetaKeyUtils::partKey(spaceId_, partId); auto ret = doGet(std::move(partKey)); @@ -37,7 +37,6 @@ void ListPartsProcessor::process(const cpp2::ListPartsReq& req) { } } else { // Show all parts - folly::SharedMutex::ReadHolder rHolder(LockUtils::spaceLock()); auto ret = getAllParts(); if (!nebula::ok(ret)) { handleErrorCode(nebula::error(ret)); @@ -54,8 +53,8 @@ void ListPartsProcessor::process(const cpp2::ListPartsReq& req) { onFinished(); return; } - auto activeHosts = std::move(nebula::value(activeHostsRet)); + auto activeHosts = std::move(nebula::value(activeHostsRet)); for (auto& partEntry : partHostsMap) { cpp2::PartItem partItem; partItem.part_id_ref() = partEntry.first; @@ -85,7 +84,6 @@ ErrorOr> partHostsMap; - folly::SharedMutex::ReadHolder rHolder(LockUtils::spaceLock()); const auto& prefix = MetaKeyUtils::partPrefix(spaceId_); auto ret = doPrefix(prefix); if (!nebula::ok(ret)) { @@ -121,10 +119,8 @@ nebula::cpp2::ErrorCode ListPartsProcessor::getLeaderDist(std::vector statuses; std::vector values; - std::tie(rc, statuses) = + auto [rc, statuses] = kvstore_->multiGet(kDefaultSpaceId, kDefaultPartId, std::move(leaderKeys), &values); if (rc != nebula::cpp2::ErrorCode::SUCCEEDED && rc != nebula::cpp2::ErrorCode::E_PARTIAL_RESULT) { return rc; diff --git a/src/meta/processors/parts/ListSpacesProcessor.cpp b/src/meta/processors/parts/ListSpacesProcessor.cpp index 5abbb367079..8c7d19c30e1 100644 --- a/src/meta/processors/parts/ListSpacesProcessor.cpp +++ b/src/meta/processors/parts/ListSpacesProcessor.cpp @@ -9,7 +9,7 @@ namespace nebula { namespace meta { void ListSpacesProcessor::process(const cpp2::ListSpacesReq&) { - folly::SharedMutex::ReadHolder rHolder(LockUtils::spaceLock()); + folly::SharedMutex::ReadHolder holder(LockUtils::lock()); const auto& prefix = MetaKeyUtils::spacePrefix(); auto ret = doPrefix(prefix); if (!nebula::ok(ret)) { diff --git a/src/meta/processors/schema/AlterEdgeProcessor.cpp b/src/meta/processors/schema/AlterEdgeProcessor.cpp index c2b6031cf53..6850e6fa914 100644 --- a/src/meta/processors/schema/AlterEdgeProcessor.cpp +++ b/src/meta/processors/schema/AlterEdgeProcessor.cpp @@ -16,7 +16,7 @@ void AlterEdgeProcessor::process(const cpp2::AlterEdgeReq& req) { const auto& edgeName = req.get_edge_name(); folly::SharedMutex::ReadHolder rHolder(LockUtils::snapshotLock()); - folly::SharedMutex::WriteHolder wHolder(LockUtils::tagAndEdgeLock()); + folly::SharedMutex::WriteHolder holder(LockUtils::lock()); auto ret = getEdgeType(spaceId, edgeName); if (!nebula::ok(ret)) { auto retCode = nebula::error(ret); diff --git a/src/meta/processors/schema/AlterTagProcessor.cpp b/src/meta/processors/schema/AlterTagProcessor.cpp index 2284710ec92..c741b37fd6d 100644 --- a/src/meta/processors/schema/AlterTagProcessor.cpp +++ b/src/meta/processors/schema/AlterTagProcessor.cpp @@ -16,7 +16,7 @@ void AlterTagProcessor::process(const cpp2::AlterTagReq& req) { const auto& tagName = req.get_tag_name(); folly::SharedMutex::ReadHolder rHolder(LockUtils::snapshotLock()); - folly::SharedMutex::WriteHolder wHolder(LockUtils::tagAndEdgeLock()); + folly::SharedMutex::WriteHolder holder(LockUtils::lock()); auto ret = getTagId(spaceId, tagName); if (!nebula::ok(ret)) { auto retCode = nebula::error(ret); diff --git a/src/meta/processors/schema/CreateEdgeProcessor.cpp b/src/meta/processors/schema/CreateEdgeProcessor.cpp index 3b9a65fe7a5..18e9ef95108 100644 --- a/src/meta/processors/schema/CreateEdgeProcessor.cpp +++ b/src/meta/processors/schema/CreateEdgeProcessor.cpp @@ -14,7 +14,7 @@ void CreateEdgeProcessor::process(const cpp2::CreateEdgeReq& req) { GraphSpaceID spaceId = req.get_space_id(); CHECK_SPACE_ID_AND_RETURN(spaceId); const auto& edgeName = req.get_edge_name(); - folly::SharedMutex::WriteHolder holder(LockUtils::tagAndEdgeLock()); + folly::SharedMutex::WriteHolder holder(LockUtils::lock()); // Check if the tag with same name exists auto conflictRet = getTagId(spaceId, edgeName); if (nebula::ok(conflictRet)) { diff --git a/src/meta/processors/schema/CreateTagProcessor.cpp b/src/meta/processors/schema/CreateTagProcessor.cpp index dd1cd80f37b..4f6fa460696 100644 --- a/src/meta/processors/schema/CreateTagProcessor.cpp +++ b/src/meta/processors/schema/CreateTagProcessor.cpp @@ -14,7 +14,7 @@ void CreateTagProcessor::process(const cpp2::CreateTagReq& req) { GraphSpaceID spaceId = req.get_space_id(); CHECK_SPACE_ID_AND_RETURN(spaceId); const auto& tagName = req.get_tag_name(); - folly::SharedMutex::WriteHolder holder(LockUtils::tagAndEdgeLock()); + folly::SharedMutex::WriteHolder holder(LockUtils::lock()); // Check if the edge with same name exists auto conflictRet = getEdgeType(spaceId, tagName); diff --git a/src/meta/processors/schema/DropEdgeProcessor.cpp b/src/meta/processors/schema/DropEdgeProcessor.cpp index ca42af470ab..b098bc14aeb 100644 --- a/src/meta/processors/schema/DropEdgeProcessor.cpp +++ b/src/meta/processors/schema/DropEdgeProcessor.cpp @@ -13,7 +13,7 @@ void DropEdgeProcessor::process(const cpp2::DropEdgeReq& req) { CHECK_SPACE_ID_AND_RETURN(spaceId); folly::SharedMutex::ReadHolder rHolder(LockUtils::snapshotLock()); - folly::SharedMutex::WriteHolder wHolder(LockUtils::tagAndEdgeLock()); + folly::SharedMutex::WriteHolder holder(LockUtils::lock()); const auto& edgeName = req.get_edge_name(); EdgeType edgeType; diff --git a/src/meta/processors/schema/DropTagProcessor.cpp b/src/meta/processors/schema/DropTagProcessor.cpp index 8fcf9cf74d3..bc37bc2b06b 100644 --- a/src/meta/processors/schema/DropTagProcessor.cpp +++ b/src/meta/processors/schema/DropTagProcessor.cpp @@ -13,7 +13,7 @@ void DropTagProcessor::process(const cpp2::DropTagReq& req) { CHECK_SPACE_ID_AND_RETURN(spaceId); folly::SharedMutex::ReadHolder rHolder(LockUtils::snapshotLock()); - folly::SharedMutex::WriteHolder wHolder(LockUtils::tagAndEdgeLock()); + folly::SharedMutex::WriteHolder holder(LockUtils::lock()); const auto& tagName = req.get_tag_name(); TagID tagId; diff --git a/src/meta/processors/schema/GetEdgeProcessor.cpp b/src/meta/processors/schema/GetEdgeProcessor.cpp index b8162cdcd58..0cf6c90be3f 100644 --- a/src/meta/processors/schema/GetEdgeProcessor.cpp +++ b/src/meta/processors/schema/GetEdgeProcessor.cpp @@ -14,7 +14,7 @@ void GetEdgeProcessor::process(const cpp2::GetEdgeReq& req) { const auto& edgeName = req.get_edge_name(); auto ver = req.get_version(); - folly::SharedMutex::ReadHolder rHolder(LockUtils::tagAndEdgeLock()); + folly::SharedMutex::ReadHolder holder(LockUtils::lock()); auto edgeTypeRet = getEdgeType(spaceId, edgeName); if (!nebula::ok(edgeTypeRet)) { LOG(ERROR) << "Get edge " << edgeName << " failed."; diff --git a/src/meta/processors/schema/GetTagProcessor.cpp b/src/meta/processors/schema/GetTagProcessor.cpp index e085d2b9d9f..4b121457f40 100644 --- a/src/meta/processors/schema/GetTagProcessor.cpp +++ b/src/meta/processors/schema/GetTagProcessor.cpp @@ -14,7 +14,7 @@ void GetTagProcessor::process(const cpp2::GetTagReq& req) { const auto& tagName = req.get_tag_name(); auto ver = req.get_version(); - folly::SharedMutex::ReadHolder rHolder(LockUtils::tagAndEdgeLock()); + folly::SharedMutex::ReadHolder holder(LockUtils::lock()); auto tagIdRet = getTagId(spaceId, tagName); if (!nebula::ok(tagIdRet)) { LOG(ERROR) << "Get tag " << tagName << " failed."; diff --git a/src/meta/processors/schema/ListEdgesProcessor.cpp b/src/meta/processors/schema/ListEdgesProcessor.cpp index ebe41692dae..91323be5ad5 100644 --- a/src/meta/processors/schema/ListEdgesProcessor.cpp +++ b/src/meta/processors/schema/ListEdgesProcessor.cpp @@ -12,7 +12,7 @@ void ListEdgesProcessor::process(const cpp2::ListEdgesReq &req) { GraphSpaceID spaceId = req.get_space_id(); CHECK_SPACE_ID_AND_RETURN(spaceId); - folly::SharedMutex::ReadHolder rHolder(LockUtils::tagAndEdgeLock()); + folly::SharedMutex::ReadHolder holder(LockUtils::lock()); auto prefix = MetaKeyUtils::schemaEdgesPrefix(spaceId); auto ret = doPrefix(prefix); if (!nebula::ok(ret)) { diff --git a/src/meta/processors/schema/ListTagsProcessor.cpp b/src/meta/processors/schema/ListTagsProcessor.cpp index ee4d3eff9d7..c4b71a53274 100644 --- a/src/meta/processors/schema/ListTagsProcessor.cpp +++ b/src/meta/processors/schema/ListTagsProcessor.cpp @@ -12,7 +12,7 @@ void ListTagsProcessor::process(const cpp2::ListTagsReq &req) { GraphSpaceID spaceId = req.get_space_id(); CHECK_SPACE_ID_AND_RETURN(spaceId); - folly::SharedMutex::ReadHolder rHolder(LockUtils::tagAndEdgeLock()); + folly::SharedMutex::ReadHolder holder(LockUtils::lock()); auto prefix = MetaKeyUtils::schemaTagsPrefix(spaceId); auto ret = doPrefix(prefix); if (!nebula::ok(ret)) { diff --git a/src/meta/processors/service/ServiceProcessor.cpp b/src/meta/processors/service/ServiceProcessor.cpp index 5e3c7efe44c..61d6cb7fb21 100644 --- a/src/meta/processors/service/ServiceProcessor.cpp +++ b/src/meta/processors/service/ServiceProcessor.cpp @@ -9,7 +9,7 @@ namespace nebula { namespace meta { void SignInServiceProcessor::process(const cpp2::SignInServiceReq& req) { - folly::SharedMutex::WriteHolder wHolder(LockUtils::serviceLock()); + folly::SharedMutex::WriteHolder holder(LockUtils::lock()); auto type = req.get_type(); auto serviceKey = MetaKeyUtils::serviceKey(type); @@ -35,7 +35,7 @@ void SignInServiceProcessor::process(const cpp2::SignInServiceReq& req) { } void SignOutServiceProcessor::process(const cpp2::SignOutServiceReq& req) { - folly::SharedMutex::WriteHolder wHolder(LockUtils::serviceLock()); + folly::SharedMutex::WriteHolder holder(LockUtils::lock()); auto type = req.get_type(); auto serviceKey = MetaKeyUtils::serviceKey(type); @@ -57,7 +57,7 @@ void SignOutServiceProcessor::process(const cpp2::SignOutServiceReq& req) { } void ListServiceClientsProcessor::process(const cpp2::ListServiceClientsReq& req) { - folly::SharedMutex::ReadHolder rHolder(LockUtils::serviceLock()); + folly::SharedMutex::ReadHolder holder(LockUtils::lock()); auto type = req.get_type(); std::unordered_map> serviceClients; diff --git a/src/meta/processors/session/SessionManagerProcessor.cpp b/src/meta/processors/session/SessionManagerProcessor.cpp index bb91bfda288..5c741a5e174 100644 --- a/src/meta/processors/session/SessionManagerProcessor.cpp +++ b/src/meta/processors/session/SessionManagerProcessor.cpp @@ -9,7 +9,7 @@ namespace nebula { namespace meta { void CreateSessionProcessor::process(const cpp2::CreateSessionReq& req) { - folly::SharedMutex::WriteHolder wHolder(LockUtils::sessionLock()); + folly::SharedMutex::WriteHolder holder(LockUtils::sessionLock()); const auto& user = req.get_user(); auto ret = userExist(user); if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { @@ -42,7 +42,7 @@ void CreateSessionProcessor::process(const cpp2::CreateSessionReq& req) { } void UpdateSessionsProcessor::process(const cpp2::UpdateSessionsReq& req) { - folly::SharedMutex::WriteHolder wHolder(LockUtils::sessionLock()); + folly::SharedMutex::WriteHolder holder(LockUtils::sessionLock()); std::vector data; std::unordered_map> @@ -108,7 +108,7 @@ void UpdateSessionsProcessor::process(const cpp2::UpdateSessionsReq& req) { } void ListSessionsProcessor::process(const cpp2::ListSessionsReq&) { - folly::SharedMutex::ReadHolder rHolder(LockUtils::sessionLock()); + folly::SharedMutex::ReadHolder holder(LockUtils::sessionLock()); auto& prefix = MetaKeyUtils::sessionPrefix(); auto ret = doPrefix(prefix); if (!nebula::ok(ret)) { @@ -134,7 +134,7 @@ void ListSessionsProcessor::process(const cpp2::ListSessionsReq&) { } void GetSessionProcessor::process(const cpp2::GetSessionReq& req) { - folly::SharedMutex::ReadHolder rHolder(LockUtils::sessionLock()); + folly::SharedMutex::ReadHolder holder(LockUtils::sessionLock()); auto sessionId = req.get_session_id(); auto sessionKey = MetaKeyUtils::sessionKey(sessionId); auto ret = doGet(sessionKey); @@ -156,7 +156,7 @@ void GetSessionProcessor::process(const cpp2::GetSessionReq& req) { } void RemoveSessionProcessor::process(const cpp2::RemoveSessionReq& req) { - folly::SharedMutex::WriteHolder wHolder(LockUtils::sessionLock()); + folly::SharedMutex::WriteHolder holder(LockUtils::sessionLock()); auto sessionId = req.get_session_id(); auto sessionKey = MetaKeyUtils::sessionKey(sessionId); auto ret = doGet(sessionKey); @@ -176,7 +176,7 @@ void RemoveSessionProcessor::process(const cpp2::RemoveSessionReq& req) { } void KillQueryProcessor::process(const cpp2::KillQueryReq& req) { - folly::SharedMutex::WriteHolder wHolder(LockUtils::sessionLock()); + folly::SharedMutex::WriteHolder holder(LockUtils::sessionLock()); auto& killQueries = req.get_kill_queries(); std::vector data; diff --git a/src/meta/processors/user/AuthenticationProcessor.cpp b/src/meta/processors/user/AuthenticationProcessor.cpp index 9185324b6f6..6c2bd99b975 100644 --- a/src/meta/processors/user/AuthenticationProcessor.cpp +++ b/src/meta/processors/user/AuthenticationProcessor.cpp @@ -11,7 +11,7 @@ namespace nebula { namespace meta { void CreateUserProcessor::process(const cpp2::CreateUserReq& req) { - folly::SharedMutex::WriteHolder wHolder(LockUtils::userLock()); + folly::SharedMutex::WriteHolder holder(LockUtils::lock()); const auto& account = req.get_account(); const auto& password = req.get_encoded_pwd(); @@ -38,7 +38,7 @@ void CreateUserProcessor::process(const cpp2::CreateUserReq& req) { } void AlterUserProcessor::process(const cpp2::AlterUserReq& req) { - folly::SharedMutex::WriteHolder wHolder(LockUtils::userLock()); + folly::SharedMutex::WriteHolder holder(LockUtils::lock()); const auto& account = req.get_account(); const auto& password = req.get_encoded_pwd(); auto userKey = MetaKeyUtils::userKey(account); @@ -64,7 +64,7 @@ void AlterUserProcessor::process(const cpp2::AlterUserReq& req) { } void DropUserProcessor::process(const cpp2::DropUserReq& req) { - folly::SharedMutex::WriteHolder wHolder(LockUtils::userLock()); + folly::SharedMutex::WriteHolder holder(LockUtils::lock()); const auto& account = req.get_account(); auto retCode = userExist(account); @@ -115,8 +115,7 @@ void DropUserProcessor::process(const cpp2::DropUserReq& req) { } void GrantProcessor::process(const cpp2::GrantRoleReq& req) { - folly::SharedMutex::WriteHolder userHolder(LockUtils::userLock()); - folly::SharedMutex::ReadHolder spaceHolder(LockUtils::spaceLock()); + folly::SharedMutex::WriteHolder holder(LockUtils::lock()); const auto& roleItem = req.get_role_item(); auto spaceId = roleItem.get_space_id(); const auto& account = roleItem.get_user_id(); @@ -148,8 +147,7 @@ void GrantProcessor::process(const cpp2::GrantRoleReq& req) { } void RevokeProcessor::process(const cpp2::RevokeRoleReq& req) { - folly::SharedMutex::WriteHolder userHolder(LockUtils::userLock()); - folly::SharedMutex::ReadHolder spaceHolder(LockUtils::spaceLock()); + folly::SharedMutex::ReadHolder holder(LockUtils::lock()); const auto& roleItem = req.get_role_item(); auto spaceId = roleItem.get_space_id(); CHECK_SPACE_ID_AND_RETURN(spaceId); @@ -193,7 +191,7 @@ void RevokeProcessor::process(const cpp2::RevokeRoleReq& req) { } void ChangePasswordProcessor::process(const cpp2::ChangePasswordReq& req) { - folly::SharedMutex::WriteHolder wHolder(LockUtils::userLock()); + folly::SharedMutex::WriteHolder holder(LockUtils::lock()); const auto& account = req.get_account(); auto userRet = userExist(account); if (userRet != nebula::cpp2::ErrorCode::SUCCEEDED) { @@ -234,9 +232,8 @@ void ChangePasswordProcessor::process(const cpp2::ChangePasswordReq& req) { doSyncPutAndUpdate(std::move(data)); } -void ListUsersProcessor::process(const cpp2::ListUsersReq& req) { - UNUSED(req); - folly::SharedMutex::ReadHolder rHolder(LockUtils::userLock()); +void ListUsersProcessor::process(const cpp2::ListUsersReq&) { + folly::SharedMutex::ReadHolder holder(LockUtils::lock()); std::string prefix = MetaKeyUtils::userPrefix(); auto ret = doPrefix(prefix); if (!nebula::ok(ret)) { @@ -266,7 +263,7 @@ void ListRolesProcessor::process(const cpp2::ListRolesReq& req) { auto spaceId = req.get_space_id(); CHECK_SPACE_ID_AND_RETURN(spaceId); - folly::SharedMutex::ReadHolder rHolder(LockUtils::userLock()); + folly::SharedMutex::ReadHolder holder(LockUtils::lock()); auto prefix = MetaKeyUtils::roleSpacePrefix(spaceId); auto ret = doPrefix(prefix); if (!nebula::ok(ret)) { @@ -297,7 +294,7 @@ void ListRolesProcessor::process(const cpp2::ListRolesReq& req) { } void GetUserRolesProcessor::process(const cpp2::GetUserRolesReq& req) { - folly::SharedMutex::WriteHolder wHolder(LockUtils::userLock()); + folly::SharedMutex::WriteHolder holder(LockUtils::lock()); const auto& act = req.get_account(); auto prefix = MetaKeyUtils::rolesPrefix(); diff --git a/src/meta/processors/zone/AddHostsIntoZoneProcessor.cpp b/src/meta/processors/zone/AddHostsIntoZoneProcessor.cpp index b5274dd332f..2131417fe76 100644 --- a/src/meta/processors/zone/AddHostsIntoZoneProcessor.cpp +++ b/src/meta/processors/zone/AddHostsIntoZoneProcessor.cpp @@ -9,8 +9,7 @@ namespace nebula { namespace meta { void AddHostsIntoZoneProcessor::process(const cpp2::AddHostsIntoZoneReq& req) { - folly::SharedMutex::WriteHolder zHolder(LockUtils::zoneLock()); - folly::SharedMutex::WriteHolder mHolder(LockUtils::machineLock()); + folly::SharedMutex::WriteHolder holder(LockUtils::lock()); auto hosts = req.get_hosts(); // Confirm that there are no duplicates in the parameters. diff --git a/src/meta/processors/zone/AddHostsProcessor.cpp b/src/meta/processors/zone/AddHostsProcessor.cpp index dc4096c7322..c0d1184f470 100644 --- a/src/meta/processors/zone/AddHostsProcessor.cpp +++ b/src/meta/processors/zone/AddHostsProcessor.cpp @@ -14,8 +14,7 @@ namespace nebula { namespace meta { void AddHostsProcessor::process(const cpp2::AddHostsReq& req) { - folly::SharedMutex::WriteHolder zHolder(LockUtils::zoneLock()); - folly::SharedMutex::WriteHolder mHolder(LockUtils::machineLock()); + folly::SharedMutex::WriteHolder holder(LockUtils::lock()); auto hosts = req.get_hosts(); // Confirm that there are no duplicates in the parameters. if (std::unique(hosts.begin(), hosts.end()) != hosts.end()) { diff --git a/src/meta/processors/zone/DivideZoneProcessor.cpp b/src/meta/processors/zone/DivideZoneProcessor.cpp index 1badc76ec58..0342d79f05f 100644 --- a/src/meta/processors/zone/DivideZoneProcessor.cpp +++ b/src/meta/processors/zone/DivideZoneProcessor.cpp @@ -9,8 +9,7 @@ namespace nebula { namespace meta { void DivideZoneProcessor::process(const cpp2::DivideZoneReq& req) { - folly::SharedMutex::WriteHolder zHolder(LockUtils::zoneLock()); - folly::SharedMutex::WriteHolder sHolder(LockUtils::spaceLock()); + folly::SharedMutex::WriteHolder holder(LockUtils::lock()); auto zoneName = req.get_zone_name(); auto zoneKey = MetaKeyUtils::zoneKey(zoneName); auto zoneValueRet = doGet(zoneKey); diff --git a/src/meta/processors/zone/DropHostsProcessor.cpp b/src/meta/processors/zone/DropHostsProcessor.cpp index 4f11f6f0922..7b1298cd926 100644 --- a/src/meta/processors/zone/DropHostsProcessor.cpp +++ b/src/meta/processors/zone/DropHostsProcessor.cpp @@ -11,8 +11,7 @@ namespace nebula { namespace meta { void DropHostsProcessor::process(const cpp2::DropHostsReq& req) { - folly::SharedMutex::WriteHolder zHolder(LockUtils::zoneLock()); - folly::SharedMutex::WriteHolder mHolder(LockUtils::machineLock()); + folly::SharedMutex::WriteHolder lockHolder(LockUtils::lock()); auto hosts = req.get_hosts(); if (std::unique(hosts.begin(), hosts.end()) != hosts.end()) { LOG(INFO) << "Hosts have duplicated element"; diff --git a/src/meta/processors/zone/DropZoneProcessor.cpp b/src/meta/processors/zone/DropZoneProcessor.cpp index aee55e19e19..3bf2836f73d 100644 --- a/src/meta/processors/zone/DropZoneProcessor.cpp +++ b/src/meta/processors/zone/DropZoneProcessor.cpp @@ -9,7 +9,7 @@ namespace nebula { namespace meta { void DropZoneProcessor::process(const cpp2::DropZoneReq& req) { - folly::SharedMutex::WriteHolder wHolder(LockUtils::zoneLock()); + folly::SharedMutex::WriteHolder holder(LockUtils::lock()); auto zoneName = req.get_zone_name(); auto zoneKey = MetaKeyUtils::zoneKey(zoneName); auto zoneValueRet = doGet(std::move(zoneKey)); diff --git a/src/meta/processors/zone/GetZoneProcessor.cpp b/src/meta/processors/zone/GetZoneProcessor.cpp index 7de7965de53..ec62418f274 100644 --- a/src/meta/processors/zone/GetZoneProcessor.cpp +++ b/src/meta/processors/zone/GetZoneProcessor.cpp @@ -9,7 +9,7 @@ namespace nebula { namespace meta { void GetZoneProcessor::process(const cpp2::GetZoneReq& req) { - folly::SharedMutex::ReadHolder rHolder(LockUtils::zoneLock()); + folly::SharedMutex::ReadHolder holder(LockUtils::lock()); auto zoneName = req.get_zone_name(); auto zoneKey = MetaKeyUtils::zoneKey(zoneName); auto zoneValueRet = doGet(std::move(zoneKey)); diff --git a/src/meta/processors/zone/ListZonesProcessor.cpp b/src/meta/processors/zone/ListZonesProcessor.cpp index 3bc2efab614..f7fe9f67a06 100644 --- a/src/meta/processors/zone/ListZonesProcessor.cpp +++ b/src/meta/processors/zone/ListZonesProcessor.cpp @@ -9,7 +9,7 @@ namespace nebula { namespace meta { void ListZonesProcessor::process(const cpp2::ListZonesReq&) { - folly::SharedMutex::ReadHolder rHolder(LockUtils::zoneLock()); + folly::SharedMutex::ReadHolder holder(LockUtils::lock()); const auto& prefix = MetaKeyUtils::zonePrefix(); auto iterRet = doPrefix(prefix); if (!nebula::ok(iterRet)) { diff --git a/src/meta/processors/zone/MergeZoneProcessor.cpp b/src/meta/processors/zone/MergeZoneProcessor.cpp index 9d4117a4e4e..50a7a0e9472 100644 --- a/src/meta/processors/zone/MergeZoneProcessor.cpp +++ b/src/meta/processors/zone/MergeZoneProcessor.cpp @@ -11,8 +11,7 @@ namespace nebula { namespace meta { void MergeZoneProcessor::process(const cpp2::MergeZoneReq& req) { - folly::SharedMutex::WriteHolder zHolder(LockUtils::zoneLock()); - folly::SharedMutex::WriteHolder sHolder(LockUtils::spaceLock()); + folly::SharedMutex::WriteHolder holder(LockUtils::lock()); auto zones = req.get_zones(); diff --git a/src/meta/processors/zone/RenameZoneProcessor.cpp b/src/meta/processors/zone/RenameZoneProcessor.cpp index 0d785e0b4bc..47925a63d7a 100644 --- a/src/meta/processors/zone/RenameZoneProcessor.cpp +++ b/src/meta/processors/zone/RenameZoneProcessor.cpp @@ -11,8 +11,7 @@ namespace nebula { namespace meta { void RenameZoneProcessor::process(const cpp2::RenameZoneReq& req) { - folly::SharedMutex::WriteHolder zHolder(LockUtils::zoneLock()); - folly::SharedMutex::WriteHolder sHolder(LockUtils::spaceLock()); + folly::SharedMutex::WriteHolder holder(LockUtils::lock()); auto originalZoneName = req.get_original_zone_name(); auto originalZoneKey = MetaKeyUtils::zoneKey(originalZoneName); diff --git a/src/meta/test/CreateBackupProcessorTest.cpp b/src/meta/test/CreateBackupProcessorTest.cpp index 413b99bf5bb..8e09a675e56 100644 --- a/src/meta/test/CreateBackupProcessorTest.cpp +++ b/src/meta/test/CreateBackupProcessorTest.cpp @@ -159,11 +159,13 @@ TEST(ProcessorTest, CreateBackupTest) { data.emplace_back(MetaKeyUtils::partKey(id, partId), MetaKeyUtils::partVal(hosts2)); data.emplace_back(MetaKeyUtils::partKey(id2, partId), MetaKeyUtils::partVal(hosts2)); } + folly::Baton baton; - kv->asyncMultiPut(0, 0, std::move(data), [&](nebula::cpp2::ErrorCode code) { - ret = (code == nebula::cpp2::ErrorCode::SUCCEEDED); - baton.post(); - }); + kv->asyncMultiPut( + kDefaultSpaceId, kDefaultPartId, std::move(data), [&](nebula::cpp2::ErrorCode code) { + ret = (code == nebula::cpp2::ErrorCode::SUCCEEDED); + baton.post(); + }); baton.wait(); auto client = std::make_unique(kv.get()); @@ -188,22 +190,14 @@ TEST(ProcessorTest, CreateBackupTest) { auto it = std::find_if(metaFiles.cbegin(), metaFiles.cend(), [](auto const& m) { auto name = m.substr(m.size() - sizeof("__indexes__.sst") + 1); - - if (name == "__indexes__.sst") { - return true; - } - return false; + return name == "__indexes__.sst"; }); ASSERT_NE(it, metaFiles.cend()); it = std::find_if(metaFiles.cbegin(), metaFiles.cend(), [](auto const& m) { auto name = m.substr(m.size() - sizeof("__users__.sst") + 1); - - if (name == "__users__.sst") { - return true; - } - return false; + return name == "__users__.sst"; }); ASSERT_EQ(it, metaFiles.cend()); diff --git a/src/meta/test/IndexProcessorTest.cpp b/src/meta/test/IndexProcessorTest.cpp index 8fb05308872..ad07bda6d2c 100644 --- a/src/meta/test/IndexProcessorTest.cpp +++ b/src/meta/test/IndexProcessorTest.cpp @@ -1568,10 +1568,11 @@ void mockSchemas(kvstore::KVStore* kv) { MetaKeyUtils::schemaVal("test_edge", srcsch)); folly::Baton baton; - kv->asyncMultiPut(0, 0, std::move(schemas), [&](nebula::cpp2::ErrorCode code) { - ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code); - baton.post(); - }); + kv->asyncMultiPut( + kDefaultSpaceId, kDefaultPartId, std::move(schemas), [&](nebula::cpp2::ErrorCode code) { + ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code); + baton.post(); + }); baton.wait(); } @@ -2163,7 +2164,6 @@ TEST(ProcessorTest, IndexIdInSpaceRangeTest) { ASSERT_EQ(4, count); // modify id to 5 for mock some schema - folly::SharedMutex::WriteHolder holder(LockUtils::idLock()); std::string kId = "__id__"; int32_t id = 5; std::vector data; diff --git a/src/meta/test/ProcessorTest.cpp b/src/meta/test/ProcessorTest.cpp index cd8b26d099d..40e4ffadc41 100644 --- a/src/meta/test/ProcessorTest.cpp +++ b/src/meta/test/ProcessorTest.cpp @@ -2497,7 +2497,6 @@ TEST(ProcessorTest, TagIdAndEdgeTypeInSpaceRangeTest) { ASSERT_EQ(20, count); // modify id to 21 for mock some schema - folly::SharedMutex::WriteHolder holder(LockUtils::idLock()); std::string kId = "__id__"; int32_t id = 21; std::vector data; diff --git a/src/meta/test/RestoreProcessorTest.cpp b/src/meta/test/RestoreProcessorTest.cpp index ee5dc821dc2..d92244107a0 100644 --- a/src/meta/test/RestoreProcessorTest.cpp +++ b/src/meta/test/RestoreProcessorTest.cpp @@ -90,10 +90,11 @@ TEST(RestoreProcessorTest, RestoreTest) { MetaKeyUtils::lastUpdateTimeVal(lastUpdateTime)); folly::Baton baton; - kv->asyncMultiPut(0, 0, std::move(data), [&](nebula::cpp2::ErrorCode code) { - ret = (code == nebula::cpp2::ErrorCode::SUCCEEDED); - baton.post(); - }); + kv->asyncMultiPut( + kDefaultSpaceId, kDefaultPartId, std::move(data), [&](nebula::cpp2::ErrorCode code) { + ret = (code == nebula::cpp2::ErrorCode::SUCCEEDED); + baton.post(); + }); baton.wait(); std::unordered_set spaces = {id}; @@ -133,10 +134,11 @@ TEST(RestoreProcessorTest, RestoreTest) { restoreData.emplace_back(MetaKeyUtils::userKey("root"), MetaKeyUtils::userVal("password")); folly::Baton restoreBaton; - kvRestore->asyncMultiPut(0, 0, std::move(restoreData), [&](nebula::cpp2::ErrorCode code) { - ret = (code == nebula::cpp2::ErrorCode::SUCCEEDED); - restoreBaton.post(); - }); + kvRestore->asyncMultiPut( + kDefaultSpaceId, kDefaultPartId, std::move(restoreData), [&](nebula::cpp2::ErrorCode code) { + ret = (code == nebula::cpp2::ErrorCode::SUCCEEDED); + restoreBaton.post(); + }); restoreBaton.wait(); auto* processor = RestoreProcessor::instance(kvRestore.get()); @@ -294,10 +296,11 @@ TEST(RestoreProcessorTest, RestoreFullTest) { data.emplace_back(MetaKeyUtils::zoneKey(zoneName), MetaKeyUtils::zoneVal(hosts)); folly::Baton baton; - kv->asyncMultiPut(0, 0, std::move(data), [&](nebula::cpp2::ErrorCode code) { - ret = (code == nebula::cpp2::ErrorCode::SUCCEEDED); - baton.post(); - }); + kv->asyncMultiPut( + kDefaultSpaceId, kDefaultPartId, std::move(data), [&](nebula::cpp2::ErrorCode code) { + ret = (code == nebula::cpp2::ErrorCode::SUCCEEDED); + baton.post(); + }); baton.wait(); std::unordered_set spaces = {id}; diff --git a/src/meta/test/TestUtils.h b/src/meta/test/TestUtils.h index 02db28347c3..40dc1d350b8 100644 --- a/src/meta/test/TestUtils.h +++ b/src/meta/test/TestUtils.h @@ -123,10 +123,11 @@ class TestUtils { } folly::Baton baton; - kv->asyncMultiPut(0, 0, std::move(data), [&](nebula::cpp2::ErrorCode code) { - ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code); - baton.post(); - }); + kv->asyncMultiPut( + kDefaultSpaceId, kDefaultPartId, std::move(data), [&](nebula::cpp2::ErrorCode code) { + ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code); + baton.post(); + }); baton.wait(); } @@ -163,10 +164,11 @@ class TestUtils { bool ret = false; folly::Baton baton; - kv->asyncMultiPut(0, 0, std::move(data), [&](nebula::cpp2::ErrorCode code) { - ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code); - baton.post(); - }); + kv->asyncMultiPut( + kDefaultSpaceId, kDefaultPartId, std::move(data), [&](nebula::cpp2::ErrorCode code) { + ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code); + baton.post(); + }); baton.wait(); return ret; } @@ -211,10 +213,11 @@ class TestUtils { data.emplace_back(MetaKeyUtils::partKey(id, partId), MetaKeyUtils::partVal(hosts)); } folly::Baton baton; - kv->asyncMultiPut(0, 0, std::move(data), [&](nebula::cpp2::ErrorCode code) { - ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code); - baton.post(); - }); + kv->asyncMultiPut( + kDefaultSpaceId, kDefaultPartId, std::move(data), [&](nebula::cpp2::ErrorCode code) { + ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code); + baton.post(); + }); baton.wait(); } @@ -294,10 +297,11 @@ class TestUtils { data.emplace_back(MetaKeyUtils::partKey(id, partId), MetaKeyUtils::partVal(hosts)); } folly::Baton baton; - kv->asyncMultiPut(0, 0, std::move(data), [&](nebula::cpp2::ErrorCode code) { - ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code); - baton.post(); - }); + kv->asyncMultiPut( + kDefaultSpaceId, kDefaultPartId, std::move(data), [&](nebula::cpp2::ErrorCode code) { + ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code); + baton.post(); + }); baton.wait(); } @@ -333,10 +337,11 @@ class TestUtils { MetaKeyUtils::schemaVal(tagName, srcsch)); } folly::Baton baton; - kv->asyncMultiPut(0, 0, std::move(tags), [&](nebula::cpp2::ErrorCode code) { - ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code); - baton.post(); - }); + kv->asyncMultiPut( + kDefaultSpaceId, kDefaultPartId, std::move(tags), [&](nebula::cpp2::ErrorCode code) { + ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code); + baton.post(); + }); baton.wait(); } @@ -361,10 +366,11 @@ class TestUtils { std::string(reinterpret_cast(&indexID), sizeof(IndexID))); data.emplace_back(MetaKeyUtils::indexKey(space, indexID), MetaKeyUtils::indexVal(item)); folly::Baton baton; - kv->asyncMultiPut(0, 0, std::move(data), [&](nebula::cpp2::ErrorCode code) { - ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code); - baton.post(); - }); + kv->asyncMultiPut( + kDefaultSpaceId, kDefaultPartId, std::move(data), [&](nebula::cpp2::ErrorCode code) { + ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code); + baton.post(); + }); baton.wait(); } @@ -389,10 +395,11 @@ class TestUtils { std::string(reinterpret_cast(&indexID), sizeof(IndexID))); data.emplace_back(MetaKeyUtils::indexKey(space, indexID), MetaKeyUtils::indexVal(item)); folly::Baton baton; - kv->asyncMultiPut(0, 0, std::move(data), [&](nebula::cpp2::ErrorCode code) { - ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code); - baton.post(); - }); + kv->asyncMultiPut( + kDefaultSpaceId, kDefaultPartId, std::move(data), [&](nebula::cpp2::ErrorCode code) { + ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code); + baton.post(); + }); baton.wait(); } @@ -429,10 +436,11 @@ class TestUtils { } folly::Baton baton; - kv->asyncMultiPut(0, 0, std::move(edges), [&](nebula::cpp2::ErrorCode code) { - ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code); - baton.post(); - }); + kv->asyncMultiPut( + kDefaultSpaceId, kDefaultPartId, std::move(edges), [&](nebula::cpp2::ErrorCode code) { + ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code); + baton.post(); + }); baton.wait(); } diff --git a/src/storage/test/ChainAddEdgesTest.cpp b/src/storage/test/ChainAddEdgesTest.cpp index a8d0d7cbb26..67b5be80c6e 100644 --- a/src/storage/test/ChainAddEdgesTest.cpp +++ b/src/storage/test/ChainAddEdgesTest.cpp @@ -3,7 +3,6 @@ * This source code is licensed under Apache 2.0 License. */ -#include #include #include #include diff --git a/src/storage/test/ChainDeleteEdgesTest.cpp b/src/storage/test/ChainDeleteEdgesTest.cpp index ee4316d380d..1aa7148cca6 100644 --- a/src/storage/test/ChainDeleteEdgesTest.cpp +++ b/src/storage/test/ChainDeleteEdgesTest.cpp @@ -3,7 +3,6 @@ * This source code is licensed under Apache 2.0 License. */ -#include #include #include #include diff --git a/src/storage/test/ChainResumeEdgeTest.cpp b/src/storage/test/ChainResumeEdgeTest.cpp index 716b8263672..7c1018c7874 100644 --- a/src/storage/test/ChainResumeEdgeTest.cpp +++ b/src/storage/test/ChainResumeEdgeTest.cpp @@ -3,7 +3,6 @@ * This source code is licensed under Apache 2.0 License. */ -#include #include #include #include diff --git a/src/storage/test/ChainUpdateEdgeTest.cpp b/src/storage/test/ChainUpdateEdgeTest.cpp index ec8e219f883..32c5d48bc49 100644 --- a/src/storage/test/ChainUpdateEdgeTest.cpp +++ b/src/storage/test/ChainUpdateEdgeTest.cpp @@ -3,7 +3,6 @@ * This source code is licensed under Apache 2.0 License. */ -#include #include #include #include