Skip to content

Commit

Permalink
Added event listener for connection manager
Browse files Browse the repository at this point in the history
  • Loading branch information
wey1and committed Feb 15, 2022
1 parent 5ed7e3e commit 6e47d1b
Show file tree
Hide file tree
Showing 27 changed files with 390 additions and 78 deletions.
6 changes: 5 additions & 1 deletion src/main/java/io/tarantool/driver/api/TarantoolClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import io.tarantool.driver.api.metadata.TarantoolMetadataOperations;
import io.tarantool.driver.api.metadata.TarantoolMetadataProvider;
import io.tarantool.driver.api.space.TarantoolSpaceOperations;
import io.tarantool.driver.core.connection.events.ConnectionManagerEvent;
import io.tarantool.driver.core.connection.events.EventManager;
import io.tarantool.driver.exceptions.TarantoolClientException;
import io.tarantool.driver.protocol.Packable;

Expand Down Expand Up @@ -73,10 +75,12 @@ public interface TarantoolClient<T extends Packable, R extends Collection<T>>
*/
TarantoolConnectionListeners getConnectionListeners();

EventManager<ConnectionManagerEvent> getEventManager();

/**
* Starts the process of establishing lacking connections to each host
*
* @return returns true if the establishing process has been started, else false
*/
boolean establishLackingConnections();
boolean refresh();
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import io.tarantool.driver.api.connection.TarantoolConnectionSelectionStrategyType;
import io.tarantool.driver.api.tuple.TarantoolTuple;
import io.tarantool.driver.auth.TarantoolCredentials;
import io.tarantool.driver.core.connection.events.ConnectionManagerEvent;
import io.tarantool.driver.core.connection.events.EventManager;
import io.tarantool.driver.mappers.MessagePackMapper;

import java.net.InetSocketAddress;
Expand Down Expand Up @@ -165,6 +167,8 @@ TarantoolClientBuilder withConnectionSelectionStrategy(
TarantoolClientBuilder withConnectionSelectionStrategy(
TarantoolConnectionSelectionStrategyType connectionSelectionStrategyType);

TarantoolClientBuilder withConnectionEventManager(EventManager<ConnectionManagerEvent> eventManager);

/**
* Build the configured Tarantool client instance. Call this when you have specified all necessary settings.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
*/
public interface TarantoolClusterAddressProvider extends AutoCloseable {
/**
* The the collection of Tarantool server nodes which belong to the same cluster
* The collection of Tarantool server nodes which belong to the same cluster
*
* @return collection of {@link TarantoolServerAddress}
*/
Collection<TarantoolServerAddress> getAddresses();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import io.tarantool.driver.api.TarantoolClusterAddressProvider;
import io.tarantool.driver.api.TarantoolServerAddress;
import io.tarantool.driver.core.TarantoolDaemonThreadFactory;
import io.tarantool.driver.core.connection.events.ConnectionManagerEvent;
import io.tarantool.driver.core.connection.events.EventManager;
import io.tarantool.driver.exceptions.TarantoolClientException;

import java.util.Collection;
Expand All @@ -23,18 +25,27 @@ public abstract class AbstractDiscoveryClusterAddressProvider implements Taranto
private final ScheduledExecutorService scheduledExecutorService;
private final CountDownLatch initLatch = new CountDownLatch(1);
private final AtomicReference<Collection<TarantoolServerAddress>> addressesHolder = new AtomicReference<>();
private final EventManager<ConnectionManagerEvent> eventManager;

public AbstractDiscoveryClusterAddressProvider(TarantoolClusterDiscoveryConfig discoveryConfig) {
this.discoveryConfig = discoveryConfig;
this.eventManager = discoveryConfig.getEventManager();
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
new TarantoolDaemonThreadFactory("tarantool-discovery"));
}

protected void startDiscoveryTask() throws TarantoolClientException {
Runnable discoveryTask = () -> {
try {
Collection<TarantoolServerAddress> currentAddresses = this.addressesHolder.get();
Collection<TarantoolServerAddress> addresses = discoverAddresses();
setAddresses(addresses);

if (currentAddresses != null
&& addresses.size() != currentAddresses.size()
|| !addresses.equals(currentAddresses)) {
eventManager.notify(ConnectionManagerEvent.SERVER_ADDRESS_CHANGED);
}
} finally {
if (initLatch.getCount() > 0) {
initLatch.countDown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,18 @@ public void setDiscoveryFunction(String discoveryFunction) {
this.discoveryFunction = discoveryFunction;
}

/**
* Builder for {@link BinaryClusterDiscoveryEndpoint}
*/
public static Builder builder() {
return new Builder();
}

/**
* Builder for {@link BinaryClusterDiscoveryEndpoint}
*/
public static class Builder {
private BinaryClusterDiscoveryEndpoint endpoint;
private final BinaryClusterDiscoveryEndpoint endpoint;

/**
* Basic constructor.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public HTTPClusterDiscoveryEndpoint() {

/**
* Create an instance, specifying URI for connection
*
* @param uri discovery endpoint URI
*/
public HTTPClusterDiscoveryEndpoint(String uri) {
Expand All @@ -29,6 +30,7 @@ public HTTPClusterDiscoveryEndpoint(String uri) {

/**
* Get discovery endpoint URI
*
* @return discovery endpoint URI
*/
public String getUri() {
Expand All @@ -37,6 +39,7 @@ public String getUri() {

/**
* Set discovery endpoint URI
*
* @param uri discovery endpoint URI
*/
public void setUri(String uri) {
Expand All @@ -45,6 +48,7 @@ public void setUri(String uri) {

/**
* Get cluster discovery endpoint connection timeout
*
* @return connection timeout, in milliseconds
*/
public int getConnectTimeout() {
Expand All @@ -53,6 +57,7 @@ public int getConnectTimeout() {

/**
* Set cluster discovery endpoint connection timeout
*
* @param connectTimeout connection timeout, in milliseconds
*/
public void setConnectTimeout(int connectTimeout) {
Expand All @@ -61,6 +66,7 @@ public void setConnectTimeout(int connectTimeout) {

/**
* Get response timeout for cluster discovery request
*
* @return request timeout, in milliseconds
*/
public int getReadTimeout() {
Expand All @@ -69,17 +75,25 @@ public int getReadTimeout() {

/**
* Set response timeout for cluster discovery request
*
* @param readTimeout request timeout, in milliseconds
*/
public void setReadTimeout(int readTimeout) {
this.readTimeout = readTimeout;
}

/**
* Builder for {@link HTTPClusterDiscoveryEndpoint}
*/
public static Builder builder() {
return new Builder();
}

/**
* Builder for {@link HTTPClusterDiscoveryEndpoint}
*/
public static class Builder {
private HTTPClusterDiscoveryEndpoint endpoint;
private final HTTPClusterDiscoveryEndpoint endpoint;

/**
* Basic constructor.
Expand All @@ -90,6 +104,7 @@ public Builder() {

/**
* Specify the cluster discovery endpoint URI
*
* @param uri discovery endpoint URI, should not be null or empty
* @return this builder instance
*/
Expand All @@ -101,6 +116,7 @@ public Builder withURI(String uri) {

/**
* Specify the connection timeout for discovery endpoint
*
* @param connectTimeout connection timeout, in milliseconds
* @return this builder instance
* @see HTTPClusterDiscoveryEndpoint#setConnectTimeout(int)
Expand All @@ -115,6 +131,7 @@ public Builder withConnectTimeout(int connectTimeout) {

/**
* Specify the read timeout for discovery endpoint connection
*
* @param readTimeout timeout of receiving response in the connection, in milliseconds
* @return this builder instance
* @see HTTPClusterDiscoveryEndpoint#setReadTimeout(int)
Expand All @@ -129,6 +146,7 @@ public Builder withReadTimeout(int readTimeout) {

/**
* Build the discovery endpoint configuration
*
* @return a {@link HTTPClusterDiscoveryEndpoint} instance
*/
public HTTPClusterDiscoveryEndpoint build() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
/**
* Tarantool server address provider with service discovery via HTTP.
* Gets list of nodes from API endpoint in json format.
*
* <p>
* Expected response format example:
* <pre>
* <code>
Expand All @@ -68,7 +68,7 @@
* }
* </code>
* </pre>
*
* <p>
* Tarantool cartridge application lua http endpoint example:
* <pre>
* <code>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.tarantool.driver.cluster;

import io.tarantool.driver.core.connection.events.ConnectionManagerEvent;
import io.tarantool.driver.core.connection.events.EventManager;
import io.tarantool.driver.exceptions.TarantoolClientException;
import io.tarantool.driver.utils.Assert;

Expand All @@ -14,9 +16,20 @@ public final class TarantoolClusterDiscoveryConfig {

private TarantoolClusterDiscoveryEndpoint endpoint;
private int serviceDiscoveryDelay = 60_000; // milliseconds
private EventManager<ConnectionManagerEvent> eventManager = new EventManager<>();


public void setEventManager(EventManager<ConnectionManagerEvent> eventManager) {
this.eventManager = eventManager;
}

public EventManager<ConnectionManagerEvent> getEventManager() {
return eventManager;
}

/**
* Get config of service discovery endpoint
*
* @return a {@link TarantoolClusterDiscoveryEndpoint} instance
*/
public TarantoolClusterDiscoveryEndpoint getEndpoint() {
Expand All @@ -25,6 +38,7 @@ public TarantoolClusterDiscoveryEndpoint getEndpoint() {

/**
* Set service discovery endpoint config and enable cluster connection
*
* @param endpoint a {@link TarantoolClusterDiscoveryEndpoint} instance
*/
public void setEndpoint(TarantoolClusterDiscoveryEndpoint endpoint) {
Expand All @@ -33,6 +47,7 @@ public void setEndpoint(TarantoolClusterDiscoveryEndpoint endpoint) {

/**
* Get cluster discovery delay
*
* @return cluster discovery delay, milliseconds
*/
public int getServiceDiscoveryDelay() {
Expand All @@ -41,6 +56,7 @@ public int getServiceDiscoveryDelay() {

/**
* Set scan period (in milliseconds) of receiving a new list of instances
*
* @param serviceDiscoveryDelay period of receiving a new list of instances
*/
public void setServiceDiscoveryDelay(int serviceDiscoveryDelay) {
Expand All @@ -49,6 +65,7 @@ public void setServiceDiscoveryDelay(int serviceDiscoveryDelay) {

/**
* Create a builder instance.
*
* @return a builder
*/
public static Builder builder() {
Expand All @@ -60,14 +77,15 @@ public static Builder builder() {
*/
public static class Builder {

private TarantoolClusterDiscoveryConfig config;
private final TarantoolClusterDiscoveryConfig config;

public Builder() {
this.config = new TarantoolClusterDiscoveryConfig();
}

/**
* Specify scan period of receiving a new list of instances
*
* @param delay period of receiving a new list of instances, in milliseconds
* @return this builder instance
* @see TarantoolClusterDiscoveryConfig#setServiceDiscoveryDelay(int)
Expand All @@ -82,6 +100,7 @@ public Builder withDelay(int delay) {

/**
* Specify service discovery config and enable using service discovery
*
* @param endpoint discovery endpoint config, should not be null
* @return this builder instance
* @see TarantoolClusterDiscoveryConfig#setEndpoint(TarantoolClusterDiscoveryEndpoint)
Expand All @@ -94,6 +113,7 @@ public Builder withEndpoint(TarantoolClusterDiscoveryEndpoint endpoint) {

/**
* Build a {@link TarantoolClusterDiscoveryConfig} instance
*
* @return configured instance
*/
public TarantoolClusterDiscoveryConfig build() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,8 @@ private TarantoolConnectionManager connectionManager() {
}

@Override
public boolean establishLackingConnections() {
return connectionManager().establishLackingConnections();
public boolean refresh() {
return connectionManager().refresh();
}

@Override
Expand Down
28 changes: 22 additions & 6 deletions src/main/java/io/tarantool/driver/core/ClusterTarantoolClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import io.tarantool.driver.core.connection.TarantoolClusterConnectionManager;
import io.tarantool.driver.core.connection.TarantoolConnectionFactory;
import io.tarantool.driver.core.connection.TarantoolConnectionManager;
import io.tarantool.driver.core.connection.events.ConnectionManagerEvent;
import io.tarantool.driver.core.connection.events.EventManager;
import io.tarantool.driver.protocol.Packable;
import io.tarantool.driver.utils.Assert;

Expand All @@ -25,6 +27,18 @@ public abstract class ClusterTarantoolClient<T extends Packable, R extends Colle
extends AbstractTarantoolClient<T, R> {

private final TarantoolClusterAddressProvider addressProvider;
private final EventManager<ConnectionManagerEvent> eventManager;

public ClusterTarantoolClient(TarantoolClientConfig config,
TarantoolClusterAddressProvider addressProvider,
EventManager<ConnectionManagerEvent> eventManager) {
super(config);

Assert.notNull(addressProvider, "Address provider must not be null");

this.addressProvider = addressProvider;
this.eventManager = eventManager;
}

/**
* Create a client. The server address for connecting to the server is specified by the passed address provider.
Expand All @@ -35,21 +49,23 @@ public abstract class ClusterTarantoolClient<T extends Packable, R extends Colle
*/
public ClusterTarantoolClient(TarantoolClientConfig config,
TarantoolClusterAddressProvider addressProvider) {
super(config);

Assert.notNull(addressProvider, "Address provider must not be null");

this.addressProvider = addressProvider;
this(config, addressProvider, new EventManager<>());
}

@Override
protected TarantoolConnectionManager connectionManager(TarantoolClientConfig config,
TarantoolConnectionFactory connectionFactory,
TarantoolConnectionListeners listeners) {
return new TarantoolClusterConnectionManager(config, connectionFactory, listeners, addressProvider);
return new TarantoolClusterConnectionManager(
config, connectionFactory, listeners, addressProvider, eventManager);
}

protected TarantoolClusterAddressProvider getAddressProvider() {
return addressProvider;
}

@Override
public EventManager<ConnectionManagerEvent> getEventManager() {
return eventManager;
}
}
Loading

0 comments on commit 6e47d1b

Please sign in to comment.