diff --git a/src/clients/meta/MetaClient.cpp b/src/clients/meta/MetaClient.cpp index ecc8ca83d81..a6c709deae6 100644 --- a/src/clients/meta/MetaClient.cpp +++ b/src/clients/meta/MetaClient.cpp @@ -77,14 +77,8 @@ bool MetaClient::isMetadReady() { } // ready_ will be set in loadData - bool ldRet = loadData(); - bool lcRet = true; - if (!options_.skipConfig_) { - lcRet = loadCfg(); - } - if (ldRet && lcRet) { - localLastUpdateTime_ = metadLastUpdateTime_; - } + loadData(); + loadCfg(); return ready_; } @@ -141,16 +135,8 @@ void MetaClient::heartBeatThreadFunc() { } // if MetaServer has some changes, refesh the localCache_ - if (localLastUpdateTime_ < metadLastUpdateTime_) { - bool ldRet = loadData(); - bool lcRet = true; - if (!options_.skipConfig_) { - lcRet = loadCfg(); - } - if (ldRet && lcRet) { - localLastUpdateTime_ = metadLastUpdateTime_; - } - } + loadData(); + loadCfg(); } bool MetaClient::loadUsersAndRoles() { @@ -179,6 +165,10 @@ bool MetaClient::loadUsersAndRoles() { } bool MetaClient::loadData() { + if (localDataLastUpdateTime_ == metadLastUpdateTime_) { + return true; + } + if (ioThreadPool_->numThreads() <= 0) { LOG(ERROR) << "The threads number in ioThreadPool should be greater than 0"; return false; @@ -305,6 +295,8 @@ bool MetaClient::loadData() { storageHosts_ = std::move(hosts); } + localDataLastUpdateTime_.store(metadLastUpdateTime_.load()); + diff(oldCache, localCache_); listenerDiff(oldCache, localCache_); loadRemoteListeners(); @@ -312,6 +304,78 @@ bool MetaClient::loadData() { return true; } +TagSchemas MetaClient::buildTagSchemas(std::vector tagItemVec, ObjectPool* pool) { + TagSchemas tagSchemas; + TagID lastTagId = -1; + for (auto& tagIt : tagItemVec) { + // meta will return the different version from new to old + auto schema = std::make_shared(tagIt.get_version()); + for (const auto& colIt : tagIt.get_schema().get_columns()) { + addSchemaField(schema.get(), colIt, pool); + } + // handle schema property + schema->setProp(tagIt.get_schema().get_schema_prop()); + if (tagIt.get_tag_id() != lastTagId) { + // init schema vector, since schema version is zero-based, need to add one + tagSchemas[tagIt.get_tag_id()].resize(schema->getVersion() + 1); + lastTagId = tagIt.get_tag_id(); + } + tagSchemas[tagIt.get_tag_id()][schema->getVersion()] = std::move(schema); + } + return tagSchemas; +} + +EdgeSchemas MetaClient::buildEdgeSchemas(std::vector edgeItemVec, + ObjectPool* pool) { + EdgeSchemas edgeSchemas; + std::unordered_set> edges; + EdgeType lastEdgeType = -1; + for (auto& edgeIt : edgeItemVec) { + // meta will return the different version from new to old + auto schema = std::make_shared(edgeIt.get_version()); + for (const auto& col : edgeIt.get_schema().get_columns()) { + MetaClient::addSchemaField(schema.get(), col, pool); + } + // handle shcem property + schema->setProp(edgeIt.get_schema().get_schema_prop()); + if (edgeIt.get_edge_type() != lastEdgeType) { + // init schema vector, since schema version is zero-based, need to add one + edgeSchemas[edgeIt.get_edge_type()].resize(schema->getVersion() + 1); + lastEdgeType = edgeIt.get_edge_type(); + } + edgeSchemas[edgeIt.get_edge_type()][schema->getVersion()] = std::move(schema); + } + return edgeSchemas; +} + +void MetaClient::addSchemaField(NebulaSchemaProvider* schema, + const cpp2::ColumnDef& col, + ObjectPool* pool) { + bool hasDef = col.default_value_ref().has_value(); + auto& colType = col.get_type(); + size_t len = colType.type_length_ref().has_value() ? *colType.get_type_length() : 0; + cpp2::GeoShape geoShape = + colType.geo_shape_ref().has_value() ? *colType.get_geo_shape() : cpp2::GeoShape::ANY; + bool nullable = col.nullable_ref().has_value() ? *col.get_nullable() : false; + Expression* defaultValueExpr = nullptr; + if (hasDef) { + auto encoded = *col.get_default_value(); + defaultValueExpr = Expression::decode(pool, folly::StringPiece(encoded.data(), encoded.size())); + + if (defaultValueExpr == nullptr) { + LOG(ERROR) << "Wrong expr default value for column name: " << col.get_name(); + hasDef = false; + } + } + + schema->addField(col.get_name(), + colType.get_type(), + len, + nullable, + hasDef ? defaultValueExpr : nullptr, + geoShape); +} + bool MetaClient::loadSchemas(GraphSpaceID spaceId, std::shared_ptr spaceInfoCache, SpaceTagNameIdMap& tagNameIdMap, @@ -336,52 +400,12 @@ bool MetaClient::loadSchemas(GraphSpaceID spaceId, auto tagItemVec = tagRet.value(); auto edgeItemVec = edgeRet.value(); allEdgeMap[spaceId] = {}; - TagSchemas tagSchemas; - EdgeSchemas edgeSchemas; - TagID lastTagId = -1; - - auto addSchemaField = [&spaceInfoCache](NebulaSchemaProvider* schema, - const cpp2::ColumnDef& col) { - bool hasDef = col.default_value_ref().has_value(); - auto& colType = col.get_type(); - size_t len = colType.type_length_ref().has_value() ? *colType.get_type_length() : 0; - cpp2::GeoShape geoShape = - colType.geo_shape_ref().has_value() ? *colType.get_geo_shape() : cpp2::GeoShape::ANY; - bool nullable = col.nullable_ref().has_value() ? *col.get_nullable() : false; - Expression* defaultValueExpr = nullptr; - if (hasDef) { - auto encoded = *col.get_default_value(); - defaultValueExpr = Expression::decode(&(spaceInfoCache->pool_), - folly::StringPiece(encoded.data(), encoded.size())); - - if (defaultValueExpr == nullptr) { - LOG(ERROR) << "Wrong expr default value for column name: " << col.get_name(); - hasDef = false; - } - } - - schema->addField(col.get_name(), - colType.get_type(), - len, - nullable, - hasDef ? defaultValueExpr : nullptr, - geoShape); - }; + spaceInfoCache->tagItemVec_ = tagItemVec; + spaceInfoCache->tagSchemas_ = buildTagSchemas(tagItemVec, &spaceInfoCache->pool_); + spaceInfoCache->edgeItemVec_ = edgeItemVec; + spaceInfoCache->edgeSchemas_ = buildEdgeSchemas(edgeItemVec, &spaceInfoCache->pool_); for (auto& tagIt : tagItemVec) { - // meta will return the different version from new to old - auto schema = std::make_shared(tagIt.get_version()); - for (const auto& colIt : tagIt.get_schema().get_columns()) { - addSchemaField(schema.get(), colIt); - } - // handle schema property - schema->setProp(tagIt.get_schema().get_schema_prop()); - if (tagIt.get_tag_id() != lastTagId) { - // init schema vector, since schema version is zero-based, need to add one - tagSchemas[tagIt.get_tag_id()].resize(schema->getVersion() + 1); - lastTagId = tagIt.get_tag_id(); - } - tagSchemas[tagIt.get_tag_id()][schema->getVersion()] = std::move(schema); tagNameIdMap.emplace(std::make_pair(spaceId, tagIt.get_tag_name()), tagIt.get_tag_id()); tagIdNameMap.emplace(std::make_pair(spaceId, tagIt.get_tag_id()), tagIt.get_tag_name()); // get the latest tag version @@ -398,21 +422,7 @@ bool MetaClient::loadSchemas(GraphSpaceID spaceId, } std::unordered_set> edges; - EdgeType lastEdgeType = -1; for (auto& edgeIt : edgeItemVec) { - // meta will return the different version from new to old - auto schema = std::make_shared(edgeIt.get_version()); - for (const auto& col : edgeIt.get_schema().get_columns()) { - addSchemaField(schema.get(), col); - } - // handle shcem property - schema->setProp(edgeIt.get_schema().get_schema_prop()); - if (edgeIt.get_edge_type() != lastEdgeType) { - // init schema vector, since schema version is zero-based, need to add one - edgeSchemas[edgeIt.get_edge_type()].resize(schema->getVersion() + 1); - lastEdgeType = edgeIt.get_edge_type(); - } - edgeSchemas[edgeIt.get_edge_type()][schema->getVersion()] = std::move(schema); edgeNameTypeMap.emplace(std::make_pair(spaceId, edgeIt.get_edge_name()), edgeIt.get_edge_type()); edgeTypeNameMap.emplace(std::make_pair(spaceId, edgeIt.get_edge_type()), @@ -437,11 +447,20 @@ bool MetaClient::loadSchemas(GraphSpaceID spaceId, << " Successfully!"; } - spaceInfoCache->tagSchemas_ = std::move(tagSchemas); - spaceInfoCache->edgeSchemas_ = std::move(edgeSchemas); return true; } +static Indexes buildIndexes(std::vector indexItemVec) { + Indexes indexes; + for (auto index : indexItemVec) { + auto indexName = index.get_index_name(); + auto indexID = index.get_index_id(); + auto indexPtr = std::make_shared(index); + indexes.emplace(indexID, indexPtr); + } + return indexes; +} + bool MetaClient::loadIndexes(GraphSpaceID spaceId, std::shared_ptr cache) { auto tagIndexesRet = listTagIndexes(spaceId).get(); if (!tagIndexesRet.ok()) { @@ -457,27 +476,25 @@ bool MetaClient::loadIndexes(GraphSpaceID spaceId, std::shared_ptrtagIndexItemVec_ = tagIndexItemVec; + cache->tagIndexes_ = buildIndexes(tagIndexItemVec); + for (const auto& tagIndex : tagIndexItemVec) { auto indexName = tagIndex.get_index_name(); auto indexID = tagIndex.get_index_id(); std::pair pair(spaceId, indexName); tagNameIndexMap_[pair] = indexID; - auto tagIndexPtr = std::make_shared(tagIndex); - tagIndexes.emplace(indexID, tagIndexPtr); } - cache->tagIndexes_ = std::move(tagIndexes); - Indexes edgeIndexes; - for (auto& edgeIndex : edgeIndexesRet.value()) { + auto edgeIndexItemVec = edgeIndexesRet.value(); + cache->edgeIndexItemVec_ = edgeIndexItemVec; + cache->edgeIndexes_ = buildIndexes(edgeIndexItemVec); + for (auto& edgeIndex : edgeIndexItemVec) { auto indexName = edgeIndex.get_index_name(); auto indexID = edgeIndex.get_index_id(); std::pair pair(spaceId, indexName); edgeNameIndexMap_[pair] = indexID; - auto edgeIndexPtr = std::make_shared(edgeIndex); - edgeIndexes.emplace(indexID, edgeIndexPtr); } - cache->edgeIndexes_ = std::move(edgeIndexes); return true; } @@ -522,10 +539,46 @@ bool MetaClient::loadFulltextIndexes() { return true; } -Status MetaClient::checkTagIndexed(GraphSpaceID space, IndexID indexID) { - folly::RWSpinLock::ReadHolder holder(localCacheLock_); - auto it = localCache_.find(space); - if (it != localCache_.end()) { +const MetaClient::ThreadLocalInfo& MetaClient::getThreadLocalInfo() { + ThreadLocalInfo& threadLocalInfo = folly::SingletonThreadLocal::get(); + + if (threadLocalInfo.localLastUpdateTime_ < localDataLastUpdateTime_) { + threadLocalInfo.localLastUpdateTime_ = localDataLastUpdateTime_; + + folly::RWSpinLock::ReadHolder holder(localCacheLock_); + for (auto& spaceInfo : localCache_) { + GraphSpaceID spaceId = spaceInfo.first; + std::shared_ptr info = spaceInfo.second; + std::shared_ptr infoDeepCopy = std::make_shared(*info); + infoDeepCopy->tagSchemas_ = buildTagSchemas(infoDeepCopy->tagItemVec_, &infoDeepCopy->pool_); + infoDeepCopy->edgeSchemas_ = + buildEdgeSchemas(infoDeepCopy->edgeItemVec_, &infoDeepCopy->pool_); + infoDeepCopy->tagIndexes_ = buildIndexes(infoDeepCopy->tagIndexItemVec_); + infoDeepCopy->edgeIndexes_ = buildIndexes(infoDeepCopy->edgeIndexItemVec_); + threadLocalInfo.localCache_[spaceId] = infoDeepCopy; + } + threadLocalInfo.spaceIndexByName_ = spaceIndexByName_; + threadLocalInfo.spaceTagIndexByName_ = spaceTagIndexByName_; + threadLocalInfo.spaceEdgeIndexByName_ = spaceEdgeIndexByName_; + threadLocalInfo.spaceEdgeIndexByType_ = spaceEdgeIndexByType_; + threadLocalInfo.spaceNewestTagVerMap_ = spaceNewestTagVerMap_; + threadLocalInfo.spaceNewestEdgeVerMap_ = spaceNewestEdgeVerMap_; + threadLocalInfo.spaceTagIndexById_ = spaceTagIndexById_; + threadLocalInfo.spaceAllEdgeMap_ = spaceAllEdgeMap_; + + threadLocalInfo.userRolesMap_ = userRolesMap_; + threadLocalInfo.storageHosts_ = storageHosts_; + threadLocalInfo.fulltextIndexMap_ = fulltextIndexMap_; + threadLocalInfo.userPasswordMap_ = userPasswordMap_; + } + + return threadLocalInfo; +} + +Status MetaClient::checkTagIndexed(GraphSpaceID spaceId, IndexID indexID) { + const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); + auto it = threadLocalInfo.localCache_.find(spaceId); + if (it != threadLocalInfo.localCache_.end()) { auto indexIt = it->second->tagIndexes_.find(indexID); if (indexIt != it->second->tagIndexes_.end()) { return Status::OK(); @@ -537,9 +590,9 @@ Status MetaClient::checkTagIndexed(GraphSpaceID space, IndexID indexID) { } Status MetaClient::checkEdgeIndexed(GraphSpaceID space, IndexID indexID) { - folly::RWSpinLock::ReadHolder holder(localCacheLock_); - auto it = localCache_.find(space); - if (it != localCache_.end()) { + const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); + auto it = threadLocalInfo.localCache_.find(space); + if (it != threadLocalInfo.localCache_.end()) { auto indexIt = it->second->edgeIndexes_.find(indexID); if (indexIt != it->second->edgeIndexes_.end()) { return Status::OK(); @@ -830,9 +883,9 @@ Status MetaClient::handleResponse(const RESP& resp) { PartsMap MetaClient::doGetPartsMap(const HostAddr& host, const LocalCache& localCache) { PartsMap partMap; - for (auto it = localCache.begin(); it != localCache.end(); it++) { - auto spaceId = it->first; - auto& cache = it->second; + for (const auto& it : localCache) { + auto spaceId = it.first; + auto& cache = it.second; auto partsIt = cache->partsOnHost_.find(host); if (partsIt != cache->partsOnHost_.end()) { for (auto& partId : partsIt->second) { @@ -857,28 +910,28 @@ void MetaClient::diff(const LocalCache& oldCache, const LocalCache& newCache) { auto newPartsMap = doGetPartsMap(options_.localHost_, newCache); auto oldPartsMap = doGetPartsMap(options_.localHost_, oldCache); VLOG(1) << "Let's check if any new parts added/updated for " << options_.localHost_; - for (auto it = newPartsMap.begin(); it != newPartsMap.end(); it++) { - auto spaceId = it->first; - const auto& newParts = it->second; + for (auto& it : newPartsMap) { + auto spaceId = it.first; + const auto& newParts = it.second; auto oldIt = oldPartsMap.find(spaceId); if (oldIt == oldPartsMap.end()) { VLOG(1) << "SpaceId " << spaceId << " was added!"; listener_->onSpaceAdded(spaceId); - for (auto partIt = newParts.begin(); partIt != newParts.end(); partIt++) { - listener_->onPartAdded(partIt->second); + for (const auto& newPart : newParts) { + listener_->onPartAdded(newPart.second); } } else { const auto& oldParts = oldIt->second; - for (auto partIt = newParts.begin(); partIt != newParts.end(); partIt++) { - auto oldPartIt = oldParts.find(partIt->first); + for (const auto& newPart : newParts) { + auto oldPartIt = oldParts.find(newPart.first); if (oldPartIt == oldParts.end()) { - VLOG(1) << "SpaceId " << spaceId << ", partId " << partIt->first << " was added!"; - listener_->onPartAdded(partIt->second); + VLOG(1) << "SpaceId " << spaceId << ", partId " << newPart.first << " was added!"; + listener_->onPartAdded(newPart.second); } else { const auto& oldPartHosts = oldPartIt->second; - const auto& newPartHosts = partIt->second; + const auto& newPartHosts = newPart.second; if (oldPartHosts != newPartHosts) { - VLOG(1) << "SpaceId " << spaceId << ", partId " << partIt->first << " was updated!"; + VLOG(1) << "SpaceId " << spaceId << ", partId " << newPart.first << " was updated!"; listener_->onPartUpdated(newPartHosts); } } @@ -886,23 +939,23 @@ void MetaClient::diff(const LocalCache& oldCache, const LocalCache& newCache) { } } VLOG(1) << "Let's check if any old parts removed...."; - for (auto it = oldPartsMap.begin(); it != oldPartsMap.end(); it++) { - auto spaceId = it->first; - const auto& oldParts = it->second; + for (auto& it : oldPartsMap) { + auto spaceId = it.first; + const auto& oldParts = it.second; auto newIt = newPartsMap.find(spaceId); if (newIt == newPartsMap.end()) { VLOG(1) << "SpaceId " << spaceId << " was removed!"; - for (auto partIt = oldParts.begin(); partIt != oldParts.end(); partIt++) { - listener_->onPartRemoved(spaceId, partIt->first); + for (const auto& oldPart : oldParts) { + listener_->onPartRemoved(spaceId, oldPart.first); } listener_->onSpaceRemoved(spaceId); } else { const auto& newParts = newIt->second; - for (auto partIt = oldParts.begin(); partIt != oldParts.end(); partIt++) { - auto newPartIt = newParts.find(partIt->first); + for (const auto& oldPart : oldParts) { + auto newPartIt = newParts.find(oldPart.first); if (newPartIt == newParts.end()) { - VLOG(1) << "SpaceId " << spaceId << ", partId " << partIt->first << " was removed!"; - listener_->onPartRemoved(spaceId, partIt->first); + VLOG(1) << "SpaceId " << spaceId << ", partId " << oldPart.first << " was removed!"; + listener_->onPartRemoved(spaceId, oldPart.first); } } } @@ -1177,8 +1230,8 @@ MetaClient::getPartsAlloc(GraphSpaceID spaceId, PartTerms* partTerms) { [](auto client, auto request) { return client->future_getPartsAlloc(request); }, [=](cpp2::GetPartsAllocResp&& resp) -> decltype(auto) { std::unordered_map> parts; - for (auto it = resp.get_parts().begin(); it != resp.get_parts().end(); it++) { - parts.emplace(it->first, it->second); + for (const auto& it : resp.get_parts()) { + parts.emplace(it.first, it.second); } if (partTerms && resp.terms_ref().has_value()) { for (auto& termOfPart : resp.terms_ref().value()) { @@ -1195,9 +1248,9 @@ StatusOr MetaClient::getSpaceIdByNameFromCache(const std::string& if (!ready_) { return Status::Error("Not ready!"); } - folly::RWSpinLock::ReadHolder holder(localCacheLock_); - auto it = spaceIndexByName_.find(name); - if (it != spaceIndexByName_.end()) { + const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); + auto it = threadLocalInfo.spaceIndexByName_.find(name); + if (it != threadLocalInfo.spaceIndexByName_.end()) { return it->second; } return Status::SpaceNotFound(); @@ -1207,9 +1260,9 @@ StatusOr MetaClient::getSpaceNameByIdFromCache(GraphSpaceID spaceId if (!ready_) { return Status::Error("Not ready!"); } - folly::RWSpinLock::ReadHolder holder(localCacheLock_); - auto spaceIt = localCache_.find(spaceId); - if (spaceIt == localCache_.end()) { + const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); + auto spaceIt = threadLocalInfo.localCache_.find(spaceId); + if (spaceIt == threadLocalInfo.localCache_.end()) { LOG(ERROR) << "Space " << spaceId << " not found!"; return Status::Error("Space %d not found", spaceId); } @@ -1221,9 +1274,9 @@ StatusOr MetaClient::getTagIDByNameFromCache(const GraphSpaceID& space, if (!ready_) { return Status::Error("Not ready!"); } - folly::RWSpinLock::ReadHolder holder(localCacheLock_); - auto it = spaceTagIndexByName_.find(std::make_pair(space, name)); - if (it == spaceTagIndexByName_.end()) { + const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); + auto it = threadLocalInfo.spaceTagIndexByName_.find(std::make_pair(space, name)); + if (it == threadLocalInfo.spaceTagIndexByName_.end()) { return Status::Error("TagName `%s' is nonexistent", name.c_str()); } return it->second; @@ -1234,9 +1287,9 @@ StatusOr MetaClient::getTagNameByIdFromCache(const GraphSpaceID& sp if (!ready_) { return Status::Error("Not ready!"); } - folly::RWSpinLock::ReadHolder holder(localCacheLock_); - auto it = spaceTagIndexById_.find(std::make_pair(space, tagId)); - if (it == spaceTagIndexById_.end()) { + const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); + auto it = threadLocalInfo.spaceTagIndexById_.find(std::make_pair(space, tagId)); + if (it == threadLocalInfo.spaceTagIndexById_.end()) { return Status::Error("TagID `%d' is nonexistent", tagId); } return it->second; @@ -1247,9 +1300,9 @@ StatusOr MetaClient::getEdgeTypeByNameFromCache(const GraphSpaceID& sp if (!ready_) { return Status::Error("Not ready!"); } - folly::RWSpinLock::ReadHolder holder(localCacheLock_); - auto it = spaceEdgeIndexByName_.find(std::make_pair(space, name)); - if (it == spaceEdgeIndexByName_.end()) { + const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); + auto it = threadLocalInfo.spaceEdgeIndexByName_.find(std::make_pair(space, name)); + if (it == threadLocalInfo.spaceEdgeIndexByName_.end()) { return Status::Error("EdgeName `%s' is nonexistent", name.c_str()); } return it->second; @@ -1260,9 +1313,9 @@ StatusOr MetaClient::getEdgeNameByTypeFromCache(const GraphSpaceID& if (!ready_) { return Status::Error("Not ready!"); } - folly::RWSpinLock::ReadHolder holder(localCacheLock_); - auto it = spaceEdgeIndexByType_.find(std::make_pair(space, edgeType)); - if (it == spaceEdgeIndexByType_.end()) { + const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); + auto it = threadLocalInfo.spaceEdgeIndexByType_.find(std::make_pair(space, edgeType)); + if (it == threadLocalInfo.spaceEdgeIndexByType_.end()) { return Status::Error("EdgeType `%d' is nonexistent", edgeType); } return it->second; @@ -1272,9 +1325,9 @@ StatusOr> MetaClient::getAllEdgeFromCache(const GraphSp if (!ready_) { return Status::Error("Not ready!"); } - folly::RWSpinLock::ReadHolder holder(localCacheLock_); - auto it = spaceAllEdgeMap_.find(space); - if (it == spaceAllEdgeMap_.end()) { + const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); + auto it = threadLocalInfo.spaceAllEdgeMap_.find(space); + if (it == threadLocalInfo.spaceAllEdgeMap_.end()) { return Status::Error("SpaceId `%d' is nonexistent", space); } return it->second; @@ -1407,14 +1460,14 @@ folly::Future> MetaClient::removeRange(std::string segment, } PartsMap MetaClient::getPartsMapFromCache(const HostAddr& host) { - folly::RWSpinLock::ReadHolder holder(localCacheLock_); - return doGetPartsMap(host, localCache_); + const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); + return doGetPartsMap(host, threadLocalInfo.localCache_); } StatusOr MetaClient::getPartHostsFromCache(GraphSpaceID spaceId, PartitionID partId) { - folly::RWSpinLock::ReadHolder holder(localCacheLock_); - auto it = localCache_.find(spaceId); - if (it == localCache_.end()) { + const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); + auto it = threadLocalInfo.localCache_.find(spaceId); + if (it == threadLocalInfo.localCache_.end()) { return Status::Error("Space not found, spaceid: %d", spaceId); } auto& cache = it->second; @@ -1432,9 +1485,9 @@ StatusOr MetaClient::getPartHostsFromCache(GraphSpaceID spaceId, Part Status MetaClient::checkPartExistInCache(const HostAddr& host, GraphSpaceID spaceId, PartitionID partId) { - folly::RWSpinLock::ReadHolder holder(localCacheLock_); - auto it = localCache_.find(spaceId); - if (it != localCache_.end()) { + const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); + auto it = threadLocalInfo.localCache_.find(spaceId); + if (it != threadLocalInfo.localCache_.end()) { auto partsIt = it->second->partsOnHost_.find(host); if (partsIt != it->second->partsOnHost_.end()) { for (auto& pId : partsIt->second) { @@ -1451,9 +1504,9 @@ Status MetaClient::checkPartExistInCache(const HostAddr& host, } Status MetaClient::checkSpaceExistInCache(const HostAddr& host, GraphSpaceID spaceId) { - folly::RWSpinLock::ReadHolder holder(localCacheLock_); - auto it = localCache_.find(spaceId); - if (it != localCache_.end()) { + const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); + auto it = threadLocalInfo.localCache_.find(spaceId); + if (it != threadLocalInfo.localCache_.end()) { auto partsIt = it->second->partsOnHost_.find(host); if (partsIt != it->second->partsOnHost_.end() && !partsIt->second.empty()) { return Status::OK(); @@ -1464,10 +1517,10 @@ Status MetaClient::checkSpaceExistInCache(const HostAddr& host, GraphSpaceID spa return Status::SpaceNotFound(); } -StatusOr MetaClient::partsNum(GraphSpaceID spaceId) const { - folly::RWSpinLock::ReadHolder holder(localCacheLock_); - auto it = localCache_.find(spaceId); - if (it == localCache_.end()) { +StatusOr MetaClient::partsNum(GraphSpaceID spaceId) { + const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); + auto it = threadLocalInfo.localCache_.find(spaceId); + if (it == threadLocalInfo.localCache_.end()) { return Status::Error("Space not found, spaceid: %d", spaceId); } return it->second->partsAlloc_.size(); @@ -1850,9 +1903,9 @@ StatusOr MetaClient::getSpaceVidLen(const GraphSpaceID& spaceId) { if (!ready_) { return Status::Error("Not ready!"); } - folly::RWSpinLock::ReadHolder holder(localCacheLock_); - auto spaceIt = localCache_.find(spaceId); - if (spaceIt == localCache_.end()) { + const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); + auto spaceIt = threadLocalInfo.localCache_.find(spaceId); + if (spaceIt == threadLocalInfo.localCache_.end()) { LOG(ERROR) << "Space " << spaceId << " not found!"; return Status::Error("Space %d not found", spaceId); } @@ -1868,9 +1921,9 @@ StatusOr MetaClient::getSpaceVidType(const GraphSpac if (!ready_) { return Status::Error("Not ready!"); } - folly::RWSpinLock::ReadHolder holder(localCacheLock_); - auto spaceIt = localCache_.find(spaceId); - if (spaceIt == localCache_.end()) { + const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); + auto spaceIt = threadLocalInfo.localCache_.find(spaceId); + if (spaceIt == threadLocalInfo.localCache_.end()) { LOG(ERROR) << "Space " << spaceId << " not found!"; return Status::Error("Space %d not found", spaceId); } @@ -1889,9 +1942,9 @@ StatusOr MetaClient::getSpaceDesc(const GraphSpaceID& space) { if (!ready_) { return Status::Error("Not ready!"); } - folly::RWSpinLock::ReadHolder holder(localCacheLock_); - auto spaceIt = localCache_.find(space); - if (spaceIt == localCache_.end()) { + const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); + auto spaceIt = threadLocalInfo.localCache_.find(space); + if (spaceIt == threadLocalInfo.localCache_.end()) { LOG(ERROR) << "Space " << space << " not found!"; return Status::Error("Space %d not found", space); } @@ -1912,9 +1965,9 @@ StatusOr> MetaClient::getTagSchemaFr if (!ready_) { return Status::Error("Not ready!"); } - folly::RWSpinLock::ReadHolder holder(localCacheLock_); - auto spaceIt = localCache_.find(spaceId); - if (spaceIt != localCache_.end()) { + const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); + auto spaceIt = threadLocalInfo.localCache_.find(spaceId); + if (spaceIt != threadLocalInfo.localCache_.end()) { auto tagIt = spaceIt->second->tagSchemas_.find(tagID); if (tagIt != spaceIt->second->tagSchemas_.end() && !tagIt->second.empty()) { size_t vNum = tagIt->second.size(); @@ -1932,9 +1985,9 @@ StatusOr> MetaClient::getEdgeSchemaF if (!ready_) { return Status::Error("Not ready!"); } - folly::RWSpinLock::ReadHolder holder(localCacheLock_); - auto spaceIt = localCache_.find(spaceId); - if (spaceIt != localCache_.end()) { + const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); + auto spaceIt = threadLocalInfo.localCache_.find(spaceId); + if (spaceIt != threadLocalInfo.localCache_.end()) { auto edgeIt = spaceIt->second->edgeSchemas_.find(edgeType); if (edgeIt != spaceIt->second->edgeSchemas_.end() && !edgeIt->second.empty()) { size_t vNum = edgeIt->second.size(); @@ -1951,9 +2004,9 @@ StatusOr MetaClient::getAllVerTagSchema(GraphSpaceID spaceId) { if (!ready_) { return Status::Error("Not ready!"); } - folly::RWSpinLock::ReadHolder holder(localCacheLock_); - auto iter = localCache_.find(spaceId); - if (iter == localCache_.end()) { + const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); + auto iter = threadLocalInfo.localCache_.find(spaceId); + if (iter == threadLocalInfo.localCache_.end()) { return Status::Error("Space %d not found", spaceId); } return iter->second->tagSchemas_; @@ -1963,9 +2016,9 @@ StatusOr MetaClient::getAllLatestVerTagSchema(const GraphSpaceID& spa if (!ready_) { return Status::Error("Not ready!"); } - folly::RWSpinLock::ReadHolder holder(localCacheLock_); - auto iter = localCache_.find(spaceId); - if (iter == localCache_.end()) { + const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); + auto iter = threadLocalInfo.localCache_.find(spaceId); + if (iter == threadLocalInfo.localCache_.end()) { return Status::Error("Space %d not found", spaceId); } TagSchema tagsSchema; @@ -1981,9 +2034,9 @@ StatusOr MetaClient::getAllVerEdgeSchema(GraphSpaceID spaceId) { if (!ready_) { return Status::Error("Not ready!"); } - folly::RWSpinLock::ReadHolder holder(localCacheLock_); - auto iter = localCache_.find(spaceId); - if (iter == localCache_.end()) { + const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); + auto iter = threadLocalInfo.localCache_.find(spaceId); + if (iter == threadLocalInfo.localCache_.end()) { return Status::Error("Space %d not found", spaceId); } return iter->second->edgeSchemas_; @@ -1993,9 +2046,9 @@ StatusOr MetaClient::getAllLatestVerEdgeSchemaFromCache(const GraphS if (!ready_) { return Status::Error("Not ready!"); } - folly::RWSpinLock::ReadHolder holder(localCacheLock_); - auto iter = localCache_.find(spaceId); - if (iter == localCache_.end()) { + const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); + auto iter = threadLocalInfo.localCache_.find(spaceId); + if (iter == threadLocalInfo.localCache_.end()) { return Status::Error("Space %d not found", spaceId); } EdgeSchema edgesSchema; @@ -2083,9 +2136,9 @@ StatusOr> MetaClient::getTagIndexFromCache(Grap return Status::Error("Not ready!"); } - folly::RWSpinLock::ReadHolder holder(localCacheLock_); - auto spaceIt = localCache_.find(spaceId); - if (spaceIt == localCache_.end()) { + const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); + auto spaceIt = threadLocalInfo.localCache_.find(spaceId); + if (spaceIt == threadLocalInfo.localCache_.end()) { VLOG(3) << "Space " << spaceId << " not found!"; return Status::SpaceNotFound(); } else { @@ -2120,9 +2173,9 @@ StatusOr> MetaClient::getEdgeIndexFromCache(Gra return Status::Error("Not ready!"); } - folly::RWSpinLock::ReadHolder holder(localCacheLock_); - auto spaceIt = localCache_.find(spaceId); - if (spaceIt == localCache_.end()) { + const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); + auto spaceIt = threadLocalInfo.localCache_.find(spaceId); + if (spaceIt == threadLocalInfo.localCache_.end()) { VLOG(3) << "Space " << spaceId << " not found!"; return Status::SpaceNotFound(); } else { @@ -2157,9 +2210,9 @@ StatusOr>> MetaClient::getTagIndexe return Status::Error("Not ready!"); } - folly::RWSpinLock::ReadHolder holder(localCacheLock_); - auto spaceIt = localCache_.find(spaceId); - if (spaceIt == localCache_.end()) { + const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); + auto spaceIt = threadLocalInfo.localCache_.find(spaceId); + if (spaceIt == threadLocalInfo.localCache_.end()) { VLOG(3) << "Space " << spaceId << " not found!"; return Status::SpaceNotFound(); } else { @@ -2180,9 +2233,9 @@ StatusOr>> MetaClient::getEdgeIndex return Status::Error("Not ready!"); } - folly::RWSpinLock::ReadHolder holder(localCacheLock_); - auto spaceIt = localCache_.find(spaceId); - if (spaceIt == localCache_.end()) { + const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); + auto spaceIt = threadLocalInfo.localCache_.find(spaceId); + if (spaceIt == threadLocalInfo.localCache_.end()) { VLOG(3) << "Space " << spaceId << " not found!"; return Status::SpaceNotFound(); } else { @@ -2250,46 +2303,46 @@ StatusOr MetaClient::getLeaderInfo() { const std::vector& MetaClient::getAddresses() { return addrs_; } -std::vector MetaClient::getRolesByUserFromCache(const std::string& user) const { +std::vector MetaClient::getRolesByUserFromCache(const std::string& user) { if (!ready_) { return std::vector(0); } - folly::RWSpinLock::ReadHolder holder(localCacheLock_); - auto iter = userRolesMap_.find(user); - if (iter == userRolesMap_.end()) { + const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); + auto iter = threadLocalInfo.userRolesMap_.find(user); + if (iter == threadLocalInfo.userRolesMap_.end()) { return std::vector(0); } return iter->second; } -bool MetaClient::authCheckFromCache(const std::string& account, const std::string& password) const { +bool MetaClient::authCheckFromCache(const std::string& account, const std::string& password) { if (!ready_) { return false; } - folly::RWSpinLock::ReadHolder holder(localCacheLock_); - auto iter = userPasswordMap_.find(account); - if (iter == userPasswordMap_.end()) { + const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); + auto iter = threadLocalInfo.userPasswordMap_.find(account); + if (iter == threadLocalInfo.userPasswordMap_.end()) { return false; } return iter->second == password; } -bool MetaClient::checkShadowAccountFromCache(const std::string& account) const { +bool MetaClient::checkShadowAccountFromCache(const std::string& account) { if (!ready_) { return false; } - folly::RWSpinLock::ReadHolder holder(localCacheLock_); - auto iter = userPasswordMap_.find(account); - if (iter != userPasswordMap_.end()) { + const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); + auto iter = threadLocalInfo.userPasswordMap_.find(account); + if (iter != threadLocalInfo.userPasswordMap_.end()) { return true; } return false; } -StatusOr MetaClient::getTermFromCache(GraphSpaceID spaceId, PartitionID partId) const { - folly::RWSpinLock::ReadHolder holder(localCacheLock_); - auto spaceInfo = localCache_.find(spaceId); - if (spaceInfo == localCache_.end()) { +StatusOr MetaClient::getTermFromCache(GraphSpaceID spaceId, PartitionID partId) { + const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); + auto spaceInfo = threadLocalInfo.localCache_.find(spaceId); + if (spaceInfo == threadLocalInfo.localCache_.end()) { return Status::Error("Term not found!"); } @@ -2301,13 +2354,13 @@ StatusOr MetaClient::getTermFromCache(GraphSpaceID spaceId, PartitionID return termInfo->second; } -StatusOr> MetaClient::getStorageHosts() const { +StatusOr> MetaClient::getStorageHosts() { if (!ready_) { return Status::Error("Not ready!"); } - folly::RWSpinLock::ReadHolder holder(localCacheLock_); - return storageHosts_; + const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); + return threadLocalInfo.storageHosts_; } StatusOr MetaClient::getLatestTagVersionFromCache(const GraphSpaceID& space, @@ -2315,9 +2368,9 @@ StatusOr MetaClient::getLatestTagVersionFromCache(const GraphSpaceID& if (!ready_) { return Status::Error("Not ready!"); } - folly::RWSpinLock::ReadHolder holder(localCacheLock_); - auto it = spaceNewestTagVerMap_.find(std::make_pair(space, tagId)); - if (it == spaceNewestTagVerMap_.end()) { + const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); + auto it = threadLocalInfo.spaceNewestTagVerMap_.find(std::make_pair(space, tagId)); + if (it == threadLocalInfo.spaceNewestTagVerMap_.end()) { return Status::TagNotFound(); } return it->second; @@ -2328,9 +2381,9 @@ StatusOr MetaClient::getLatestEdgeVersionFromCache(const GraphSpaceID if (!ready_) { return Status::Error("Not ready!"); } - folly::RWSpinLock::ReadHolder holder(localCacheLock_); - auto it = spaceNewestEdgeVerMap_.find(std::make_pair(space, edgeType)); - if (it == spaceNewestEdgeVerMap_.end()) { + const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); + auto it = threadLocalInfo.spaceNewestEdgeVerMap_.find(std::make_pair(space, edgeType)); + if (it == threadLocalInfo.spaceNewestEdgeVerMap_.end()) { return Status::EdgeNotFound(); } return it->second; @@ -2801,9 +2854,9 @@ MetaClient::getListenersBySpaceHostFromCache(GraphSpaceID spaceId, const HostAdd if (!ready_) { return Status::Error("Not ready!"); } - folly::RWSpinLock::ReadHolder holder(localCacheLock_); - auto spaceIt = localCache_.find(spaceId); - if (spaceIt == localCache_.end()) { + const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); + auto spaceIt = threadLocalInfo.localCache_.find(spaceId); + if (spaceIt == threadLocalInfo.localCache_.end()) { VLOG(3) << "Space " << spaceId << " not found!"; return Status::SpaceNotFound(); } @@ -2820,8 +2873,8 @@ StatusOr MetaClient::getListenersByHostFromCache(const HostAddr& h if (!ready_) { return Status::Error("Not ready!"); } - folly::RWSpinLock::ReadHolder holder(localCacheLock_); - return doGetListenersMap(host, localCache_); + const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); + return doGetListenersMap(host, threadLocalInfo.localCache_); } ListenersMap MetaClient::doGetListenersMap(const HostAddr& host, const LocalCache& localCache) { @@ -2857,9 +2910,9 @@ StatusOr MetaClient::getListenerHostsBySpacePartType(GraphSpaceID spac if (!ready_) { return Status::Error("Not ready!"); } - folly::RWSpinLock::ReadHolder holder(localCacheLock_); - auto spaceIt = localCache_.find(spaceId); - if (spaceIt == localCache_.end()) { + const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); + auto spaceIt = threadLocalInfo.localCache_.find(spaceId); + if (spaceIt == threadLocalInfo.localCache_.end()) { VLOG(3) << "Space " << spaceId << " not found!"; return Status::SpaceNotFound(); } @@ -2878,9 +2931,9 @@ StatusOr> MetaClient::getListenerHostTypeBySpace if (!ready_) { return Status::Error("Not ready!"); } - folly::RWSpinLock::ReadHolder holder(localCacheLock_); - auto spaceIt = localCache_.find(spaceId); - if (spaceIt == localCache_.end()) { + const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); + auto spaceIt = threadLocalInfo.localCache_.find(spaceId); + if (spaceIt == threadLocalInfo.localCache_.end()) { VLOG(3) << "Space " << spaceId << " not found!"; return Status::SpaceNotFound(); } @@ -2899,6 +2952,9 @@ StatusOr> MetaClient::getListenerHostTypeBySpace } bool MetaClient::loadCfg() { + if (options_.skipConfig_ || localCfgLastUpdateTime_ == metadLastUpdateTime_) { + return true; + } if (!configReady_ && !registerCfg()) { return false; } @@ -2929,6 +2985,7 @@ bool MetaClient::loadCfg() { LOG(ERROR) << "Load configs failed: " << ret.status(); return false; } + localCfgLastUpdateTime_.store(metadLastUpdateTime_.load()); return true; } @@ -2936,7 +2993,7 @@ void MetaClient::updateGflagsValue(const cpp2::ConfigItem& item) { if (item.get_mode() != cpp2::ConfigMode::MUTABLE) { return; } - auto value = item.get_value(); + const auto& value = item.get_value(); std::string curValue; if (!gflags::GetCommandLineOption(item.get_name().c_str(), &curValue)) { return; @@ -2965,8 +3022,8 @@ void MetaClient::updateNestedGflags(const std::unordered_map optionMap.emplace(value.first, value.second.toString()); } - folly::RWSpinLock::ReadHolder holder(localCacheLock_); - for (const auto& spaceEntry : localCache_) { + const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); + for (const auto& spaceEntry : threadLocalInfo.localCache_) { listener_->onSpaceOptionUpdated(spaceEntry.first, optionMap); } } @@ -3352,8 +3409,8 @@ StatusOr> MetaClient::getFTIndexe if (!ready_) { return Status::Error("Not ready!"); } - folly::RWSpinLock::ReadHolder holder(localCacheLock_); - return fulltextIndexMap_; + const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); + return threadLocalInfo.fulltextIndexMap_; } StatusOr> MetaClient::getFTIndexBySpaceFromCache( @@ -3361,11 +3418,11 @@ StatusOr> MetaClient::getFTIndexB if (!ready_) { return Status::Error("Not ready!"); } - folly::RWSpinLock::ReadHolder holder(localCacheLock_); + const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); std::unordered_map indexes; - for (auto it = fulltextIndexMap_.begin(); it != fulltextIndexMap_.end(); ++it) { - if (it->second.get_space_id() == spaceId) { - indexes[it->first] = it->second; + for (const auto& it : threadLocalInfo.fulltextIndexMap_) { + if (it.second.get_space_id() == spaceId) { + indexes[it.first] = it.second; } } return indexes; @@ -3376,13 +3433,13 @@ StatusOr> MetaClient::getFTIndexBySpaceSch if (!ready_) { return Status::Error("Not ready!"); } - folly::RWSpinLock::ReadHolder holder(localCacheLock_); - for (auto it = fulltextIndexMap_.begin(); it != fulltextIndexMap_.end(); ++it) { - auto id = it->second.get_depend_schema().getType() == nebula::cpp2::SchemaID::Type::edge_type - ? it->second.get_depend_schema().get_edge_type() - : it->second.get_depend_schema().get_tag_id(); - if (it->second.get_space_id() == spaceId && id == schemaId) { - return std::make_pair(it->first, it->second); + const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); + for (auto& it : threadLocalInfo.fulltextIndexMap_) { + auto id = it.second.get_depend_schema().getType() == nebula::cpp2::SchemaID::Type::edge_type + ? it.second.get_depend_schema().get_edge_type() + : it.second.get_depend_schema().get_tag_id(); + if (it.second.get_space_id() == spaceId && id == schemaId) { + return std::make_pair(it.first, it.second); } } return Status::IndexNotFound(); @@ -3393,12 +3450,12 @@ StatusOr MetaClient::getFTIndexByNameFromCache(GraphSpaceID space if (!ready_) { return Status::Error("Not ready!"); } - folly::RWSpinLock::ReadHolder holder(localCacheLock_); - if (fulltextIndexMap_.find(name) != fulltextIndexMap_.end() && - fulltextIndexMap_[name].get_space_id() != spaceId) { + const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); + if (threadLocalInfo.fulltextIndexMap_.find(name) != fulltextIndexMap_.end() && + threadLocalInfo.fulltextIndexMap_.at(name).get_space_id() != spaceId) { return Status::IndexNotFound(); } - return fulltextIndexMap_[name]; + return threadLocalInfo.fulltextIndexMap_.at(name); } folly::Future> MetaClient::createSession( diff --git a/src/clients/meta/MetaClient.h b/src/clients/meta/MetaClient.h index 6605404ca53..445cabdebf0 100644 --- a/src/clients/meta/MetaClient.h +++ b/src/clients/meta/MetaClient.h @@ -16,6 +16,7 @@ #include #include "common/base/Base.h" +#include "common/base/ObjectPool.h" #include "common/base/Status.h" #include "common/base/StatusOr.h" #include "common/meta/Common.h" @@ -74,14 +75,36 @@ struct SpaceInfoCache { cpp2::SpaceDesc spaceDesc_; PartsAlloc partsAlloc_; std::unordered_map> partsOnHost_; + std::vector tagItemVec_; TagSchemas tagSchemas_; + std::vector edgeItemVec_; EdgeSchemas edgeSchemas_; + std::vector tagIndexItemVec_; Indexes tagIndexes_; + std::vector edgeIndexItemVec_; Indexes edgeIndexes_; Listeners listeners_; // objPool used to decode when adding field ObjectPool pool_; std::unordered_map termOfPartition_; + + SpaceInfoCache() = default; + SpaceInfoCache(const SpaceInfoCache& info) + : spaceDesc_(info.spaceDesc_), + partsAlloc_(info.partsAlloc_), + partsOnHost_(info.partsOnHost_), + tagItemVec_(info.tagItemVec_), + tagSchemas_(info.tagSchemas_), + edgeItemVec_(info.edgeItemVec_), + edgeSchemas_(info.edgeSchemas_), + tagIndexItemVec_(info.tagIndexItemVec_), + tagIndexes_(info.tagIndexes_), + edgeIndexItemVec_(info.edgeIndexItemVec_), + edgeIndexes_(info.edgeIndexes_), + listeners_(info.listeners_), + termOfPartition_(info.termOfPartition_) {} + + ~SpaceInfoCache() = default; }; using LocalCache = std::unordered_map>; @@ -501,7 +524,7 @@ class MetaClient { Status checkSpaceExistInCache(const HostAddr& host, GraphSpaceID spaceId); - StatusOr partsNum(GraphSpaceID spaceId) const; + StatusOr partsNum(GraphSpaceID spaceId); PartitionID partId(int32_t numParts, VertexID id) const; @@ -559,15 +582,15 @@ class MetaClient { EdgeType edgeType, const std::string& field); - std::vector getRolesByUserFromCache(const std::string& user) const; + std::vector getRolesByUserFromCache(const std::string& user); - bool authCheckFromCache(const std::string& account, const std::string& password) const; + bool authCheckFromCache(const std::string& account, const std::string& password); - StatusOr getTermFromCache(GraphSpaceID spaceId, PartitionID) const; + StatusOr getTermFromCache(GraphSpaceID spaceId, PartitionID); - bool checkShadowAccountFromCache(const std::string& account) const; + bool checkShadowAccountFromCache(const std::string& account); - StatusOr> getStorageHosts() const; + StatusOr> getStorageHosts(); StatusOr getSessionFromCache(const nebula::SessionID& session_id); @@ -719,8 +742,10 @@ class MetaClient { // leaderIdsLock_ is used to protect leaderIds_ std::unordered_map> leaderIds_; folly::RWSpinLock leaderIdsLock_; - int64_t localLastUpdateTime_{0}; - int64_t metadLastUpdateTime_{0}; + std::atomic localDataLastUpdateTime_{-1}; + std::atomic localCfgLastUpdateTime_{-1}; + std::atomic metadLastUpdateTime_{0}; + int64_t metaServerVersion_{-1}; static constexpr int64_t EXPECT_META_VERSION = 2; @@ -736,6 +761,31 @@ class MetaClient { HostAddr leader_; HostAddr localHost_; + struct ThreadLocalInfo { + int64_t localLastUpdateTime_{-2}; + LocalCache localCache_; + SpaceNameIdMap spaceIndexByName_; + SpaceTagNameIdMap spaceTagIndexByName_; + SpaceEdgeNameTypeMap spaceEdgeIndexByName_; + SpaceEdgeTypeNameMap spaceEdgeIndexByType_; + SpaceTagIdNameMap spaceTagIndexById_; + SpaceNewestTagVerMap spaceNewestTagVerMap_; + SpaceNewestEdgeVerMap spaceNewestEdgeVerMap_; + SpaceAllEdgeMap spaceAllEdgeMap_; + + UserRolesMap userRolesMap_; + std::vector storageHosts_; + FTIndexMap fulltextIndexMap_; + UserPasswordMap userPasswordMap_; + }; + + const ThreadLocalInfo& getThreadLocalInfo(); + + void addSchemaField(NebulaSchemaProvider* schema, const cpp2::ColumnDef& col, ObjectPool* pool); + + TagSchemas buildTagSchemas(std::vector tagItemVec, ObjectPool* pool); + EdgeSchemas buildEdgeSchemas(std::vector edgeItemVec, ObjectPool* pool); + std::unique_ptr bgThread_; SpaceNameIdMap spaceIndexByName_; SpaceTagNameIdMap spaceTagIndexByName_; diff --git a/src/graph/service/CloudAuthenticator.cpp b/src/graph/service/CloudAuthenticator.cpp index d69ea54c656..31830779882 100644 --- a/src/graph/service/CloudAuthenticator.cpp +++ b/src/graph/service/CloudAuthenticator.cpp @@ -12,7 +12,7 @@ namespace nebula { namespace graph { -CloudAuthenticator::CloudAuthenticator(const meta::MetaClient* client) { metaClient_ = client; } +CloudAuthenticator::CloudAuthenticator(meta::MetaClient* client) { metaClient_ = client; } bool CloudAuthenticator::auth(const std::string& user, const std::string& password) { // The shadow account on the nebula side has been created diff --git a/src/graph/service/CloudAuthenticator.h b/src/graph/service/CloudAuthenticator.h index 0b54d0d39eb..04e37b5ebde 100644 --- a/src/graph/service/CloudAuthenticator.h +++ b/src/graph/service/CloudAuthenticator.h @@ -15,12 +15,12 @@ namespace graph { class CloudAuthenticator final : public Authenticator { public: - explicit CloudAuthenticator(const meta::MetaClient* client); + explicit CloudAuthenticator(meta::MetaClient* client); bool auth(const std::string& user, const std::string& password) override; private: - const meta::MetaClient* metaClient_; + meta::MetaClient* metaClient_; }; } // namespace graph diff --git a/src/graph/service/PasswordAuthenticator.cpp b/src/graph/service/PasswordAuthenticator.cpp index 4326c7d5b74..833200dec53 100644 --- a/src/graph/service/PasswordAuthenticator.cpp +++ b/src/graph/service/PasswordAuthenticator.cpp @@ -8,9 +8,7 @@ namespace nebula { namespace graph { -PasswordAuthenticator::PasswordAuthenticator(const meta::MetaClient* client) { - metaClient_ = client; -} +PasswordAuthenticator::PasswordAuthenticator(meta::MetaClient* client) { metaClient_ = client; } bool PasswordAuthenticator::auth(const std::string& user, const std::string& password) { return metaClient_->authCheckFromCache(user, password); diff --git a/src/graph/service/PasswordAuthenticator.h b/src/graph/service/PasswordAuthenticator.h index 344ca2415af..3a8bbf5abd8 100644 --- a/src/graph/service/PasswordAuthenticator.h +++ b/src/graph/service/PasswordAuthenticator.h @@ -14,12 +14,12 @@ namespace graph { class PasswordAuthenticator final : public Authenticator { public: - explicit PasswordAuthenticator(const meta::MetaClient* client); + explicit PasswordAuthenticator(meta::MetaClient* client); bool auth(const std::string& user, const std::string& password) override; private: - const meta::MetaClient* metaClient_; + meta::MetaClient* metaClient_; }; } // namespace graph diff --git a/src/graph/service/QueryEngine.h b/src/graph/service/QueryEngine.h index 505a844373d..a0b777bc35b 100644 --- a/src/graph/service/QueryEngine.h +++ b/src/graph/service/QueryEngine.h @@ -38,7 +38,7 @@ class QueryEngine final : public cpp::NonCopyable, public cpp::NonMovable { using RequestContextPtr = std::unique_ptr>; void execute(RequestContextPtr rctx); - const meta::MetaClient* metaClient() const { return metaClient_; } + meta::MetaClient* metaClient() { return metaClient_; } private: Status setupMemoryMonitorThread();