diff --git a/client/src/main/java/com/vesoft/nebula/client/graph/SessionPool.java b/client/src/main/java/com/vesoft/nebula/client/graph/SessionPool.java index 803dada07..d9833eaf6 100644 --- a/client/src/main/java/com/vesoft/nebula/client/graph/SessionPool.java +++ b/client/src/main/java/com/vesoft/nebula/client/graph/SessionPool.java @@ -55,6 +55,7 @@ public class SessionPool implements Serializable { private final int healthCheckTime; private final int retryTimes; private final int intervalTime; + private final boolean reconnect; private final String spaceName; private final String useSpace; @@ -66,6 +67,7 @@ public SessionPool(SessionPoolConfig poolConfig) { this.cleanTime = poolConfig.getCleanTime(); this.retryTimes = poolConfig.getRetryTimes(); this.intervalTime = poolConfig.getIntervalTime(); + this.reconnect = poolConfig.isReconnect(); this.healthCheckTime = poolConfig.getHealthCheckTime(); this.spaceName = poolConfig.getSpaceName(); useSpace = "USE `" + spaceName + "`;"; @@ -146,7 +148,7 @@ public ResultSet execute(String stmt) throws IOErrorException, NebulaSession nebulaSession = null; ResultSet resultSet = null; int tryTimes = 0; - while (tryTimes++ < retryTimes) { + while (tryTimes++ <= retryTimes) { try { nebulaSession = getSession(); resultSet = nebulaSession.execute(stmt); @@ -160,6 +162,11 @@ public ResultSet execute(String stmt) throws IOErrorException, resultSet.getErrorCode(), resultSet.getErrorMessage(), tryTimes)); nebulaSession.release(); sessionList.remove(nebulaSession); + try { + Thread.sleep(intervalTime); + } catch (InterruptedException interruptedException) { + // ignore + } } catch (ClientServerIncompatibleException e) { // will never get here. } catch (AuthFailedException | BindSpaceFailedException e) { @@ -330,11 +337,25 @@ private void updateSessionQueue() { * @param state {@link SessionState} * @return NebulaSession */ - private synchronized NebulaSession createSessionObject(SessionState state) + private NebulaSession createSessionObject(SessionState state) throws ClientServerIncompatibleException, AuthFailedException, IOErrorException, BindSpaceFailedException { SyncConnection connection = new SyncConnection(); - connection.open(getAddress(), sessionPoolConfig.getTimeout()); + int tryConnect = sessionPoolConfig.getGraphAddressList().size(); + // reconnect with all available address + while (tryConnect-- > 0) { + try { + connection.open(getAddress(), sessionPoolConfig.getTimeout()); + break; + } catch (Exception e) { + if (tryConnect == 0 || !reconnect) { + throw e; + } else { + log.warn("connect failed, " + e.getMessage()); + } + } + } + AuthResult authResult; try { authResult = connection.authenticate(sessionPoolConfig.getUsername(), diff --git a/client/src/main/java/com/vesoft/nebula/client/graph/SessionPoolConfig.java b/client/src/main/java/com/vesoft/nebula/client/graph/SessionPoolConfig.java index fc41030c7..10346fa7f 100644 --- a/client/src/main/java/com/vesoft/nebula/client/graph/SessionPoolConfig.java +++ b/client/src/main/java/com/vesoft/nebula/client/graph/SessionPoolConfig.java @@ -44,6 +44,9 @@ public class SessionPoolConfig implements Serializable { // interval time for retry, unit ms private int intervalTime = 0; + // whether reconnect when create session using a broken graphd server + private boolean reconnect = false; + public SessionPoolConfig(List addresses, String spaceName, @@ -180,6 +183,14 @@ public SessionPoolConfig setIntervalTime(int intervalTime) { return this; } + public boolean isReconnect() { + return reconnect; + } + + public SessionPoolConfig setReconnect(boolean reconnect) { + this.reconnect = reconnect; + return this; + } @Override public String toString() { @@ -195,6 +206,7 @@ public String toString() { + ", waitTime=" + waitTime + ", retryTimes=" + retryTimes + ", intervalTIme=" + intervalTime + + ", reconnect=" + reconnect + '}'; } }