Skip to content

Commit

Permalink
Close toss, and some other minor enhancement (#5119)
Browse files Browse the repository at this point in the history
  • Loading branch information
critical27 committed Dec 28, 2022
1 parent a0cfb0b commit 6cb56c3
Show file tree
Hide file tree
Showing 20 changed files with 83 additions and 163 deletions.
3 changes: 0 additions & 3 deletions conf/nebula-graphd.conf.default
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,5 @@
# if use experimental features
--enable_experimental_feature=false

# if use toss feature, only work if enable_experimental_feature is true
--enable_toss=false

# if use balance data feature, only work if enable_experimental_feature is true
--enable_data_balance=true
3 changes: 0 additions & 3 deletions conf/nebula-graphd.conf.production
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,6 @@
# if use experimental features
--enable_experimental_feature=false

# if use toss feature, only work if enable_experimental_feature is true
--enable_toss=false

# if use balance data feature, only work if enable_experimental_feature is true
--enable_data_balance=true

Expand Down
3 changes: 0 additions & 3 deletions conf/nebula-standalone.conf.default
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,6 @@
# if use experimental features
--enable_experimental_feature=false

# if use toss feature, only work if enable_experimental_feature is true
--enable_toss=false

# if use balance data feature, only work if enable_experimental_feature is true
--enable_data_balance=true

Expand Down
8 changes: 4 additions & 4 deletions conf/nebula-storaged-listener.conf.production
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
# Whether to run as a daemon process
--daemonize=true
# The file to host the process id
--pid_file=pids_listener/nebula-storaged.pid
--pid_file=pids_listener/nebula-storaged-listener.pid
# Whether to use the configuration obtained from the configuration file
--local_config=true

########## logging ##########
# The directory to host logging files
--log_dir=logs_listener
--log_dir=logs_storage_listener
# Log level, 0, 1, 2, 3 for INFO, WARNING, ERROR, FATAL respectively
--minloglevel=0
# Verbose log level, 1, 2, 3, 4, the higher of the level, the more verbose of the logging
Expand All @@ -19,8 +19,8 @@
# Whether to redirect stdout and stderr to separate output files
--redirect_stdout=true
# Destination filename of stdout and stderr, which will also reside in log_dir.
--stdout_log_file=storaged-stdout.log
--stderr_log_file=storaged-stderr.log
--stdout_log_file=storaged-listener-stdout.log
--stderr_log_file=storaged-listener-stderr.log
# Copy log messages at or above this level to stderr in addition to logfiles. The numbers of severity levels INFO, WARNING, ERROR, and FATAL are 0, 1, 2, and 3, respectively.
--stderrthreshold=2
# Wether logging files' name contain timestamp.
Expand Down
12 changes: 2 additions & 10 deletions conf/nebula-storaged.conf.default
Original file line number Diff line number Diff line change
Expand Up @@ -95,16 +95,6 @@
# Whether or not to enable rocksdb's whole key bloom filter, disabled by default.
--enable_rocksdb_whole_key_filtering=false

############## Key-Value separation ##############
# Whether or not to enable BlobDB (RocksDB key-value separation support)
--rocksdb_enable_kv_separation=false
# RocksDB key value separation threshold in bytes. Values at or above this threshold will be written to blob files during flush or compaction.
--rocksdb_kv_separation_threshold=100
# Compression algorithm for blobs, options: no,snappy,lz4,lz4hc,zlib,bzip2,zstd
--rocksdb_blob_compression=lz4
# Whether to garbage collect blobs during compaction
--rocksdb_enable_blob_garbage_collection=true

############## rocksdb Options ##############
# rocksdb DBOptions in json, each name and value of option is a string, given as "option_name":"option_value" separated by comma
--rocksdb_db_options={}
Expand All @@ -114,6 +104,8 @@
--rocksdb_block_based_table_options={"block_size":"8192"}

############### misc ####################
# Whether turn on query in multiple thread
--query_concurrently=true
# Whether remove outdated space data
--auto_remove_invalid_space=true
# Network IO threads number
Expand Down
18 changes: 5 additions & 13 deletions conf/nebula-storaged.conf.production
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# Whether to run as a daemon process
--daemonize=true
# The file to host the process id
--pid_file=pids/nebula-storaged.pid
--pid_file=pids/nebula-storaged-listener.pid
# Whether to use the configuration obtained from the configuration file
--local_config=true

Expand All @@ -18,8 +18,8 @@
# Whether to redirect stdout and stderr to separate output files
--redirect_stdout=true
# Destination filename of stdout and stderr, which will also reside in log_dir.
--stdout_log_file=storaged-stdout.log
--stderr_log_file=storaged-stderr.log
--stdout_log_file=storaged-listener-stdout.log
--stderr_log_file=storaged-listener-stderr.log
# Copy log messages at or above this level to stderr in addition to logfiles. The numbers of severity levels INFO, WARNING, ERROR, and FATAL are 0, 1, 2, and 3, respectively.
--stderrthreshold=2
# Wether logging files' name contain timestamp.
Expand Down Expand Up @@ -104,17 +104,9 @@
# Whether or not to enable rocksdb's whole key bloom filter, disabled by default.
--enable_rocksdb_whole_key_filtering=false

############## Key-Value separation ##############
# Whether or not to enable BlobDB (RocksDB key-value separation support)
--rocksdb_enable_kv_separation=false
# RocksDB key value separation threshold in bytes. Values at or above this threshold will be written to blob files during flush or compaction.
--rocksdb_kv_separation_threshold=100
# Compression algorithm for blobs, options: no,snappy,lz4,lz4hc,zlib,bzip2,zstd
--rocksdb_blob_compression=lz4
# Whether to garbage collect blobs during compaction
--rocksdb_enable_blob_garbage_collection=true

############### misc ####################
# Whether turn on query in multiple thread
--query_concurrently=true
# Whether remove outdated space data
--auto_remove_invalid_space=true
# Network IO threads number
Expand Down
14 changes: 2 additions & 12 deletions src/graph/executor/admin/SpaceExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -256,19 +256,11 @@ folly::Future<Status> ShowCreateSpaceExecutor::execute() {
DataSet dataSet({"Space", "Create Space"});
Row row;
row.values.emplace_back(properties.get_space_name());
std::string sAtomicEdge{"false"};
if (properties.isolation_level_ref().has_value() &&
(*properties.isolation_level_ref() == meta::cpp2::IsolationLevel::TOSS)) {
sAtomicEdge = "true";
}
auto fmt = properties.comment_ref().has_value()
? "CREATE SPACE `%s` (partition_num = %d, replica_factor = %d, "
"charset = %s, collate = %s, vid_type = %s, atomic_edge = %s"
") ON %s"
" comment = '%s'"
"charset = %s, collate = %s, vid_type = %s) ON %s comment = '%s'"
: "CREATE SPACE `%s` (partition_num = %d, replica_factor = %d, "
"charset = %s, collate = %s, vid_type = %s, atomic_edge = %s"
") ON %s";
"charset = %s, collate = %s, vid_type = %s) ON %s";
auto zoneNames = folly::join(",", properties.get_zone_names());
if (properties.comment_ref().has_value()) {
row.values.emplace_back(
Expand All @@ -279,7 +271,6 @@ folly::Future<Status> ShowCreateSpaceExecutor::execute() {
properties.get_charset_name().c_str(),
properties.get_collate_name().c_str(),
SchemaUtil::typeToString(properties.get_vid_type()).c_str(),
sAtomicEdge.c_str(),
zoneNames.c_str(),
properties.comment_ref()->c_str()));
} else {
Expand All @@ -291,7 +282,6 @@ folly::Future<Status> ShowCreateSpaceExecutor::execute() {
properties.get_charset_name().c_str(),
properties.get_collate_name().c_str(),
SchemaUtil::typeToString(properties.get_vid_type()).c_str(),
sAtomicEdge.c_str(),
zoneNames.c_str()));
}
dataSet.rows.emplace_back(std::move(row));
Expand Down
2 changes: 1 addition & 1 deletion src/graph/service/GraphFlags.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ DEFINE_string(meta_server_addrs,
"",
"list of meta server addresses,"
"the format looks like ip1:port1, ip2:port2, ip3:port3");
DEFINE_bool(local_config, false, "meta client will not retrieve latest configuration from meta");
DEFINE_bool(local_config, true, "meta client will not retrieve latest configuration from meta");

DEFINE_string(default_charset, "utf8", "The default charset when a space is created");
DEFINE_string(default_collate, "utf8_bin", "The default collate when a space is created");
Expand Down
3 changes: 0 additions & 3 deletions src/parser/parser.yy
Original file line number Diff line number Diff line change
Expand Up @@ -3630,9 +3630,6 @@ space_opt_item
$$ = new SpaceOptItem(SpaceOptItem::VID_TYPE, *$3);
delete $3;
}
| KW_ATOMIC_EDGE ASSIGN BOOL {
$$ = new SpaceOptItem(SpaceOptItem::ATOMIC_EDGE, $3);
}
// TODO(YT) Create Spaces for different engines
// KW_ENGINE_TYPE ASSIGN name_label
;
Expand Down
4 changes: 2 additions & 2 deletions src/parser/test/ParserTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -283,14 +283,14 @@ TEST_F(ParserTest, SpaceOperation) {
"CREATE SPACE default_space(partition_num=9, replica_factor=3,"
"atomic_edge=true)";
auto result = parse(query);
EXPECT_TRUE(result.ok()) << result.status();
EXPECT_FALSE(result.ok()) << result.status();
}
{
std::string query =
"CREATE SPACE default_space(partition_num=9, replica_factor=3,"
"atomic_edge=FALSE)";
auto result = parse(query);
EXPECT_TRUE(result.ok()) << result.status();
EXPECT_FALSE(result.ok()) << result.status();
}
{
std::string query = "USE default_space";
Expand Down
50 changes: 3 additions & 47 deletions src/storage/StorageServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

#include <boost/filesystem.hpp>

#include "clients/storage/InternalStorageClient.h"
#include "common/hdfs/HdfsCommandHelper.h"
#include "common/memory/MemoryUtils.h"
#include "common/meta/ServerBasedIndexManager.h"
Expand All @@ -25,7 +24,6 @@
#include "storage/CompactionFilter.h"
#include "storage/GraphStorageLocalServer.h"
#include "storage/GraphStorageServiceHandler.h"
#include "storage/InternalStorageServiceHandler.h"
#include "storage/StorageAdminServiceHandler.h"
#include "storage/StorageFlags.h"
#include "storage/http/StorageHttpAdminHandler.h"
Expand All @@ -39,7 +37,7 @@
#ifndef BUILD_STANDALONE
DEFINE_int32(port, 44500, "Storage daemon listening port");
DEFINE_int32(num_worker_threads, 32, "Number of workers");
DEFINE_bool(local_config, false, "meta client will not retrieve latest configuration from meta");
DEFINE_bool(local_config, true, "meta client will not retrieve latest configuration from meta");
#else
DEFINE_int32(storage_port, 44501, "Storage daemon listening port");
DEFINE_int32(storage_num_worker_threads, 32, "Number of workers");
Expand Down Expand Up @@ -265,22 +263,12 @@ bool StorageServer::start() {
return false;
}

interClient_ = std::make_unique<InternalStorageClient>(ioThreadPool_, metaClient_.get());

env_ = std::make_unique<storage::StorageEnv>();
env_->kvstore_ = kvstore_.get();
env_->indexMan_ = indexMan_.get();
env_->schemaMan_ = schemaMan_.get();
env_->rebuildIndexGuard_ = std::make_unique<IndexGuard>();
env_->metaClient_ = metaClient_.get();
env_->interClient_ = interClient_.get();

txnMan_ = std::make_unique<TransactionManager>(env_.get());
if (!txnMan_->start()) {
LOG(ERROR) << "Start transaction manager failed!";
return false;
}
env_->txnMan_ = txnMan_.get();

env_->verticesML_ = std::make_unique<VerticesMemLock>();
env_->edgesML_ = std::make_unique<EdgesMemLock>();
Expand All @@ -299,8 +287,7 @@ bool StorageServer::start() {

storageServer_ = getStorageServer();
adminServer_ = getAdminServer();
internalStorageServer_ = getInternalServer();
if (!storageServer_ || !adminServer_ || !internalStorageServer_) {
if (!storageServer_ || !adminServer_) {
return false;
}

Expand Down Expand Up @@ -342,28 +329,21 @@ void StorageServer::stop() {
// Stop http service
webSvc_.reset();

// Stop all thrift server: raft/storage/admin/internal
// Stop all thrift server: raft/storage/admin
if (kvstore_) {
// stop kvstore background job and raft services
kvstore_->stop();
}
if (adminServer_) {
adminServer_->cleanUp();
}
if (internalStorageServer_) {
internalStorageServer_->cleanUp();
}
if (storageServer_) {
#ifndef BUILD_STANDALONE
storageServer_->cleanUp();
#endif
}

// Stop all interface related to kvstore
if (txnMan_) {
txnMan_->stop();
txnMan_->join();
}
if (taskMgr_) {
taskMgr_->shutdown();
}
Expand Down Expand Up @@ -434,29 +414,5 @@ std::unique_ptr<apache::thrift::ThriftServer> StorageServer::getAdminServer() {
}
}

std::unique_ptr<apache::thrift::ThriftServer> StorageServer::getInternalServer() {
try {
auto handler = std::make_shared<InternalStorageServiceHandler>(env_.get());
auto internalAddr = Utils::getInternalAddrFromStoreAddr(localHost_);
auto server = std::make_unique<apache::thrift::ThriftServer>();
server->setPort(internalAddr.port);
server->setIdleTimeout(std::chrono::seconds(0));
server->setIOThreadPool(ioThreadPool_);
server->setThreadManager(workers_);
if (FLAGS_enable_ssl) {
server->setSSLConfig(nebula::sslContextConfig());
}
server->setInterface(std::move(handler));
server->setup();
return server;
} catch (const std::exception& e) {
LOG(ERROR) << "Start internal storage server failed: " << e.what();
return nullptr;
} catch (...) {
LOG(ERROR) << "Start internal storage server failed";
return nullptr;
}
}

} // namespace storage
} // namespace nebula
4 changes: 0 additions & 4 deletions src/storage/StorageServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ class StorageServer final {
std::shared_ptr<GraphStorageLocalServer> storageServer_;
#endif
std::unique_ptr<apache::thrift::ThriftServer> adminServer_;
std::unique_ptr<apache::thrift::ThriftServer> internalStorageServer_;

std::unique_ptr<nebula::WebService> webSvc_;
std::unique_ptr<meta::MetaClient> metaClient_;
Expand All @@ -119,9 +118,6 @@ class StorageServer final {
std::string listenerPath_;

AdminTaskManager* taskMgr_{nullptr};
std::unique_ptr<TransactionManager> txnMan_{nullptr};
// used for communicate between one storaged to another
std::unique_ptr<InternalStorageClient> interClient_;

std::unique_ptr<LogMonitor> logMonitor_;

Expand Down
1 change: 1 addition & 0 deletions src/storage/test/StorageHttpPropertyHandlerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ TEST(StorageHttpPropertyHandlerTest, ValidRequest) {
])";
EXPECT_EQ(expect, request("/rocksdb_property?space=1&property=rocksdb.is-write-stopped"));
}
{ EXPECT_TRUE(!request("/rocksdb_property?space=1").empty()); }
}

} // namespace storage
Expand Down
7 changes: 7 additions & 0 deletions src/storage/transaction/ChainAddEdgesGroupProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ namespace nebula {
namespace storage {

void ChainAddEdgesGroupProcessor::process(const cpp2::AddEdgesRequest& req) {
// toss is turned off
for (const auto& partEntry : req.get_parts()) {
pushResultCode(nebula::cpp2::ErrorCode::E_UNSUPPORTED, partEntry.first);
}
onFinished();
/*
auto space = req.get_space_id();
ShuffledReq shuffledReq;
shuffleRequest(req, shuffledReq);
Expand All @@ -35,6 +41,7 @@ void ChainAddEdgesGroupProcessor::process(const cpp2::AddEdgesRequest& req) {
};
std::for_each(shuffledReq.begin(), shuffledReq.end(), delegateProcess);
*/
}

void ChainAddEdgesGroupProcessor::shuffleRequest(const cpp2::AddEdgesRequest& req,
Expand Down
7 changes: 7 additions & 0 deletions src/storage/transaction/ChainDeleteEdgesGroupProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ using ChainID = std::pair<PartitionID, PartitionID>;
using SplitedRequest = std::unordered_map<ChainID, cpp2::DeleteEdgesRequest>;

void ChainDeleteEdgesGroupProcessor::process(const cpp2::DeleteEdgesRequest& req) {
// toss is turned off
for (const auto& partEntry : req.get_parts()) {
pushResultCode(nebula::cpp2::ErrorCode::E_UNSUPPORTED, partEntry.first);
}
onFinished();
/*
auto spaceId = req.get_space_id();
auto localPartId = req.get_parts().begin()->first;
auto stSplitRequest = splitRequest(req);
Expand All @@ -41,6 +47,7 @@ void ChainDeleteEdgesGroupProcessor::process(const cpp2::DeleteEdgesRequest& req
};
std::for_each(splitedRequest.begin(), splitedRequest.end(), fnSplit);
*/
}

StatusOr<SplitedRequest> ChainDeleteEdgesGroupProcessor::splitRequest(
Expand Down
5 changes: 5 additions & 0 deletions src/storage/transaction/ChainUpdateEdgeLocalProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,17 @@ namespace nebula {
namespace storage {

void ChainUpdateEdgeLocalProcessor::process(const cpp2::UpdateEdgeRequest& req) {
// toss is turned off
pushResultCode(Code::E_UNSUPPORTED, req.get_part_id());
onFinished();
/*
if (!prepareRequest(req)) {
pushResultCode(rcPrepare_, localPartId_);
onFinished();
}
env_->txnMan_->addChainTask(this);
*/
}

bool ChainUpdateEdgeLocalProcessor::prepareRequest(const cpp2::UpdateEdgeRequest& req) {
Expand Down
Loading

0 comments on commit 6cb56c3

Please sign in to comment.