Skip to content

Commit

Permalink
Added refresh callback to address provider (#167)
Browse files Browse the repository at this point in the history
* Added refresh callback to address provider and use it in connection manager to update the list of routers.
  • Loading branch information
wey1and committed Feb 17, 2022
1 parent 5adf1b0 commit cb2ddca
Show file tree
Hide file tree
Showing 18 changed files with 260 additions and 65 deletions.
2 changes: 1 addition & 1 deletion src/main/java/io/tarantool/driver/api/TarantoolClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,5 +78,5 @@ public interface TarantoolClient<T extends Packable, R extends Collection<T>>
*
* @return returns true if the establishing process has been started, else false
*/
boolean establishLackingConnections();
boolean refresh();
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,22 @@
*/
public interface TarantoolClusterAddressProvider extends AutoCloseable {
/**
* The the collection of Tarantool server nodes which belong to the same cluster
* The collection of Tarantool server nodes which belong to the same cluster
*
* @return collection of {@link TarantoolServerAddress}
*/
Collection<TarantoolServerAddress> getAddresses();

/**
* Specify callback for refreshing connections to addresses.
* <p>
* For example: can be executed when provider detects changes in the list of server addresses.
*
* @param runnable callback for running refresh connections
*/
default void setRefreshCallback(Runnable runnable) {
}

@Override
default void close() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,28 @@ public abstract class AbstractDiscoveryClusterAddressProvider implements Taranto
private final ScheduledExecutorService scheduledExecutorService;
private final CountDownLatch initLatch = new CountDownLatch(1);
private final AtomicReference<Collection<TarantoolServerAddress>> addressesHolder = new AtomicReference<>();
private final AtomicReference<Runnable> refreshCallback;

public AbstractDiscoveryClusterAddressProvider(TarantoolClusterDiscoveryConfig discoveryConfig) {
this.discoveryConfig = discoveryConfig;
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
new TarantoolDaemonThreadFactory("tarantool-discovery"));
this.refreshCallback = new AtomicReference<>(() -> {
});
}

protected void startDiscoveryTask() throws TarantoolClientException {
Runnable discoveryTask = () -> {
try {
Collection<TarantoolServerAddress> currentAddresses = this.addressesHolder.get();
Collection<TarantoolServerAddress> addresses = discoverAddresses();
setAddresses(addresses);

if (currentAddresses != null
&& addresses.size() != currentAddresses.size()
|| !addresses.equals(currentAddresses)) {
this.refreshCallback.get().run();
}
} finally {
if (initLatch.getCount() > 0) {
initLatch.countDown();
Expand Down Expand Up @@ -74,6 +84,11 @@ public Collection<TarantoolServerAddress> getAddresses() {
return addressesHolder.get();
}

@Override
public void setRefreshCallback(Runnable runnable) {
this.refreshCallback.set(runnable);
}

@Override
public void close() {
if (scheduledExecutorService != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public BinaryClusterDiscoveryEndpoint() {

/**
* Get service discovery endpoint provider
*
* @return tarantool server address provider instance
*/
public TarantoolClusterAddressProvider getEndpointProvider() {
Expand All @@ -33,6 +34,7 @@ public TarantoolClusterAddressProvider getEndpointProvider() {

/**
* Set service discovery endpoint provider
*
* @param endpointProvider a tarantool address provider instance
*/
public void setEndpointProvider(TarantoolClusterAddressProvider endpointProvider) {
Expand All @@ -41,6 +43,7 @@ public void setEndpointProvider(TarantoolClusterAddressProvider endpointProvider

/**
* Get client configuration for connecting to the set of the discovery endpoints
*
* @return tarantool client configuration
*/
public TarantoolClientConfig getClientConfig() {
Expand All @@ -50,6 +53,7 @@ public TarantoolClientConfig getClientConfig() {
/**
* Set client configuration for connecting to the set of the discovery endpoints. The same configuration will be
* used for each endpoint.
*
* @param clientConfig tarantool client configuration
*/
public void setClientConfig(TarantoolClientConfig clientConfig) {
Expand All @@ -58,6 +62,7 @@ public void setClientConfig(TarantoolClientConfig clientConfig) {

/**
* Get discovery function name
*
* @return discovery function name
*/
public String getDiscoveryFunction() {
Expand All @@ -66,17 +71,25 @@ public String getDiscoveryFunction() {

/**
* Set discovery function name
*
* @param discoveryFunction discovery function name
*/
public void setDiscoveryFunction(String discoveryFunction) {
this.discoveryFunction = discoveryFunction;
}

/**
* Builder for {@link BinaryClusterDiscoveryEndpoint}
*/
public static Builder builder() {
return new Builder();
}

/**
* Builder for {@link BinaryClusterDiscoveryEndpoint}
*/
public static class Builder {
private BinaryClusterDiscoveryEndpoint endpoint;
private final BinaryClusterDiscoveryEndpoint endpoint;

/**
* Basic constructor.
Expand All @@ -89,6 +102,7 @@ public Builder() {
* Specify the function name to invoke in the discovery endpoint for getting the list of nodes. The function
* should not require any parameters and must be exposed as API function. Also the user which is connecting the
* endpoint must have the appropriate permission for this function.
*
* @param discoveryFunction the function name, should not be null
* @return this builder instance
*/
Expand All @@ -100,6 +114,7 @@ public Builder withEntryFunction(String discoveryFunction) {

/**
* Specify address provider for the discovery endpoints
*
* @param endpointProvider discovery endpoint address privider, should not be null
* @return this builder instance
* @see BinaryClusterDiscoveryEndpoint#setEndpointProvider(TarantoolClusterAddressProvider)
Expand All @@ -113,6 +128,7 @@ public Builder withEndpointProvider(TarantoolClusterAddressProvider endpointProv
/**
* Specify the client configuration for connecting to the discovery endpoints. The same configuration will be
* used for all endpoints
*
* @param clientConfig tarantool client configuration
* @return this builder instance
*/
Expand All @@ -124,6 +140,7 @@ public Builder withClientConfig(TarantoolClientConfig clientConfig) {

/**
* Build the discovery endpoint configuration
*
* @return {@link BinaryClusterDiscoveryEndpoint} instance
*/
public BinaryClusterDiscoveryEndpoint build() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public HTTPClusterDiscoveryEndpoint() {

/**
* Create an instance, specifying URI for connection
*
* @param uri discovery endpoint URI
*/
public HTTPClusterDiscoveryEndpoint(String uri) {
Expand All @@ -29,6 +30,7 @@ public HTTPClusterDiscoveryEndpoint(String uri) {

/**
* Get discovery endpoint URI
*
* @return discovery endpoint URI
*/
public String getUri() {
Expand All @@ -37,6 +39,7 @@ public String getUri() {

/**
* Set discovery endpoint URI
*
* @param uri discovery endpoint URI
*/
public void setUri(String uri) {
Expand All @@ -45,6 +48,7 @@ public void setUri(String uri) {

/**
* Get cluster discovery endpoint connection timeout
*
* @return connection timeout, in milliseconds
*/
public int getConnectTimeout() {
Expand All @@ -53,6 +57,7 @@ public int getConnectTimeout() {

/**
* Set cluster discovery endpoint connection timeout
*
* @param connectTimeout connection timeout, in milliseconds
*/
public void setConnectTimeout(int connectTimeout) {
Expand All @@ -61,6 +66,7 @@ public void setConnectTimeout(int connectTimeout) {

/**
* Get response timeout for cluster discovery request
*
* @return request timeout, in milliseconds
*/
public int getReadTimeout() {
Expand All @@ -69,17 +75,25 @@ public int getReadTimeout() {

/**
* Set response timeout for cluster discovery request
*
* @param readTimeout request timeout, in milliseconds
*/
public void setReadTimeout(int readTimeout) {
this.readTimeout = readTimeout;
}

/**
* Builder for {@link HTTPClusterDiscoveryEndpoint}
*/
public static Builder builder() {
return new Builder();
}

/**
* Builder for {@link HTTPClusterDiscoveryEndpoint}
*/
public static class Builder {
private HTTPClusterDiscoveryEndpoint endpoint;
private final HTTPClusterDiscoveryEndpoint endpoint;

/**
* Basic constructor.
Expand All @@ -90,6 +104,7 @@ public Builder() {

/**
* Specify the cluster discovery endpoint URI
*
* @param uri discovery endpoint URI, should not be null or empty
* @return this builder instance
*/
Expand All @@ -101,6 +116,7 @@ public Builder withURI(String uri) {

/**
* Specify the connection timeout for discovery endpoint
*
* @param connectTimeout connection timeout, in milliseconds
* @return this builder instance
* @see HTTPClusterDiscoveryEndpoint#setConnectTimeout(int)
Expand All @@ -115,6 +131,7 @@ public Builder withConnectTimeout(int connectTimeout) {

/**
* Specify the read timeout for discovery endpoint connection
*
* @param readTimeout timeout of receiving response in the connection, in milliseconds
* @return this builder instance
* @see HTTPClusterDiscoveryEndpoint#setReadTimeout(int)
Expand All @@ -129,6 +146,7 @@ public Builder withReadTimeout(int readTimeout) {

/**
* Build the discovery endpoint configuration
*
* @return a {@link HTTPClusterDiscoveryEndpoint} instance
*/
public HTTPClusterDiscoveryEndpoint build() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,8 @@ private TarantoolConnectionManager connectionManager() {
}

@Override
public boolean establishLackingConnections() {
return connectionManager().establishLackingConnections();
public boolean refresh() {
return connectionManager().refresh();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -417,8 +417,8 @@ public CompletableFuture<List<?>> eval(String expression,
}

@Override
public boolean establishLackingConnections() {
return this.client.establishLackingConnections();
public boolean refresh() {
return this.client.refresh();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,8 +379,8 @@ public CompletableFuture<List<?>> eval(String expression, List<?> arguments,
}

@Override
public boolean establishLackingConnections() {
return this.client.establishLackingConnections();
public boolean refresh() {
return this.client.refresh();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,22 +46,6 @@ public abstract class AbstractTarantoolConnectionManager implements TarantoolCon
private final Phaser initPhaser = new Phaser(0);
private final Logger logger = LoggerFactory.getLogger(getClass().getName());

/**
* Constructor
*
* @param config Tarantool client config
* @param connectionFactory connection factory
* @param selectionStrategyFactory connection selection strategy factory
* @param connectionListeners connection listeners
* @deprecated
*/
protected AbstractTarantoolConnectionManager(TarantoolClientConfig config,
TarantoolConnectionFactory connectionFactory,
ConnectionSelectionStrategyFactory selectionStrategyFactory,
TarantoolConnectionListeners connectionListeners) {
this(config, connectionFactory, connectionListeners);
}

/**
* Basic constructor
*
Expand Down Expand Up @@ -105,7 +89,7 @@ public CompletableFuture<TarantoolConnection> getConnection() {
}

@Override
public boolean establishLackingConnections() {
public boolean refresh() {
return connectionMode.compareAndSet(ConnectionMode.OFF, ConnectionMode.PARTIAL);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,18 @@ public TarantoolClusterConnectionManager(TarantoolClientConfig config,
TarantoolClusterAddressProvider addressProvider) {
super(config, connectionFactory, listeners);
this.addressProvider = addressProvider;
this.addressProvider.setRefreshCallback(super::refresh);
}

@Override
protected Collection<TarantoolServerAddress> getAddresses() {
return addressProvider.getAddresses();
}

@Override
public void close() {
addressProvider.setRefreshCallback(() -> {
});
super.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ public interface TarantoolConnectionManager extends AutoCloseable {
*
* @return returns true if the establishing process has been started, else false
*/
boolean establishLackingConnections();
boolean refresh();
}
Loading

0 comments on commit cb2ddca

Please sign in to comment.