Skip to content

Commit

Permalink
Add timeout for storage/meta clients and add some error logs (#1399)
Browse files Browse the repository at this point in the history
* Add timeout for storage/meta clients and add some eror logs

* address zlcook's comments
  • Loading branch information
dangleptr committed Dec 9, 2019
1 parent fa0cdf1 commit 628d069
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 10 deletions.
15 changes: 11 additions & 4 deletions src/common/filter/Expressions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -664,11 +664,18 @@ OptVariantType UUIDExpression::eval() const {
auto client = context_->storageClient();
auto space = context_->space();
auto uuidResult = client->getUUID(space, *field_).get();
if (!uuidResult.ok() ||
!uuidResult.value().get_result().get_failed_codes().empty()) {
return OptVariantType(Status::Error("Get UUID Failed"));
if (!uuidResult.ok()) {
LOG(ERROR) << "Get UUID failed for " << toString() << ", status " << uuidResult.status();
return OptVariantType(Status::Error("Get UUID Failed"));
}
return uuidResult.value().get_id();
auto v = std::move(uuidResult).value();
for (auto& rc : v.get_result().get_failed_codes()) {
LOG(ERROR) << "Get UUID failed, error " << static_cast<int32_t>(rc.get_code())
<< ", part " << rc.get_part_id() << ", str id " << toString();
return OptVariantType(Status::Error("Get UUID Failed"));
}
VLOG(3) << "Get UUID from " << *field_ << " to " << v.get_id();
return v.get_id();
}

Status UUIDExpression::prepare() {
Expand Down
8 changes: 7 additions & 1 deletion src/graph/InsertEdgeExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,8 @@ void InsertEdgeExecutor::execute() {

auto result = prepareEdges();
if (!result.ok()) {
doError(std::move(status), ectx()->getGraphStats()->getInsertEdgeStats());
LOG(ERROR) << "Insert edge failed, error " << result.status();
doError(result.status(), ectx()->getGraphStats()->getInsertEdgeStats());
return;
}

Expand All @@ -233,6 +234,11 @@ void InsertEdgeExecutor::execute() {
// For insertion, we regard partial success as failure.
auto completeness = resp.completeness();
if (completeness != 100) {
const auto& failedCodes = resp.failedParts();
for (auto it = failedCodes.begin(); it != failedCodes.end(); it++) {
LOG(ERROR) << "Insert edge failed, error " << static_cast<int32_t>(it->second)
<< ", part " << it->first;
}
doError(Status::Error("Internal Error"), ectx()->getGraphStats()->getInsertEdgeStats());
return;
}
Expand Down
10 changes: 8 additions & 2 deletions src/graph/InsertVertexExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ Status InsertVertexExecutor::check() {
auto props = item->properties();
if (props.size() > schema->getNumFields()) {
LOG(ERROR) << "Input props number " << props.size()
<< ", schema fields number " << schema->getNumFields();
<< ", schema fields number " << schema->getNumFields();
return Status::Error("Wrong number of props");
}

Expand Down Expand Up @@ -213,7 +213,8 @@ void InsertVertexExecutor::execute() {

auto result = prepareVertices();
if (!result.ok()) {
doError(std::move(status), ectx()->getGraphStats()->getInsertVertexStats());
LOG(ERROR) << "Insert vertices failed, error " << result.status().toString();
doError(result.status(), ectx()->getGraphStats()->getInsertVertexStats());
return;
}
auto future = ectx()->getStorageClient()->addVertices(spaceId_,
Expand All @@ -225,6 +226,11 @@ void InsertVertexExecutor::execute() {
// For insertion, we regard partial success as failure.
auto completeness = resp.completeness();
if (completeness != 100) {
const auto& failedCodes = resp.failedParts();
for (auto it = failedCodes.begin(); it != failedCodes.end(); it++) {
LOG(ERROR) << "Insert vertices failed, error " << static_cast<int32_t>(it->second)
<< ", part " << it->first;
}
doError(Status::Error("Internal Error"),
ectx()->getGraphStats()->getInsertVertexStats());
return;
Expand Down
4 changes: 3 additions & 1 deletion src/meta/client/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ DEFINE_int32(load_data_interval_secs, 1, "Load data interval");
DEFINE_int32(heartbeat_interval_secs, 10, "Heartbeat interval");
DEFINE_int32(meta_client_retry_times, 3, "meta client retry times, 0 means no retry");
DEFINE_int32(meta_client_retry_interval_secs, 1, "meta client sleep interval between retry");
DEFINE_int32(meta_client_timeout_ms, 60 * 1000, "meta client timeout");
DEFINE_string(cluster_id_path, "cluster.id", "file path saved clusterId");
DECLARE_string(gflags_mode_json);


namespace nebula {
namespace meta {

Expand Down Expand Up @@ -314,7 +316,7 @@ void MetaClient::getResponse(Request req,
folly::via(evb, [host, evb, req = std::move(req), remoteFunc = std::move(remoteFunc),
respGen = std::move(respGen), pro = std::move(pro),
toLeader, retry, retryLimit, duration, this] () mutable {
auto client = clientsMan_->client(host, evb);
auto client = clientsMan_->client(host, evb, false, FLAGS_meta_client_timeout_ms);
VLOG(1) << "Send request to meta " << host;
remoteFunc(client, req).via(evb)
.then([host, req = std::move(req), remoteFunc = std::move(remoteFunc),
Expand Down
3 changes: 3 additions & 0 deletions src/storage/client/StorageClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
#define ID_HASH(id, numShards) \
((static_cast<uint64_t>(id)) % numShards + 1)


DEFINE_int32(storage_client_timeout_ms, 60 * 1000, "storage client timeout");

namespace nebula {
namespace storage {

Expand Down
6 changes: 4 additions & 2 deletions src/storage/client/StorageClient.inl
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
#include "time/Duration.h"
#include <folly/Try.h>

DECLARE_int32(storage_client_timeout_ms);

namespace nebula {
namespace storage {

Expand Down Expand Up @@ -93,7 +95,7 @@ folly::SemiFuture<StorageRpcResponse<Response>> StorageClient::collectResponse(
DCHECK(res.second);
// Invoke the remote method
folly::via(evb, [this, evb, context, host, spaceId, res, duration] () mutable {
auto client = clientsMan_->client(host, evb);
auto client = clientsMan_->client(host, evb, false, FLAGS_storage_client_timeout_ms);
// Result is a pair of <Request&, bool>
context->serverMethod(client.get(), *res.first)
// Future process code will be executed on the IO thread
Expand Down Expand Up @@ -185,7 +187,7 @@ folly::Future<StatusOr<Response>> StorageClient::getResponse(
folly::via(evb, [evb, request = std::move(request), remoteFunc = std::move(remoteFunc),
pro = std::move(pro), duration, this] () mutable {
auto host = request.first;
auto client = clientsMan_->client(host, evb);
auto client = clientsMan_->client(host, evb, false, FLAGS_storage_client_timeout_ms);
auto spaceId = request.second.get_space_id();
auto partId = request.second.get_part_id();
LOG(INFO) << "Send request to storage " << host;
Expand Down

0 comments on commit 628d069

Please sign in to comment.