Skip to content

Commit

Permalink
add db-upgrade V3 (#3417)
Browse files Browse the repository at this point in the history
* add db-upgrade V3

* use ingest

* modify upgrade args

* write data version key

* address wenhao's comment

* address some comment

* address some comment

* format

Co-authored-by: Sophie <84560950+Sophie-Xie@users.noreply.github.com>
  • Loading branch information
cangfengzhs and Sophie-Xie committed Dec 28, 2021
1 parent 2a4be17 commit a89e5b3
Show file tree
Hide file tree
Showing 8 changed files with 191 additions and 16 deletions.
4 changes: 4 additions & 0 deletions src/common/utils/NebulaKeyUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -261,4 +261,8 @@ std::string NebulaKeyUtils::adminTaskKey(int32_t seqId, JobID jobId, TaskID task
return key;
}

std::string NebulaKeyUtils::dataVersionKey() {
return "\xFF\xFF\xFF\xFF";
}

} // namespace nebula
2 changes: 2 additions & 0 deletions src/common/utils/NebulaKeyUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,8 @@ class NebulaKeyUtils final {

static std::string adminTaskKey(int32_t seqId, JobID jobId, TaskID taskId);

static std::string dataVersionKey();

static_assert(sizeof(NebulaKeyType) == sizeof(PartitionID));

private:
Expand Down
1 change: 1 addition & 0 deletions src/tools/db-upgrade/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ nebula_add_executable(
DbUpgraderTool.cpp
NebulaKeyUtilsV1.cpp
NebulaKeyUtilsV2.cpp
NebulaKeyUtilsV3.cpp
DbUpgrader.cpp
OBJECTS
$<TARGET_OBJECTS:meta_service_handler>
Expand Down
129 changes: 122 additions & 7 deletions src/tools/db-upgrade/DbUpgrader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
#include "common/fs/FileUtils.h"
#include "common/utils/IndexKeyUtils.h"
#include "common/utils/NebulaKeyUtils.h"
#include "rocksdb/sst_file_writer.h"
#include "tools/db-upgrade/NebulaKeyUtilsV1.h"
#include "tools/db-upgrade/NebulaKeyUtilsV2.h"
#include "tools/db-upgrade/NebulaKeyUtilsV3.h"

DEFINE_string(src_db_path,
"",
Expand All @@ -22,10 +24,11 @@ DEFINE_string(dst_db_path,
"multi paths should be split by comma");
DEFINE_string(upgrade_meta_server, "127.0.0.1:45500", "Meta servers' address.");
DEFINE_uint32(write_batch_num, 100, "The size of the batch written to rocksdb");
DEFINE_uint32(upgrade_version,
0,
"When the value is 1, upgrade the data from 1.x to 2.0 GA. "
"When the value is 2, upgrade the data from 2.0 RC to 2.0 GA.");
DEFINE_string(upgrade_version,
"",
"When the value is 1:2, upgrade the data from 1.x to 2.0 GA. "
"When the value is 2RC:2, upgrade the data from 2.0 RC to 2.0 GA."
"When the value is 2:3, upgrade the data from 2.0 GA to 3.0 .");
DEFINE_bool(compactions,
true,
"When the upgrade of the space is completed, "
Expand Down Expand Up @@ -83,7 +86,7 @@ Status UpgraderSpace::initSpace(const std::string& sId) {

// Use readonly rocksdb
readEngine_.reset(new nebula::kvstore::RocksEngine(
spaceId_, spaceVidLen_, srcPath_, "", nullptr, nullptr, true));
spaceId_, spaceVidLen_, srcPath_, "", nullptr, nullptr, false));
writeEngine_.reset(new nebula::kvstore::RocksEngine(spaceId_, spaceVidLen_, dstPath_));

parts_.clear();
Expand Down Expand Up @@ -882,6 +885,114 @@ std::string UpgraderSpace::encodeRowVal(const RowReader* reader,
return std::move(rowWrite).moveEncodedStr();
}

void UpgraderSpace::runPartV3() {
std::chrono::milliseconds take_dura{10};
if (auto pId = partQueue_.try_take_for(take_dura)) {
PartitionID partId = *pId;
// Handle vertex and edge, if there is an index, generate index data
LOG(INFO) << "Start to handle vertex/edge/index data in space id " << spaceId_ << " part id "
<< partId;
auto prefix = NebulaKeyUtilsV3::partTagPrefix(partId);
std::unique_ptr<kvstore::KVIterator> iter;
auto retCode = readEngine_->prefix(prefix, &iter);
if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Space id " << spaceId_ << " part " << partId << " no found!";
LOG(ERROR) << "Handle vertex/edge/index data in space id " << spaceId_ << " part id "
<< partId << " failed";

auto unFinishedPart = --unFinishedPart_;
if (unFinishedPart == 0) {
// all parts has finished
LOG(INFO) << "Handle last part: " << partId << " vertex/edge/index data in space id "
<< spaceId_ << " finished";
} else {
pool_->add(std::bind(&UpgraderSpace::runPartV3, this));
}
return;
}
auto write_sst = [&, this](const std::vector<kvstore::KV>& data) {
::rocksdb::Options option;
option.create_if_missing = true;
option.compression = ::rocksdb::CompressionType::kNoCompression;
::rocksdb::SstFileWriter sst_file_writer(::rocksdb::EnvOptions(), option);
std::string file = ::fmt::format(
".nebula_upgrade.space-{}.part-{}.{}.sst", spaceId_, partId, std::time(nullptr));
::rocksdb::Status s = sst_file_writer.Open(file);
if (!s.ok()) {
LOG(FATAL) << "Faild upgrade V3 of space " << spaceId_ << ", part " << partId << ":"
<< s.code();
}
for (auto item : data) {
s = sst_file_writer.Put(item.first, item.second);
if (!s.ok()) {
LOG(FATAL) << "Faild upgrade V3 of space " << spaceId_ << ", part " << partId << ":"
<< s.code();
}
}
s = sst_file_writer.Finish();
if (!s.ok()) {
LOG(FATAL) << "Faild upgrade V3 of space " << spaceId_ << ", part " << partId << ":"
<< s.code();
}
std::lock_guard<std::mutex> lck(this->ingest_sst_file_mut_);
ingest_sst_file_.push_back(file);
};
std::vector<kvstore::KV> data;
std::string lastVertexKey = "";
while (iter && iter->valid()) {
auto vertex = NebulaKeyUtilsV3::getVertexKey(iter->key());
if (vertex == lastVertexKey) {
iter->next();
continue;
}
data.emplace_back(vertex, "");
lastVertexKey = vertex;
if (data.size() >= 100000) {
write_sst(data);
data.clear();
}
iter->next();
}
if (!data.empty()) {
write_sst(data);
data.clear();
}
LOG(INFO) << "Handle vertex/edge/index data in space id " << spaceId_ << " part id " << partId
<< " succeed";

auto unFinishedPart = --unFinishedPart_;
if (unFinishedPart == 0) {
// all parts has finished
LOG(INFO) << "Handle last part: " << partId << " vertex/edge/index data in space id "
<< spaceId_ << " finished.";
} else {
pool_->add(std::bind(&UpgraderSpace::runPartV3, this));
}
} else {
LOG(INFO) << "Handle vertex/edge/index of parts data in space id " << spaceId_ << " finished";
}
}
void UpgraderSpace::doProcessV3() {
LOG(INFO) << "Start to handle data in space id " << spaceId_;
// Parallel process part
auto partConcurrency = std::min(static_cast<size_t>(FLAGS_max_concurrent_parts), parts_.size());
LOG(INFO) << "Max concurrent parts: " << partConcurrency;
unFinishedPart_ = parts_.size();

LOG(INFO) << "Start to handle vertex/edge/index of parts data in space id " << spaceId_;
for (size_t i = 0; i < partConcurrency; ++i) {
pool_->add(std::bind(&UpgraderSpace::runPartV3, this));
}

while (unFinishedPart_ != 0) {
sleep(10);
}
auto code = readEngine_->ingest(ingest_sst_file_, true);
if (code != ::nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(FATAL) << "Faild upgrade 2:3 when ingest sst file:" << static_cast<int>(code);
}
readEngine_->put(NebulaKeyUtils::dataVersionKey(), NebulaKeyUtilsV3::dataVersionValue());
}
std::vector<std::string> UpgraderSpace::indexVertexKeys(
PartitionID partId,
VertexID& vId,
Expand Down Expand Up @@ -1094,10 +1205,14 @@ void DbUpgrader::doSpace() {
LOG(INFO) << "Upgrade from path " << upgraderSpaceIter->srcPath_ << " space id "
<< upgraderSpaceIter->entry_ << " to path " << upgraderSpaceIter->dstPath_
<< " begin";
if (FLAGS_upgrade_version == 1) {
if (FLAGS_upgrade_version == "1:2") {
upgraderSpaceIter->doProcessV1();
} else {
} else if (FLAGS_upgrade_version == "2RC:2") {
upgraderSpaceIter->doProcessV2();
} else if (FLAGS_upgrade_version == "2:3") {
upgraderSpaceIter->doProcessV3();
} else {
LOG(FATAL) << "error upgrade version " << FLAGS_upgrade_version;
}

auto ret = upgraderSpaceIter->copyWal();
Expand Down
10 changes: 9 additions & 1 deletion src/tools/db-upgrade/DbUpgrader.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ DECLARE_string(src_db_path);
DECLARE_string(dst_db_path);
DECLARE_string(upgrade_meta_server);
DECLARE_uint32(write_batch_num);
DECLARE_uint32(upgrade_version);
DECLARE_string(upgrade_version);
DECLARE_bool(compactions);
DECLARE_uint32(max_concurrent_parts);
DECLARE_uint32(max_concurrent_spaces);
Expand Down Expand Up @@ -55,6 +55,9 @@ class UpgraderSpace {
// Processing v2 Rc data upgrade to v2 Ga
void doProcessV2();

// Processing v2 Ga data upgrade to v3
void doProcessV3();

// Perform manual compact
void doCompaction();

Expand Down Expand Up @@ -111,6 +114,8 @@ class UpgraderSpace {

void runPartV2();

void runPartV3();

public:
// Source data path
std::string srcPath_;
Expand Down Expand Up @@ -159,6 +164,9 @@ class UpgraderSpace {
folly::UnboundedBlockingQueue<PartitionID> partQueue_;

std::atomic<size_t> unFinishedPart_;

std::mutex ingest_sst_file_mut_;
std::vector<std::string> ingest_sst_file_;
};

// Upgrade one data path in storage conf
Expand Down
17 changes: 9 additions & 8 deletions src/tools/db-upgrade/DbUpgraderTool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,12 @@ void printHelp() {
A list of meta severs' ip:port separated by comma.
Default: 127.0.0.1:45500
--upgrade_version=<1|2>
This tool can only upgrade 1.x data or 2.0 RC data.
When the value is 1, upgrade the data from 1.x to 2.0 GA.
When the value is 2, upgrade the data from 2.0 RC to 2.0 GA.
Default: 0
--upgrade_version=<1:2|2RC:2|2:3>
This tool can only upgrade 1.x data, 2.0 RC, or 2.0 GA data.
1:2 upgrade the data from 1.x to 2.0GA
2RC:2 upgrade the data from 2.0RC to 2.0GA
2:3 upgrade the data from 2.0GA to 3.0
Default: ""
optional:
--write_batch_num=<N>
Expand Down Expand Up @@ -164,9 +165,9 @@ int main(int argc, char* argv[]) {
CHECK_NOTNULL(schemaMan);
CHECK_NOTNULL(indexMan);

if (FLAGS_upgrade_version != 1 && FLAGS_upgrade_version != 2) {
LOG(ERROR) << "Flag upgrade_version : " << FLAGS_upgrade_version
<< " illegal, upgrade_version can only be 1 or 2";
std::vector<std::string> versions = {"1:2", "2RC:2", "2:3"};
if (std::find(versions.begin(), versions.end(), FLAGS_upgrade_version) == versions.end()) {
LOG(ERROR) << "Flag upgrade_version : " << FLAGS_upgrade_version;
return EXIT_FAILURE;
}
LOG(INFO) << "Prepare phase end";
Expand Down
26 changes: 26 additions & 0 deletions src/tools/db-upgrade/NebulaKeyUtilsV3.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

#include "tools/db-upgrade/NebulaKeyUtilsV3.h"

namespace nebula {
std::string NebulaKeyUtilsV3::partTagPrefix(PartitionID partId) {
PartitionID item = (partId << kPartitionOffset) | static_cast<uint32_t>(kTag_);
std::string key;
key.reserve(sizeof(PartitionID));
key.append(reinterpret_cast<const char*>(&item), sizeof(PartitionID));
return key;
}
std::string NebulaKeyUtilsV3::getVertexKey(folly::StringPiece tagKey) {
std::string key = tagKey.toString();
key[3] = static_cast<uint32_t>(kVertex);
key.resize(key.size() - sizeof(TagID));
return key;
}
std::string NebulaKeyUtilsV3::dataVersionValue() {
return "3.0";
}

} // namespace nebula
18 changes: 18 additions & 0 deletions src/tools/db-upgrade/NebulaKeyUtilsV3.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/
#pragma once
#include "common/utils/Types.h"
namespace nebula {
class NebulaKeyUtilsV3 {
public:
static std::string partTagPrefix(PartitionID partId);
static std::string getVertexKey(folly::StringPiece tagKey);
static std::string dataVersionValue();

private:
enum NebulaKeyTypeV3 : uint32_t { kTag_ = 0x00000001, kVertex = 0x00000007 };
};

} // namespace nebula

0 comments on commit a89e5b3

Please sign in to comment.