From be24055f97739e77175d694d205da834347c1914 Mon Sep 17 00:00:00 2001 From: Oleg Kuznetsov Date: Tue, 15 Feb 2022 23:57:23 +0300 Subject: [PATCH] Added event listener for connection manager --- .../tarantool/driver/api/TarantoolClient.java | 2 +- .../api/TarantoolClusterAddressProvider.java | 16 ++- ...stractDiscoveryClusterAddressProvider.java | 16 +++ .../BinaryClusterDiscoveryEndpoint.java | 19 +++- ...BinaryDiscoveryClusterAddressProvider.java | 6 ++ .../cluster/DiscoveryServiceEventManager.java | 52 +++++++++ .../cluster/HTTPClusterDiscoveryEndpoint.java | 20 +++- .../HTTPDiscoveryClusterAddressProvider.java | 10 +- .../driver/core/AbstractTarantoolClient.java | 4 +- .../driver/core/ProxyTarantoolClient.java | 4 +- .../driver/core/RetryingTarantoolClient.java | 4 +- .../AbstractTarantoolConnectionManager.java | 18 +--- .../connection/ConnectionManagerEvent.java | 12 +++ .../driver/core/connection/EventListener.java | 11 ++ .../driver/core/connection/EventManager.java | 39 +++++++ .../driver/core/connection/EventType.java | 9 ++ .../TarantoolClusterConnectionManager.java | 12 +++ .../TarantoolConnectionManager.java | 2 +- .../TestWrappedClusterAddressProvider.java | 19 +++- .../integration/ClusterConnectionIT.java | 44 ++++---- .../integration/ClusterDiscoveryIT.java | 12 ++- .../ProxyTarantoolClientMixedInstancesIT.java | 3 +- .../driver/integration/ReconnectIT.java | 100 +++++++++++++++++- .../cartridge/app/roles/api_router.lua | 19 ++++ .../cartridge/app/roles/api_storage.lua | 17 +++ 25 files changed, 403 insertions(+), 67 deletions(-) create mode 100644 src/main/java/io/tarantool/driver/cluster/DiscoveryServiceEventManager.java create mode 100644 src/main/java/io/tarantool/driver/core/connection/ConnectionManagerEvent.java create mode 100644 src/main/java/io/tarantool/driver/core/connection/EventListener.java create mode 100644 src/main/java/io/tarantool/driver/core/connection/EventManager.java create mode 100644 src/main/java/io/tarantool/driver/core/connection/EventType.java diff --git a/src/main/java/io/tarantool/driver/api/TarantoolClient.java b/src/main/java/io/tarantool/driver/api/TarantoolClient.java index 7e2833bac..def70053e 100644 --- a/src/main/java/io/tarantool/driver/api/TarantoolClient.java +++ b/src/main/java/io/tarantool/driver/api/TarantoolClient.java @@ -78,5 +78,5 @@ public interface TarantoolClient> * * @return returns true if the establishing process has been started, else false */ - boolean establishLackingConnections(); + boolean refresh(); } diff --git a/src/main/java/io/tarantool/driver/api/TarantoolClusterAddressProvider.java b/src/main/java/io/tarantool/driver/api/TarantoolClusterAddressProvider.java index 457e90706..c0ffe4161 100644 --- a/src/main/java/io/tarantool/driver/api/TarantoolClusterAddressProvider.java +++ b/src/main/java/io/tarantool/driver/api/TarantoolClusterAddressProvider.java @@ -1,5 +1,8 @@ package io.tarantool.driver.api; +import io.tarantool.driver.cluster.DiscoveryServiceEventManager; +import io.tarantool.driver.core.connection.EventManager; + import java.util.Collection; /** @@ -10,11 +13,22 @@ */ public interface TarantoolClusterAddressProvider extends AutoCloseable { /** - * The the collection of Tarantool server nodes which belong to the same cluster + * The collection of Tarantool server nodes which belong to the same cluster + * * @return collection of {@link TarantoolServerAddress} */ Collection getAddresses(); + /** + * Returns event manager for notifying connection manager about external events + * e.g. server address changed + * + * @return event manager + */ + default EventManager getEventManager() { + return new DiscoveryServiceEventManager(); + } + @Override default void close() { } diff --git a/src/main/java/io/tarantool/driver/cluster/AbstractDiscoveryClusterAddressProvider.java b/src/main/java/io/tarantool/driver/cluster/AbstractDiscoveryClusterAddressProvider.java index 481a2659d..434062831 100644 --- a/src/main/java/io/tarantool/driver/cluster/AbstractDiscoveryClusterAddressProvider.java +++ b/src/main/java/io/tarantool/driver/cluster/AbstractDiscoveryClusterAddressProvider.java @@ -3,6 +3,8 @@ import io.tarantool.driver.api.TarantoolClusterAddressProvider; import io.tarantool.driver.api.TarantoolServerAddress; import io.tarantool.driver.core.TarantoolDaemonThreadFactory; +import io.tarantool.driver.core.connection.ConnectionManagerEvent; +import io.tarantool.driver.core.connection.EventManager; import io.tarantool.driver.exceptions.TarantoolClientException; import java.util.Collection; @@ -23,9 +25,11 @@ public abstract class AbstractDiscoveryClusterAddressProvider implements Taranto private final ScheduledExecutorService scheduledExecutorService; private final CountDownLatch initLatch = new CountDownLatch(1); private final AtomicReference> addressesHolder = new AtomicReference<>(); + private final DiscoveryServiceEventManager eventManager; public AbstractDiscoveryClusterAddressProvider(TarantoolClusterDiscoveryConfig discoveryConfig) { this.discoveryConfig = discoveryConfig; + this.eventManager = new DiscoveryServiceEventManager(); this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor( new TarantoolDaemonThreadFactory("tarantool-discovery")); } @@ -33,8 +37,15 @@ public AbstractDiscoveryClusterAddressProvider(TarantoolClusterDiscoveryConfig d protected void startDiscoveryTask() throws TarantoolClientException { Runnable discoveryTask = () -> { try { + Collection currentAddresses = this.addressesHolder.get(); Collection addresses = discoverAddresses(); setAddresses(addresses); + + if (currentAddresses != null + && addresses.size() != currentAddresses.size() + || !addresses.equals(currentAddresses)) { + eventManager.notify(ConnectionManagerEvent.SERVER_ADDRESS_CHANGED); + } } finally { if (initLatch.getCount() > 0) { initLatch.countDown(); @@ -74,6 +85,11 @@ public Collection getAddresses() { return addressesHolder.get(); } + @Override + public EventManager getEventManager() { + return this.eventManager; + } + @Override public void close() { if (scheduledExecutorService != null) { diff --git a/src/main/java/io/tarantool/driver/cluster/BinaryClusterDiscoveryEndpoint.java b/src/main/java/io/tarantool/driver/cluster/BinaryClusterDiscoveryEndpoint.java index cccaf7989..21e6c4ec2 100644 --- a/src/main/java/io/tarantool/driver/cluster/BinaryClusterDiscoveryEndpoint.java +++ b/src/main/java/io/tarantool/driver/cluster/BinaryClusterDiscoveryEndpoint.java @@ -25,6 +25,7 @@ public BinaryClusterDiscoveryEndpoint() { /** * Get service discovery endpoint provider + * * @return tarantool server address provider instance */ public TarantoolClusterAddressProvider getEndpointProvider() { @@ -33,6 +34,7 @@ public TarantoolClusterAddressProvider getEndpointProvider() { /** * Set service discovery endpoint provider + * * @param endpointProvider a tarantool address provider instance */ public void setEndpointProvider(TarantoolClusterAddressProvider endpointProvider) { @@ -41,6 +43,7 @@ public void setEndpointProvider(TarantoolClusterAddressProvider endpointProvider /** * Get client configuration for connecting to the set of the discovery endpoints + * * @return tarantool client configuration */ public TarantoolClientConfig getClientConfig() { @@ -50,6 +53,7 @@ public TarantoolClientConfig getClientConfig() { /** * Set client configuration for connecting to the set of the discovery endpoints. The same configuration will be * used for each endpoint. + * * @param clientConfig tarantool client configuration */ public void setClientConfig(TarantoolClientConfig clientConfig) { @@ -58,6 +62,7 @@ public void setClientConfig(TarantoolClientConfig clientConfig) { /** * Get discovery function name + * * @return discovery function name */ public String getDiscoveryFunction() { @@ -66,17 +71,25 @@ public String getDiscoveryFunction() { /** * Set discovery function name + * * @param discoveryFunction discovery function name */ public void setDiscoveryFunction(String discoveryFunction) { this.discoveryFunction = discoveryFunction; } + /** + * Builder for {@link BinaryClusterDiscoveryEndpoint} + */ + public static Builder builder() { + return new Builder(); + } + /** * Builder for {@link BinaryClusterDiscoveryEndpoint} */ public static class Builder { - private BinaryClusterDiscoveryEndpoint endpoint; + private final BinaryClusterDiscoveryEndpoint endpoint; /** * Basic constructor. @@ -89,6 +102,7 @@ public Builder() { * Specify the function name to invoke in the discovery endpoint for getting the list of nodes. The function * should not require any parameters and must be exposed as API function. Also the user which is connecting the * endpoint must have the appropriate permission for this function. + * * @param discoveryFunction the function name, should not be null * @return this builder instance */ @@ -100,6 +114,7 @@ public Builder withEntryFunction(String discoveryFunction) { /** * Specify address provider for the discovery endpoints + * * @param endpointProvider discovery endpoint address privider, should not be null * @return this builder instance * @see BinaryClusterDiscoveryEndpoint#setEndpointProvider(TarantoolClusterAddressProvider) @@ -113,6 +128,7 @@ public Builder withEndpointProvider(TarantoolClusterAddressProvider endpointProv /** * Specify the client configuration for connecting to the discovery endpoints. The same configuration will be * used for all endpoints + * * @param clientConfig tarantool client configuration * @return this builder instance */ @@ -124,6 +140,7 @@ public Builder withClientConfig(TarantoolClientConfig clientConfig) { /** * Build the discovery endpoint configuration + * * @return {@link BinaryClusterDiscoveryEndpoint} instance */ public BinaryClusterDiscoveryEndpoint build() { diff --git a/src/main/java/io/tarantool/driver/cluster/BinaryDiscoveryClusterAddressProvider.java b/src/main/java/io/tarantool/driver/cluster/BinaryDiscoveryClusterAddressProvider.java index af7ef92ee..b515e87bf 100644 --- a/src/main/java/io/tarantool/driver/cluster/BinaryDiscoveryClusterAddressProvider.java +++ b/src/main/java/io/tarantool/driver/cluster/BinaryDiscoveryClusterAddressProvider.java @@ -4,6 +4,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.tarantool.driver.api.TarantoolServerAddress; import io.tarantool.driver.core.ClusterTarantoolTupleClient; +import io.tarantool.driver.core.connection.EventManager; import io.tarantool.driver.exceptions.TarantoolClientException; import java.util.Collection; @@ -90,6 +91,11 @@ public BinaryDiscoveryClusterAddressProvider(TarantoolClusterDiscoveryConfig dis startDiscoveryTask(); } + @Override + public EventManager getEventManager() { + return super.getEventManager(); + } + protected Collection discoverAddresses() { try { List functionResult = client.call(endpoint.getDiscoveryFunction(), Collections.emptyList()).get(); diff --git a/src/main/java/io/tarantool/driver/cluster/DiscoveryServiceEventManager.java b/src/main/java/io/tarantool/driver/cluster/DiscoveryServiceEventManager.java new file mode 100644 index 000000000..bca795018 --- /dev/null +++ b/src/main/java/io/tarantool/driver/cluster/DiscoveryServiceEventManager.java @@ -0,0 +1,52 @@ +package io.tarantool.driver.cluster; + +import io.tarantool.driver.core.connection.EventListener; +import io.tarantool.driver.core.connection.EventManager; +import io.tarantool.driver.core.connection.EventType; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * Implementation of event manager which manage events for discovery service + * + * @author Oleg Kuznetsov + */ +public class DiscoveryServiceEventManager implements EventManager { + + private final Map> listeners; + + public DiscoveryServiceEventManager() { + this.listeners = new HashMap<>(); + } + + @Override + public void subscribe(EventType type, EventListener listener) { + Set eventListeners = getEventListeners(type); + eventListeners.add(listener); + } + + @Override + public void unsubscribe(EventType type, EventListener listener) { + final Set eventListeners = listeners.get(type); + if (eventListeners != null) { + eventListeners.remove(listener); + } + } + + @Override + public void unsubscribe(EventType type) { + listeners.remove(type); + } + + @Override + public void notify(EventType type) { + getEventListeners(type).forEach(EventListener::update); + } + + private Set getEventListeners(EventType type) { + return listeners.computeIfAbsent(type, k -> new HashSet<>()); + } +} diff --git a/src/main/java/io/tarantool/driver/cluster/HTTPClusterDiscoveryEndpoint.java b/src/main/java/io/tarantool/driver/cluster/HTTPClusterDiscoveryEndpoint.java index 3ebf510ac..0a01d3911 100644 --- a/src/main/java/io/tarantool/driver/cluster/HTTPClusterDiscoveryEndpoint.java +++ b/src/main/java/io/tarantool/driver/cluster/HTTPClusterDiscoveryEndpoint.java @@ -21,6 +21,7 @@ public HTTPClusterDiscoveryEndpoint() { /** * Create an instance, specifying URI for connection + * * @param uri discovery endpoint URI */ public HTTPClusterDiscoveryEndpoint(String uri) { @@ -29,6 +30,7 @@ public HTTPClusterDiscoveryEndpoint(String uri) { /** * Get discovery endpoint URI + * * @return discovery endpoint URI */ public String getUri() { @@ -37,6 +39,7 @@ public String getUri() { /** * Set discovery endpoint URI + * * @param uri discovery endpoint URI */ public void setUri(String uri) { @@ -45,6 +48,7 @@ public void setUri(String uri) { /** * Get cluster discovery endpoint connection timeout + * * @return connection timeout, in milliseconds */ public int getConnectTimeout() { @@ -53,6 +57,7 @@ public int getConnectTimeout() { /** * Set cluster discovery endpoint connection timeout + * * @param connectTimeout connection timeout, in milliseconds */ public void setConnectTimeout(int connectTimeout) { @@ -61,6 +66,7 @@ public void setConnectTimeout(int connectTimeout) { /** * Get response timeout for cluster discovery request + * * @return request timeout, in milliseconds */ public int getReadTimeout() { @@ -69,17 +75,25 @@ public int getReadTimeout() { /** * Set response timeout for cluster discovery request + * * @param readTimeout request timeout, in milliseconds */ public void setReadTimeout(int readTimeout) { this.readTimeout = readTimeout; } + /** + * Builder for {@link HTTPClusterDiscoveryEndpoint} + */ + public static Builder builder() { + return new Builder(); + } + /** * Builder for {@link HTTPClusterDiscoveryEndpoint} */ public static class Builder { - private HTTPClusterDiscoveryEndpoint endpoint; + private final HTTPClusterDiscoveryEndpoint endpoint; /** * Basic constructor. @@ -90,6 +104,7 @@ public Builder() { /** * Specify the cluster discovery endpoint URI + * * @param uri discovery endpoint URI, should not be null or empty * @return this builder instance */ @@ -101,6 +116,7 @@ public Builder withURI(String uri) { /** * Specify the connection timeout for discovery endpoint + * * @param connectTimeout connection timeout, in milliseconds * @return this builder instance * @see HTTPClusterDiscoveryEndpoint#setConnectTimeout(int) @@ -115,6 +131,7 @@ public Builder withConnectTimeout(int connectTimeout) { /** * Specify the read timeout for discovery endpoint connection + * * @param readTimeout timeout of receiving response in the connection, in milliseconds * @return this builder instance * @see HTTPClusterDiscoveryEndpoint#setReadTimeout(int) @@ -129,6 +146,7 @@ public Builder withReadTimeout(int readTimeout) { /** * Build the discovery endpoint configuration + * * @return a {@link HTTPClusterDiscoveryEndpoint} instance */ public HTTPClusterDiscoveryEndpoint build() { diff --git a/src/main/java/io/tarantool/driver/cluster/HTTPDiscoveryClusterAddressProvider.java b/src/main/java/io/tarantool/driver/cluster/HTTPDiscoveryClusterAddressProvider.java index fb7368374..8a63021e1 100644 --- a/src/main/java/io/tarantool/driver/cluster/HTTPDiscoveryClusterAddressProvider.java +++ b/src/main/java/io/tarantool/driver/cluster/HTTPDiscoveryClusterAddressProvider.java @@ -31,6 +31,7 @@ import io.netty.handler.ssl.util.InsecureTrustManagerFactory; import io.netty.util.CharsetUtil; import io.tarantool.driver.api.TarantoolServerAddress; +import io.tarantool.driver.core.connection.EventManager; import io.tarantool.driver.exceptions.TarantoolClientException; import javax.net.ssl.SSLException; @@ -48,7 +49,7 @@ /** * Tarantool server address provider with service discovery via HTTP. * Gets list of nodes from API endpoint in json format. - * + *

* Expected response format example: *

  * 
@@ -68,7 +69,7 @@
  * }
  * 
  * 
- * + *

* Tarantool cartridge application lua http endpoint example: *

  * 
@@ -154,6 +155,11 @@ public HTTPDiscoveryClusterAddressProvider(TarantoolClusterDiscoveryConfig confi
         startDiscoveryTask();
     }
 
+    @Override
+    public EventManager getEventManager() {
+        return super.getEventManager();
+    }
+
     /*
      * Quick and dirty solution. TODO rewrite with using the standard URI class
      */
diff --git a/src/main/java/io/tarantool/driver/core/AbstractTarantoolClient.java b/src/main/java/io/tarantool/driver/core/AbstractTarantoolClient.java
index 79dfbe8f7..500f33a1f 100644
--- a/src/main/java/io/tarantool/driver/core/AbstractTarantoolClient.java
+++ b/src/main/java/io/tarantool/driver/core/AbstractTarantoolClient.java
@@ -149,8 +149,8 @@ private TarantoolConnectionManager connectionManager() {
     }
 
     @Override
-    public boolean establishLackingConnections() {
-        return connectionManager().establishLackingConnections();
+    public boolean refresh() {
+        return connectionManager().refresh();
     }
 
     @Override
diff --git a/src/main/java/io/tarantool/driver/core/ProxyTarantoolClient.java b/src/main/java/io/tarantool/driver/core/ProxyTarantoolClient.java
index 67d53efe0..eed0d9c64 100644
--- a/src/main/java/io/tarantool/driver/core/ProxyTarantoolClient.java
+++ b/src/main/java/io/tarantool/driver/core/ProxyTarantoolClient.java
@@ -417,8 +417,8 @@ public CompletableFuture> eval(String expression,
     }
 
     @Override
-    public boolean establishLackingConnections() {
-        return this.client.establishLackingConnections();
+    public boolean refresh() {
+        return this.client.refresh();
     }
 
     @Override
diff --git a/src/main/java/io/tarantool/driver/core/RetryingTarantoolClient.java b/src/main/java/io/tarantool/driver/core/RetryingTarantoolClient.java
index 5c50c51d0..4c3a15229 100644
--- a/src/main/java/io/tarantool/driver/core/RetryingTarantoolClient.java
+++ b/src/main/java/io/tarantool/driver/core/RetryingTarantoolClient.java
@@ -379,8 +379,8 @@ public CompletableFuture> eval(String expression, List arguments,
     }
 
     @Override
-    public boolean establishLackingConnections() {
-        return this.client.establishLackingConnections();
+    public boolean refresh() {
+        return this.client.refresh();
     }
 
     @Override
diff --git a/src/main/java/io/tarantool/driver/core/connection/AbstractTarantoolConnectionManager.java b/src/main/java/io/tarantool/driver/core/connection/AbstractTarantoolConnectionManager.java
index b6bb7f479..262c4ec03 100644
--- a/src/main/java/io/tarantool/driver/core/connection/AbstractTarantoolConnectionManager.java
+++ b/src/main/java/io/tarantool/driver/core/connection/AbstractTarantoolConnectionManager.java
@@ -46,22 +46,6 @@ public abstract class AbstractTarantoolConnectionManager implements TarantoolCon
     private final Phaser initPhaser = new Phaser(0);
     private final Logger logger = LoggerFactory.getLogger(getClass().getName());
 
-    /**
-     * Constructor
-     *
-     * @param config                   Tarantool client config
-     * @param connectionFactory        connection factory
-     * @param selectionStrategyFactory connection selection strategy factory
-     * @param connectionListeners      connection listeners
-     * @deprecated
-     */
-    protected AbstractTarantoolConnectionManager(TarantoolClientConfig config,
-                                                 TarantoolConnectionFactory connectionFactory,
-                                                 ConnectionSelectionStrategyFactory selectionStrategyFactory,
-                                                 TarantoolConnectionListeners connectionListeners) {
-        this(config, connectionFactory, connectionListeners);
-    }
-
     /**
      * Basic constructor
      *
@@ -105,7 +89,7 @@ public CompletableFuture getConnection() {
     }
 
     @Override
-    public boolean establishLackingConnections() {
+    public boolean refresh() {
         return connectionMode.compareAndSet(ConnectionMode.OFF, ConnectionMode.PARTIAL);
     }
 
diff --git a/src/main/java/io/tarantool/driver/core/connection/ConnectionManagerEvent.java b/src/main/java/io/tarantool/driver/core/connection/ConnectionManagerEvent.java
new file mode 100644
index 000000000..86e29c08b
--- /dev/null
+++ b/src/main/java/io/tarantool/driver/core/connection/ConnectionManagerEvent.java
@@ -0,0 +1,12 @@
+package io.tarantool.driver.core.connection;
+
+
+/**
+ * Event type for connection manager
+ *
+ * @author Oleg Kuznetsov
+ */
+public enum ConnectionManagerEvent implements EventType {
+
+    SERVER_ADDRESS_CHANGED
+}
diff --git a/src/main/java/io/tarantool/driver/core/connection/EventListener.java b/src/main/java/io/tarantool/driver/core/connection/EventListener.java
new file mode 100644
index 000000000..c3ba8e37a
--- /dev/null
+++ b/src/main/java/io/tarantool/driver/core/connection/EventListener.java
@@ -0,0 +1,11 @@
+package io.tarantool.driver.core.connection;
+
+/**
+ * Basic event listener interface
+ *
+ * @author Oleg Kuznetsov
+ */
+public interface EventListener {
+
+    void update();
+}
diff --git a/src/main/java/io/tarantool/driver/core/connection/EventManager.java b/src/main/java/io/tarantool/driver/core/connection/EventManager.java
new file mode 100644
index 000000000..b896317d1
--- /dev/null
+++ b/src/main/java/io/tarantool/driver/core/connection/EventManager.java
@@ -0,0 +1,39 @@
+package io.tarantool.driver.core.connection;
+
+/**
+ * Basic event manager interface
+ *
+ * @author Oleg Kuznetsov
+ */
+public interface EventManager {
+
+    /**
+     * Add a listener by type
+     *
+     * @param type     type of event
+     * @param listener listener to be added to all listeners by type
+     */
+    void subscribe(EventType type, EventListener listener);
+
+    /**
+     * Remove listener by type
+     *
+     * @param type     type of event
+     * @param listener listener to be added to all listeners by type
+     */
+    void unsubscribe(EventType type, EventListener listener);
+
+    /**
+     * Remove all listeners by type
+     *
+     * @param type type of event
+     */
+    void unsubscribe(EventType type);
+
+    /**
+     * Notify all subscribers about event
+     *
+     * @param type type of event
+     */
+    void notify(EventType type);
+}
diff --git a/src/main/java/io/tarantool/driver/core/connection/EventType.java b/src/main/java/io/tarantool/driver/core/connection/EventType.java
new file mode 100644
index 000000000..bb2fdca42
--- /dev/null
+++ b/src/main/java/io/tarantool/driver/core/connection/EventType.java
@@ -0,0 +1,9 @@
+package io.tarantool.driver.core.connection;
+
+/**
+ * Event type interface, using in {@link EventManager}
+ *
+ * @author Oleg Kuznetsov
+ */
+public interface EventType {
+}
diff --git a/src/main/java/io/tarantool/driver/core/connection/TarantoolClusterConnectionManager.java b/src/main/java/io/tarantool/driver/core/connection/TarantoolClusterConnectionManager.java
index d588b1b27..3173de434 100644
--- a/src/main/java/io/tarantool/driver/core/connection/TarantoolClusterConnectionManager.java
+++ b/src/main/java/io/tarantool/driver/core/connection/TarantoolClusterConnectionManager.java
@@ -5,6 +5,7 @@
 import io.tarantool.driver.api.TarantoolServerAddress;
 import io.tarantool.driver.api.connection.TarantoolConnectionListeners;
 
+import java.util.Arrays;
 import java.util.Collection;
 
 /**
@@ -13,6 +14,8 @@
  * @author Alexey Kuzin
  */
 public class TarantoolClusterConnectionManager extends AbstractTarantoolConnectionManager {
+
+    private final EventManager eventManager;
     private final TarantoolClusterAddressProvider addressProvider;
 
     /**
@@ -29,10 +32,19 @@ public TarantoolClusterConnectionManager(TarantoolClientConfig config,
                                              TarantoolClusterAddressProvider addressProvider) {
         super(config, connectionFactory, listeners);
         this.addressProvider = addressProvider;
+        this.eventManager = addressProvider.getEventManager();
+        this.eventManager.subscribe(ConnectionManagerEvent.SERVER_ADDRESS_CHANGED, super::refresh);
     }
 
     @Override
     protected Collection getAddresses() {
         return addressProvider.getAddresses();
     }
+
+    @Override
+    public void close() {
+        super.close();
+        Arrays.asList(ConnectionManagerEvent.values())
+                .forEach(eventManager::unsubscribe);
+    }
 }
diff --git a/src/main/java/io/tarantool/driver/core/connection/TarantoolConnectionManager.java b/src/main/java/io/tarantool/driver/core/connection/TarantoolConnectionManager.java
index e64968242..941bd216a 100644
--- a/src/main/java/io/tarantool/driver/core/connection/TarantoolConnectionManager.java
+++ b/src/main/java/io/tarantool/driver/core/connection/TarantoolConnectionManager.java
@@ -24,5 +24,5 @@ public interface TarantoolConnectionManager extends AutoCloseable {
      *
      * @return returns true if the establishing process has been started, else false
      */
-    boolean establishLackingConnections();
+    boolean refresh();
 }
diff --git a/src/test/java/io/tarantool/driver/cluster/TestWrappedClusterAddressProvider.java b/src/test/java/io/tarantool/driver/cluster/TestWrappedClusterAddressProvider.java
index bd5454322..14126e0a9 100644
--- a/src/test/java/io/tarantool/driver/cluster/TestWrappedClusterAddressProvider.java
+++ b/src/test/java/io/tarantool/driver/cluster/TestWrappedClusterAddressProvider.java
@@ -2,9 +2,11 @@
 
 import io.tarantool.driver.api.TarantoolClusterAddressProvider;
 import io.tarantool.driver.api.TarantoolServerAddress;
+import io.tarantool.driver.core.connection.EventManager;
 import org.testcontainers.containers.TarantoolCartridgeContainer;
 
 import java.util.Collection;
+import java.util.Collections;
 import java.util.stream.Collectors;
 
 public class TestWrappedClusterAddressProvider implements TarantoolClusterAddressProvider {
@@ -22,11 +24,20 @@ public TestWrappedClusterAddressProvider(TarantoolClusterAddressProvider provide
     public Collection getAddresses() {
         Collection addresses = provider.getAddresses();
 
+        if (addresses == null) {
+            return Collections.emptyList();
+        }
+
         return addresses.stream().map(a ->
-            new TarantoolServerAddress(
-                    a.getHost(),
-                    container.getMappedPort(a.getPort())
-            )
+                new TarantoolServerAddress(
+                        a.getHost(),
+                        container.getMappedPort(a.getPort())
+                )
         ).collect(Collectors.toList());
     }
+
+    @Override
+    public EventManager getEventManager() {
+        return this.provider.getEventManager();
+    }
 }
diff --git a/src/test/java/io/tarantool/driver/integration/ClusterConnectionIT.java b/src/test/java/io/tarantool/driver/integration/ClusterConnectionIT.java
index 5b766c18d..66adb1cab 100644
--- a/src/test/java/io/tarantool/driver/integration/ClusterConnectionIT.java
+++ b/src/test/java/io/tarantool/driver/integration/ClusterConnectionIT.java
@@ -1,14 +1,14 @@
 package io.tarantool.driver.integration;
 
-import io.tarantool.driver.core.ClusterTarantoolTupleClient;
-import io.tarantool.driver.core.ProxyTarantoolTupleClient;
-import io.tarantool.driver.core.RetryingTarantoolTupleClient;
 import io.tarantool.driver.api.TarantoolClientConfig;
 import io.tarantool.driver.api.TarantoolClusterAddressProvider;
-import io.tarantool.driver.api.retry.TarantoolRequestRetryPolicies;
 import io.tarantool.driver.api.TarantoolServerAddress;
-import io.tarantool.driver.auth.SimpleTarantoolCredentials;
 import io.tarantool.driver.api.connection.TarantoolConnection;
+import io.tarantool.driver.api.retry.TarantoolRequestRetryPolicies;
+import io.tarantool.driver.auth.SimpleTarantoolCredentials;
+import io.tarantool.driver.core.ClusterTarantoolTupleClient;
+import io.tarantool.driver.core.ProxyTarantoolTupleClient;
+import io.tarantool.driver.core.RetryingTarantoolTupleClient;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.rnorth.ducttape.unreliables.Unreliables;
@@ -28,7 +28,9 @@
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 
-import static org.junit.jupiter.api.Assertions.*;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
  * @author Alexey Kuzin
@@ -137,7 +139,7 @@ void testMultipleRoutersReconnect_retryableRequestShouldNotFail() throws Excepti
         // after the first retry we will turn off this router
         // and it is expected that the request will go to another router
         assertTrue(client.callForSingleResult("long_running_function",
-                        Collections.singletonList(Arrays.asList(0.5, nextRouterName.get())), Boolean.class).get());
+                Collections.singletonList(Arrays.asList(0.5, nextRouterName.get())), Boolean.class).get());
 
         // start the turned off router to get requests
         result.set(container.execInContainer(
@@ -232,20 +234,20 @@ void testMultipleConnectionsReconnect_retryableRequestShouldNotFail() throws Exc
 
         // initiate another long-running request
         request1 = client.callForSingleResult("long_running_function", Collections.singletonList(0.5), Boolean.class)
-        .thenApply(r -> {
-            try {
-                // partial disconnect -> one connection is restored
-                assertEquals(3, connections.size());
-
-                // close remaining connections
-                for (int i = 1; i < 3; i++) {
-                    connections.get(i).close();
-                }
-            } catch (Exception e) {
-                throw new RuntimeException(e);
-            }
-            return r;
-        });
+                .thenApply(r -> {
+                    try {
+                        // partial disconnect -> one connection is restored
+                        assertEquals(3, connections.size());
+
+                        // close remaining connections
+                        for (int i = 1; i < 3; i++) {
+                            connections.get(i).close();
+                        }
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
+                    return r;
+                });
 
         // the request should return normally (reconnection effect)
         assertTrue(request1.get());
diff --git a/src/test/java/io/tarantool/driver/integration/ClusterDiscoveryIT.java b/src/test/java/io/tarantool/driver/integration/ClusterDiscoveryIT.java
index 776bd91aa..754930747 100644
--- a/src/test/java/io/tarantool/driver/integration/ClusterDiscoveryIT.java
+++ b/src/test/java/io/tarantool/driver/integration/ClusterDiscoveryIT.java
@@ -1,17 +1,17 @@
 package io.tarantool.driver.integration;
 
-import io.tarantool.driver.core.ClusterTarantoolTupleClient;
 import io.tarantool.driver.api.TarantoolClientConfig;
 import io.tarantool.driver.api.TarantoolClusterAddressProvider;
 import io.tarantool.driver.api.TarantoolServerAddress;
+import io.tarantool.driver.auth.SimpleTarantoolCredentials;
+import io.tarantool.driver.auth.TarantoolCredentials;
 import io.tarantool.driver.cluster.BinaryClusterDiscoveryEndpoint;
 import io.tarantool.driver.cluster.BinaryDiscoveryClusterAddressProvider;
-import io.tarantool.driver.cluster.TarantoolClusterDiscoveryConfig;
 import io.tarantool.driver.cluster.HTTPClusterDiscoveryEndpoint;
 import io.tarantool.driver.cluster.HTTPDiscoveryClusterAddressProvider;
-import io.tarantool.driver.auth.SimpleTarantoolCredentials;
-import io.tarantool.driver.auth.TarantoolCredentials;
+import io.tarantool.driver.cluster.TarantoolClusterDiscoveryConfig;
 import io.tarantool.driver.cluster.TestWrappedClusterAddressProvider;
+import io.tarantool.driver.core.ClusterTarantoolTupleClient;
 import io.tarantool.driver.exceptions.TarantoolClientException;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
@@ -21,7 +21,9 @@
 import java.util.HashSet;
 import java.util.Set;
 
-import static org.junit.jupiter.api.Assertions.*;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
  * @author Sergey Volgin
diff --git a/src/test/java/io/tarantool/driver/integration/ProxyTarantoolClientMixedInstancesIT.java b/src/test/java/io/tarantool/driver/integration/ProxyTarantoolClientMixedInstancesIT.java
index d87b791c4..06e0a2877 100644
--- a/src/test/java/io/tarantool/driver/integration/ProxyTarantoolClientMixedInstancesIT.java
+++ b/src/test/java/io/tarantool/driver/integration/ProxyTarantoolClientMixedInstancesIT.java
@@ -42,7 +42,6 @@
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
-import java.util.function.Function;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -116,7 +115,7 @@ public static void initClient() {
         ClusterTarantoolTupleClient clusterClient = new ClusterTarantoolTupleClient(
                 config, getClusterAddressProvider());
 
-        client =  new RetryingTarantoolTupleClient(new ProxyTarantoolTupleClient(clusterClient),
+        client = new RetryingTarantoolTupleClient(new ProxyTarantoolTupleClient(clusterClient),
                 TarantoolRequestRetryPolicies.AttemptsBoundRetryPolicyFactory
                         .builder(10, thr -> thr instanceof TarantoolNoSuchProcedureException)
                         .withDelay(100)
diff --git a/src/test/java/io/tarantool/driver/integration/ReconnectIT.java b/src/test/java/io/tarantool/driver/integration/ReconnectIT.java
index 105337d0f..92696dcd0 100644
--- a/src/test/java/io/tarantool/driver/integration/ReconnectIT.java
+++ b/src/test/java/io/tarantool/driver/integration/ReconnectIT.java
@@ -1,11 +1,21 @@
 package io.tarantool.driver.integration;
 
 import io.tarantool.driver.api.TarantoolClient;
+import io.tarantool.driver.api.TarantoolClientConfig;
 import io.tarantool.driver.api.TarantoolClientFactory;
 import io.tarantool.driver.api.TarantoolResult;
 import io.tarantool.driver.api.TarantoolServerAddress;
+import io.tarantool.driver.api.tuple.DefaultTarantoolTupleFactory;
 import io.tarantool.driver.api.tuple.TarantoolTuple;
+import io.tarantool.driver.api.tuple.TarantoolTupleFactory;
+import io.tarantool.driver.auth.SimpleTarantoolCredentials;
+import io.tarantool.driver.cluster.BinaryClusterDiscoveryEndpoint;
+import io.tarantool.driver.cluster.BinaryDiscoveryClusterAddressProvider;
+import io.tarantool.driver.cluster.TarantoolClusterDiscoveryConfig;
+import io.tarantool.driver.cluster.TestWrappedClusterAddressProvider;
 import io.tarantool.driver.exceptions.TarantoolNoSuchProcedureException;
+import io.tarantool.driver.mappers.DefaultMessagePackMapperFactory;
+import org.jetbrains.annotations.NotNull;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
@@ -13,8 +23,10 @@
 import org.testcontainers.containers.output.WaitingConsumer;
 import org.testcontainers.junit.jupiter.Testcontainers;
 
+import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.TimeoutException;
 
@@ -32,6 +44,10 @@ public class ReconnectIT extends SharedCartridgeContainer {
     private static String USER_NAME;
     private static String PASSWORD;
 
+    private static final DefaultMessagePackMapperFactory mapperFactory = DefaultMessagePackMapperFactory.getInstance();
+    private static final TarantoolTupleFactory tupleFactory =
+            new DefaultTarantoolTupleFactory(mapperFactory.defaultComplexTypesMapper());
+
     @BeforeAll
     public static void setUp() throws TimeoutException {
         startCluster();
@@ -109,7 +125,7 @@ public void test_should_reconnect_ifReconnectIsInvoked() throws Exception {
         // start routers
         container.execInContainer("cartridge", "start", "--run-dir=/tmp/run", "--data-dir=/tmp/data", "-d");
 
-        client.establishLackingConnections();
+        client.refresh();
         Thread.sleep(3000);
 
         // getting all routers uuids after restarting
@@ -136,6 +152,80 @@ private TarantoolClient> getRetr
                 .build();
     }
 
+    @Test
+    void should_removeUnavailableHostsFromAddressProvider_ifDiscoveryProcedureReturnStatusNotHealthyAndNotAvailable()
+            throws InterruptedException {
+        initRouterStatuses();
+        final TarantoolClient> client = initClientWithDiscovery();
+
+        final Set instancesUuids = getInstancesUuids(client);
+        assertEquals(3, instancesUuids.size());
+
+        replaceInstancesInfo(client, 1, "unavailable", 3301);
+        replaceInstancesInfo(client, 2, "unavailable", 3311);
+        Thread.sleep(1000);
+
+        final Set afterRoutersDisablingInstancesUuids = getInstancesUuids(client);
+        assertEquals(1, afterRoutersDisablingInstancesUuids.size());
+
+        replaceInstancesInfo(client, 1, "available", 3301);
+        replaceInstancesInfo(client, 2, "available", 3311);
+        Thread.sleep(1000);
+
+        final Set afterRoutersEnablingInstancesUuids = getInstancesUuids(client);
+        assertEquals(3, afterRoutersEnablingInstancesUuids.size());
+    }
+
+    private TarantoolClient> initClientWithDiscovery() {
+        final BinaryClusterDiscoveryEndpoint discoveryEndpoint = BinaryClusterDiscoveryEndpoint.builder()
+                .withEntryFunction("get_routers_status")
+                .withEndpointProvider(() -> Arrays.asList(getTarantoolServerAddresses()))
+                .withClientConfig(TarantoolClientConfig.builder()
+                        .withCredentials(new SimpleTarantoolCredentials(USER_NAME, PASSWORD))
+                        .build()
+                )
+                .build();
+
+        final BinaryDiscoveryClusterAddressProvider binaryDiscoveryClusterAddressProvider =
+                new BinaryDiscoveryClusterAddressProvider(TarantoolClusterDiscoveryConfig.builder()
+                        .withEndpoint(discoveryEndpoint)
+                        .withDelay(50)
+                        .build());
+
+        return TarantoolClientFactory.createClient()
+                .withAddressProvider(
+                        new TestWrappedClusterAddressProvider(binaryDiscoveryClusterAddressProvider, container))
+                .withCredentials(USER_NAME, PASSWORD)
+                .withConnections(10)
+                .withProxyMethodMapping()
+                .build();
+    }
+
+    private void initRouterStatuses() {
+        final TarantoolClient> initClient =
+                TarantoolClientFactory.createClient()
+                        .withAddresses(getTarantoolServerAddresses())
+                        .withCredentials(USER_NAME, PASSWORD)
+                        .withConnections(1)
+                        .build();
+        initClient.call("init_router_status").join();
+    }
+
+    private void replaceInstancesInfo(TarantoolClient> client,
+                                      int id, String status, Integer port) {
+        final TarantoolTuple tuple = tupleFactory
+                .create(id, 1, UUID.randomUUID().toString(), status, String.format("%s:%s", "localhost", port));
+
+        client.space("instances_info").replace(tuple).join();
+    }
+
+    @NotNull
+    private TarantoolServerAddress[] getTarantoolServerAddresses() {
+        return new TarantoolServerAddress[]{new TarantoolServerAddress(container.getRouterHost(), container.getMappedPort(3301)),
+                new TarantoolServerAddress(container.getRouterHost(), container.getMappedPort(3311)),
+                new TarantoolServerAddress(container.getRouterHost(), container.getMappedPort(3312))};
+    }
+
     /**
      * Return all instances uuids from cluster, using round robin connection selection strategy
      *
@@ -145,16 +235,16 @@ private TarantoolClient> getRetr
     private Set getInstancesUuids(TarantoolClient> client) {
         String firstUuid = getInstanceUuid(client);
 
-        final Set routerUuids = new HashSet<>();
-        routerUuids.add(firstUuid);
+        final Set instancesUuids = new HashSet<>();
+        instancesUuids.add(firstUuid);
 
         String currentUuid = "";
         while (!firstUuid.equals(currentUuid)) {
             currentUuid = getInstanceUuid(client);
-            routerUuids.add(currentUuid);
+            instancesUuids.add(currentUuid);
         }
 
-        return routerUuids;
+        return instancesUuids;
     }
 
     private String getInstanceUuid(TarantoolClient> client) {
diff --git a/src/test/resources/cartridge/app/roles/api_router.lua b/src/test/resources/cartridge/app/roles/api_router.lua
index da2bbcde7..f6a8e17b0 100644
--- a/src/test/resources/cartridge/app/roles/api_router.lua
+++ b/src/test/resources/cartridge/app/roles/api_router.lua
@@ -2,6 +2,7 @@ local vshard = require('vshard')
 local cartridge_rpc = require('cartridge.rpc')
 local fiber = require('fiber')
 local crud = require('crud')
+local uuid = require('uuid')
 local log = require('log')
 
 local function get_schema()
@@ -14,6 +15,22 @@ local function truncate_space(space_name)
     crud.truncate(space_name)
 end
 
+local function get_routers_status()
+    local res = crud.select('instances_info')
+
+    local result = {}
+    for k, v in pairs(res.rows) do
+        result[v[3]] = { status = v[4], uri = v[5] }
+    end
+    return result
+end
+
+local function init_router_status()
+    crud.insert('instances_info', { 1, 1, uuid.str(), 'available', 'localhost:3301' })
+    crud.insert('instances_info', { 2, 1, uuid.str(), 'available', 'localhost:3311' })
+    crud.insert('instances_info', { 3, 1, uuid.str(), 'available', 'localhost:3312' })
+end
+
 local retries_holder = {
 }
 local function setup_retrying_function(retries)
@@ -158,6 +175,8 @@ local function init(opts)
     rawset(_G, 'box_error_non_network_error', box_error_non_network_error)
     rawset(_G, 'crud_error_timeout', crud_error_timeout)
     rawset(_G, 'custom_crud_select', custom_crud_select)
+    rawset(_G, 'get_routers_status', get_routers_status)
+    rawset(_G, 'init_router_status', init_router_status)
     rawset(_G, 'test_no_such_procedure', test_no_such_procedure)
 
     return true
diff --git a/src/test/resources/cartridge/app/roles/api_storage.lua b/src/test/resources/cartridge/app/roles/api_storage.lua
index 132cd6080..c2017f6f0 100644
--- a/src/test/resources/cartridge/app/roles/api_storage.lua
+++ b/src/test/resources/cartridge/app/roles/api_storage.lua
@@ -81,6 +81,23 @@ local function init_space()
     test_space_with_double_field:create_index('id', { parts = { 'id' }, if_not_exists = true, })
     test_space_with_double_field:create_index('bucket_id', { parts = { 'bucket_id' }, unique = false, if_not_exists = true, })
 
+    local instances_info = box.schema.space.create(
+            'instances_info',
+            {
+                format = {
+                    { 'id', 'unsigned' },
+                    { 'bucket_id', 'unsigned' },
+                    { 'uuid', 'string' },
+                    { 'status', 'string' },
+                    { 'uri', 'string' },
+                },
+                if_not_exists = true,
+            }
+    )
+
+    instances_info:create_index('id', { parts = { 'id' }, if_not_exists = true, })
+    instances_info:create_index('bucket_id', { parts = { 'bucket_id' }, unique = false, if_not_exists = true, })
+
     -- cursor test spaces
     local cursor_test_space = box.schema.space.create('cursor_test_space', { if_not_exists = true })
     cursor_test_space:format({