Skip to content

Commit

Permalink
Added event listener for connection manager
Browse files Browse the repository at this point in the history
  • Loading branch information
wey1and committed Feb 15, 2022
1 parent 5ed7e3e commit be24055
Show file tree
Hide file tree
Showing 25 changed files with 403 additions and 67 deletions.
2 changes: 1 addition & 1 deletion src/main/java/io/tarantool/driver/api/TarantoolClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,5 +78,5 @@ public interface TarantoolClient<T extends Packable, R extends Collection<T>>
*
* @return returns true if the establishing process has been started, else false
*/
boolean establishLackingConnections();
boolean refresh();
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package io.tarantool.driver.api;

import io.tarantool.driver.cluster.DiscoveryServiceEventManager;
import io.tarantool.driver.core.connection.EventManager;

import java.util.Collection;

/**
Expand All @@ -10,11 +13,22 @@
*/
public interface TarantoolClusterAddressProvider extends AutoCloseable {
/**
* The the collection of Tarantool server nodes which belong to the same cluster
* The collection of Tarantool server nodes which belong to the same cluster
*
* @return collection of {@link TarantoolServerAddress}
*/
Collection<TarantoolServerAddress> getAddresses();

/**
* Returns event manager for notifying connection manager about external events
* e.g. server address changed
*
* @return event manager
*/
default EventManager getEventManager() {
return new DiscoveryServiceEventManager();
}

@Override
default void close() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import io.tarantool.driver.api.TarantoolClusterAddressProvider;
import io.tarantool.driver.api.TarantoolServerAddress;
import io.tarantool.driver.core.TarantoolDaemonThreadFactory;
import io.tarantool.driver.core.connection.ConnectionManagerEvent;
import io.tarantool.driver.core.connection.EventManager;
import io.tarantool.driver.exceptions.TarantoolClientException;

import java.util.Collection;
Expand All @@ -23,18 +25,27 @@ public abstract class AbstractDiscoveryClusterAddressProvider implements Taranto
private final ScheduledExecutorService scheduledExecutorService;
private final CountDownLatch initLatch = new CountDownLatch(1);
private final AtomicReference<Collection<TarantoolServerAddress>> addressesHolder = new AtomicReference<>();
private final DiscoveryServiceEventManager eventManager;

public AbstractDiscoveryClusterAddressProvider(TarantoolClusterDiscoveryConfig discoveryConfig) {
this.discoveryConfig = discoveryConfig;
this.eventManager = new DiscoveryServiceEventManager();
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
new TarantoolDaemonThreadFactory("tarantool-discovery"));
}

protected void startDiscoveryTask() throws TarantoolClientException {
Runnable discoveryTask = () -> {
try {
Collection<TarantoolServerAddress> currentAddresses = this.addressesHolder.get();
Collection<TarantoolServerAddress> addresses = discoverAddresses();
setAddresses(addresses);

if (currentAddresses != null
&& addresses.size() != currentAddresses.size()
|| !addresses.equals(currentAddresses)) {
eventManager.notify(ConnectionManagerEvent.SERVER_ADDRESS_CHANGED);
}
} finally {
if (initLatch.getCount() > 0) {
initLatch.countDown();
Expand Down Expand Up @@ -74,6 +85,11 @@ public Collection<TarantoolServerAddress> getAddresses() {
return addressesHolder.get();
}

@Override
public EventManager getEventManager() {
return this.eventManager;
}

@Override
public void close() {
if (scheduledExecutorService != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public BinaryClusterDiscoveryEndpoint() {

/**
* Get service discovery endpoint provider
*
* @return tarantool server address provider instance
*/
public TarantoolClusterAddressProvider getEndpointProvider() {
Expand All @@ -33,6 +34,7 @@ public TarantoolClusterAddressProvider getEndpointProvider() {

/**
* Set service discovery endpoint provider
*
* @param endpointProvider a tarantool address provider instance
*/
public void setEndpointProvider(TarantoolClusterAddressProvider endpointProvider) {
Expand All @@ -41,6 +43,7 @@ public void setEndpointProvider(TarantoolClusterAddressProvider endpointProvider

/**
* Get client configuration for connecting to the set of the discovery endpoints
*
* @return tarantool client configuration
*/
public TarantoolClientConfig getClientConfig() {
Expand All @@ -50,6 +53,7 @@ public TarantoolClientConfig getClientConfig() {
/**
* Set client configuration for connecting to the set of the discovery endpoints. The same configuration will be
* used for each endpoint.
*
* @param clientConfig tarantool client configuration
*/
public void setClientConfig(TarantoolClientConfig clientConfig) {
Expand All @@ -58,6 +62,7 @@ public void setClientConfig(TarantoolClientConfig clientConfig) {

/**
* Get discovery function name
*
* @return discovery function name
*/
public String getDiscoveryFunction() {
Expand All @@ -66,17 +71,25 @@ public String getDiscoveryFunction() {

/**
* Set discovery function name
*
* @param discoveryFunction discovery function name
*/
public void setDiscoveryFunction(String discoveryFunction) {
this.discoveryFunction = discoveryFunction;
}

/**
* Builder for {@link BinaryClusterDiscoveryEndpoint}
*/
public static Builder builder() {
return new Builder();
}

/**
* Builder for {@link BinaryClusterDiscoveryEndpoint}
*/
public static class Builder {
private BinaryClusterDiscoveryEndpoint endpoint;
private final BinaryClusterDiscoveryEndpoint endpoint;

/**
* Basic constructor.
Expand All @@ -89,6 +102,7 @@ public Builder() {
* Specify the function name to invoke in the discovery endpoint for getting the list of nodes. The function
* should not require any parameters and must be exposed as API function. Also the user which is connecting the
* endpoint must have the appropriate permission for this function.
*
* @param discoveryFunction the function name, should not be null
* @return this builder instance
*/
Expand All @@ -100,6 +114,7 @@ public Builder withEntryFunction(String discoveryFunction) {

/**
* Specify address provider for the discovery endpoints
*
* @param endpointProvider discovery endpoint address privider, should not be null
* @return this builder instance
* @see BinaryClusterDiscoveryEndpoint#setEndpointProvider(TarantoolClusterAddressProvider)
Expand All @@ -113,6 +128,7 @@ public Builder withEndpointProvider(TarantoolClusterAddressProvider endpointProv
/**
* Specify the client configuration for connecting to the discovery endpoints. The same configuration will be
* used for all endpoints
*
* @param clientConfig tarantool client configuration
* @return this builder instance
*/
Expand All @@ -124,6 +140,7 @@ public Builder withClientConfig(TarantoolClientConfig clientConfig) {

/**
* Build the discovery endpoint configuration
*
* @return {@link BinaryClusterDiscoveryEndpoint} instance
*/
public BinaryClusterDiscoveryEndpoint build() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import io.tarantool.driver.api.TarantoolServerAddress;
import io.tarantool.driver.core.ClusterTarantoolTupleClient;
import io.tarantool.driver.core.connection.EventManager;
import io.tarantool.driver.exceptions.TarantoolClientException;

import java.util.Collection;
Expand Down Expand Up @@ -90,6 +91,11 @@ public BinaryDiscoveryClusterAddressProvider(TarantoolClusterDiscoveryConfig dis
startDiscoveryTask();
}

@Override
public EventManager getEventManager() {
return super.getEventManager();
}

protected Collection<TarantoolServerAddress> discoverAddresses() {
try {
List<?> functionResult = client.call(endpoint.getDiscoveryFunction(), Collections.emptyList()).get();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package io.tarantool.driver.cluster;

import io.tarantool.driver.core.connection.EventListener;
import io.tarantool.driver.core.connection.EventManager;
import io.tarantool.driver.core.connection.EventType;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

/**
* Implementation of event manager which manage events for discovery service
*
* @author Oleg Kuznetsov
*/
public class DiscoveryServiceEventManager implements EventManager {

private final Map<EventType, Set<EventListener>> listeners;

public DiscoveryServiceEventManager() {
this.listeners = new HashMap<>();
}

@Override
public void subscribe(EventType type, EventListener listener) {
Set<EventListener> eventListeners = getEventListeners(type);
eventListeners.add(listener);
}

@Override
public void unsubscribe(EventType type, EventListener listener) {
final Set<EventListener> eventListeners = listeners.get(type);
if (eventListeners != null) {
eventListeners.remove(listener);
}
}

@Override
public void unsubscribe(EventType type) {
listeners.remove(type);
}

@Override
public void notify(EventType type) {
getEventListeners(type).forEach(EventListener::update);
}

private Set<EventListener> getEventListeners(EventType type) {
return listeners.computeIfAbsent(type, k -> new HashSet<>());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public HTTPClusterDiscoveryEndpoint() {

/**
* Create an instance, specifying URI for connection
*
* @param uri discovery endpoint URI
*/
public HTTPClusterDiscoveryEndpoint(String uri) {
Expand All @@ -29,6 +30,7 @@ public HTTPClusterDiscoveryEndpoint(String uri) {

/**
* Get discovery endpoint URI
*
* @return discovery endpoint URI
*/
public String getUri() {
Expand All @@ -37,6 +39,7 @@ public String getUri() {

/**
* Set discovery endpoint URI
*
* @param uri discovery endpoint URI
*/
public void setUri(String uri) {
Expand All @@ -45,6 +48,7 @@ public void setUri(String uri) {

/**
* Get cluster discovery endpoint connection timeout
*
* @return connection timeout, in milliseconds
*/
public int getConnectTimeout() {
Expand All @@ -53,6 +57,7 @@ public int getConnectTimeout() {

/**
* Set cluster discovery endpoint connection timeout
*
* @param connectTimeout connection timeout, in milliseconds
*/
public void setConnectTimeout(int connectTimeout) {
Expand All @@ -61,6 +66,7 @@ public void setConnectTimeout(int connectTimeout) {

/**
* Get response timeout for cluster discovery request
*
* @return request timeout, in milliseconds
*/
public int getReadTimeout() {
Expand All @@ -69,17 +75,25 @@ public int getReadTimeout() {

/**
* Set response timeout for cluster discovery request
*
* @param readTimeout request timeout, in milliseconds
*/
public void setReadTimeout(int readTimeout) {
this.readTimeout = readTimeout;
}

/**
* Builder for {@link HTTPClusterDiscoveryEndpoint}
*/
public static Builder builder() {
return new Builder();
}

/**
* Builder for {@link HTTPClusterDiscoveryEndpoint}
*/
public static class Builder {
private HTTPClusterDiscoveryEndpoint endpoint;
private final HTTPClusterDiscoveryEndpoint endpoint;

/**
* Basic constructor.
Expand All @@ -90,6 +104,7 @@ public Builder() {

/**
* Specify the cluster discovery endpoint URI
*
* @param uri discovery endpoint URI, should not be null or empty
* @return this builder instance
*/
Expand All @@ -101,6 +116,7 @@ public Builder withURI(String uri) {

/**
* Specify the connection timeout for discovery endpoint
*
* @param connectTimeout connection timeout, in milliseconds
* @return this builder instance
* @see HTTPClusterDiscoveryEndpoint#setConnectTimeout(int)
Expand All @@ -115,6 +131,7 @@ public Builder withConnectTimeout(int connectTimeout) {

/**
* Specify the read timeout for discovery endpoint connection
*
* @param readTimeout timeout of receiving response in the connection, in milliseconds
* @return this builder instance
* @see HTTPClusterDiscoveryEndpoint#setReadTimeout(int)
Expand All @@ -129,6 +146,7 @@ public Builder withReadTimeout(int readTimeout) {

/**
* Build the discovery endpoint configuration
*
* @return a {@link HTTPClusterDiscoveryEndpoint} instance
*/
public HTTPClusterDiscoveryEndpoint build() {
Expand Down
Loading

0 comments on commit be24055

Please sign in to comment.