Skip to content

Commit

Permalink
fix handling with IO error (#123)
Browse files Browse the repository at this point in the history
  • Loading branch information
songqing committed Aug 24, 2023
1 parent 5c7cecc commit 1af0ff7
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 33 deletions.
4 changes: 2 additions & 2 deletions include/nebula/client/SessionPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ struct SessionPoolConfig {
std::string password_;
std::vector<std::string> addrs_; // the list of graph addresses
std::string spaceName_;
// Socket timeout and Socket connection timeout, unit: seconds
// Socket timeout and Socket connection timeout, unit: milliseconds
std::uint32_t timeout_{0};
// The idleTime of the connection, unit: seconds
// The idleTime of the connection, unit: milliseconds
// If connection's idle time is longer than idleTime, it will be delete
// 0 value means the connection will not expire
std::uint32_t idleTime_{0};
Expand Down
44 changes: 19 additions & 25 deletions src/client/Connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,23 +143,7 @@ AuthResponse Connection::authenticate(const std::string &user, const std::string
}

ExecutionResponse Connection::execute(int64_t sessionId, const std::string &stmt) {
if (client_ == nullptr) {
return ExecutionResponse{ErrorCode::E_DISCONNECTED,
0,
nullptr,
nullptr,
std::make_unique<std::string>("Not open connection.")};
}

ExecutionResponse resp;
try {
resp = client_->future_execute(sessionId, stmt).get();
} catch (const std::exception &ex) {
resp = ExecutionResponse{
ErrorCode::E_RPC_FAILURE, 0, nullptr, nullptr, std::make_unique<std::string>(ex.what())};
}

return resp;
return executeWithParameter(sessionId, stmt, {});
}

void Connection::asyncExecute(int64_t sessionId, const std::string &stmt, ExecuteCallback cb) {
Expand Down Expand Up @@ -188,15 +172,26 @@ ExecutionResponse Connection::executeWithParameter(
std::make_unique<std::string>("Not open connection.")};
}

using TTransportException = apache::thrift::transport::TTransportException;
ExecutionResponse resp;
try {
resp = client_->future_executeWithParameter(sessionId, stmt, parameters).get();
} catch (const apache::thrift::transport::TTransportException &ex) {
resp = ExecutionResponse{ErrorCode::E_FAIL_TO_CONNECT,
0,
nullptr,
nullptr,
std::make_unique<std::string>(ex.what())};
} catch (const TTransportException &ex) {
auto errType = ex.getType();
std::string errMsg = ex.what();
if (errType == TTransportException::END_OF_FILE ||
(errType == TTransportException::INTERNAL_ERROR &&
errMsg.find("Connection reset by peer") != std::string::npos)) {
resp = ExecutionResponse{
ErrorCode::E_FAIL_TO_CONNECT, 0, nullptr, nullptr, std::make_unique<std::string>(errMsg)};
} else if (errType == TTransportException::TIMED_OUT) {
resp = ExecutionResponse{
ErrorCode::E_SESSION_TIMEOUT, 0, nullptr, nullptr, std::make_unique<std::string>(errMsg)};

} else {
resp = ExecutionResponse{
ErrorCode::E_RPC_FAILURE, 0, nullptr, nullptr, std::make_unique<std::string>(errMsg)};
}
} catch (const std::exception &ex) {
resp = ExecutionResponse{
ErrorCode::E_RPC_FAILURE, 0, nullptr, nullptr, std::make_unique<std::string>(ex.what())};
Expand Down Expand Up @@ -296,8 +291,7 @@ void Connection::close() {

bool Connection::ping() {
auto resp = execute(0 /*Only check connection*/, "YIELD 1");
if (resp.errorCode == ErrorCode::E_RPC_FAILURE ||
resp.errorCode == ErrorCode::E_FAIL_TO_CONNECT ||
if (resp.errorCode == ErrorCode::E_FAIL_TO_CONNECT ||
resp.errorCode == ErrorCode::E_DISCONNECTED) {
DLOG(ERROR) << "Ping failed: " << *resp.errorMsg;
return false;
Expand Down
4 changes: 1 addition & 3 deletions src/client/tests/ConnectionTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,7 @@ TEST_F(ConnectionTest, Timeout) {
// execute
resp = c.execute(*authResp.sessionId,
"use conn_test;GO 100000 STEPS FROM 'Tim Duncan' OVER like YIELD like._dst;");
ASSERT_TRUE(resp.errorCode == nebula::ErrorCode::E_RPC_FAILURE ||
resp.errorCode == nebula::ErrorCode::E_FAIL_TO_CONNECT)
<< *resp.errorMsg;
ASSERT_TRUE(resp.errorCode == nebula::ErrorCode::E_SESSION_TIMEOUT) << *resp.errorMsg;

resp =
c.execute(*authResp.sessionId,
Expand Down
4 changes: 1 addition & 3 deletions src/client/tests/SessionTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -212,9 +212,7 @@ TEST_F(SessionTest, Timeout) {
// execute
resp = session.execute(
"use session_test;GO 100000 STEPS FROM 'Tim Duncan' OVER like YIELD like._dst;");
ASSERT_TRUE(resp.errorCode == nebula::ErrorCode::E_FAIL_TO_CONNECT ||
resp.errorCode == nebula::ErrorCode::E_RPC_FAILURE)
<< *resp.errorMsg;
ASSERT_TRUE(resp.errorCode == nebula::ErrorCode::E_SESSION_TIMEOUT) << *resp.errorMsg;

resp = session.execute(
"SHOW QUERIES "
Expand Down

0 comments on commit 1af0ff7

Please sign in to comment.