Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added refresh callback to address provider #167

Merged
merged 2 commits into from
Feb 17, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/main/java/io/tarantool/driver/api/TarantoolClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,5 +78,5 @@ public interface TarantoolClient<T extends Packable, R extends Collection<T>>
*
* @return returns true if the establishing process has been started, else false
*/
boolean establishLackingConnections();
boolean refresh();
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,22 @@
*/
public interface TarantoolClusterAddressProvider extends AutoCloseable {
/**
* The the collection of Tarantool server nodes which belong to the same cluster
* The collection of Tarantool server nodes which belong to the same cluster
*
* @return collection of {@link TarantoolServerAddress}
*/
Collection<TarantoolServerAddress> getAddresses();

/**
* Specify callback for refreshing connections to addresses.
* <p>
* For example: you can run it when you want saying about your Tarantool server addresses is changed
vrogach2020 marked this conversation as resolved.
Show resolved Hide resolved
*
* @param runnable callback for running refresh connections
*/
default void setRefreshCallback(Runnable runnable) {
}

@Override
default void close() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,28 @@ public abstract class AbstractDiscoveryClusterAddressProvider implements Taranto
private final ScheduledExecutorService scheduledExecutorService;
private final CountDownLatch initLatch = new CountDownLatch(1);
private final AtomicReference<Collection<TarantoolServerAddress>> addressesHolder = new AtomicReference<>();
private final AtomicReference<Runnable> refreshCallback;

public AbstractDiscoveryClusterAddressProvider(TarantoolClusterDiscoveryConfig discoveryConfig) {
this.discoveryConfig = discoveryConfig;
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
new TarantoolDaemonThreadFactory("tarantool-discovery"));
this.refreshCallback = new AtomicReference<>(() -> {
});
}

protected void startDiscoveryTask() throws TarantoolClientException {
Runnable discoveryTask = () -> {
try {
Collection<TarantoolServerAddress> currentAddresses = this.addressesHolder.get();
Collection<TarantoolServerAddress> addresses = discoverAddresses();
setAddresses(addresses);

if (currentAddresses != null
&& addresses.size() != currentAddresses.size()
|| !addresses.equals(currentAddresses)) {
this.refreshCallback.get().run();
}
} finally {
if (initLatch.getCount() > 0) {
initLatch.countDown();
Expand Down Expand Up @@ -74,6 +84,11 @@ public Collection<TarantoolServerAddress> getAddresses() {
return addressesHolder.get();
}

@Override
public void setRefreshCallback(Runnable runnable) {
this.refreshCallback.set(runnable);
}

@Override
public void close() {
if (scheduledExecutorService != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public BinaryClusterDiscoveryEndpoint() {

/**
* Get service discovery endpoint provider
*
* @return tarantool server address provider instance
*/
public TarantoolClusterAddressProvider getEndpointProvider() {
Expand All @@ -33,6 +34,7 @@ public TarantoolClusterAddressProvider getEndpointProvider() {

/**
* Set service discovery endpoint provider
*
* @param endpointProvider a tarantool address provider instance
*/
public void setEndpointProvider(TarantoolClusterAddressProvider endpointProvider) {
Expand All @@ -41,6 +43,7 @@ public void setEndpointProvider(TarantoolClusterAddressProvider endpointProvider

/**
* Get client configuration for connecting to the set of the discovery endpoints
*
* @return tarantool client configuration
*/
public TarantoolClientConfig getClientConfig() {
Expand All @@ -50,6 +53,7 @@ public TarantoolClientConfig getClientConfig() {
/**
* Set client configuration for connecting to the set of the discovery endpoints. The same configuration will be
* used for each endpoint.
*
* @param clientConfig tarantool client configuration
*/
public void setClientConfig(TarantoolClientConfig clientConfig) {
Expand All @@ -58,6 +62,7 @@ public void setClientConfig(TarantoolClientConfig clientConfig) {

/**
* Get discovery function name
*
* @return discovery function name
*/
public String getDiscoveryFunction() {
Expand All @@ -66,17 +71,25 @@ public String getDiscoveryFunction() {

/**
* Set discovery function name
*
* @param discoveryFunction discovery function name
*/
public void setDiscoveryFunction(String discoveryFunction) {
this.discoveryFunction = discoveryFunction;
}

/**
* Builder for {@link BinaryClusterDiscoveryEndpoint}
*/
public static Builder builder() {
return new Builder();
}

/**
* Builder for {@link BinaryClusterDiscoveryEndpoint}
*/
public static class Builder {
private BinaryClusterDiscoveryEndpoint endpoint;
private final BinaryClusterDiscoveryEndpoint endpoint;

/**
* Basic constructor.
Expand All @@ -89,6 +102,7 @@ public Builder() {
* Specify the function name to invoke in the discovery endpoint for getting the list of nodes. The function
* should not require any parameters and must be exposed as API function. Also the user which is connecting the
* endpoint must have the appropriate permission for this function.
*
* @param discoveryFunction the function name, should not be null
* @return this builder instance
*/
Expand All @@ -100,6 +114,7 @@ public Builder withEntryFunction(String discoveryFunction) {

/**
* Specify address provider for the discovery endpoints
*
* @param endpointProvider discovery endpoint address privider, should not be null
* @return this builder instance
* @see BinaryClusterDiscoveryEndpoint#setEndpointProvider(TarantoolClusterAddressProvider)
Expand All @@ -113,6 +128,7 @@ public Builder withEndpointProvider(TarantoolClusterAddressProvider endpointProv
/**
* Specify the client configuration for connecting to the discovery endpoints. The same configuration will be
* used for all endpoints
*
* @param clientConfig tarantool client configuration
* @return this builder instance
*/
Expand All @@ -124,6 +140,7 @@ public Builder withClientConfig(TarantoolClientConfig clientConfig) {

/**
* Build the discovery endpoint configuration
*
* @return {@link BinaryClusterDiscoveryEndpoint} instance
*/
public BinaryClusterDiscoveryEndpoint build() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public HTTPClusterDiscoveryEndpoint() {

/**
* Create an instance, specifying URI for connection
*
* @param uri discovery endpoint URI
*/
public HTTPClusterDiscoveryEndpoint(String uri) {
Expand All @@ -29,6 +30,7 @@ public HTTPClusterDiscoveryEndpoint(String uri) {

/**
* Get discovery endpoint URI
*
* @return discovery endpoint URI
*/
public String getUri() {
Expand All @@ -37,6 +39,7 @@ public String getUri() {

/**
* Set discovery endpoint URI
*
* @param uri discovery endpoint URI
*/
public void setUri(String uri) {
Expand All @@ -45,6 +48,7 @@ public void setUri(String uri) {

/**
* Get cluster discovery endpoint connection timeout
*
* @return connection timeout, in milliseconds
*/
public int getConnectTimeout() {
Expand All @@ -53,6 +57,7 @@ public int getConnectTimeout() {

/**
* Set cluster discovery endpoint connection timeout
*
* @param connectTimeout connection timeout, in milliseconds
*/
public void setConnectTimeout(int connectTimeout) {
Expand All @@ -61,6 +66,7 @@ public void setConnectTimeout(int connectTimeout) {

/**
* Get response timeout for cluster discovery request
*
* @return request timeout, in milliseconds
*/
public int getReadTimeout() {
Expand All @@ -69,17 +75,25 @@ public int getReadTimeout() {

/**
* Set response timeout for cluster discovery request
*
* @param readTimeout request timeout, in milliseconds
*/
public void setReadTimeout(int readTimeout) {
this.readTimeout = readTimeout;
}

/**
* Builder for {@link HTTPClusterDiscoveryEndpoint}
*/
public static Builder builder() {
return new Builder();
}

/**
* Builder for {@link HTTPClusterDiscoveryEndpoint}
*/
public static class Builder {
private HTTPClusterDiscoveryEndpoint endpoint;
private final HTTPClusterDiscoveryEndpoint endpoint;

/**
* Basic constructor.
Expand All @@ -90,6 +104,7 @@ public Builder() {

/**
* Specify the cluster discovery endpoint URI
*
* @param uri discovery endpoint URI, should not be null or empty
* @return this builder instance
*/
Expand All @@ -101,6 +116,7 @@ public Builder withURI(String uri) {

/**
* Specify the connection timeout for discovery endpoint
*
* @param connectTimeout connection timeout, in milliseconds
* @return this builder instance
* @see HTTPClusterDiscoveryEndpoint#setConnectTimeout(int)
Expand All @@ -115,6 +131,7 @@ public Builder withConnectTimeout(int connectTimeout) {

/**
* Specify the read timeout for discovery endpoint connection
*
* @param readTimeout timeout of receiving response in the connection, in milliseconds
* @return this builder instance
* @see HTTPClusterDiscoveryEndpoint#setReadTimeout(int)
Expand All @@ -129,6 +146,7 @@ public Builder withReadTimeout(int readTimeout) {

/**
* Build the discovery endpoint configuration
*
* @return a {@link HTTPClusterDiscoveryEndpoint} instance
*/
public HTTPClusterDiscoveryEndpoint build() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,8 @@ private TarantoolConnectionManager connectionManager() {
}

@Override
public boolean establishLackingConnections() {
return connectionManager().establishLackingConnections();
public boolean refresh() {
return connectionManager().refresh();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -417,8 +417,8 @@ public CompletableFuture<List<?>> eval(String expression,
}

@Override
public boolean establishLackingConnections() {
return this.client.establishLackingConnections();
public boolean refresh() {
return this.client.refresh();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,8 +379,8 @@ public CompletableFuture<List<?>> eval(String expression, List<?> arguments,
}

@Override
public boolean establishLackingConnections() {
return this.client.establishLackingConnections();
public boolean refresh() {
return this.client.refresh();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,22 +46,6 @@ public abstract class AbstractTarantoolConnectionManager implements TarantoolCon
private final Phaser initPhaser = new Phaser(0);
private final Logger logger = LoggerFactory.getLogger(getClass().getName());

/**
* Constructor
*
* @param config Tarantool client config
* @param connectionFactory connection factory
* @param selectionStrategyFactory connection selection strategy factory
* @param connectionListeners connection listeners
* @deprecated
*/
protected AbstractTarantoolConnectionManager(TarantoolClientConfig config,
TarantoolConnectionFactory connectionFactory,
ConnectionSelectionStrategyFactory selectionStrategyFactory,
TarantoolConnectionListeners connectionListeners) {
this(config, connectionFactory, connectionListeners);
}

/**
* Basic constructor
*
Expand Down Expand Up @@ -105,7 +89,7 @@ public CompletableFuture<TarantoolConnection> getConnection() {
}

@Override
public boolean establishLackingConnections() {
public boolean refresh() {
return connectionMode.compareAndSet(ConnectionMode.OFF, ConnectionMode.PARTIAL);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,18 @@ public TarantoolClusterConnectionManager(TarantoolClientConfig config,
TarantoolClusterAddressProvider addressProvider) {
super(config, connectionFactory, listeners);
this.addressProvider = addressProvider;
this.addressProvider.setRefreshCallback(super::refresh);
}

@Override
protected Collection<TarantoolServerAddress> getAddresses() {
return addressProvider.getAddresses();
}

@Override
public void close() {
addressProvider.setRefreshCallback(() -> {
});
super.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ public interface TarantoolConnectionManager extends AutoCloseable {
*
* @return returns true if the establishing process has been started, else false
*/
boolean establishLackingConnections();
boolean refresh();
}
Loading