diff --git a/docker/README.md b/docker/README.md index 92365b474ac..6cd7391c46e 100644 --- a/docker/README.md +++ b/docker/README.md @@ -5,4 +5,4 @@ Following docker images will be ready in production. - [vesoft/nebula-graphd](https://hub.docker.com/r/vesoft/nebula-graphd): nebula-graphd service built with `Dockerfile.graphd` - [vesoft/nebula-metad](https://hub.docker.com/r/vesoft/nebula-metad): nebula-metad service built with `Dockerfile.metad` - [vesoft/nebula-storaged](https://hub.docker.com/r/vesoft/nebula-storaged): nebula-storaged service built with `Dockerfile.storaged` -- [vesoft/nebula-tools](https://hub.docker.com/r/vesoft/nebula-tools): nebula tools built with `Dockerfile.tools`, including db_dump and meta_dump +- [vesoft/nebula-tools](https://hub.docker.com/r/vesoft/nebula-tools): nebula tools built with `Dockerfile.tools`, including db_dump, meta_dump and db_upgrader diff --git a/src/common/utils/NebulaKeyUtils.cpp b/src/common/utils/NebulaKeyUtils.cpp index d026f3efed0..479ad2c939c 100644 --- a/src/common/utils/NebulaKeyUtils.cpp +++ b/src/common/utils/NebulaKeyUtils.cpp @@ -253,6 +253,7 @@ std::vector NebulaKeyUtils::snapshotPrefix(PartitionID partId) { if (partId == 0) { result.emplace_back(""); } else { + result.emplace_back(vertexPrefix(partId)); result.emplace_back(tagPrefix(partId)); result.emplace_back(edgePrefix(partId)); result.emplace_back(IndexKeyUtils::indexPrefix(partId)); diff --git a/src/graph/executor/admin/SubmitJobExecutor.cpp b/src/graph/executor/admin/SubmitJobExecutor.cpp index 839d918ef4a..16b507d3bcc 100644 --- a/src/graph/executor/admin/SubmitJobExecutor.cpp +++ b/src/graph/executor/admin/SubmitJobExecutor.cpp @@ -79,9 +79,14 @@ StatusOr SubmitJobExecutor::buildResult(meta::cpp2::JobOp jobOp, } const auto &jobsDesc = *resp.job_desc_ref(); for (const auto &jobDesc : jobsDesc) { + // show zone balance as data balance + auto type = jobDesc.get_type(); + if (type == meta::cpp2::JobType::ZONE_BALANCE) { + type = meta::cpp2::JobType::DATA_BALANCE; + } v.emplace_back(nebula::Row({ jobDesc.get_job_id(), - apache::thrift::util::enumNameSafe(jobDesc.get_type()), + apache::thrift::util::enumNameSafe(type), apache::thrift::util::enumNameSafe(jobDesc.get_status()), convertJobTimestampToDateTime(jobDesc.get_start_time()), convertJobTimestampToDateTime(jobDesc.get_stop_time()), @@ -124,7 +129,7 @@ nebula::DataSet SubmitJobExecutor::buildShowResultData( uint32_t total = paras.size() - index - 1, succeeded = 0, failed = 0, inProgress = 0, invalid = 0; v.emplace_back(Row({jd.get_job_id(), - apache::thrift::util::enumNameSafe(meta::cpp2::JobType::ZONE_BALANCE), + apache::thrift::util::enumNameSafe(meta::cpp2::JobType::DATA_BALANCE), apache::thrift::util::enumNameSafe(jd.get_status()), convertJobTimestampToDateTime(jd.get_start_time()).toString(), convertJobTimestampToDateTime(jd.get_stop_time()).toString(), diff --git a/src/graph/service/GraphFlags.cpp b/src/graph/service/GraphFlags.cpp index eecddc5bbd9..5ea5fbc1ad3 100644 --- a/src/graph/service/GraphFlags.cpp +++ b/src/graph/service/GraphFlags.cpp @@ -103,5 +103,3 @@ DEFINE_uint32( gc_worker_size, 0, "Background garbage clean workers, default number is 0 which means using hardware core size."); - -DEFINE_bool(graph_use_vertex_key, false, "whether allow insert or query the vertex key"); diff --git a/src/graph/service/GraphFlags.h b/src/graph/service/GraphFlags.h index 07e7bbdb3ff..f2c7ecf9c75 100644 --- a/src/graph/service/GraphFlags.h +++ b/src/graph/service/GraphFlags.h @@ -65,7 +65,4 @@ DECLARE_int32(max_job_size); DECLARE_bool(enable_async_gc); DECLARE_uint32(gc_worker_size); - -DECLARE_bool(graph_use_vertex_key); - #endif // GRAPH_GRAPHFLAGS_H_ diff --git a/src/graph/validator/MutateValidator.cpp b/src/graph/validator/MutateValidator.cpp index d59072d83a9..41f1f6fb14f 100644 --- a/src/graph/validator/MutateValidator.cpp +++ b/src/graph/validator/MutateValidator.cpp @@ -46,9 +46,6 @@ Status InsertVerticesValidator::check() { } auto tagItems = sentence->tagItems(); - if (!FLAGS_graph_use_vertex_key && tagItems.empty()) { - return Status::SemanticError("Insert vertex is forbidden, please speicify the tag"); - } schemas_.reserve(tagItems.size()); diff --git a/src/meta/processors/index/CreateEdgeIndexProcessor.cpp b/src/meta/processors/index/CreateEdgeIndexProcessor.cpp index 7fe16120005..85928287333 100644 --- a/src/meta/processors/index/CreateEdgeIndexProcessor.cpp +++ b/src/meta/processors/index/CreateEdgeIndexProcessor.cpp @@ -16,6 +16,7 @@ void CreateEdgeIndexProcessor::process(const cpp2::CreateEdgeIndexReq& req) { const auto& indexName = req.get_index_name(); auto& edgeName = req.get_edge_name(); const auto& fields = req.get_fields(); + auto ifNotExists = req.get_if_not_exists(); std::set columnSet; for (const auto& field : fields) { @@ -41,7 +42,7 @@ void CreateEdgeIndexProcessor::process(const cpp2::CreateEdgeIndexReq& req) { // check if the space already exist index has the same index name auto ret = getIndexID(space, indexName); if (nebula::ok(ret)) { - if (req.get_if_not_exists()) { + if (ifNotExists) { handleErrorCode(nebula::cpp2::ErrorCode::SUCCEEDED); } else { LOG(INFO) << "Create Edge Index Failed: " << indexName << " has existed"; @@ -96,7 +97,15 @@ void CreateEdgeIndexProcessor::process(const cpp2::CreateEdgeIndexReq& req) { } if (checkIndexExist(fields, item)) { - resp_.code_ref() = nebula::cpp2::ErrorCode::E_EXISTED; + if (ifNotExists) { + resp_.code_ref() = nebula::cpp2::ErrorCode::SUCCEEDED; + cpp2::ID thriftID; + // Fill index id to avoid broken promise + thriftID.index_id_ref() = item.get_index_id(); + resp_.id_ref() = thriftID; + } else { + resp_.code_ref() = nebula::cpp2::ErrorCode::E_EXISTED; + } onFinished(); return; } diff --git a/src/meta/processors/index/CreateTagIndexProcessor.cpp b/src/meta/processors/index/CreateTagIndexProcessor.cpp index d4c12254902..23ee267c09c 100644 --- a/src/meta/processors/index/CreateTagIndexProcessor.cpp +++ b/src/meta/processors/index/CreateTagIndexProcessor.cpp @@ -16,6 +16,7 @@ void CreateTagIndexProcessor::process(const cpp2::CreateTagIndexReq& req) { const auto& indexName = req.get_index_name(); auto& tagName = req.get_tag_name(); const auto& fields = req.get_fields(); + auto ifNotExists = req.get_if_not_exists(); std::set columnSet; for (const auto& field : fields) { @@ -42,7 +43,7 @@ void CreateTagIndexProcessor::process(const cpp2::CreateTagIndexReq& req) { // check if the space has the index with the same name auto ret = getIndexID(space, indexName); if (nebula::ok(ret)) { - if (req.get_if_not_exists()) { + if (ifNotExists) { handleErrorCode(nebula::cpp2::ErrorCode::SUCCEEDED); } else { LOG(INFO) << "Create Tag Index Failed: " << indexName << " has existed"; @@ -96,7 +97,15 @@ void CreateTagIndexProcessor::process(const cpp2::CreateTagIndexReq& req) { } if (checkIndexExist(fields, item)) { - resp_.code_ref() = nebula::cpp2::ErrorCode::E_EXISTED; + if (ifNotExists) { + resp_.code_ref() = nebula::cpp2::ErrorCode::SUCCEEDED; + cpp2::ID thriftID; + // Fill index id to avoid broken promise + thriftID.index_id_ref() = item.get_index_id(); + resp_.id_ref() = thriftID; + } else { + resp_.code_ref() = nebula::cpp2::ErrorCode::E_EXISTED; + } onFinished(); return; } diff --git a/src/storage/exec/GetPropNode.h b/src/storage/exec/GetPropNode.h index d5560f8f361..4f2bf6a495f 100644 --- a/src/storage/exec/GetPropNode.h +++ b/src/storage/exec/GetPropNode.h @@ -42,8 +42,10 @@ class GetTagPropNode : public QueryNode { return ret; } - // If none of the tag node valid, will check vertex key if use_vertex_key is true, - // do not emplace the row if the flag is false + // If none of the tag node valid, will check if vertex exists: + // 1. if use_vertex_key is true, check it by vertex key + // 2. if use_vertex_key is false, check it by scanning vertex prefix + // If vertex does not exists, do not emplace the row. if (!std::any_of(tagNodes_.begin(), tagNodes_.end(), [](const auto& tagNode) { return tagNode->valid(); })) { @@ -58,7 +60,16 @@ class GetTagPropNode : public QueryNode { return ret; } } else { - return nebula::cpp2::ErrorCode::SUCCEEDED; + // check if vId has any valid tag by prefix scan + std::unique_ptr iter; + auto tagPrefix = NebulaKeyUtils::tagPrefix(context_->vIdLen(), partId, vId); + ret = context_->env()->kvstore_->prefix(context_->spaceId(), partId, tagPrefix, &iter); + if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { + return ret; + } else if (!iter->valid()) { + return nebula::cpp2::ErrorCode::SUCCEEDED; + } + // if has any tag, will emplace a row with vId } } diff --git a/src/storage/exec/UpdateNode.h b/src/storage/exec/UpdateNode.h index 0c643803729..eef3ab90e58 100644 --- a/src/storage/exec/UpdateNode.h +++ b/src/storage/exec/UpdateNode.h @@ -488,6 +488,9 @@ class UpdateTagNode : public UpdateNode { } } // step 3, insert new vertex data + if (FLAGS_use_vertex_key) { + batchHolder->put(NebulaKeyUtils::vertexKey(context_->vIdLen(), partId, vId), ""); + } batchHolder->put(std::move(key_), std::move(nVal)); return encodeBatchValue(batchHolder->getBatch()); } diff --git a/src/storage/mutate/AddVerticesProcessor.cpp b/src/storage/mutate/AddVerticesProcessor.cpp index 7900cd21220..3b6b4675623 100644 --- a/src/storage/mutate/AddVerticesProcessor.cpp +++ b/src/storage/mutate/AddVerticesProcessor.cpp @@ -62,7 +62,6 @@ void AddVerticesProcessor::process(const cpp2::AddVerticesRequest& req) { void AddVerticesProcessor::doProcess(const cpp2::AddVerticesRequest& req) { const auto& partVertices = req.get_parts(); const auto& propNamesMap = req.get_prop_names(); - bool onlyVertex = propNamesMap.empty(); for (auto& part : partVertices) { auto partId = part.first; const auto& vertices = part.second; @@ -82,7 +81,7 @@ void AddVerticesProcessor::doProcess(const cpp2::AddVerticesRequest& req) { code = nebula::cpp2::ErrorCode::E_INVALID_VID; break; } - if (onlyVertex && FLAGS_use_vertex_key) { + if (FLAGS_use_vertex_key) { data.emplace_back(NebulaKeyUtils::vertexKey(spaceVidLen_, partId, vid), ""); } for (auto& newTag : newTags) { @@ -140,7 +139,6 @@ void AddVerticesProcessor::doProcess(const cpp2::AddVerticesRequest& req) { void AddVerticesProcessor::doProcessWithIndex(const cpp2::AddVerticesRequest& req) { const auto& partVertices = req.get_parts(); const auto& propNamesMap = req.get_prop_names(); - bool onlyVertex = propNamesMap.empty(); for (const auto& part : partVertices) { auto partId = part.first; @@ -163,7 +161,8 @@ void AddVerticesProcessor::doProcessWithIndex(const cpp2::AddVerticesRequest& re code = nebula::cpp2::ErrorCode::E_INVALID_VID; break; } - if (onlyVertex && FLAGS_use_vertex_key) { + + if (FLAGS_use_vertex_key) { verticeData.emplace_back(NebulaKeyUtils::vertexKey(spaceVidLen_, partId, vid)); } for (const auto& newTag : newTags) { diff --git a/src/tools/CMakeLists.txt b/src/tools/CMakeLists.txt index 22fc01c9461..e8ca43a022c 100644 --- a/src/tools/CMakeLists.txt +++ b/src/tools/CMakeLists.txt @@ -69,3 +69,4 @@ nebula_add_subdirectory(simple-kv-verify) endif() nebula_add_subdirectory(meta-dump) nebula_add_subdirectory(db-dump) +nebula_add_subdirectory(db-upgrade) diff --git a/src/tools/db-upgrade/CMakeLists.txt b/src/tools/db-upgrade/CMakeLists.txt new file mode 100644 index 00000000000..e76f008c810 --- /dev/null +++ b/src/tools/db-upgrade/CMakeLists.txt @@ -0,0 +1,33 @@ +# Copyright (c) 2022 vesoft inc. All rights reserved. +# +# This source code is licensed under Apache 2.0 License. + +nebula_add_executable( + NAME + db_upgrader + SOURCES + DbUpgraderTool.cpp + NebulaKeyUtilsV1.cpp + NebulaKeyUtilsV2.cpp + NebulaKeyUtilsV3.cpp + DbUpgrader.cpp + OBJECTS + ${tools_test_deps} + LIBRARIES + ${ROCKSDB_LIBRARIES} + ${THRIFT_LIBRARIES} + ${PROXYGEN_LIBRARIES} +) + +install( + TARGETS + db_upgrader + PERMISSIONS + OWNER_EXECUTE OWNER_WRITE OWNER_READ + GROUP_EXECUTE GROUP_READ + WORLD_EXECUTE WORLD_READ + DESTINATION + bin + COMPONENT + tool +) diff --git a/src/tools/db-upgrade/DbUpgrader.cpp b/src/tools/db-upgrade/DbUpgrader.cpp new file mode 100644 index 00000000000..c0834d87339 --- /dev/null +++ b/src/tools/db-upgrade/DbUpgrader.cpp @@ -0,0 +1,1253 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#include "tools/db-upgrade/DbUpgrader.h" + +#include "common/datatypes/Value.h" +#include "common/fs/FileUtils.h" +#include "common/utils/IndexKeyUtils.h" +#include "common/utils/NebulaKeyUtils.h" +#include "rocksdb/sst_file_writer.h" +#include "tools/db-upgrade/NebulaKeyUtilsV1.h" +#include "tools/db-upgrade/NebulaKeyUtilsV2.h" +#include "tools/db-upgrade/NebulaKeyUtilsV3.h" + +DEFINE_string(src_db_path, + "", + "Source data path(data_path in storage 1.x conf), " + "multi paths should be split by comma"); +DEFINE_string(dst_db_path, + "", + "Destination data path(data_path in storage 2.0 conf), " + "multi paths should be split by comma"); +DEFINE_string(upgrade_meta_server, "127.0.0.1:45500", "Meta servers' address."); +DEFINE_uint32(write_batch_num, 100, "The size of the batch written to rocksdb"); +DEFINE_string(upgrade_version, + "", + "When the value is 1:2, upgrade the data from 1.x to 2.0 GA. " + "When the value is 2RC:2, upgrade the data from 2.0 RC to 2.0 GA." + "When the value is 2:3, upgrade the data from 2.0 GA to 3.0 ."); +DEFINE_bool(compactions, + true, + "When the upgrade of the space is completed, " + "whether to compact data"); +DEFINE_uint32(max_concurrent_parts, 10, "The parts could be processed simultaneously"); +DEFINE_uint32(max_concurrent_spaces, 5, "The spaces could be processed simultaneously"); + +namespace nebula { +namespace storage { + +using nebula::cpp2::PropertyType; + +Status UpgraderSpace::init(meta::MetaClient* mclient, + meta::ServerBasedSchemaManager* sMan, + meta::IndexManager* iMan, + const std::string& srcPath, + const std::string& dstPath, + const std::string& entry) { + metaClient_ = mclient; + schemaMan_ = sMan; + indexMan_ = iMan; + srcPath_ = srcPath; + dstPath_ = dstPath; + entry_ = entry; + + auto ret = initSpace(entry_); + if (!ret.ok()) { + LOG(ERROR) << "Init " << srcPath << " space id " << entry_ << " failed"; + return ret; + } + + return Status::OK(); +} + +Status UpgraderSpace::initSpace(const std::string& sId) { + try { + spaceId_ = folly::to(sId); + } catch (const std::exception& ex) { + LOG(ERROR) << "Cannot convert space id " << sId; + return Status::Error("Cannot convert space id %s", sId.c_str()); + } + + auto sRet = schemaMan_->toGraphSpaceName(spaceId_); + if (!sRet.ok()) { + LOG(ERROR) << "Space id " << spaceId_ << " no found"; + return sRet.status(); + } + spaceName_ = sRet.value(); + + auto spaceVidLen = metaClient_->getSpaceVidLen(spaceId_); + if (!spaceVidLen.ok()) { + return spaceVidLen.status(); + } + spaceVidLen_ = spaceVidLen.value(); + + // Use readonly rocksdb + readEngine_.reset(new nebula::kvstore::RocksEngine( + spaceId_, spaceVidLen_, srcPath_, "", nullptr, nullptr, false)); + writeEngine_.reset(new nebula::kvstore::RocksEngine(spaceId_, spaceVidLen_, dstPath_)); + + parts_.clear(); + parts_ = readEngine_->allParts(); + LOG(INFO) << "Src data path: " << srcPath_ << " space id " << spaceId_ << " has " << parts_.size() + << " parts"; + + tagSchemas_.clear(); + tagFieldName_.clear(); + tagIndexes_.clear(); + edgeSchemas_.clear(); + edgeFieldName_.clear(); + edgeIndexes_.clear(); + + auto ret = buildSchemaAndIndex(); + if (!ret.ok()) { + LOG(ERROR) << "Build schema and index in space id " << spaceId_ << " failed"; + return ret; + } + + pool_ = std::make_unique(FLAGS_max_concurrent_parts); + // Parallel process part + for (auto& partId : parts_) { + partQueue_.add(partId); + } + + return Status::OK(); +} + +Status UpgraderSpace::buildSchemaAndIndex() { + // Get all tag in space + auto tags = schemaMan_->getAllVerTagSchema(spaceId_); + if (!tags.ok()) { + LOG(ERROR) << "Space id " << spaceId_ << " no found"; + return tags.status(); + } + tagSchemas_ = std::move(tags).value(); + + for (auto& tag : tagSchemas_) { + auto tagId = tag.first; + auto newestTagschema = tag.second.back(); + auto fields = newestTagschema->getNumFields(); + for (size_t i = 0; i < fields; i++) { + tagFieldName_[tagId].emplace_back(newestTagschema->getFieldName(i)); + } + if (fields == 0) { + tagFieldName_[tagId] = {}; + } + LOG(INFO) << "Tag id " << tagId << " has " << tagFieldName_[tagId].size() << " fields!"; + } + + // Get all tag index in space + std::vector> tagIndexes; + auto iRet = indexMan_->getTagIndexes(spaceId_); + if (!iRet.ok()) { + LOG(ERROR) << "Space id " << spaceId_ << " no found"; + return iRet.status(); + } + tagIndexes = std::move(iRet).value(); + + // Handle tag index + for (auto& tagIndex : tagIndexes) { + auto tagId = tagIndex->get_schema_id().get_tag_id(); + tagIndexes_[tagId].emplace(tagIndex); + } + + for (auto& tagindexes : tagIndexes_) { + LOG(INFO) << "Tag id " << tagindexes.first << " has " << tagindexes.second.size() << " indexes"; + } + + // Get all edge in space + auto edges = schemaMan_->getAllVerEdgeSchema(spaceId_); + if (!edges.ok()) { + LOG(ERROR) << "Space id " << spaceId_ << " no found"; + return edges.status(); + } + edgeSchemas_ = std::move(edges).value(); + + for (auto& edge : edgeSchemas_) { + auto edgetype = edge.first; + auto newestEdgeSchema = edge.second.back(); + auto fields = newestEdgeSchema->getNumFields(); + for (size_t i = 0; i < fields; i++) { + edgeFieldName_[edgetype].emplace_back(newestEdgeSchema->getFieldName(i)); + } + if (fields == 0) { + edgeFieldName_[edgetype] = {}; + } + LOG(INFO) << "Edgetype " << edgetype << " has " << edgeFieldName_[edgetype].size() + << " fields!"; + } + + // Get all edge index in space + std::vector> edgeIndexes; + iRet = indexMan_->getEdgeIndexes(spaceId_); + if (!iRet.ok()) { + LOG(ERROR) << "Space id " << spaceId_ << " no found"; + return iRet.status(); + } + edgeIndexes = std::move(iRet).value(); + + // Handle edge index + for (auto& edgeIndex : edgeIndexes) { + auto edgetype = edgeIndex->get_schema_id().get_edge_type(); + edgeIndexes_[edgetype].emplace(edgeIndex); + } + + for (auto& edgeindexes : edgeIndexes_) { + LOG(INFO) << "EdgeType " << edgeindexes.first << " has " << edgeindexes.second.size() + << " indexes"; + } + return Status::OK(); +} + +bool UpgraderSpace::isValidVidLen(VertexID srcVId, VertexID dstVId) { + if (!NebulaKeyUtils::isValidVidLen(spaceVidLen_, srcVId, dstVId)) { + LOG(ERROR) << "Vertex id length is illegal, expect: " << spaceVidLen_ << " result: " << srcVId + << " " << dstVId; + return false; + } + return true; +} + +void UpgraderSpace::runPartV1() { + std::chrono::milliseconds take_dura{10}; + if (auto pId = partQueue_.try_take_for(take_dura)) { + PartitionID partId = *pId; + // Handle vertex and edge, if there is an index, generate index data + LOG(INFO) << "Start to handle vertex/edge/index data in space id " << spaceId_ << " part id " + << partId; + const auto& prefix = NebulaKeyUtilsV1::prefix(partId); + std::unique_ptr iter; + auto retCode = readEngine_->prefix(prefix, &iter); + if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { + LOG(ERROR) << "Space id " << spaceId_ << " part " << partId << " no found!"; + LOG(ERROR) << "Handle vertex/edge/index data in space id " << spaceId_ << " part id " + << partId << " failed"; + + auto unFinishedPart = --unFinishedPart_; + if (unFinishedPart == 0) { + // all parts has finished + LOG(INFO) << "Handle last part: " << partId << " vertex/edge/index data in space id " + << spaceId_ << " finished"; + } else { + pool_->add(std::bind(&UpgraderSpace::runPartV1, this)); + } + return; + } + + std::vector data; + TagID lastTagId = 0; + int64_t lastVertexId = 0; + + int64_t lastSrcVertexId = 0; + EdgeType lastEdgeType = 0; + int64_t lastDstVertexId = 0; + EdgeRanking lastRank = 0; + + while (iter && iter->valid()) { + auto key = iter->key(); + if (NebulaKeyUtilsV1::isVertex(key)) { + auto vId = NebulaKeyUtilsV1::getVertexId(key); + auto tagId = NebulaKeyUtilsV1::getTagId(key); + + auto it = tagSchemas_.find(tagId); + if (it == tagSchemas_.end()) { + // Invalid data + iter->next(); + continue; + } + auto iterField = tagFieldName_.find(tagId); + if (iterField == tagFieldName_.end()) { + // Invalid data + iter->next(); + continue; + } + if (vId == lastVertexId && tagId == lastTagId) { + // Multi version + iter->next(); + continue; + } + + auto strVid = std::string(reinterpret_cast(&vId), sizeof(vId)); + auto newTagSchema = it->second.back().get(); + // Generate 2.0 key + auto newKey = NebulaKeyUtils::tagKey(spaceVidLen_, partId, strVid, tagId); + auto val = iter->val(); + auto reader = RowReaderWrapper::getTagPropReader(schemaMan_, spaceId_, tagId, val); + if (!reader) { + LOG(ERROR) << "Can't get tag reader of " << tagId; + iter->next(); + continue; + } + // Generate 2.0 value and index records + encodeVertexValue(partId, reader.get(), newTagSchema, newKey, strVid, tagId, data); + + lastTagId = tagId; + lastVertexId = vId; + } else if (NebulaKeyUtilsV1::isEdge(key)) { + auto svId = NebulaKeyUtilsV1::getSrcId(key); + auto edgetype = NebulaKeyUtilsV1::getEdgeType(key); + auto ranking = NebulaKeyUtilsV1::getRank(key); + auto dvId = NebulaKeyUtilsV1::getDstId(key); + + auto it = edgeSchemas_.find(std::abs(edgetype)); + if (it == edgeSchemas_.end()) { + // Invalid data + iter->next(); + continue; + } + auto iterField = edgeFieldName_.find(std::abs(edgetype)); + if (iterField == edgeFieldName_.end()) { + // Invalid data + iter->next(); + continue; + } + if (svId == lastSrcVertexId && edgetype == lastEdgeType && ranking == lastRank && + dvId == lastDstVertexId) { + // Multi version + iter->next(); + continue; + } + + auto strsvId = std::string(reinterpret_cast(&svId), sizeof(svId)); + auto strdvId = std::string(reinterpret_cast(&dvId), sizeof(dvId)); + + auto newEdgeSchema = it->second.back().get(); + + // Generate 2.0 key + auto newKey = + NebulaKeyUtils::edgeKey(spaceVidLen_, partId, strsvId, edgetype, ranking, strdvId); + auto val = iter->val(); + auto reader = + RowReaderWrapper::getEdgePropReader(schemaMan_, spaceId_, std::abs(edgetype), val); + if (!reader) { + LOG(ERROR) << "Can't get edge reader of " << edgetype; + iter->next(); + continue; + } + // Generate 2.0 value and index records + encodeEdgeValue( + partId, reader.get(), newEdgeSchema, newKey, strsvId, edgetype, ranking, strdvId, data); + lastSrcVertexId = svId; + lastEdgeType = edgetype; + lastRank = ranking; + lastDstVertexId = dvId; + } + + if (data.size() >= FLAGS_write_batch_num) { + VLOG(2) << "Send record total rows " << data.size(); + auto code = writeEngine_->multiPut(data); + if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { + LOG(FATAL) << "Write multi put in space id " << spaceId_ << " part id " << partId + << " failed."; + } + data.clear(); + } + + iter->next(); + } + + auto code = writeEngine_->multiPut(data); + if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { + LOG(FATAL) << "Write multi put in space id " << spaceId_ << " part id " << partId + << " failed."; + } + data.clear(); + LOG(INFO) << "Handle vertex/edge/index data in space id " << spaceId_ << " part id " << partId + << " finished"; + + auto unFinishedPart = --unFinishedPart_; + if (unFinishedPart == 0) { + // all parts has finished + LOG(INFO) << "Handle last part: " << partId << " vertex/edge/index data in space id " + << spaceId_ << " finished"; + } else { + pool_->add(std::bind(&UpgraderSpace::runPartV1, this)); + } + } else { + LOG(INFO) << "Handle vertex/edge/index of parts data in space id " << spaceId_ << " finished"; + } +} + +void UpgraderSpace::doProcessV1() { + LOG(INFO) << "Start to handle data in space id " << spaceId_; + + // Parallel process part + auto partConcurrency = std::min(static_cast(FLAGS_max_concurrent_parts), parts_.size()); + LOG(INFO) << "Max concurrent parts: " << partConcurrency; + + unFinishedPart_ = parts_.size(); + + LOG(INFO) << "Start to handle vertex/edge/index of parts data in space id " << spaceId_; + for (size_t i = 0; i < partConcurrency; ++i) { + pool_->add(std::bind(&UpgraderSpace::runPartV1, this)); + } + + while (unFinishedPart_ != 0) { + sleep(10); + } + + // handle system data + { + LOG(INFO) << "Start to handle system data in space id " << spaceId_; + auto prefix = NebulaKeyUtilsV1::systemPrefix(); + std::unique_ptr iter; + auto retCode = readEngine_->prefix(prefix, &iter); + if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { + LOG(ERROR) << "Space id " << spaceId_ << " get system data failed"; + LOG(ERROR) << "Handle system data in space id " << spaceId_ << " failed"; + return; + } + std::vector data; + while (iter && iter->valid()) { + auto key = iter->key(); + auto val = iter->val(); + data.emplace_back(std::move(key), std::move(val)); + if (data.size() >= FLAGS_write_batch_num) { + VLOG(2) << "Send system data total rows " << data.size(); + auto code = writeEngine_->multiPut(data); + if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { + LOG(FATAL) << "Write multi put in space id " << spaceId_ << " failed."; + } + data.clear(); + } + iter->next(); + } + + auto code = writeEngine_->multiPut(data); + if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { + LOG(FATAL) << "Write multi put in space id " << spaceId_ << " failed."; + } + LOG(INFO) << "Handle system data in space id " << spaceId_ << " success"; + LOG(INFO) << "Handle data in space id " << spaceId_ << " success"; + } +} + +void UpgraderSpace::runPartV2() { + std::chrono::milliseconds take_dura{10}; + if (auto pId = partQueue_.try_take_for(take_dura)) { + PartitionID partId = *pId; + // Handle vertex and edge, if there is an index, generate index data + LOG(INFO) << "Start to handle vertex/edge/index data in space id " << spaceId_ << " part id " + << partId; + auto prefix = NebulaKeyUtilsV2::partPrefix(partId); + std::unique_ptr iter; + auto retCode = readEngine_->prefix(prefix, &iter); + if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { + LOG(ERROR) << "Space id " << spaceId_ << " part " << partId << " no found!"; + LOG(ERROR) << "Handle vertex/edge/index data in space id " << spaceId_ << " part id " + << partId << " failed"; + + auto unFinishedPart = --unFinishedPart_; + if (unFinishedPart == 0) { + // all parts has finished + LOG(INFO) << "Handle last part: " << partId << " vertex/edge/index data in space id " + << spaceId_ << " finished"; + } else { + pool_->add(std::bind(&UpgraderSpace::runPartV2, this)); + } + return; + } + + std::vector data; + TagID lastTagId = 0; + VertexID lastVertexId = ""; + + VertexID lastSrcVertexId = ""; + EdgeType lastEdgeType = 0; + VertexID lastDstVertexId = ""; + EdgeRanking lastRank = 0; + + while (iter && iter->valid()) { + auto key = iter->key(); + if (NebulaKeyUtilsV2::isVertex(spaceVidLen_, key)) { + auto vId = NebulaKeyUtilsV2::getVertexId(spaceVidLen_, key).str(); + auto tagId = NebulaKeyUtilsV2::getTagId(spaceVidLen_, key); + + auto it = tagSchemas_.find(tagId); + if (it == tagSchemas_.end()) { + // Invalid data + iter->next(); + continue; + } + auto iterField = tagFieldName_.find(tagId); + if (iterField == tagFieldName_.end()) { + // Invalid data + iter->next(); + continue; + } + if (vId == lastVertexId && tagId == lastTagId) { + // Multi version + iter->next(); + continue; + } + + auto newTagSchema = it->second.back().get(); + // Generate 2.0 key + auto newKey = NebulaKeyUtils::tagKey(spaceVidLen_, partId, vId, tagId); + auto val = iter->val(); + auto reader = RowReaderWrapper::getTagPropReader(schemaMan_, spaceId_, tagId, val); + if (!reader) { + LOG(ERROR) << "Can't get tag reader of " << tagId; + iter->next(); + continue; + } + // Generate 2.0 value and index records + encodeVertexValue(partId, reader.get(), newTagSchema, newKey, vId, tagId, data); + + lastTagId = tagId; + lastVertexId = vId; + } else if (NebulaKeyUtilsV2::isEdge(spaceVidLen_, key)) { + auto svId = NebulaKeyUtilsV2::getSrcId(spaceVidLen_, key).str(); + auto edgetype = NebulaKeyUtilsV2::getEdgeType(spaceVidLen_, key); + auto ranking = NebulaKeyUtilsV2::getRank(spaceVidLen_, key); + auto dvId = NebulaKeyUtilsV2::getDstId(spaceVidLen_, key).str(); + + auto it = edgeSchemas_.find(std::abs(edgetype)); + if (it == edgeSchemas_.end()) { + // Invalid data + iter->next(); + continue; + } + auto iterField = edgeFieldName_.find(std::abs(edgetype)); + if (iterField == edgeFieldName_.end()) { + // Invalid data + iter->next(); + continue; + } + if (svId == lastSrcVertexId && edgetype == lastEdgeType && ranking == lastRank && + dvId == lastDstVertexId) { + // Multi version + iter->next(); + continue; + } + + auto newEdgeSchema = it->second.back().get(); + + // Generate 2.0 key + auto newKey = NebulaKeyUtils::edgeKey(spaceVidLen_, partId, svId, edgetype, ranking, dvId); + auto val = iter->val(); + auto reader = + RowReaderWrapper::getEdgePropReader(schemaMan_, spaceId_, std::abs(edgetype), val); + if (!reader) { + LOG(ERROR) << "Can't get edge reader of " << edgetype; + iter->next(); + continue; + } + // Generate 2.0 value and index records + encodeEdgeValue( + partId, reader.get(), newEdgeSchema, newKey, svId, edgetype, ranking, dvId, data); + lastSrcVertexId = svId; + lastEdgeType = edgetype; + lastRank = ranking; + lastDstVertexId = dvId; + } + + if (data.size() >= FLAGS_write_batch_num) { + VLOG(2) << "Send record total rows " << data.size(); + auto code = writeEngine_->multiPut(data); + if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { + LOG(FATAL) << "Write multi put in space id " << spaceId_ << " part id " << partId + << " failed."; + } + data.clear(); + } + + iter->next(); + } + + auto code = writeEngine_->multiPut(data); + if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { + LOG(FATAL) << "Write multi put in space id " << spaceId_ << " part id " << partId + << " failed."; + } + data.clear(); + LOG(INFO) << "Handle vertex/edge/index data in space id " << spaceId_ << " part id " << partId + << " succeed"; + + auto unFinishedPart = --unFinishedPart_; + if (unFinishedPart == 0) { + // all parts has finished + LOG(INFO) << "Handle last part: " << partId << " vertex/edge/index data in space id " + << spaceId_ << " finished."; + } else { + pool_->add(std::bind(&UpgraderSpace::runPartV2, this)); + } + } else { + LOG(INFO) << "Handle vertex/edge/index of parts data in space id " << spaceId_ << " finished"; + } +} + +void UpgraderSpace::doProcessV2() { + LOG(INFO) << "Start to handle data in space id " << spaceId_; + + // Parallel process part + auto partConcurrency = std::min(static_cast(FLAGS_max_concurrent_parts), parts_.size()); + LOG(INFO) << "Max concurrent parts: " << partConcurrency; + unFinishedPart_ = parts_.size(); + + LOG(INFO) << "Start to handle vertex/edge/index of parts data in space id " << spaceId_; + for (size_t i = 0; i < partConcurrency; ++i) { + pool_->add(std::bind(&UpgraderSpace::runPartV2, this)); + } + + while (unFinishedPart_ != 0) { + sleep(10); + } + + // handle system data + { + LOG(INFO) << "Start to handle system data in space id " << spaceId_; + auto prefix = NebulaKeyUtilsV2::systemPrefix(); + std::unique_ptr iter; + auto retCode = readEngine_->prefix(prefix, &iter); + if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { + LOG(ERROR) << "Space id " << spaceId_ << " get system data failed."; + LOG(ERROR) << "Handle system data in space id " << spaceId_ << " failed."; + return; + } + std::vector data; + while (iter && iter->valid()) { + auto key = iter->key(); + auto val = iter->val(); + data.emplace_back(std::move(key), std::move(val)); + if (data.size() >= FLAGS_write_batch_num) { + VLOG(2) << "Send system data total rows " << data.size(); + auto code = writeEngine_->multiPut(data); + if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { + LOG(FATAL) << "Write multi put in space id " << spaceId_ << " failed."; + } + data.clear(); + } + iter->next(); + } + + auto code = writeEngine_->multiPut(data); + if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { + LOG(FATAL) << "Write multi put in space id " << spaceId_ << " failed."; + } + LOG(INFO) << "Handle system data in space id " << spaceId_ << " success"; + LOG(INFO) << "Handle data in space id " << spaceId_ << " success"; + } +} + +void UpgraderSpace::encodeVertexValue(PartitionID partId, + RowReader* reader, + const meta::NebulaSchemaProvider* schema, + std::string& newkey, + VertexID& strVid, + TagID tagId, + std::vector& data) { + // Get all returned field name + auto& fieldNames = tagFieldName_[tagId]; + + auto ret = encodeRowVal(reader, schema, fieldNames); + if (ret.empty()) { + LOG(ERROR) << "Vertex or edge value is empty"; + return; + } + data.emplace_back(std::move(newkey), ret); + + // encode v2 index value + auto it = tagIndexes_.find(tagId); + if (it != tagIndexes_.end()) { + // Use new RowReader + auto nReader = RowReaderWrapper::getTagPropReader(schemaMan_, spaceId_, tagId, ret); + if (nReader == nullptr) { + LOG(ERROR) << "Bad format row: space id " << spaceId_ << " tag id " << tagId << " vertex id " + << strVid; + return; + } + for (auto& index : it->second) { + auto newIndexKeys = indexVertexKeys(partId, strVid, nReader.get(), index, schema); + for (auto& newIndexKey : newIndexKeys) { + data.emplace_back(std::move(newIndexKey), ""); + } + } + } +} + +// If the field types are inconsistent, can be converted +WriteResult UpgraderSpace::convertValue(const meta::NebulaSchemaProvider* nSchema, + const meta::SchemaProviderIf* oSchema, + std::string& name, + Value& val) { + auto newpropType = nSchema->getFieldType(name); + auto oldpropType = oSchema->getFieldType(name); + if (newpropType == oldpropType) { + return WriteResult::SUCCEEDED; + } + + bool bval; + double fval; + int64_t ival; + std::string sval; + + // need convert + switch (val.type()) { + case Value::Type::NULLVALUE: + return WriteResult::SUCCEEDED; + case Value::Type::BOOL: { + switch (newpropType) { + case PropertyType::INT8: + case PropertyType::INT16: + case PropertyType::INT32: + case PropertyType::INT64: + case PropertyType::TIMESTAMP: + case PropertyType::VID: { + bval = val.getBool(); + if (bval) { + val.setInt(1); + } else { + val.setInt(0); + } + return WriteResult::SUCCEEDED; + } + case PropertyType::STRING: + case PropertyType::FIXED_STRING: { + try { + bval = val.getBool(); + sval = folly::to(bval); + val.setStr(sval); + return WriteResult::SUCCEEDED; + } catch (const std::exception& e) { + return WriteResult::TYPE_MISMATCH; + } + } + case PropertyType::FLOAT: + case PropertyType::DOUBLE: { + try { + bval = val.getBool(); + fval = folly::to(bval); + val.setFloat(fval); + return WriteResult::SUCCEEDED; + } catch (const std::exception& e) { + return WriteResult::TYPE_MISMATCH; + } + } + // other not need convert + default: + return WriteResult::SUCCEEDED; + } + } + case Value::Type::INT: { + switch (newpropType) { + case PropertyType::STRING: + case PropertyType::FIXED_STRING: { + try { + ival = val.getInt(); + sval = folly::to(ival); + val.setStr(sval); + return WriteResult::SUCCEEDED; + } catch (const std::exception& e) { + return WriteResult::TYPE_MISMATCH; + } + } + // other not need convert + default: + return WriteResult::SUCCEEDED; + } + } + case Value::Type::FLOAT: { + switch (newpropType) { + case PropertyType::STRING: + case PropertyType::FIXED_STRING: { + try { + fval = val.getFloat(); + sval = folly::to(fval); + val.setStr(sval); + return WriteResult::SUCCEEDED; + } catch (const std::exception& e) { + return WriteResult::TYPE_MISMATCH; + } + } + case PropertyType::BOOL: { + try { + fval = val.getFloat(); + bval = folly::to(fval); + val.setBool(bval); + return WriteResult::SUCCEEDED; + } catch (const std::exception& e) { + return WriteResult::TYPE_MISMATCH; + } + } + // other not need convert + default: + return WriteResult::SUCCEEDED; + } + } + case Value::Type::STRING: { + switch (newpropType) { + case PropertyType::INT8: + case PropertyType::INT16: + case PropertyType::INT32: + case PropertyType::INT64: + case PropertyType::TIMESTAMP: + case PropertyType::VID: { + try { + sval = val.getStr(); + ival = folly::to(sval); + val.setInt(ival); + return WriteResult::SUCCEEDED; + } catch (const std::exception& e) { + return WriteResult::TYPE_MISMATCH; + } + } + case PropertyType::BOOL: { + try { + sval = val.getStr(); + bval = folly::to(sval); + val.setBool(bval); + return WriteResult::SUCCEEDED; + } catch (const std::exception& e) { + return WriteResult::TYPE_MISMATCH; + } + } + case PropertyType::FLOAT: + case PropertyType::DOUBLE: { + try { + sval = val.getStr(); + fval = folly::to(sval); + val.setFloat(fval); + return WriteResult::SUCCEEDED; + } catch (const std::exception& e) { + return WriteResult::TYPE_MISMATCH; + } + } + // other not need convert + default: + return WriteResult::SUCCEEDED; + } + } + // other not need convert + default: + return WriteResult::SUCCEEDED; + } +} + +// Used for vertex and edge +std::string UpgraderSpace::encodeRowVal(const RowReader* reader, + const meta::NebulaSchemaProvider* schema, + std::vector& fieldName) { + auto oldSchema = reader->getSchema(); + if (oldSchema == nullptr) { + LOG(ERROR) << "Schema not found from RowReader."; + return ""; + } + + // encode v2 value, use new schema + WriteResult wRet; + RowWriterV2 rowWrite(schema); + + // fieldName contains all the fields of the latest schema. + // The data reader may not use the latest schema, + // If it does not contain the field, the default value or null value will be + // used. + for (auto& name : fieldName) { + auto val = reader->getValueByName(name); + if (val.type() != Value::Type::NULLVALUE) { + // If the field types are inconsistent, can be converted + wRet = convertValue(schema, oldSchema, name, val); + if (wRet != WriteResult::SUCCEEDED) { + LOG(ERROR) << "Convert value failed"; + return ""; + } + wRet = rowWrite.setValue(name, val); + if (wRet != WriteResult::SUCCEEDED) { + LOG(ERROR) << "Write rowWriterV2 failed"; + return ""; + } + } else { + // read null value + auto nullType = val.getNull(); + if (nullType == NullType::__NULL__) { + wRet = rowWrite.setValue(name, val); + if (wRet != WriteResult::SUCCEEDED) { + LOG(ERROR) << "Write rowWriterV2 failed"; + return ""; + } + } else if (nullType != NullType::UNKNOWN_PROP) { + // nullType == NullType::kNullUnknownProp, indicates that the field is + // only in the latest schema, maybe use default value or null value. + LOG(ERROR) << "Data is illegal in " << name << " field"; + return ""; + } + } + } + + wRet = rowWrite.finish(); + if (wRet != WriteResult::SUCCEEDED) { + LOG(ERROR) << "Write rowWriterV2 failed"; + return ""; + } + + return std::move(rowWrite).moveEncodedStr(); +} + +void UpgraderSpace::runPartV3() { + std::chrono::milliseconds take_dura{10}; + if (auto pId = partQueue_.try_take_for(take_dura)) { + PartitionID partId = *pId; + // Handle vertex and edge, if there is an index, generate index data + LOG(INFO) << "Start to handle vertex/edge/index data in space id " << spaceId_ << " part id " + << partId; + auto prefix = NebulaKeyUtilsV3::partTagPrefix(partId); + std::unique_ptr iter; + auto retCode = readEngine_->prefix(prefix, &iter); + if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { + LOG(ERROR) << "Space id " << spaceId_ << " part " << partId << " no found!"; + LOG(ERROR) << "Handle vertex/edge/index data in space id " << spaceId_ << " part id " + << partId << " failed"; + + auto unFinishedPart = --unFinishedPart_; + if (unFinishedPart == 0) { + // all parts has finished + LOG(INFO) << "Handle last part: " << partId << " vertex/edge/index data in space id " + << spaceId_ << " finished"; + } else { + pool_->add(std::bind(&UpgraderSpace::runPartV3, this)); + } + return; + } + int64_t ingestFileCount = 0; + auto write_sst = [&, this](const std::vector& data) { + ::rocksdb::Options option; + option.create_if_missing = true; + option.compression = ::rocksdb::CompressionType::kNoCompression; + ::rocksdb::SstFileWriter sst_file_writer(::rocksdb::EnvOptions(), option); + std::string file = ::fmt::format(".nebula_upgrade.space-{}.part-{}-{}-{}.sst", + spaceId_, + partId, + ingestFileCount++, + std::time(nullptr)); + ::rocksdb::Status s = sst_file_writer.Open(file); + if (!s.ok()) { + LOG(FATAL) << "Faild upgrade V3 of space " << spaceId_ << ", part " << partId << ":" + << s.code(); + } + for (auto item : data) { + s = sst_file_writer.Put(item.first, item.second); + if (!s.ok()) { + LOG(FATAL) << "Faild upgrade V3 of space " << spaceId_ << ", part " << partId << ":" + << s.code(); + } + } + s = sst_file_writer.Finish(); + if (!s.ok()) { + LOG(FATAL) << "Faild upgrade V3 of space " << spaceId_ << ", part " << partId << ":" + << s.code(); + } + std::lock_guard lck(this->ingest_sst_file_mut_); + ingest_sst_file_.push_back(file); + }; + std::vector data; + std::string lastVertexKey = ""; + while (iter && iter->valid()) { + auto vertex = NebulaKeyUtilsV3::getVertexKey(iter->key()); + if (vertex == lastVertexKey) { + iter->next(); + continue; + } + data.emplace_back(vertex, ""); + lastVertexKey = vertex; + if (data.size() >= 100000) { + write_sst(data); + data.clear(); + } + iter->next(); + } + if (!data.empty()) { + write_sst(data); + data.clear(); + } + LOG(INFO) << "Handle vertex/edge/index data in space id " << spaceId_ << " part id " << partId + << " succeed"; + + auto unFinishedPart = --unFinishedPart_; + if (unFinishedPart == 0) { + // all parts has finished + LOG(INFO) << "Handle last part: " << partId << " vertex/edge/index data in space id " + << spaceId_ << " finished."; + } else { + pool_->add(std::bind(&UpgraderSpace::runPartV3, this)); + } + } else { + LOG(INFO) << "Handle vertex/edge/index of parts data in space id " << spaceId_ << " finished"; + } +} +void UpgraderSpace::doProcessV3() { + LOG(INFO) << "Start to handle data in space id " << spaceId_; + // Parallel process part + auto partConcurrency = std::min(static_cast(FLAGS_max_concurrent_parts), parts_.size()); + LOG(INFO) << "Max concurrent parts: " << partConcurrency; + unFinishedPart_ = parts_.size(); + + LOG(INFO) << "Start to handle vertex/edge/index of parts data in space id " << spaceId_; + for (size_t i = 0; i < partConcurrency; ++i) { + pool_->add(std::bind(&UpgraderSpace::runPartV3, this)); + } + + while (unFinishedPart_ != 0) { + sleep(10); + } + + if (ingest_sst_file_.size() != 0) { + auto code = readEngine_->ingest(ingest_sst_file_, true); + if (code != ::nebula::cpp2::ErrorCode::SUCCEEDED) { + LOG(FATAL) << "Faild upgrade 2:3 when ingest sst file:" << static_cast(code); + } + } + readEngine_->put(NebulaKeyUtils::dataVersionKey(), NebulaKeyUtilsV3::dataVersionValue()); +} +std::vector UpgraderSpace::indexVertexKeys( + PartitionID partId, + VertexID& vId, + RowReader* reader, + std::shared_ptr index, + const meta::SchemaProviderIf* latestSchema) { + auto values = IndexKeyUtils::collectIndexValues(reader, index.get(), latestSchema); + if (!values.ok()) { + return {}; + } + return IndexKeyUtils::vertexIndexKeys( + spaceVidLen_, partId, index->get_index_id(), vId, std::move(values).value()); +} + +void UpgraderSpace::encodeEdgeValue(PartitionID partId, + RowReader* reader, + const meta::NebulaSchemaProvider* schema, + std::string& newkey, + VertexID& svId, + EdgeType type, + EdgeRanking rank, + VertexID& dstId, + std::vector& data) { + // Get all returned field name + auto& fieldNames = edgeFieldName_[std::abs(type)]; + + auto ret = encodeRowVal(reader, schema, fieldNames); + if (ret.empty()) { + return; + } + data.emplace_back(std::move(newkey), ret); + + if (type <= 0) { + return; + } + + // encode v2 index value + auto it = edgeIndexes_.find(type); + if (it != edgeIndexes_.end()) { + // Use new RowReader + auto nReader = RowReaderWrapper::getEdgePropReader(schemaMan_, spaceId_, type, ret); + if (nReader == nullptr) { + LOG(ERROR) << "Bad format row: space id " << spaceId_ << " edgetype " << type + << " srcVertexId " << svId << " rank " << rank << " dstVertexId " << dstId; + return; + } + for (auto& index : it->second) { + auto newIndexKeys = indexEdgeKeys(partId, nReader.get(), svId, rank, dstId, index, schema); + for (auto& newIndexKey : newIndexKeys) { + data.emplace_back(std::move(newIndexKey), ""); + } + } + } +} + +std::vector UpgraderSpace::indexEdgeKeys( + PartitionID partId, + RowReader* reader, + VertexID& svId, + EdgeRanking rank, + VertexID& dstId, + std::shared_ptr index, + const meta::SchemaProviderIf* latestSchema) { + auto values = IndexKeyUtils::collectIndexValues(reader, index.get(), latestSchema); + if (!values.ok()) { + return {}; + } + return IndexKeyUtils::edgeIndexKeys( + spaceVidLen_, partId, index->get_index_id(), svId, rank, dstId, std::move(values).value()); +} + +void UpgraderSpace::doCompaction() { + LOG(INFO) << "Path " << dstPath_ << " space id " << spaceId_ << " compaction begin"; + auto ret = writeEngine_->compact(); + if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { + LOG(ERROR) << "Path " << dstPath_ << " space id " << spaceId_ << " compaction failed!"; + } + LOG(INFO) << "Path " << dstPath_ << " space id " << spaceId_ << " compaction success!"; +} + +bool UpgraderSpace::copyWal() { + LOG(INFO) << "Copy space id " << entry_ << " wal file begin"; + // Get source wal directory + auto srcPath = + folly::stringPrintf("%s/%s/%s/%s", srcPath_.c_str(), "nebula", entry_.c_str(), "wal"); + if (!fs::FileUtils::exist(srcPath)) { + LOG(ERROR) << "Source data wal path " << srcPath << " not exists!"; + return false; + } + // Get destination wal directory + auto dstPath = folly::stringPrintf("%s/%s/%s", dstPath_.c_str(), "nebula", entry_.c_str()); + if (!fs::FileUtils::exist(dstPath)) { + LOG(ERROR) << "Destination data wal path " << dstPath << " not exists!"; + return false; + } + dstPath = fs::FileUtils::joinPath(dstPath, "wal"); + if (!fs::FileUtils::makeDir(dstPath)) { + LOG(FATAL) << "makeDir " << dstPath << " failed"; + } + + auto partDirs = fs::FileUtils::listAllDirsInDir(srcPath.c_str()); + for (size_t i = 0; i < partDirs.size(); i++) { + // In general, there are only two wal files left for each part + auto files = fs::FileUtils::listAllFilesInDir( + folly::stringPrintf("%s/%s", srcPath.c_str(), partDirs[i].c_str()).c_str()); + // If the number of wal files is greater than two, find the latest two wal + // files + auto walNum = files.size(); + if (walNum > 2) { + std::sort(files.begin(), files.end()); + auto newestFile = files[walNum - 2]; + auto latestFile = files[walNum - 1]; + files.resize(2); + files[0] = newestFile; + files[1] = latestFile; + } + + for (const auto& file : files) { + std::fstream srcF( + folly::stringPrintf("%s/%s/%s", srcPath.c_str(), partDirs[i].c_str(), file.c_str()), + std::ios::binary | std::ios::in); + auto dstwalpart = folly::stringPrintf("%s/%s", dstPath.c_str(), partDirs[i].c_str()); + if (!fs::FileUtils::makeDir(dstwalpart)) { + LOG(FATAL) << "makeDir " << dstwalpart << " failed"; + } + std::fstream destF(folly::stringPrintf("%s/%s", dstwalpart.c_str(), file.c_str()), + std::ios::binary | std::ios::out); + destF << srcF.rdbuf(); + destF.close(); + srcF.close(); + } + } + return true; +} + +Status DbUpgrader::init(meta::MetaClient* mclient, + meta::ServerBasedSchemaManager* sMan, + meta::IndexManager* iMan, + const std::string& srcPath, + const std::string& dstPath) { + metaClient_ = mclient; + schemaMan_ = sMan; + indexMan_ = iMan; + srcPath_ = srcPath; + dstPath_ = dstPath; + pool_ = std::make_unique(FLAGS_max_concurrent_spaces); + return listSpace(); +} + +Status DbUpgrader::listSpace() { + // from srcPath_ to srcPath_/nebula + auto path = fs::FileUtils::joinPath(srcPath_, "nebula"); + if (!fs::FileUtils::exist(path)) { + LOG(ERROR) << "Source data path " << srcPath_ << " not exists!"; + return Status::Error("Db path '%s' not exists.", srcPath_.c_str()); + } + + if (!fs::FileUtils::exist(dstPath_)) { + LOG(ERROR) << "Destination data path " << dstPath_ << " not exists!"; + return Status::Error("Db path '%s' not exists.", dstPath_.c_str()); + } + subDirs_ = fs::FileUtils::listAllDirsInDir(path.c_str()); + return Status::OK(); +} + +void DbUpgrader::run() { + LOG(INFO) << "Upgrade from path " << srcPath_ << " to path " << dstPath_ + << " in DbUpgrader run begin"; + // Get all the directories in the data directory, + // each directory name is spaceId, traverse each directory + std::vector> upgraderSpaces; + for (auto& entry : subDirs_) { + auto it = std::make_unique(); + + // When the init space fails, ignore to upgrade this space + auto ret = it->init(metaClient_, schemaMan_, indexMan_, srcPath_, dstPath_, entry); + if (!ret.ok()) { + LOG(WARNING) << "Upgrade from path " << srcPath_ << " space id " << entry << " to path " + << dstPath_ << " init failed"; + LOG(WARNING) << "Ignore upgrade " << srcPath_ << " space id " << entry; + } else { + upgraderSpaces.emplace_back(std::move(it)); + } + } + + unFinishedSpace_ = upgraderSpaces.size(); + for (size_t i = 0; i < upgraderSpaces.size(); i++) { + spaceQueue_.add(upgraderSpaces[i].get()); + } + + // Parallel process space + auto spaceConcurrency = + std::min(static_cast(FLAGS_max_concurrent_spaces), upgraderSpaces.size()); + LOG(INFO) << "Max concurrent spaces: " << spaceConcurrency; + + for (size_t i = 0; i < spaceConcurrency; ++i) { + pool_->add(std::bind(&DbUpgrader::doSpace, this)); + } + + while (unFinishedSpace_ != 0) { + sleep(10); + } + + LOG(INFO) << "Upgrade from path " << srcPath_ << " to path " << dstPath_ + << " in DbUpgrader run end"; +} + +void DbUpgrader::doSpace() { + std::chrono::milliseconds take_dura{10}; + if (auto UpSpace = spaceQueue_.try_take_for(take_dura)) { + auto upgraderSpaceIter = *UpSpace; + LOG(INFO) << "Upgrade from path " << upgraderSpaceIter->srcPath_ << " space id " + << upgraderSpaceIter->entry_ << " to path " << upgraderSpaceIter->dstPath_ + << " begin"; + if (FLAGS_upgrade_version == "1:2") { + upgraderSpaceIter->doProcessV1(); + } else if (FLAGS_upgrade_version == "2RC:2") { + upgraderSpaceIter->doProcessV2(); + } else if (FLAGS_upgrade_version == "2:3") { + upgraderSpaceIter->doProcessV3(); + } else { + LOG(FATAL) << "error upgrade version " << FLAGS_upgrade_version; + } + + auto ret = upgraderSpaceIter->copyWal(); + if (!ret) { + LOG(ERROR) << "Copy space id " << upgraderSpaceIter->entry_ << " wal file failed"; + } else { + LOG(INFO) << "Copy space id " << upgraderSpaceIter->entry_ << " wal file success"; + } + + if (FLAGS_compactions) { + upgraderSpaceIter->doCompaction(); + } + + auto unFinishedSpace = --unFinishedSpace_; + if (unFinishedSpace == 0) { + // all spaces has finished + LOG(INFO) << "Upgrade last space: " << upgraderSpaceIter->entry_ << " from " + << upgraderSpaceIter->srcPath_ << " to path " << upgraderSpaceIter->dstPath_ + << " end"; + } else { + pool_->add(std::bind(&DbUpgrader::doSpace, this)); + } + } else { + LOG(INFO) << "Upgrade from path " << srcPath_ << " to path " << dstPath_ << " end"; + } +} + +} // namespace storage +} // namespace nebula diff --git a/src/tools/db-upgrade/DbUpgrader.h b/src/tools/db-upgrade/DbUpgrader.h new file mode 100644 index 00000000000..b66543aa882 --- /dev/null +++ b/src/tools/db-upgrade/DbUpgrader.h @@ -0,0 +1,223 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#ifndef TOOLS_DBUPGRADE_DBUPGRADER_H_ +#define TOOLS_DBUPGRADE_DBUPGRADER_H_ + +#include +#include +#include + +#include "clients/meta/MetaClient.h" +#include "codec/RowReaderWrapper.h" +#include "codec/RowWriterV2.h" +#include "common/base/Base.h" +#include "common/base/Status.h" +#include "common/meta/ServerBasedIndexManager.h" +#include "common/meta/ServerBasedSchemaManager.h" +#include "kvstore/RocksEngine.h" + +DECLARE_string(src_db_path); +DECLARE_string(dst_db_path); +DECLARE_string(upgrade_meta_server); +DECLARE_uint32(write_batch_num); +DECLARE_string(upgrade_version); +DECLARE_bool(compactions); +DECLARE_uint32(max_concurrent_parts); +DECLARE_uint32(max_concurrent_spaces); + +namespace nebula { +namespace storage { + +// Upgrade a space of data path in storage conf +class UpgraderSpace { + public: + UpgraderSpace() = default; + + ~UpgraderSpace() { + if (pool_) { + pool_->join(); + } + } + + Status init(meta::MetaClient* mclient, + meta::ServerBasedSchemaManager* sMan, + meta::IndexManager* iMan, + const std::string& srcPath, + const std::string& dstPath, + const std::string& entry); + + // Process v1 data and upgrade to v2 Ga + void doProcessV1(); + + // Processing v2 Rc data upgrade to v2 Ga + void doProcessV2(); + + // Processing v2 Ga data upgrade to v3 + void doProcessV3(); + + // Perform manual compact + void doCompaction(); + + // Copy the latest wal file under each part, two at most + bool copyWal(); + + private: + Status initSpace(const std::string& spaceId); + + Status buildSchemaAndIndex(); + + bool isValidVidLen(VertexID srcVId, VertexID dstVId = ""); + + void encodeVertexValue(PartitionID partId, + RowReader* reader, + const meta::NebulaSchemaProvider* schema, + std::string& newkey, + VertexID& strVid, + TagID tagId, + std::vector& data); + + // Used for vertex and edge + std::string encodeRowVal(const RowReader* reader, + const meta::NebulaSchemaProvider* schema, + std::vector& fieldName); + + std::vector indexVertexKeys(PartitionID partId, + VertexID& vId, + RowReader* reader, + std::shared_ptr index, + const meta::SchemaProviderIf* latestSchema); + + void encodeEdgeValue(PartitionID partId, + RowReader* reader, + const meta::NebulaSchemaProvider* schema, + std::string& newkey, + VertexID& svId, + EdgeType type, + EdgeRanking rank, + VertexID& dstId, + std::vector& data); + + std::vector indexEdgeKeys(PartitionID partId, + RowReader* reader, + VertexID& svId, + EdgeRanking rank, + VertexID& dstId, + std::shared_ptr index, + const meta::SchemaProviderIf* latestSchema); + + WriteResult convertValue(const meta::NebulaSchemaProvider* newSchema, + const meta::SchemaProviderIf* oldSchema, + std::string& name, + Value& val); + void runPartV1(); + + void runPartV2(); + + void runPartV3(); + + public: + // Source data path + std::string srcPath_; + // Destination data path + std::string dstPath_; + std::string entry_; + + private: + meta::MetaClient* metaClient_; + meta::ServerBasedSchemaManager* schemaMan_; + meta::IndexManager* indexMan_; + + // The following variables are space level + GraphSpaceID spaceId_; + int32_t spaceVidLen_; + std::string spaceName_; + std::vector parts_; + std::unique_ptr readEngine_; + std::unique_ptr writeEngine_; + + // Get all tag newest schema in space + std::unordered_map>> + tagSchemas_; + + // tagId -> All field names in newest schema + std::unordered_map> tagFieldName_; + + // tagId -> all indexes of this tag + std::unordered_map>> + tagIndexes_; + + // Get all edge newest schema in space + std::unordered_map>> + edgeSchemas_; + + // edgetype -> all field name in newest schema + std::unordered_map> edgeFieldName_; + + // edgetype -> all indexes of this edgetype + std::unordered_map>> + edgeIndexes_; + + // for parallel parts + std::unique_ptr pool_{nullptr}; + + folly::UnboundedBlockingQueue partQueue_; + + std::atomic unFinishedPart_; + + std::mutex ingest_sst_file_mut_; + std::vector ingest_sst_file_; +}; + +// Upgrade one data path in storage conf +class DbUpgrader { + public: + DbUpgrader() = default; + + ~DbUpgrader() { + if (pool_) { + pool_->join(); + } + } + + Status init(meta::MetaClient* mclient, + meta::ServerBasedSchemaManager* sMan, + meta::IndexManager* iMan, + const std::string& srcPath, + const std::string& dstPath); + + void run(); + + private: + // Get all string spaceId + Status listSpace(); + + void doProcessAllTagsAndEdges(); + + void doSpace(); + + private: + meta::MetaClient* metaClient_; + meta::ServerBasedSchemaManager* schemaMan_; + meta::IndexManager* indexMan_; + // Source data path + std::string srcPath_; + + // Destination data path + std::string dstPath_; + std::vector subDirs_; + + // for parallel spaces + std::unique_ptr pool_{nullptr}; + + folly::UnboundedBlockingQueue spaceQueue_; + + std::atomic unFinishedSpace_; +}; + +} // namespace storage +} // namespace nebula + +#endif // TOOLS_DBUPGRADE_DBUPGRADER_H_ diff --git a/src/tools/db-upgrade/DbUpgraderTool.cpp b/src/tools/db-upgrade/DbUpgraderTool.cpp new file mode 100644 index 00000000000..b4e05cd143b --- /dev/null +++ b/src/tools/db-upgrade/DbUpgraderTool.cpp @@ -0,0 +1,204 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#include "common/base/Base.h" +#include "kvstore/RocksEngineConfig.h" +#include "tools/db-upgrade/DbUpgrader.h" + +void printHelp() { + fprintf( + stderr, + R"( ./db_upgrade --src_db_path= --dst_db_path= --upgrade_meta_server= --upgrade_version=2:3 + +desc: + This tool is used to upgrade data from nebula 2.0GA to 3.0 + +required: + --src_db_path= + Source data path to the rocksdb data directory. + This is an absolute path, multi paths should be split by comma. + If old nebula was installed in /usr/local/nebula, + the db_path would be /usr/local/nebula/data/storage + Default: "" + + --dst_db_path= + Destination data path to the rocksdb data directory. + This is an absolute path, multi paths should be split by comma. + If new nebula was installed in /usr/local/nebula_new, + the db_path would be /usr/local/nebulav_new/data/storage + Default: "" + + note: + The number of paths in src_db_path is equal to the number of paths in dst_db_path, and + src_db_path and dst_db_path must be different. + For 2.0GA to 3.0, dst_db_path is useless. + + --upgrade_meta_server= + A list of meta severs' ip:port separated by comma. + Default: 127.0.0.1:45500 + + --upgrade_version=<2:3> + This tool can only upgrade 2.0GA. + 2:3 upgrade the data from 2.0GA to 3.0 + Default: "" + + optional: + --write_batch_num= + The size of the batch written to rocksdb. + Default: 100 + + --compactions= + When the data upgrade finished, whether to compact all data. + Default: true + + --max_concurrent_parts + Maximum number of concurrent parts allowed. + Default: 10 + + --max_concurrent_spaces + Maximum number of concurrent spaces allowed. + Default: 5 +)"); +} + +void printParams() { + std::cout << "===========================PARAMS============================\n"; + std::cout << "meta server: " << FLAGS_upgrade_meta_server << "\n"; + std::cout << "source data path: " << FLAGS_src_db_path << "\n"; + std::cout << "destination data path: " << FLAGS_dst_db_path << "\n"; + std::cout << "The size of the batch written: " << FLAGS_write_batch_num << "\n"; + std::cout << "upgrade data from version: " << FLAGS_upgrade_version << "\n"; + std::cout << "whether to compact all data: " << (FLAGS_compactions == true ? "true" : "false") + << "\n"; + std::cout << "maximum number of concurrent parts allowed:" << FLAGS_max_concurrent_parts << "\n"; + std::cout << "maximum number of concurrent spaces allowed: " << FLAGS_max_concurrent_spaces + << "\n"; + std::cout << "===========================PARAMS============================\n\n"; +} + +int main(int argc, char* argv[]) { + // When begin to upgrade the data, close compaction + // When upgrade finished, perform compaction. + FLAGS_rocksdb_column_family_options = R"({ + "disable_auto_compactions":"true", + "write_buffer_size":"134217728", + "max_write_buffer_number":"12", + "max_bytes_for_level_base":"268435456", + "level0_slowdown_writes_trigger":"999999", + "level0_stop_writes_trigger":"999999", + "soft_pending_compaction_bytes_limit":"137438953472", + "hard_pending_compaction_bytes_limit":"274877906944" + })"; + + FLAGS_rocksdb_db_options = R"({ + "max_background_jobs":"10", + "max_subcompactions":"10" + })"; + + if (argc == 1) { + printHelp(); + return EXIT_FAILURE; + } else { + folly::init(&argc, &argv, true); + } + + google::SetStderrLogging(google::INFO); + + printParams(); + + // Handle arguments + LOG(INFO) << "Prepare phase begin"; + if (FLAGS_src_db_path.empty() || FLAGS_dst_db_path.empty()) { + LOG(ERROR) << "Source data path or destination data path should be not empty."; + return EXIT_FAILURE; + } + + std::vector srcPaths; + folly::split(",", FLAGS_src_db_path, srcPaths, true); + std::transform(srcPaths.begin(), srcPaths.end(), srcPaths.begin(), [](auto& p) { + return folly::trimWhitespace(p).str(); + }); + if (srcPaths.empty()) { + LOG(ERROR) << "Bad source data path format: " << FLAGS_src_db_path; + return EXIT_FAILURE; + } + + std::vector dstPaths; + folly::split(",", FLAGS_dst_db_path, dstPaths, true); + std::transform(dstPaths.begin(), dstPaths.end(), dstPaths.begin(), [](auto& p) { + return folly::trimWhitespace(p).str(); + }); + if (dstPaths.empty()) { + LOG(ERROR) << "Bad destination data path format: " << FLAGS_dst_db_path; + return EXIT_FAILURE; + } + + if (srcPaths.size() != dstPaths.size()) { + LOG(ERROR) << "The size of source data paths is not equal the " + << "size of destination data paths."; + return EXIT_FAILURE; + } + + auto addrs = nebula::network::NetworkUtils::toHosts(FLAGS_upgrade_meta_server); + if (!addrs.ok()) { + LOG(ERROR) << "Get meta host address failed " << FLAGS_upgrade_meta_server; + return EXIT_FAILURE; + } + + auto ioExecutor = std::make_shared(1); + nebula::meta::MetaClientOptions options; + options.skipConfig_ = true; + auto metaClient = + std::make_unique(ioExecutor, std::move(addrs.value()), options); + CHECK_NOTNULL(metaClient); + if (!metaClient->waitForMetadReady(1)) { + LOG(ERROR) << "Meta is not ready: " << FLAGS_upgrade_meta_server; + return EXIT_FAILURE; + } + + auto schemaMan = nebula::meta::ServerBasedSchemaManager::create(metaClient.get()); + auto indexMan = nebula::meta::ServerBasedIndexManager::create(metaClient.get()); + CHECK_NOTNULL(schemaMan); + CHECK_NOTNULL(indexMan); + + std::vector versions = {"2:3"}; + if (std::find(versions.begin(), versions.end(), FLAGS_upgrade_version) == versions.end()) { + LOG(ERROR) << "Flag upgrade_version : " << FLAGS_upgrade_version; + return EXIT_FAILURE; + } + LOG(INFO) << "Prepare phase end"; + + // Upgrade data + LOG(INFO) << "Upgrade phase begin"; + + // The data path in storage conf is generally one, not too many. + // So there is no need to control the number of threads here. + std::vector threads; + for (size_t i = 0; i < srcPaths.size(); i++) { + threads.emplace_back(std::thread([mclient = metaClient.get(), + sMan = schemaMan.get(), + iMan = indexMan.get(), + srcPath = srcPaths[i], + dstPath = dstPaths[i]] { + LOG(INFO) << "Upgrade from path " << srcPath << " to path " << dstPath << " begin"; + nebula::storage::DbUpgrader upgrader; + auto ret = upgrader.init(mclient, sMan, iMan, srcPath, dstPath); + if (!ret.ok()) { + LOG(ERROR) << "Upgrader from path " << srcPath << " to path " << dstPath << " init failed."; + return; + } + upgrader.run(); + LOG(INFO) << "Upgrade from path " << srcPath << " to path " << dstPath << " end"; + })); + } + + // Wait for all threads to finish + for (auto& t : threads) { + t.join(); + } + + LOG(INFO) << "Upgrade phase end"; + return 0; +} diff --git a/src/tools/db-upgrade/NebulaKeyUtilsV1.cpp b/src/tools/db-upgrade/NebulaKeyUtilsV1.cpp new file mode 100644 index 00000000000..9948422a709 --- /dev/null +++ b/src/tools/db-upgrade/NebulaKeyUtilsV1.cpp @@ -0,0 +1,109 @@ +/* Copyright (c) 2018 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#include "tools/db-upgrade/NebulaKeyUtilsV1.h" + +namespace nebula { + +// static +std::string NebulaKeyUtilsV1::indexPrefix(PartitionID partId, IndexID indexId) { + PartitionID item = (partId << kPartitionOffset) | static_cast(NebulaKeyTypeV1::kIndex); + std::string key; + key.reserve(sizeof(PartitionID) + sizeof(IndexID)); + key.append(reinterpret_cast(&item), sizeof(PartitionID)) + .append(reinterpret_cast(&indexId), sizeof(IndexID)); + return key; +} + +// static +std::string NebulaKeyUtilsV1::vertexPrefix(PartitionID partId, VertexID vId, TagID tagId) { + tagId &= kTagMaskSet; + PartitionID item = (partId << kPartitionOffset) | static_cast(NebulaKeyTypeV1::kData); + + std::string key; + key.reserve(kVertexLen); + key.append(reinterpret_cast(&item), sizeof(PartitionID)) + .append(reinterpret_cast(&vId), sizeof(VertexID)) + .append(reinterpret_cast(&tagId), sizeof(TagID)); + return key; +} + +// static +std::string NebulaKeyUtilsV1::edgePrefix(PartitionID partId, VertexID srcId, EdgeType type) { + type |= kEdgeMaskSet; + PartitionID item = (partId << kPartitionOffset) | static_cast(NebulaKeyTypeV1::kData); + + std::string key; + key.reserve(sizeof(PartitionID) + sizeof(VertexID) + sizeof(EdgeType)); + key.append(reinterpret_cast(&item), sizeof(PartitionID)) + .append(reinterpret_cast(&srcId), sizeof(VertexID)) + .append(reinterpret_cast(&type), sizeof(EdgeType)); + return key; +} + +// static +std::string NebulaKeyUtilsV1::edgePrefix( + PartitionID partId, VertexID srcId, EdgeType type, EdgeRanking rank, VertexID dstId) { + type |= kEdgeMaskSet; + int32_t item = (partId << kPartitionOffset) | static_cast(NebulaKeyTypeV1::kData); + std::string key; + key.reserve(sizeof(PartitionID) + sizeof(VertexID) + sizeof(EdgeType) + sizeof(VertexID) + + sizeof(EdgeRanking)); + key.append(reinterpret_cast(&item), sizeof(PartitionID)) + .append(reinterpret_cast(&srcId), sizeof(VertexID)) + .append(reinterpret_cast(&type), sizeof(EdgeType)) + .append(reinterpret_cast(&rank), sizeof(EdgeRanking)) + .append(reinterpret_cast(&dstId), sizeof(VertexID)); + return key; +} + +// static +std::string NebulaKeyUtilsV1::prefix(PartitionID partId) { + PartitionID item = (partId << kPartitionOffset) | static_cast(NebulaKeyTypeV1::kData); + std::string key; + key.reserve(sizeof(PartitionID)); + key.append(reinterpret_cast(&item), sizeof(PartitionID)); + return key; +} + +// static +std::string NebulaKeyUtilsV1::snapshotPrefix(PartitionID partId) { + // snapshot of meta would be all key-value pairs + if (partId == 0) { + return ""; + } + return prefix(partId); +} + +// static +std::string NebulaKeyUtilsV1::vertexPrefix(PartitionID partId, VertexID vId) { + PartitionID item = (partId << kPartitionOffset) | static_cast(NebulaKeyTypeV1::kData); + std::string key; + key.reserve(sizeof(PartitionID) + sizeof(VertexID)); + key.append(reinterpret_cast(&item), sizeof(PartitionID)) + .append(reinterpret_cast(&vId), sizeof(VertexID)); + return key; +} + +// static +std::string NebulaKeyUtilsV1::edgePrefix(PartitionID partId, VertexID vId) { + PartitionID item = (partId << kPartitionOffset) | static_cast(NebulaKeyTypeV1::kData); + std::string key; + key.reserve(sizeof(PartitionID) + sizeof(VertexID)); + key.append(reinterpret_cast(&item), sizeof(PartitionID)) + .append(reinterpret_cast(&vId), sizeof(VertexID)); + return key; +} + +// static +std::string NebulaKeyUtilsV1::systemPrefix() { + int8_t type = static_cast(NebulaKeyTypeV1::kSystem); + std::string key; + key.reserve(sizeof(int8_t)); + key.append(reinterpret_cast(&type), sizeof(int8_t)); + return key; +} + +} // namespace nebula diff --git a/src/tools/db-upgrade/NebulaKeyUtilsV1.h b/src/tools/db-upgrade/NebulaKeyUtilsV1.h new file mode 100644 index 00000000000..3226f34b626 --- /dev/null +++ b/src/tools/db-upgrade/NebulaKeyUtilsV1.h @@ -0,0 +1,252 @@ +/* Copyright (c) 2018 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#ifndef TOOLS_DBUPGRADE_NEBULAKEYUTILSV1_H_ +#define TOOLS_DBUPGRADE_NEBULAKEYUTILSV1_H_ + +#include "common/utils/Types.h" + +namespace nebula { + +enum class NebulaKeyTypeV1 : uint32_t { + kData = 0x00000001, + kIndex = 0x00000002, + kUUID = 0x00000003, + kSystem = 0x00000004, +}; + +/** + * This class supply some utils for transition between Vertex/Edge and key in + * kvstore. + * */ +class NebulaKeyUtilsV1 final { + public: + ~NebulaKeyUtilsV1() = default; + + using VertexID = int64_t; + + static std::string indexPrefix(PartitionID partId, IndexID indexId); + + static std::string vertexPrefix(PartitionID partId, VertexID vId, TagID tagId); + + static std::string edgePrefix(PartitionID partId, VertexID srcId, EdgeType type); + + static std::string vertexPrefix(PartitionID partId, VertexID vId); + + static std::string edgePrefix(PartitionID partId, VertexID vId); + + static std::string edgePrefix( + PartitionID partId, VertexID srcId, EdgeType type, EdgeRanking rank, VertexID dstId); + + static std::string systemPrefix(); + + static std::string prefix(PartitionID partId); + + static std::string snapshotPrefix(PartitionID partId); + + static PartitionID getPart(const folly::StringPiece& rawKey) { + return readInt(rawKey.data(), sizeof(PartitionID)) >> 8; + } + + static bool isVertex(const folly::StringPiece& rawKey) { + if (rawKey.size() != kVertexLen) { + return false; + } + constexpr int32_t len = static_cast(sizeof(NebulaKeyTypeV1)); + auto type = readInt(rawKey.data(), len) & kTypeMask; + if (static_cast(NebulaKeyTypeV1::kData) != type) { + return false; + } + auto offset = sizeof(PartitionID) + sizeof(VertexID); + TagID tagId = readInt(rawKey.data() + offset, sizeof(TagID)); + return !(tagId & kTagEdgeMask); + } + + static VertexID getVertexId(const folly::StringPiece& rawKey) { + CHECK_EQ(rawKey.size(), kVertexLen); + auto offset = sizeof(PartitionID); + return readInt(rawKey.data() + offset, sizeof(VertexID)); + } + + static TagID getTagId(const folly::StringPiece& rawKey) { + // CHECK_EQ(rawKey.size(), kVertexLen); + if (rawKey.size() != kVertexLen) { + std::stringstream msg; + msg << " rawKey.size() != kVertexLen." + << "\nrawKey.size()=" << rawKey.size() << "\nkVertexLen=" << kVertexLen << "\nhexDump:\n" + << folly::hexDump(rawKey.data(), rawKey.size()); + LOG(FATAL) << msg.str(); + } + auto offset = sizeof(PartitionID) + sizeof(VertexID); + return readInt(rawKey.data() + offset, sizeof(TagID)); + } + + static bool isEdge(const folly::StringPiece& rawKey) { + if (rawKey.size() != kEdgeLen) { + return false; + } + constexpr int32_t len = static_cast(sizeof(NebulaKeyTypeV1)); + auto type = readInt(rawKey.data(), len) & kTypeMask; + if (static_cast(NebulaKeyTypeV1::kData) != type) { + return false; + } + auto offset = sizeof(PartitionID) + sizeof(VertexID); + EdgeType etype = readInt(rawKey.data() + offset, sizeof(EdgeType)); + return etype & kTagEdgeMask; + } + + static bool isSystemCommit(const folly::StringPiece& rawKey) { + if (rawKey.size() != kSystemLen) { + return false; + } + auto position = rawKey.data() + sizeof(PartitionID); + auto len = sizeof(NebulaSystemKeyType); + auto type = readInt(position, len); + return static_cast(NebulaSystemKeyType::kSystemCommit) == type; + } + + static bool isSystemPart(const folly::StringPiece& rawKey) { + if (rawKey.size() != kSystemLen) { + return false; + } + auto position = rawKey.data() + sizeof(PartitionID); + auto len = sizeof(NebulaSystemKeyType); + auto type = readInt(position, len); + return static_cast(NebulaSystemKeyType::kSystemPart) == type; + } + + static VertexID getSrcId(const folly::StringPiece& rawKey) { + CHECK_EQ(rawKey.size(), kEdgeLen); + return readInt(rawKey.data() + sizeof(PartitionID), sizeof(VertexID)); + } + + static VertexID getDstId(const folly::StringPiece& rawKey) { + CHECK_EQ(rawKey.size(), kEdgeLen); + auto offset = sizeof(PartitionID) + sizeof(VertexID) + sizeof(EdgeType) + sizeof(EdgeRanking); + return readInt(rawKey.data() + offset, sizeof(VertexID)); + } + + static EdgeType getEdgeType(const folly::StringPiece& rawKey) { + CHECK_EQ(rawKey.size(), kEdgeLen); + auto offset = sizeof(PartitionID) + sizeof(VertexID); + EdgeType type = readInt(rawKey.data() + offset, sizeof(EdgeType)); + return type > 0 ? type & kTagEdgeValueMask : type; + } + + static EdgeRanking getRank(const folly::StringPiece& rawKey) { + CHECK_EQ(rawKey.size(), kEdgeLen); + auto offset = sizeof(PartitionID) + sizeof(VertexID) + sizeof(EdgeType); + return readInt(rawKey.data() + offset, sizeof(EdgeRanking)); + } + + static int64_t getVersion(const folly::StringPiece& rawKey) { + CHECK(isVertex(rawKey) || isEdge(rawKey)); + auto offset = rawKey.size() - sizeof(int64_t); + return readInt(rawKey.data() + offset, sizeof(int64_t)); + } + + static IndexID getIndexId(const folly::StringPiece& rawKey) { + CHECK_GT(rawKey.size(), kIndexLen); + auto offset = sizeof(PartitionID); + return readInt(rawKey.data() + offset, sizeof(IndexID)); + } + + template + static typename std::enable_if::value, T>::type readInt(const char* data, + int32_t len) { + CHECK_GE(len, sizeof(T)); + return *reinterpret_cast(data); + } + + static bool isDataKey(const folly::StringPiece& key) { + constexpr int32_t len = static_cast(sizeof(NebulaKeyTypeV1)); + auto type = readInt(key.data(), len) & kTypeMask; + return static_cast(NebulaKeyTypeV1::kData) == type; + } + + static bool isIndexKey(const folly::StringPiece& key) { + if (key.size() < kIndexLen) { + return false; + } + constexpr int32_t len = static_cast(sizeof(NebulaKeyTypeV1)); + auto type = readInt(key.data(), len) & kTypeMask; + return static_cast(NebulaKeyTypeV1::kIndex) == type; + } + + static bool isUUIDKey(const folly::StringPiece& key) { + auto type = readInt(key.data(), sizeof(int32_t)) & kTypeMask; + return static_cast(NebulaKeyTypeV1::kUUID) == type; + } + + static folly::StringPiece keyWithNoVersion(const folly::StringPiece& rawKey) { + // TODO(heng) We should change the method if varint data version supported. + return rawKey.subpiece(0, rawKey.size() - sizeof(int64_t)); + } + + static VertexID getIndexVertexID(const folly::StringPiece& rawKey) { + CHECK_GE(rawKey.size(), kVertexIndexLen); + auto offset = rawKey.size() - sizeof(VertexID); + return *reinterpret_cast(rawKey.data() + offset); + } + + static VertexID getIndexSrcId(const folly::StringPiece& rawKey) { + CHECK_GE(rawKey.size(), kEdgeIndexLen); + auto offset = rawKey.size() - sizeof(VertexID) * 2 - sizeof(EdgeRanking); + return readInt(rawKey.data() + offset, sizeof(VertexID)); + } + + static VertexID getIndexDstId(const folly::StringPiece& rawKey) { + CHECK_GE(rawKey.size(), kEdgeIndexLen); + auto offset = rawKey.size() - sizeof(VertexID); + return readInt(rawKey.data() + offset, sizeof(VertexID)); + } + + static EdgeRanking getIndexRank(const folly::StringPiece& rawKey) { + CHECK_GE(rawKey.size(), kEdgeIndexLen); + auto offset = rawKey.size() - sizeof(VertexID) - sizeof(EdgeRanking); + return readInt(rawKey.data() + offset, sizeof(EdgeRanking)); + } + + private: + NebulaKeyUtilsV1() = delete; + + private: + static constexpr int32_t kVertexLen = + sizeof(PartitionID) + sizeof(VertexID) + sizeof(TagID) + sizeof(TagVersion); + + static constexpr int32_t kEdgeLen = sizeof(PartitionID) + sizeof(VertexID) + sizeof(EdgeType) + + sizeof(VertexID) + sizeof(EdgeRanking) + sizeof(EdgeVersion); + + static constexpr int32_t kVertexIndexLen = + sizeof(PartitionID) + sizeof(IndexID) + sizeof(VertexID); + + static constexpr int32_t kEdgeIndexLen = + sizeof(PartitionID) + sizeof(IndexID) + sizeof(VertexID) * 2 + sizeof(EdgeRanking); + + static constexpr int32_t kIndexLen = std::min(kVertexIndexLen, kEdgeIndexLen); + + static constexpr int32_t kSystemLen = sizeof(PartitionID) + sizeof(NebulaSystemKeyType); + + // The partition id offset in 4 Bytes + static constexpr uint8_t kPartitionOffset = 8; + + // The key type bits Mask + // See KeyType enum + static constexpr uint32_t kTypeMask = 0x000000FF; + + // The most significant bit is sign bit, tag is always 0 + // The second most significant bit is tag/edge type bit Mask + // 0 for Tag, 1 for Edge + static constexpr uint32_t kTagEdgeMask = 0x40000000; + // For extract Tag/Edge value + static constexpr uint32_t kTagEdgeValueMask = ~kTagEdgeMask; + // Write edge by |= 0x40000000 + static constexpr uint32_t kEdgeMaskSet = kTagEdgeMask; + // Write Tag by &= 0xbfffffff + static constexpr uint32_t kTagMaskSet = ~kTagEdgeMask; +}; + +} // namespace nebula +#endif // TOOLS_DBUPGRADE_NEBULAKEYUTILSV1_H_ diff --git a/src/tools/db-upgrade/NebulaKeyUtilsV2.cpp b/src/tools/db-upgrade/NebulaKeyUtilsV2.cpp new file mode 100644 index 00000000000..8a1a0f429c6 --- /dev/null +++ b/src/tools/db-upgrade/NebulaKeyUtilsV2.cpp @@ -0,0 +1,225 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#include "tools/db-upgrade/NebulaKeyUtilsV2.h" + +namespace nebula { + +// static +bool NebulaKeyUtilsV2::isValidVidLen(size_t vIdLen, VertexID srcVId, VertexID dstVId) { + if (srcVId.size() > vIdLen || dstVId.size() > vIdLen) { + return false; + } + return true; +} + +// static +std::string NebulaKeyUtilsV2::tagKey( + size_t vIdLen, PartitionID partId, VertexID vId, TagID tagId, TagVersion tv) { + CHECK_GE(vIdLen, vId.size()); + tagId &= kTagMaskSet; + int32_t item = (partId << kPartitionOffset) | static_cast(NebulaKeyTypeV2::kData); + + std::string key; + key.reserve(kVertexLen + vIdLen); + key.append(reinterpret_cast(&item), sizeof(int32_t)) + .append(vId.data(), vId.size()) + .append(vIdLen - vId.size(), '\0') + .append(reinterpret_cast(&tagId), sizeof(TagID)) + .append(reinterpret_cast(&tv), sizeof(TagVersion)); + return key; +} + +// static +std::string NebulaKeyUtilsV2::edgeKey(size_t vIdLen, + PartitionID partId, + VertexID srcId, + EdgeType type, + EdgeRanking rank, + VertexID dstId, + EdgeVersion ev) { + CHECK_GE(vIdLen, srcId.size()); + CHECK_GE(vIdLen, dstId.size()); + type |= kEdgeMaskSet; + int32_t item = (partId << kPartitionOffset) | static_cast(NebulaKeyTypeV2::kData); + + std::string key; + key.reserve(kEdgeLen + (vIdLen << 1)); + key.append(reinterpret_cast(&item), sizeof(PartitionID)) + .append(srcId.data(), srcId.size()) + .append(vIdLen - srcId.size(), '\0') + .append(reinterpret_cast(&type), sizeof(EdgeType)) + .append(reinterpret_cast(&rank), sizeof(EdgeRanking)) + .append(dstId.data(), dstId.size()) + .append(vIdLen - dstId.size(), '\0') + .append(reinterpret_cast(&ev), sizeof(EdgeVersion)); + return key; +} + +// static +std::string NebulaKeyUtilsV2::systemCommitKey(PartitionID partId) { + int32_t item = (partId << kPartitionOffset) | static_cast(NebulaKeyTypeV2::kSystem); + uint32_t type = static_cast(NebulaSystemKeyType::kSystemCommit); + std::string key; + key.reserve(kSystemLen); + key.append(reinterpret_cast(&item), sizeof(PartitionID)) + .append(reinterpret_cast(&type), sizeof(NebulaSystemKeyType)); + return key; +} + +// static +std::string NebulaKeyUtilsV2::systemPartKey(PartitionID partId) { + uint32_t item = (partId << kPartitionOffset) | static_cast(NebulaKeyTypeV2::kSystem); + uint32_t type = static_cast(NebulaSystemKeyType::kSystemPart); + std::string key; + key.reserve(kSystemLen); + key.append(reinterpret_cast(&item), sizeof(PartitionID)) + .append(reinterpret_cast(&type), sizeof(NebulaSystemKeyType)); + return key; +} + +// static +std::string NebulaKeyUtilsV2::kvKey(PartitionID partId, const folly::StringPiece& name) { + std::string key; + key.reserve(sizeof(PartitionID) + name.size()); + int32_t item = (partId << kPartitionOffset) | static_cast(NebulaKeyTypeV2::kData); + key.append(reinterpret_cast(&item), sizeof(int32_t)) + .append(name.data(), name.size()); + return key; +} + +// static +std::string NebulaKeyUtilsV2::vertexPrefix(size_t vIdLen, + PartitionID partId, + VertexID vId, + TagID tagId) { + CHECK_GE(vIdLen, vId.size()); + tagId &= kTagMaskSet; + PartitionID item = (partId << kPartitionOffset) | static_cast(NebulaKeyTypeV2::kData); + + std::string key; + key.reserve(sizeof(PartitionID) + vIdLen + sizeof(TagID)); + key.append(reinterpret_cast(&item), sizeof(PartitionID)) + .append(vId.data(), vId.size()) + .append(vIdLen - vId.size(), '\0') + .append(reinterpret_cast(&tagId), sizeof(TagID)); + return key; +} + +// static +std::string NebulaKeyUtilsV2::vertexPrefix(size_t vIdLen, PartitionID partId, VertexID vId) { + CHECK_GE(vIdLen, vId.size()); + PartitionID item = (partId << kPartitionOffset) | static_cast(NebulaKeyTypeV2::kData); + std::string key; + key.reserve(sizeof(PartitionID) + vIdLen); + key.append(reinterpret_cast(&item), sizeof(PartitionID)) + .append(vId.data(), vId.size()) + .append(vIdLen - vId.size(), '\0'); + return key; +} + +// static +std::string NebulaKeyUtilsV2::edgePrefix(size_t vIdLen, + PartitionID partId, + VertexID srcId, + EdgeType type) { + CHECK_GE(vIdLen, srcId.size()); + type |= kEdgeMaskSet; + PartitionID item = (partId << kPartitionOffset) | static_cast(NebulaKeyTypeV2::kData); + + std::string key; + key.reserve(sizeof(PartitionID) + vIdLen + sizeof(EdgeType)); + key.append(reinterpret_cast(&item), sizeof(PartitionID)) + .append(srcId.data(), srcId.size()) + .append(vIdLen - srcId.size(), '\0') + .append(reinterpret_cast(&type), sizeof(EdgeType)); + return key; +} + +// static +std::string NebulaKeyUtilsV2::edgePrefix(size_t vIdLen, PartitionID partId, VertexID srcId) { + CHECK_GE(vIdLen, srcId.size()); + PartitionID item = (partId << kPartitionOffset) | static_cast(NebulaKeyTypeV2::kData); + std::string key; + key.reserve(sizeof(PartitionID) + vIdLen); + key.append(reinterpret_cast(&item), sizeof(PartitionID)) + .append(srcId.data(), srcId.size()) + .append(vIdLen - srcId.size(), '\0'); + return key; +} + +// static +std::string NebulaKeyUtilsV2::edgePrefix(size_t vIdLen, + PartitionID partId, + VertexID srcId, + EdgeType type, + EdgeRanking rank, + VertexID dstId) { + CHECK_GE(vIdLen, srcId.size()); + CHECK_GE(vIdLen, dstId.size()); + type |= kEdgeMaskSet; + int32_t item = (partId << kPartitionOffset) | static_cast(NebulaKeyTypeV2::kData); + std::string key; + key.reserve(sizeof(PartitionID) + (vIdLen << 1) + sizeof(EdgeType) + sizeof(EdgeRanking)); + key.append(reinterpret_cast(&item), sizeof(PartitionID)) + .append(srcId.data(), srcId.size()) + .append(vIdLen - srcId.size(), '\0') + .append(reinterpret_cast(&type), sizeof(EdgeType)) + .append(reinterpret_cast(&rank), sizeof(EdgeRanking)) + .append(dstId.data(), dstId.size()) + .append(vIdLen - dstId.size(), '\0'); + return key; +} + +// static +std::string NebulaKeyUtilsV2::partPrefix(PartitionID partId) { + PartitionID item = (partId << kPartitionOffset) | static_cast(NebulaKeyTypeV2::kData); + std::string key; + key.reserve(sizeof(PartitionID)); + key.append(reinterpret_cast(&item), sizeof(PartitionID)); + return key; +} + +// static +std::string NebulaKeyUtilsV2::snapshotPrefix(PartitionID partId) { + // snapshot of meta would be all key-value pairs + if (partId == 0) { + return ""; + } + return partPrefix(partId); +} + +std::string NebulaKeyUtilsV2::systemPrefix() { + int8_t type = static_cast(NebulaKeyTypeV2::kSystem); + std::string key; + key.reserve(sizeof(int8_t)); + key.append(reinterpret_cast(&type), sizeof(int8_t)); + return key; +} + +std::string NebulaKeyUtilsV2::toLockKey(const folly::StringPiece& rawKey, bool enableMvcc) { + EdgeVersion verPlaceHolder = 0; + EdgeVersion ver = 0; + if (enableMvcc) { + auto offset = rawKey.size() - sizeof(EdgeVersion); + ver = readInt(rawKey.data() + offset, sizeof(EdgeVersion)); + } + + auto lockKey = NebulaKeyUtilsV2::keyWithNoVersion(rawKey).str(); + lockKey.append(reinterpret_cast(&verPlaceHolder), sizeof(EdgeVersion)); + lockKey.append(reinterpret_cast(&ver), sizeof(EdgeVersion)); + return lockKey + kLockSuffix; +} + +std::string NebulaKeyUtilsV2::toEdgeKey(const folly::StringPiece& lockKey, bool enableMvcc) { + // edges in toss space must have edge ver greater then 0. + EdgeVersion ver = enableMvcc ? NebulaKeyUtilsV2::getLockVersion(lockKey) : 1; + + auto rawKey = NebulaKeyUtilsV2::lockWithNoVersion(lockKey).str(); + rawKey.append(reinterpret_cast(&ver), sizeof(EdgeVersion)); + return rawKey; +} + +} // namespace nebula diff --git a/src/tools/db-upgrade/NebulaKeyUtilsV2.h b/src/tools/db-upgrade/NebulaKeyUtilsV2.h new file mode 100644 index 00000000000..ad35cb5ae26 --- /dev/null +++ b/src/tools/db-upgrade/NebulaKeyUtilsV2.h @@ -0,0 +1,304 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#ifndef TOOLS_DBUPGRADE_NEBULAKEYUTILSV2_H_ +#define TOOLS_DBUPGRADE_NEBULAKEYUTILSV2_H_ + +#include "common/utils/Types.h" + +namespace nebula { + +enum class NebulaKeyTypeV2 : uint32_t { + kData = 0x00000001, + kIndex = 0x00000002, + kUUID = 0x00000003, + kSystem = 0x00000004, + kOperation = 0x00000005, +}; + +/** + * VertexKeyUtils: + * type(1) + partId(3) + vertexId(*) + tagId(4) + version(8) + * + * EdgeKeyUtils: + * type(1) + partId(3) + srcId(*) + edgeType(4) + edgeRank(8) + dstId(*) + + * version(8) + * + * For data in Nebula 1.0, all vertexId is int64_t, so the size would be 8. + * For data in Nebula 2.0, all vertexId is fixed length string according to + * space property. + * + * LockKeyUtils: + * EdgeKeyWithNoVersion + placeHolder(8) + version(8) + suffix(2) + * */ + +const std::string kLockSuffix = "lk"; // NOLINT + +/** + * This class supply some utils for transition between Vertex/Edge and key in + * kvstore for v2.0 rc. + * */ +class NebulaKeyUtilsV2 final { + public: + ~NebulaKeyUtilsV2() = default; + + /* + * Check the validity of vid length + */ + static bool isValidVidLen(size_t vIdLen, VertexID srcvId, VertexID dstvId = ""); + + /** + * Generate tag key for kv store + * */ + static std::string tagKey( + size_t vIdLen, PartitionID partId, VertexID vId, TagID tagId, TagVersion tv); + + /** + * Generate edge key for kv store + * */ + static std::string edgeKey(size_t vIdLen, + PartitionID partId, + VertexID srcId, + EdgeType type, + EdgeRanking rank, + VertexID dstId, + EdgeVersion ev); + + static std::string systemCommitKey(PartitionID partId); + + static std::string systemPartKey(PartitionID partId); + + static std::string kvKey(PartitionID partId, const folly::StringPiece& name); + + /** + * Prefix for vertex + * */ + static std::string vertexPrefix(size_t vIdLen, PartitionID partId, VertexID vId, TagID tagId); + + static std::string vertexPrefix(size_t vIdLen, PartitionID partId, VertexID vId); + + /** + * Prefix for edge + * */ + static std::string edgePrefix(size_t vIdLen, PartitionID partId, VertexID srcId, EdgeType type); + + static std::string edgePrefix(size_t vIdLen, PartitionID partId, VertexID srcId); + + static std::string edgePrefix(size_t vIdLen, + PartitionID partId, + VertexID srcId, + EdgeType type, + EdgeRanking rank, + VertexID dstId); + + static std::string systemPrefix(); + + static std::string partPrefix(PartitionID partId); + + static std::string snapshotPrefix(PartitionID partId); + + static PartitionID getPart(const folly::StringPiece& rawKey) { + return readInt(rawKey.data(), sizeof(PartitionID)) >> 8; + } + + static bool isVertex(size_t vIdLen, const folly::StringPiece& rawKey) { + if (rawKey.size() != kVertexLen + vIdLen) { + return false; + } + constexpr int32_t len = static_cast(sizeof(NebulaKeyTypeV2)); + auto type = readInt(rawKey.data(), len) & kTypeMask; + if (static_cast(NebulaKeyTypeV2::kData) != type) { + return false; + } + auto offset = sizeof(PartitionID) + vIdLen; + TagID tagId = readInt(rawKey.data() + offset, sizeof(TagID)); + return !(tagId & kTagEdgeMask); + } + + static VertexIDSlice getVertexId(size_t vIdLen, const folly::StringPiece& rawKey) { + if (rawKey.size() != kVertexLen + vIdLen) { + dumpBadKey(rawKey, kVertexLen + vIdLen, vIdLen); + } + auto offset = sizeof(PartitionID); + return rawKey.subpiece(offset, vIdLen); + } + + static TagID getTagId(size_t vIdLen, const folly::StringPiece& rawKey) { + if (rawKey.size() != kVertexLen + vIdLen) { + dumpBadKey(rawKey, kVertexLen + vIdLen, vIdLen); + } + auto offset = sizeof(PartitionID) + vIdLen; + return readInt(rawKey.data() + offset, sizeof(TagID)); + } + + static bool isEdge(size_t vIdLen, const folly::StringPiece& rawKey) { + if (rawKey.size() != kEdgeLen + (vIdLen << 1)) { + return false; + } + constexpr int32_t len = static_cast(sizeof(NebulaKeyTypeV2)); + auto type = readInt(rawKey.data(), len) & kTypeMask; + if (static_cast(NebulaKeyTypeV2::kData) != type) { + return false; + } + auto offset = sizeof(PartitionID) + vIdLen; + EdgeType etype = readInt(rawKey.data() + offset, sizeof(EdgeType)); + return etype & kTagEdgeMask; + } + + static bool isLock(size_t vIdLen, const folly::StringPiece& rawKey) { + auto len = rawKey.size() - sizeof(EdgeVersion) - kLockSuffix.size(); + return isEdge(vIdLen, folly::StringPiece(rawKey.begin(), len)); + } + + static bool isSystemCommit(const folly::StringPiece& rawKey) { + if (rawKey.size() != kSystemLen) { + return false; + } + auto position = rawKey.data() + sizeof(PartitionID); + auto len = sizeof(NebulaSystemKeyType); + auto type = readInt(position, len); + return static_cast(NebulaSystemKeyType::kSystemCommit) == type; + } + + static bool isSystemPart(const folly::StringPiece& rawKey) { + if (rawKey.size() != kSystemLen) { + return false; + } + auto position = rawKey.data() + sizeof(PartitionID); + auto len = sizeof(NebulaSystemKeyType); + auto type = readInt(position, len); + return static_cast(NebulaSystemKeyType::kSystemPart) == type; + } + + static VertexIDSlice getSrcId(size_t vIdLen, const folly::StringPiece& rawKey) { + if (rawKey.size() < kEdgeLen + (vIdLen << 1)) { + dumpBadKey(rawKey, kEdgeLen + (vIdLen << 1), vIdLen); + } + auto offset = sizeof(PartitionID); + return rawKey.subpiece(offset, vIdLen); + } + + static VertexIDSlice getDstId(size_t vIdLen, const folly::StringPiece& rawKey) { + if (rawKey.size() < kEdgeLen + (vIdLen << 1)) { + dumpBadKey(rawKey, kEdgeLen + (vIdLen << 1), vIdLen); + } + auto offset = sizeof(PartitionID) + vIdLen + sizeof(EdgeType) + sizeof(EdgeRanking); + return rawKey.subpiece(offset, vIdLen); + } + + static EdgeType getEdgeType(size_t vIdLen, const folly::StringPiece& rawKey) { + if (rawKey.size() < kEdgeLen + (vIdLen << 1)) { + dumpBadKey(rawKey, kEdgeLen + (vIdLen << 1), vIdLen); + } + auto offset = sizeof(PartitionID) + vIdLen; + EdgeType type = readInt(rawKey.data() + offset, sizeof(EdgeType)); + return type > 0 ? type & kTagEdgeValueMask : type; + } + + static EdgeRanking getRank(size_t vIdLen, const folly::StringPiece& rawKey) { + if (rawKey.size() < kEdgeLen + (vIdLen << 1)) { + dumpBadKey(rawKey, kEdgeLen + (vIdLen << 1), vIdLen); + } + auto offset = sizeof(PartitionID) + vIdLen + sizeof(EdgeType); + return readInt(rawKey.data() + offset, sizeof(EdgeRanking)); + } + + static int64_t getVersion(size_t vIdLen, const folly::StringPiece& rawKey) { + if (isVertex(vIdLen, rawKey) || isEdge(vIdLen, rawKey)) { + auto offset = rawKey.size() - sizeof(int64_t); + return readInt(rawKey.data() + offset, sizeof(int64_t)); + } else if (isLock(vIdLen, rawKey)) { + return getLockVersion(rawKey); + } else { + LOG(FATAL) << "key is not one of vertex, edge or lock"; + } + return 0; // will not runs here, just for satisfied g++ + } + + static bool isDataKey(const folly::StringPiece& key) { + constexpr int32_t len = static_cast(sizeof(NebulaKeyTypeV2)); + auto type = readInt(key.data(), len) & kTypeMask; + return static_cast(NebulaKeyTypeV2::kData) == type; + } + + static folly::StringPiece keyWithNoVersion(const folly::StringPiece& rawKey) { + // TODO(heng) We should change the method if varint data version supported. + return rawKey.subpiece(0, rawKey.size() - sizeof(int64_t)); + } + + /** + * @brief gen edge key from lock, this will used at resume + * if enableMvcc ver of edge and lock will be same, + * else ver of lock should be 0, and ver of edge should be 1 + */ + static std::string toEdgeKey(const folly::StringPiece& lockKey, bool enableMvcc = false); + + /** + * @brief gen edge lock from lock + * if enableMvcc ver of edge and lock will be same, + * else ver of lock should be 0, and ver of edge should be 1 + */ + static std::string toLockKey(const folly::StringPiece& rawKey, bool enableMvcc = false); + + static EdgeVersion getLockVersion(const folly::StringPiece& rawKey) { + // TODO(liuyu) We should change the method if varint data version + // supported. + auto offset = rawKey.size() - sizeof(int64_t) * 2 - kLockSuffix.size(); + return readInt(rawKey.data() + offset, sizeof(int64_t)); + } + + static folly::StringPiece lockWithNoVersion(const folly::StringPiece& rawKey) { + // TODO(liuyu) We should change the method if varint data version + // supported. + return rawKey.subpiece(0, rawKey.size() - sizeof(int64_t) * 2 - kLockSuffix.size()); + } + + static void dumpBadKey(const folly::StringPiece& rawKey, size_t expect, size_t vIdLen) { + std::stringstream msg; + msg << "rawKey.size() != expect size" + << ", rawKey.size() = " << rawKey.size() << ", expect = " << expect + << ", vIdLen = " << vIdLen << ", rawkey hex format:\n"; + msg << folly::hexDump(rawKey.data(), rawKey.size()); + LOG(FATAL) << msg.str(); + } + + private: + NebulaKeyUtilsV2() = delete; + + private: + // size of vertex key except vertexId + static constexpr int32_t kVertexLen = sizeof(PartitionID) + sizeof(TagID) + sizeof(TagVersion); + + // size of vertex key except srcId and dstId + static constexpr int32_t kEdgeLen = + sizeof(PartitionID) + sizeof(EdgeType) + sizeof(EdgeRanking) + sizeof(EdgeVersion); + + static constexpr int32_t kSystemLen = sizeof(PartitionID) + sizeof(NebulaSystemKeyType); + + // The partition id offset in 4 Bytes + static constexpr uint8_t kPartitionOffset = 8; + + // The key type bits Mask + // See KeyType enum + static constexpr uint32_t kTypeMask = 0x000000FF; + + // The Tag/Edge type bit Mask, the most significant bit is to indicate sign, + // the next bit is to indicate it is tag or edge, 0 for Tag, 1 for Edge + static constexpr uint32_t kTagEdgeMask = 0x40000000; + // For extract Tag/Edge value + static constexpr uint32_t kTagEdgeValueMask = ~kTagEdgeMask; + // Write edge by &= + static constexpr uint32_t kEdgeMaskSet = kTagEdgeMask; + // Write Tag by |= + static constexpr uint32_t kTagMaskSet = ~kTagEdgeMask; + + static constexpr int32_t kVertexIndexLen = sizeof(PartitionID) + sizeof(IndexID); + + static constexpr int32_t kEdgeIndexLen = + sizeof(PartitionID) + sizeof(IndexID) + sizeof(EdgeRanking); +}; + +} // namespace nebula +#endif // TOOLS_DBUPGRADE_NEBULAKEYUTILSV2_H_ diff --git a/src/tools/db-upgrade/NebulaKeyUtilsV3.cpp b/src/tools/db-upgrade/NebulaKeyUtilsV3.cpp new file mode 100644 index 00000000000..e7e7033e389 --- /dev/null +++ b/src/tools/db-upgrade/NebulaKeyUtilsV3.cpp @@ -0,0 +1,26 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#include "tools/db-upgrade/NebulaKeyUtilsV3.h" + +namespace nebula { +std::string NebulaKeyUtilsV3::partTagPrefix(PartitionID partId) { + PartitionID item = (partId << kPartitionOffset) | static_cast(kTag_); + std::string key; + key.reserve(sizeof(PartitionID)); + key.append(reinterpret_cast(&item), sizeof(PartitionID)); + return key; +} +std::string NebulaKeyUtilsV3::getVertexKey(folly::StringPiece tagKey) { + std::string key = tagKey.toString(); + key[0] = static_cast(kVertex); + key.resize(key.size() - sizeof(TagID)); + return key; +} +std::string NebulaKeyUtilsV3::dataVersionValue() { + return "3.0"; +} + +} // namespace nebula diff --git a/src/tools/db-upgrade/NebulaKeyUtilsV3.h b/src/tools/db-upgrade/NebulaKeyUtilsV3.h new file mode 100644 index 00000000000..b486690067f --- /dev/null +++ b/src/tools/db-upgrade/NebulaKeyUtilsV3.h @@ -0,0 +1,20 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ +#ifndef TOOLS_DB_UPGRADE_NEBULAKEYUTILSV3_H +#define TOOLS_DB_UPGRADE_NEBULAKEYUTILSV3_H +#include "common/utils/Types.h" +namespace nebula { +class NebulaKeyUtilsV3 { + public: + static std::string partTagPrefix(PartitionID partId); + static std::string getVertexKey(folly::StringPiece tagKey); + static std::string dataVersionValue(); + + private: + enum NebulaKeyTypeV3 : uint32_t { kTag_ = 0x00000001, kVertex = 0x00000007 }; +}; + +} // namespace nebula +#endif diff --git a/tests/tck/features/delete/DeleteTag.IntVid.feature b/tests/tck/features/delete/DeleteTag.IntVid.feature index af279dd4701..70eadfc8626 100644 --- a/tests/tck/features/delete/DeleteTag.IntVid.feature +++ b/tests/tck/features/delete/DeleteTag.IntVid.feature @@ -41,6 +41,7 @@ Feature: Delete int vid of tag """ Then the result should be, in any order: | player.name | player.age | + | EMPTY | EMPTY | When executing query: """ FETCH PROP ON bachelor hash("Tim Duncan") YIELD bachelor.name, bachelor.speciality @@ -160,6 +161,7 @@ Feature: Delete int vid of tag | id | Then drop the used space + @wtf Scenario: delete int vid multiple vertex one tag Given an empty graph And load "nba_int_vid" csv data to a new space @@ -205,6 +207,7 @@ Feature: Delete int vid of tag """ Then the result should be, in any order: | player.name | player.age | + | EMPTY | EMPTY | When executing query: """ FETCH PROP ON player hash("Tony Parker") YIELD player.name, player.age diff --git a/tests/tck/features/delete/DeleteTag.feature b/tests/tck/features/delete/DeleteTag.feature index 4e01b0cdeff..7a4fe136e94 100644 --- a/tests/tck/features/delete/DeleteTag.feature +++ b/tests/tck/features/delete/DeleteTag.feature @@ -41,6 +41,7 @@ Feature: Delete string vid of tag """ Then the result should be, in any order: | player.name | player.age | + | EMPTY | EMPTY | When executing query: """ FETCH PROP ON bachelor "Tim Duncan" YIELD bachelor.name, bachelor.speciality @@ -205,6 +206,7 @@ Feature: Delete string vid of tag """ Then the result should be, in any order: | player.name | player.age | + | EMPTY | EMPTY | When executing query: """ FETCH PROP ON player "Tony Parker" YIELD player.name, player.age diff --git a/tests/tck/features/go/GO.IntVid.feature b/tests/tck/features/go/GO.IntVid.feature index 27eb032b59d..757b31efc45 100644 --- a/tests/tck/features/go/GO.IntVid.feature +++ b/tests/tck/features/go/GO.IntVid.feature @@ -1383,14 +1383,14 @@ Feature: IntegerVid Go Sentence """ Then the result should be, in any order, with relax comparison, and the columns 0,1 should be hashed: | serve._dst | like._dst | serve.start_year | like.likeness | $$.player.name | - | "Thunders" | EMPTY | 2008 | EMPTY | NULL | + | "Thunders" | EMPTY | 2008 | EMPTY | EMPTY | | EMPTY | "Paul George" | EMPTY | 90 | "Paul George" | | EMPTY | "James Harden" | EMPTY | 90 | "James Harden" | - | "Pacers" | EMPTY | 2010 | EMPTY | NULL | - | "Thunders" | EMPTY | 2017 | EMPTY | NULL | + | "Pacers" | EMPTY | 2010 | EMPTY | EMPTY | + | "Thunders" | EMPTY | 2017 | EMPTY | EMPTY | | EMPTY | "Russell Westbrook" | EMPTY | 95 | "Russell Westbrook" | - | "Thunders" | EMPTY | 2009 | EMPTY | NULL | - | "Rockets" | EMPTY | 2012 | EMPTY | NULL | + | "Thunders" | EMPTY | 2009 | EMPTY | EMPTY | + | "Rockets" | EMPTY | 2012 | EMPTY | EMPTY | | EMPTY | "Russell Westbrook" | EMPTY | 80 | "Russell Westbrook" | When executing query: """ @@ -1399,14 +1399,14 @@ Feature: IntegerVid Go Sentence """ Then the result should be, in any order, with relax comparison, and the columns 0,1 should be hashed: | serve._dst | like._dst | serve.start_year | like.likeness | $$.player.name | - | "Thunders" | EMPTY | 2008 | EMPTY | NULL | + | "Thunders" | EMPTY | 2008 | EMPTY | EMPTY | | EMPTY | "Paul George" | EMPTY | 90 | "Paul George" | | EMPTY | "James Harden" | EMPTY | 90 | "James Harden" | - | "Pacers" | EMPTY | 2010 | EMPTY | NULL | - | "Thunders" | EMPTY | 2017 | EMPTY | NULL | + | "Pacers" | EMPTY | 2010 | EMPTY | EMPTY | + | "Thunders" | EMPTY | 2017 | EMPTY | EMPTY | | EMPTY | "Russell Westbrook" | EMPTY | 95 | "Russell Westbrook" | - | "Thunders" | EMPTY | 2009 | EMPTY | NULL | - | "Rockets" | EMPTY | 2012 | EMPTY | NULL | + | "Thunders" | EMPTY | 2009 | EMPTY | EMPTY | + | "Rockets" | EMPTY | 2012 | EMPTY | EMPTY | | EMPTY | "Russell Westbrook" | EMPTY | 80 | "Russell Westbrook" | When executing query: """ @@ -1480,8 +1480,8 @@ Feature: IntegerVid Go Sentence GO FROM hash('Tim Duncan') OVER serve YIELD $$.player.name as name """ Then the result should be, in any order, with relax comparison: - | name | - | NULL | + | name | + | EMPTY | Scenario: Integer Vid zero step When executing query: diff --git a/tests/tck/features/go/GO.feature b/tests/tck/features/go/GO.feature index 9dabe8c77e3..7374d9bcdaa 100644 --- a/tests/tck/features/go/GO.feature +++ b/tests/tck/features/go/GO.feature @@ -346,7 +346,7 @@ Feature: Go Sentence When executing query: """ GO FROM "Paul Gasol" OVER * - WHERE $$.player.name IS NOT NULL + WHERE $$.player.name IS NOT EMPTY YIELD like._dst """ Then the result should be, in any order, with relax comparison: @@ -356,7 +356,7 @@ Feature: Go Sentence When executing query: """ GO FROM "Paul Gasol" OVER * - WHERE $$.player.name IS NULL + WHERE $$.player.name IS EMPTY YIELD like._dst """ Then the result should be, in any order, with relax comparison: @@ -1474,14 +1474,14 @@ Feature: Go Sentence """ Then the result should be, in any order, with relax comparison: | serve._dst | like._dst | serve.start_year | like.likeness | $$.player.name | - | "Thunders" | EMPTY | 2008 | EMPTY | NULL | + | "Thunders" | EMPTY | 2008 | EMPTY | EMPTY | | EMPTY | "Paul George" | EMPTY | 90 | "Paul George" | | EMPTY | "James Harden" | EMPTY | 90 | "James Harden" | - | "Pacers" | EMPTY | 2010 | EMPTY | NULL | - | "Thunders" | EMPTY | 2017 | EMPTY | NULL | + | "Pacers" | EMPTY | 2010 | EMPTY | EMPTY | + | "Thunders" | EMPTY | 2017 | EMPTY | EMPTY | | EMPTY | "Russell Westbrook" | EMPTY | 95 | "Russell Westbrook" | - | "Thunders" | EMPTY | 2009 | EMPTY | NULL | - | "Rockets" | EMPTY | 2012 | EMPTY | NULL | + | "Thunders" | EMPTY | 2009 | EMPTY | EMPTY | + | "Rockets" | EMPTY | 2012 | EMPTY | EMPTY | | EMPTY | "Russell Westbrook" | EMPTY | 80 | "Russell Westbrook" | When executing query: """ @@ -1490,14 +1490,14 @@ Feature: Go Sentence """ Then the result should be, in any order, with relax comparison: | serve._dst | like._dst | serve.start_year | like.likeness | $$.player.name | - | "Thunders" | EMPTY | 2008 | EMPTY | NULL | + | "Thunders" | EMPTY | 2008 | EMPTY | EMPTY | | EMPTY | "Paul George" | EMPTY | 90 | "Paul George" | | EMPTY | "James Harden" | EMPTY | 90 | "James Harden" | - | "Pacers" | EMPTY | 2010 | EMPTY | NULL | - | "Thunders" | EMPTY | 2017 | EMPTY | NULL | + | "Pacers" | EMPTY | 2010 | EMPTY | EMPTY | + | "Thunders" | EMPTY | 2017 | EMPTY | EMPTY | | EMPTY | "Russell Westbrook" | EMPTY | 95 | "Russell Westbrook" | - | "Thunders" | EMPTY | 2009 | EMPTY | NULL | - | "Rockets" | EMPTY | 2012 | EMPTY | NULL | + | "Thunders" | EMPTY | 2009 | EMPTY | EMPTY | + | "Rockets" | EMPTY | 2012 | EMPTY | EMPTY | | EMPTY | "Russell Westbrook" | EMPTY | 80 | "Russell Westbrook" | When executing query: """ @@ -1571,8 +1571,8 @@ Feature: Go Sentence GO FROM 'Tim Duncan' OVER serve YIELD $$.player.name as name """ Then the result should be, in any order, with relax comparison: - | name | - | NULL | + | name | + | EMPTY | Scenario: zero step When executing query: diff --git a/tests/tck/features/go/GoYieldVertexEdge.feature b/tests/tck/features/go/GoYieldVertexEdge.feature index e32b5b96031..5fbba5aec71 100644 --- a/tests/tck/features/go/GoYieldVertexEdge.feature +++ b/tests/tck/features/go/GoYieldVertexEdge.feature @@ -345,7 +345,7 @@ Feature: Go Yield Vertex And Edge Sentence When executing query: """ GO FROM "Paul Gasol" OVER * - WHERE $$.player.name IS NOT NULL + WHERE $$.player.name IS NOT EMPTY YIELD edge as e """ Then the result should be, in any order, with relax comparison: @@ -355,7 +355,7 @@ Feature: Go Yield Vertex And Edge Sentence When executing query: """ GO FROM "Paul Gasol" OVER * - WHERE $$.player.name IS NULL + WHERE $$.player.name IS EMPTY YIELD type(edge) as type """ Then the result should be, in any order, with relax comparison: @@ -1383,13 +1383,13 @@ Feature: Go Yield Vertex And Edge Sentence | dst | serve.start_year | like.likeness | $$.player.name | | "James Harden" | EMPTY | 90 | "James Harden" | | "Paul George" | EMPTY | 90 | "Paul George" | - | "Thunders" | 2008 | EMPTY | NULL | + | "Thunders" | 2008 | EMPTY | EMPTY | | "Russell Westbrook" | EMPTY | 80 | "Russell Westbrook" | - | "Rockets" | 2012 | EMPTY | NULL | - | "Thunders" | 2009 | EMPTY | NULL | + | "Rockets" | 2012 | EMPTY | EMPTY | + | "Thunders" | 2009 | EMPTY | EMPTY | | "Russell Westbrook" | EMPTY | 95 | "Russell Westbrook" | - | "Pacers" | 2010 | EMPTY | NULL | - | "Thunders" | 2017 | EMPTY | NULL | + | "Pacers" | 2010 | EMPTY | EMPTY | + | "Thunders" | 2017 | EMPTY | EMPTY | When executing query: """ GO 1 TO 2 STEPS FROM 'Russell Westbrook' OVER * REVERSELY YIELD edge as e diff --git a/tests/tck/features/index/Index.feature b/tests/tck/features/index/Index.feature index 8fcc8763177..7d854d3370d 100644 --- a/tests/tck/features/index/Index.feature +++ b/tests/tck/features/index/Index.feature @@ -17,6 +17,11 @@ Feature: IndexTest_Vid_String CREATE TAG INDEX single_tag_index ON tag_1(col2); """ Then the execution should be successful + When executing query: + """ + CREATE TAG INDEX IF NOT EXISTS single_tag_index_1 ON tag_1(col2); + """ + Then the execution should be successful When executing query: """ CREATE TAG INDEX duplicate_tag_index_1 ON tag_1(col2); @@ -155,6 +160,11 @@ Feature: IndexTest_Vid_String CREATE EDGE INDEX single_edge_index ON edge_1(col2); """ Then the execution should be successful + When executing query: + """ + CREATE EDGE INDEX IF NOT EXISTS single_edge_index_1 ON edge_1(col2); + """ + Then the execution should be successful When executing query: """ CREATE EDGE INDEX duplicate_edge_1_index ON edge_1(col2) diff --git a/tests/tck/features/insert/insertVertexOnly.feature b/tests/tck/features/insert/insertVertexOnly.feature index 9419e29e54b..29ebff743d9 100644 --- a/tests/tck/features/insert/insertVertexOnly.feature +++ b/tests/tck/features/insert/insertVertexOnly.feature @@ -19,10 +19,6 @@ Feature: insert vertex without tag When executing query and retrying it on failure every 6 seconds for 3 times: """ INSERT VERTEX VALUES 1:(),2:(),3:(); - """ - Then a SemanticError should be raised at runtime: Insert vertex is forbidden, please speicify the tag - When executing query: - """ INSERT EDGE e() VALUES 1->2:(),2->3:(); """ Then the execution should be successful diff --git a/tests/tck/features/match/MatchById.IntVid.feature b/tests/tck/features/match/MatchById.IntVid.feature index bf35ec30285..1bf44590933 100644 --- a/tests/tck/features/match/MatchById.IntVid.feature +++ b/tests/tck/features/match/MatchById.IntVid.feature @@ -101,6 +101,7 @@ Feature: Integer Vid Match By Id """ Then the result should be, in any order, with relax comparison: | Type | Name | + | 'like' | NULL | | 'serve' | 'Cavaliers' | | 'serve' | 'Heat' | | 'serve' | 'Cavaliers' | @@ -113,6 +114,7 @@ Feature: Integer Vid Match By Id """ Then the result should be, in any order, with relax comparison: | Type | Name | + | 'like' | NULL | | 'serve' | 'Cavaliers' | | 'serve' | 'Heat' | | 'serve' | 'Cavaliers' | diff --git a/tests/tck/features/match/SeekByEdge.feature b/tests/tck/features/match/SeekByEdge.feature index dfdb3a50f4f..a333e520d94 100644 --- a/tests/tck/features/match/SeekByEdge.feature +++ b/tests/tck/features/match/SeekByEdge.feature @@ -333,6 +333,158 @@ Feature: Match seek by edge """ Then the result should be, in any order: | player.player.name | team.team.name | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | + | NULL | NULL | | "Vince Carter" | "Nets" | | "Jason Kidd" | "Nets" | | "Grant Hill" | "Pistons" | @@ -1139,7 +1291,6 @@ Feature: Match seek by edge Then the result should be, in any order: | p1.player.name | p2.team.name | - @wtf Scenario Outline: Seek by edge with properties When executing query: """ @@ -1165,6 +1316,8 @@ Feature: Match seek by edge | player | team | | "Paul Gasol" | "Grizzlies" | | "Jason Kidd" | "Nets" | + | NULL | NULL | + | NULL | NULL | When executing query: """ match (player)-[s:serve]->(team) where s.start_year == 2001 return player.player.name AS player, team.team.name AS team @@ -1189,6 +1342,8 @@ Feature: Match seek by edge | player | team | | "Paul Gasol" | "Grizzlies" | | "Jason Kidd" | "Nets" | + | NULL | NULL | + | NULL | NULL | Scenario Outline: Seek by edge with range with properties When executing query: diff --git a/tests/tck/features/match/With.feature b/tests/tck/features/match/With.feature index 022dbd4965c..d4cdca681c0 100644 --- a/tests/tck/features/match/With.feature +++ b/tests/tck/features/match/With.feature @@ -133,8 +133,8 @@ Feature: With clause RETURN collect(names) """ Then the result should be, in any order, with relax comparison: - | collect(names) | - | ["Tony Parker", "Tiago Splitter", "Shaquille O'Neal", "Marco Belinelli", "Manu Ginobili"] | + | collect(names) | + | ["Tony Parker", "Tiago Splitter", "Shaquille O'Neal", "Marco Belinelli"] | When profiling query: """ MATCH (v:player) diff --git a/tests/tck/features/ttl/TTL.feature b/tests/tck/features/ttl/TTL.feature index 1cb456d209d..f3f74c6c618 100644 --- a/tests/tck/features/ttl/TTL.feature +++ b/tests/tck/features/ttl/TTL.feature @@ -393,31 +393,36 @@ Feature: TTLTest FETCH PROP ON person "1" YIELD vertex as node; """ Then the result should be, in any order, with relax comparison: - | node | + | node | + | ("1") | When executing query: """ FETCH PROP ON person "1" YIELD person.id as id """ Then the result should be, in any order: - | id | + | id | + | EMPTY | When executing query: """ FETCH PROP ON * "1" YIELD person.id, career.id """ Then the result should be, in any order: | person.id | career.id | + | EMPTY | EMPTY | When executing query: """ FETCH PROP ON person "2" YIELD person.id """ Then the result should be, in any order: | person.id | + | EMPTY | When executing query: """ FETCH PROP ON person "2" YIELD person.id as id """ Then the result should be, in any order: - | id | + | id | + | EMPTY | When executing query: """ FETCH PROP ON career "2" YIELD career.id; @@ -486,5 +491,6 @@ Feature: TTLTest FETCH PROP ON person "1" YIELD person.age as age; """ Then the result should be, in any order: - | age | + | age | + | EMPTY | And drop the used space