Skip to content

Commit

Permalink
Apply connection timeout for each connect attempt
Browse files Browse the repository at this point in the history
Relax a socket provider contract. Now socket provider can throw a
transient error and the client will try to obtain a socket again instead
of being closed.

Make built-in socket providers configurable. Now the client can set
retries count and connection timeout for providers.

Update README doc im scope of new socket provider contract.

Closes: #167
Follows on: #144
  • Loading branch information
nicktorwald committed May 14, 2019
1 parent 3420575 commit fe6bdbb
Show file tree
Hide file tree
Showing 12 changed files with 246 additions and 145 deletions.
104 changes: 62 additions & 42 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ To get the Java connector for Tarantool 1.6.9, visit

## Getting started

1. Add a dependency to your `pom.xml` file.
1. Add a dependency to your `pom.xml` file:

```xml
<dependency>
Expand All @@ -32,75 +32,81 @@ To get the Java connector for Tarantool 1.6.9, visit
</dependency>
```

2. Configure `TarantoolClientConfig`.
2. Configure `TarantoolClientConfig`:

```java
TarantoolClientConfig config = new TarantoolClientConfig();
config.username = "test";
config.password = "test";
```

3. Implement your `SocketChannelProvider`.
It should return a connected `SocketChannel`.
3. Create a client:

```java
SocketChannelProvider socketChannelProvider = new SocketChannelProvider() {
@Override
public SocketChannel get(int retryNumber, Throwable lastError) {
if (lastError != null) {
lastError.printStackTrace(System.out);
}
try {
return SocketChannel.open(new InetSocketAddress("localhost", 3301));
} catch (IOException e) {
throw new IllegalStateException(e);
}
}
};
TarantoolClient client = new TarantoolClientImpl("host:3301", config);
```

Here you could also implement some reconnection or fallback policy.
Remember that `TarantoolClient` adopts a
[fail-fast](https://en.wikipedia.org/wiki/Fail-fast) policy
when a client is not connected.
using `TarantoolClientImpl(String, TarantoolClientConfig)` is equivalent to:

The `TarantoolClient` will stop functioning if your implementation of a socket
channel provider raises an exception or returns a null. You will need a new
instance of client to recover. Hence, you should only throw in case you have
met unrecoverable error.
```java
SocketChannelProvider socketChannelProvider = new SingleSocketChannelProviderImpl("host:3301")
TarantoolClient client = new TarantoolClientImpl(socketChannelProvider, config);
```

Below is an example of `SocketChannelProvider` implementation that handles short
tarantool restarts.
You could implement your own `SocketChannelProvider`. It should return
a connected `SocketChannel`. Feel free to implement `get(int retryNumber, Throwable lastError)`
using your appropriate strategy to obtain the channel. The strategy can take into
account current attempt number (retryNumber) and the last transient error occurred on
the previous attempt.

The `TarantoolClient` will be closed if your implementation of a socket
channel provider raises any exceptions but not a `SocketProviderTransientException`.
Latter is handled by the client as a recoverable error. Otherwise, you will
need a new instance of client to recover. Hence, you should only throw an
error different to `SocketProviderTransientException` in case you have met
unrecoverable error.

Below is an example of `SocketChannelProvider` implementation that tries
to connect no more than 3 times, two seconds for each attempt at max.

```java
SocketChannelProvider socketChannelProvider = new SocketChannelProvider() {
@Override
public SocketChannel get(int retryNumber, Throwable lastError) {
long deadline = System.currentTimeMillis() + RESTART_TIMEOUT;
while (!Thread.currentThread().isInterrupted()) {
try {
return SocketChannel.open(new InetSocketAddress("localhost", 3301));
} catch (IOException e) {
if (deadline < System.currentTimeMillis())
throw new RuntimeException(e);
try {
Thread.sleep(100);
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
}
if (retryNumber > 3) {
throw new RuntimeException("Too many attempts");
}
SocketChannel channel = null;
try {
channel = SocketChannel.open();
channel.socket().connect(new InetSocketAddress("localhost", 3301), 2000);
return channel;
} catch (IOException e) {
if (channel != null) {
try {
channel.close();
} catch (IOException ignored) { }
}
throw new SocketProviderTransientException("Couldn't connect to server", e);
}
throw new RuntimeException(new TimeoutException("Connect timed out."));
}
};
```

4. Create a client.
Same behaviour can be achieved using built-in `SingleSocketChannelProviderImpl`:

```java
TarantoolClientConfig config = new TarantoolClientConfig();
config.connectionTimeout = 2_000; // two seconds timeout per attempt
config.retryCount = 3; // three attempts at max

SocketChannelProvider socketChannelProvider = new SingleSocketChannelProviderImpl("localhost:3301")
TarantoolClient client = new TarantoolClientImpl(socketChannelProvider, config);
```

`SingleSocketChannelProviderImpl` implements `ConfigurableSocketChannelProvider` that
makes possible for the client to configure a socket provider.

> **Notes:**
> * `TarantoolClient` is thread-safe and asynchronous, so you should use one
> client inside the whole application.
Expand Down Expand Up @@ -168,6 +174,20 @@ a list of nodes which will be used by the cluster client to provide such
ability. Also you can prefer to use a [discovery mechanism](#auto-discovery)
in order to dynamically fetch and apply the node list.

### The RoundRobinSocketProviderImpl class

This cluster-aware provider uses addresses pool to connect to DB server.
The provider picks up next address in order the addresses were passed.

Similar to `SingleSocketChannelProviderImpl` this RR provider also
relies on two options from the config: `TarantoolClientConfig.connectionTimeout`
and `TarantoolClientConfig.retryCount` but in a bit different way.
The latter option says how many times the provider should try to establish a
connection to _one instance_ before failing an attempt. The provider requires
positive retry count to work properly. The socket timeout is used to limit
an interval between connections attempts per instance. In other words, the provider
follows a pattern _connection must succeed after N attempts with M interval between them_.

### Basic cluster client usage

1. Configure `TarantoolClusterClientConfig`:
Expand Down Expand Up @@ -198,7 +218,7 @@ client.syncOps().insert(23, Arrays.asList(1, 1));
Auto-discovery feature allows a cluster client to fetch addresses of
cluster nodes to reflect changes related to the cluster topology. To achieve
this you have to create a Lua function on the server side which returns
a single array result. Client periodically pools the server to obtain a
a single array result. Client periodically polls the server to obtain a
fresh list and apply it if its content changes.

1. On the server side create a function which returns nodes:
Expand Down
88 changes: 29 additions & 59 deletions src/main/java/org/tarantool/BaseSocketChannelProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;

public abstract class BaseSocketChannelProvider implements SocketChannelProvider {
public abstract class BaseSocketChannelProvider implements ConfigurableSocketChannelProvider {

/**
* Limit of retries.
Expand All @@ -14,46 +14,25 @@ public abstract class BaseSocketChannelProvider implements SocketChannelProvider
/**
* Timeout to establish socket connection with an individual server.
*/
private int timeout = NO_TIMEOUT;
private int connectionTimeout = NO_TIMEOUT;

/**
* Tries to establish a new connection to the Tarantool instances.
*
* @param retryNumber number of current retry. Reset after successful connect.
* @param retryNumber number of current retry
* @param lastError the last error occurs when reconnecting
*
* @return connected socket channel
*
* @throws CommunicationException if any I/O errors happen or there are
* no addresses available
* @throws CommunicationException if number of retries or socket timeout are exceeded
* @throws SocketProviderTransientException if any I/O errors happen
*/
@Override
public final SocketChannel get(int retryNumber, Throwable lastError) {
if (areRetriesExhausted(retryNumber)) {
throw new CommunicationException("Connection retries exceeded.", lastError);
}

long deadline = System.currentTimeMillis() + timeout;
while (!Thread.currentThread().isInterrupted()) {
try {
InetSocketAddress address = getAddress(retryNumber, lastError);
return openChannel(address);
} catch (IOException e) {
checkTimeout(deadline, e);
}
}
throw new CommunicationException("Thread interrupted.", new InterruptedException());
}

private void checkTimeout(long deadline, Exception e) {
long timeLeft = deadline - System.currentTimeMillis();
if (timeLeft <= 0) {
throw new CommunicationException("Connection time out.", e);
}
try {
Thread.sleep(timeLeft / 10);
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
return makeAttempt(retryNumber, lastError);
} catch (IOException e) {
throw new SocketProviderTransientException("Couldn't connect to the server", e);
}
}

Expand All @@ -68,7 +47,7 @@ private void checkTimeout(long deadline, Exception e) {
*
* @throws IOException if any I/O errors occur
*/
protected abstract InetSocketAddress getAddress(int retryNumber, Throwable lastError) throws IOException;
protected abstract SocketChannel makeAttempt(int retryNumber, Throwable lastError) throws IOException;

/**
* Sets maximum amount of reconnect attempts to be made before an exception is raised.
Expand All @@ -79,7 +58,11 @@ private void checkTimeout(long deadline, Exception e) {
*
* @param retriesLimit Limit of retries to use.
*/
@Override
public void setRetriesLimit(int retriesLimit) {
if (retriesLimit < 0) {
throw new IllegalArgumentException("Retries count cannot be negative.");
}
this.retriesLimit = retriesLimit;
}

Expand Down Expand Up @@ -111,7 +94,7 @@ protected SocketChannel openChannel(InetSocketAddress socketAddress) throws IOEx
SocketChannel channel = null;
try {
channel = SocketChannel.open();
channel.socket().connect(socketAddress, timeout);
channel.socket().connect(socketAddress, connectionTimeout);
return channel;
} catch (IOException e) {
if (channel != null) {
Expand All @@ -126,44 +109,31 @@ protected SocketChannel openChannel(InetSocketAddress socketAddress) throws IOEx
}

/**
* Sets maximum amount of time to wait for a socket connection establishment
* with an individual server.
* <p>
* Zero means infinite timeout.
*
* @param timeout timeout value, ms.
*
* @throws IllegalArgumentException if timeout is negative.
*/
public void setTimeout(int timeout) {
if (timeout < 0) {
throw new IllegalArgumentException("timeout is negative.");
}
this.timeout = timeout;
}

/**
* Gest maximum amount of time to wait for a socket
* Gets maximum amount of time to wait for a socket
* connection establishment with an individual server.
*
* @return timeout
*/
public int getTimeout() {
return timeout;
public int getConnectionTimeout() {
return connectionTimeout;
}

/**
* Provides a decision on whether retries limit is hit.
* Sets maximum amount of time to wait for a socket connection establishment
* with an individual server.
* <p>
* Zero means infinite connectionTimeout.
*
* @param retries Current count of retries.
* @param connectionTimeout connectionTimeout value, ms.
*
* @return {@code true} if retries are exhausted.
* @throws IllegalArgumentException if connectionTimeout is negative.
*/
private boolean areRetriesExhausted(int retries) {
int limit = getRetriesLimit();
if (limit < 0) {
return false;
@Override
public void setConnectionTimeout(int connectionTimeout) {
if (connectionTimeout < 0) {
throw new IllegalArgumentException("Connection timeout cannot be negative.");
}
return retries >= limit;
this.connectionTimeout = connectionTimeout;
}

}
23 changes: 23 additions & 0 deletions src/main/java/org/tarantool/ConfigurableSocketChannelProvider.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package org.tarantool;

public interface ConfigurableSocketChannelProvider extends SocketChannelProvider {

int RETRY_NO_LIMIT = 0;
int NO_TIMEOUT = 0;

/**
* Configures max count of retries.
*
* @param limit max attempts count
*/
void setRetriesLimit(int limit);

/**
* Configures max time to establish
* a connection per attempt.
*
* @param timeout connection timeout in millis
*/
void setConnectionTimeout(int timeout);

}
29 changes: 27 additions & 2 deletions src/main/java/org/tarantool/RoundRobinSocketProviderImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand All @@ -21,6 +22,7 @@
public class RoundRobinSocketProviderImpl extends BaseSocketChannelProvider implements RefreshableSocketProvider {

private static final int UNSET_POSITION = -1;
private static final int DEFAULT_RETRIES_PER_CONNECTION = 3;

/**
* Socket addresses pool.
Expand Down Expand Up @@ -59,6 +61,7 @@ public class RoundRobinSocketProviderImpl extends BaseSocketChannelProvider impl
*/
public RoundRobinSocketProviderImpl(String... addresses) {
updateAddressList(Arrays.asList(addresses));
setRetriesLimit(DEFAULT_RETRIES_PER_CONNECTION);
}

private void updateAddressList(Collection<String> addresses) {
Expand Down Expand Up @@ -117,8 +120,26 @@ protected InetSocketAddress getLastObtainedAddress() {
}

@Override
protected InetSocketAddress getAddress(int retryNumber, Throwable lastError) throws IOException {
return getNextSocketAddress();
protected SocketChannel makeAttempt(int retryNumber, Throwable lastError) throws IOException {
if (retryNumber > getAddressCount()) {
throwFatalError("No more connection addresses are left.");
}

int retriesLimit = getRetriesLimit();
InetSocketAddress socketAddress = getNextSocketAddress();
if (retriesLimit < 1) {
throwFatalError("Retries count should be at least 1 or more");
}

IOException connectionError = null;
for (int i = 0; i < retriesLimit; i++) {
try {
return openChannel(socketAddress);
} catch (IOException e) {
connectionError = e;
}
}
throw connectionError;
}

/**
Expand Down Expand Up @@ -163,4 +184,8 @@ public void refreshAddresses(Collection<String> addresses) {
updateAddressList(addresses);
}

private void throwFatalError(String message) {
throw new CommunicationException(message);
}

}
Loading

0 comments on commit fe6bdbb

Please sign in to comment.