From 2cce844a204b43d59fe0f5eba43d4cd6723532af Mon Sep 17 00:00:00 2001 From: Vittorio Rigamonti Date: Mon, 12 Sep 2016 13:57:13 +0200 Subject: [PATCH] HRCPP-286 Added failover capabilities --- include/infinispan/hotrod/Configuration.h | 29 +++-- .../infinispan/hotrod/ConfigurationBuilder.h | 69 ++++++++--- include/infinispan/hotrod/Flag.h | 2 + .../infinispan/hotrod/RemoteCacheManager.h | 15 +++ .../infinispan/hotrod/ServerConfiguration.h | 10 +- include/infinispan/hotrod/portable.h | 5 + jni/pom.xml | 6 +- .../ClusterConfigurationBuilder.java | 15 +++ .../configuration/ConfigurationBuilder.java | 29 ++--- .../ServerConfigurationBuilder.java | 6 +- jni/src/main/swig/java.i | 3 + .../jni/hotrod/CrossSiteFailoverTest.java | 9 ++ .../infinispan/client/jni/hotrod/JniTest.java | 8 +- src/hotrod/api/RemoteCacheManager.cpp | 9 ++ src/hotrod/impl/RemoteCacheManagerImpl.cpp | 10 ++ src/hotrod/impl/RemoteCacheManagerImpl.h | 3 + .../impl/configuration/Configuration.cpp | 2 + .../impl/operations/RetryOnFailureOperation.h | 27 +++-- src/hotrod/impl/protocol/Codec20.cpp | 13 +- src/hotrod/impl/transport/TransportFactory.h | 6 +- .../transport/tcp/TcpTransportFactory.cpp | 114 +++++++++++++++++- .../impl/transport/tcp/TcpTransportFactory.h | 6 +- 22 files changed, 315 insertions(+), 81 deletions(-) create mode 100644 jni/src/main/java/org/infinispan/client/hotrod/configuration/ClusterConfigurationBuilder.java create mode 100644 jni/src/test/java/org/infinispan/client/jni/hotrod/CrossSiteFailoverTest.java diff --git a/include/infinispan/hotrod/Configuration.h b/include/infinispan/hotrod/Configuration.h index 56545579..4ef1860c 100644 --- a/include/infinispan/hotrod/Configuration.h +++ b/include/infinispan/hotrod/Configuration.h @@ -33,6 +33,8 @@ class Configuration HR_EXTERN static const char* PROTOCOL_VERSION_22; HR_EXTERN static const char* PROTOCOL_VERSION_23; HR_EXTERN static const char* PROTOCOL_VERSION_24; + HR_EXTERN static const char* DEFAULT_CLUSTER_NAME; + Configuration(const std::string &_protocolVersion, @@ -40,7 +42,7 @@ class Configuration int _connectionTimeout, bool _forceReturnValue, int _keySizeEstimate, - std::vector _serversConfiguration, + std::map > _serversConfiguration, int _socketTimeout, const SslConfiguration _sslConfiguration, bool _tcpNoDelay, @@ -50,9 +52,17 @@ class Configuration protocolVersion(_protocolVersion), protocolVersionPtr(), connectionPoolConfiguration(_connectionPoolConfiguration), connectionTimeout(_connectionTimeout), forceReturnValue(_forceReturnValue), - keySizeEstimate(_keySizeEstimate), servers(_serversConfiguration), + keySizeEstimate(_keySizeEstimate), socketTimeout(_socketTimeout), sslConfiguration(_sslConfiguration),tcpNoDelay(_tcpNoDelay), - valueSizeEstimate(_valueSizeEstimate), maxRetries(_maxRetries), balancingStrategyProducer(bsp) {} + valueSizeEstimate(_valueSizeEstimate), maxRetries(_maxRetries), balancingStrategyProducer(bsp) + { + std::map> tmpMap; + for(auto pair : _serversConfiguration) + { + tmpMap.insert(std::make_pair(portable::string(pair.first), portable::vector(pair.second))); + } + serversMap=tmpMap; + } /** * DEPRECATED. Use getProtocolVersionCString(). @@ -106,13 +116,18 @@ class Configuration HR_EXTERN const int& getKeySizeEstimate() const; /** - * Returns the vector of server configurations where each server configuration instance + * Returns the vector of the failover server configurations where each server configuration instance * describes a HotRod server address and port. * *\return vector of server configurations */ - std::vector getServersConfiguration() const { - return servers.std_vector(); + std::map > getServersMapConfiguration() const { + std::map > temp; + for (auto pair : serversMap.std_map()) + { + temp.insert(make_pair(pair.first.std_string(), pair.second.std_vector())); + } + return temp; } /** @@ -159,7 +174,7 @@ class Configuration int connectionTimeout; bool forceReturnValue; int keySizeEstimate; - portable::vector servers; + portable::map > serversMap; int socketTimeout; SslConfiguration sslConfiguration; bool tcpNoDelay; diff --git a/include/infinispan/hotrod/ConfigurationBuilder.h b/include/infinispan/hotrod/ConfigurationBuilder.h index b390d1dd..fe12fbcc 100644 --- a/include/infinispan/hotrod/ConfigurationBuilder.h +++ b/include/infinispan/hotrod/ConfigurationBuilder.h @@ -16,7 +16,18 @@ namespace infinispan { namespace hotrod { - +class ClusterConfigurationBuilder +{ +public: + ClusterConfigurationBuilder(std::vector& servers) : servers(servers) {} + ClusterConfigurationBuilder& addClusterNode(const std::string host, const int port) + { + servers.push_back(ServerConfigurationBuilder().host(host).port(port)); + return *this; + } +private: + std::vector& servers; +}; /** * ConfigurationBuilder used to generate immutable Configuration objects that are in turn * used to configure RemoteCacheManager instances. @@ -41,18 +52,30 @@ class ConfigurationBuilder __pragma(warning(suppress:4355)) sslConfigurationBuilder() {} + void validate() {} - /** - * Adds a server to this Configuration. ServerConfigurationBuilder is in turn used - * to actually configure a server. - * - *\return ServerConfigurationBuilder instance to be used for server configuration - */ - ServerConfigurationBuilder& addServer() { - m_servers.push_back(ServerConfigurationBuilder()); - return m_servers[m_servers.size() - 1]; + + ClusterConfigurationBuilder addCluster(const std::string& clusterName) { + return ClusterConfigurationBuilder(m_serversMap[clusterName]); } + /** + * Adds a server to this Configuration. ServerConfigurationBuilder is in turn used + * to actually configure a server. + * + *\return ServerConfigurationBuilder instance to be used for server configuration + */ + ServerConfigurationBuilder& addServer() { + if (m_serversMap.find(Configuration::DEFAULT_CLUSTER_NAME) == m_serversMap.end()) + { + m_serversMap[Configuration::DEFAULT_CLUSTER_NAME]; + } + auto& servers = m_serversMap[Configuration::DEFAULT_CLUSTER_NAME]; + servers.push_back(ServerConfigurationBuilder()); + return servers[servers.size() - 1]; + } + + /** * Adds multiple servers to this Configuration. ConfigurationBuilder is in turn used * to actually configure added servers as well as other configuration settings. @@ -204,13 +227,21 @@ class ConfigurationBuilder *\return Configuration instance to be used for RemoteCacheManager configuration */ Configuration create() { - std::vector servers; - if (m_servers.size() > 0) { - for (std::vector::iterator it = m_servers.begin(); it < m_servers.end(); it++) { - servers.push_back(it->create()); - } - } else { - servers.push_back(ServerConfigurationBuilder().create()); + std::map> serversMap; + for (auto p: m_serversMap) + { + std::vector scVec; + for (auto e : p.second) + { + scVec.push_back(e.create()); + } + serversMap.insert(std::make_pair(p.first, scVec)); + } + if (serversMap.size()==0) + { + std::vector scVec; + scVec.push_back(ServerConfigurationBuilder().create()); + serversMap.insert(std::make_pair(Configuration::DEFAULT_CLUSTER_NAME, scVec)); } return Configuration(m_protocolVersion, @@ -218,7 +249,7 @@ class ConfigurationBuilder m_connectionTimeout, m_forceReturnValue, m_keySizeEstimate, - servers, + serversMap, m_socketTimeout, sslConfigurationBuilder.create(), m_tcpNoDelay, @@ -252,7 +283,7 @@ class ConfigurationBuilder bool m_forceReturnValue; int m_keySizeEstimate; std::string m_protocolVersion; - std::vector m_servers; + std::map >m_serversMap; int m_socketTimeout; bool m_tcpNoDelay; int m_valueSizeEstimate; diff --git a/include/infinispan/hotrod/Flag.h b/include/infinispan/hotrod/Flag.h index 36245cea..9e5b9829 100644 --- a/include/infinispan/hotrod/Flag.h +++ b/include/infinispan/hotrod/Flag.h @@ -11,6 +11,8 @@ enum Flag DEFAULT_MAXIDLE = 0x04 }; +enum ClusterStatus { SWITCHED, NOT_SWITCHED, ALREADY_SWITCHED }; + }} // namespace #endif /* ISPN_HOTROD_FLAG_H */ diff --git a/include/infinispan/hotrod/RemoteCacheManager.h b/include/infinispan/hotrod/RemoteCacheManager.h index c45b62b3..6a6aa3a7 100644 --- a/include/infinispan/hotrod/RemoteCacheManager.h +++ b/include/infinispan/hotrod/RemoteCacheManager.h @@ -249,6 +249,21 @@ class HR_EXTERN RemoteCacheManager return *rcache; } + /** + * Switch the client on the main cluster + * + * \return ClusterStatus::SWITCHED or ClusterStatus::NOT_SWITCHED if no fully working + * cluster is available + */ + ClusterStatus clusterSwitch(); + + /** + * Switch the client on the main cluster + * + * \return ClusterStatus::SWITCHED or ClusterStatus::NOT_SWITCHED if the named cluster + * doesn't exists + */ + ClusterStatus clusterSwitch(std::string clusterName); private: void *impl; diff --git a/include/infinispan/hotrod/ServerConfiguration.h b/include/infinispan/hotrod/ServerConfiguration.h index ce6eb6a9..09795f22 100644 --- a/include/infinispan/hotrod/ServerConfiguration.h +++ b/include/infinispan/hotrod/ServerConfiguration.h @@ -29,10 +29,7 @@ class HR_EXTERN ServerConfiguration const std::string &getHost() const { - if (hostPtr.get() == NULL) { - const_cast(this)->hostPtr.set(new std::string(host.c_string()), &deleteString); - } - return *hostPtr.get(); + return host; } /** * Returns host of this ServerConfiguration @@ -41,7 +38,7 @@ class HR_EXTERN ServerConfiguration */ const char *getHostCString() const { - return host.c_string(); + return host.data(); } /** @@ -55,9 +52,8 @@ class HR_EXTERN ServerConfiguration } private: - portable::string host; + std::string host; __pragma(warning(suppress:4251)) - portable::local_ptr hostPtr; int port; static void deleteString(std::string *str) { delete str; } diff --git a/include/infinispan/hotrod/portable.h b/include/infinispan/hotrod/portable.h index ec26f693..97e0702c 100644 --- a/include/infinispan/hotrod/portable.h +++ b/include/infinispan/hotrod/portable.h @@ -94,6 +94,11 @@ class HR_EXTERN string { if (m_dynamic != 0) delete[] m_dynamic; } + inline bool operator<(const string &str) const { + return strncmp(this->c_string(), str.c_string(), + this->m_length < str.m_length ? this->m_length : str.m_length); + } + inline string &operator=(const std::string &str) { if (m_dynamic != 0) delete[] m_dynamic; diff --git a/jni/pom.xml b/jni/pom.xml index c5414284..1e42c291 100644 --- a/jni/pom.xml +++ b/jni/pom.xml @@ -17,11 +17,13 @@ Infinispan HotRod JNI Wrapper - 9.0.0.Alpha1 + 9.0.0.Alpha2 1.1.0.Final 4.17.13.Final 6.8 true + 1.8 + 1.8 @@ -38,7 +40,7 @@ - + diff --git a/jni/src/main/java/org/infinispan/client/hotrod/configuration/ClusterConfigurationBuilder.java b/jni/src/main/java/org/infinispan/client/hotrod/configuration/ClusterConfigurationBuilder.java new file mode 100644 index 00000000..c036fd1d --- /dev/null +++ b/jni/src/main/java/org/infinispan/client/hotrod/configuration/ClusterConfigurationBuilder.java @@ -0,0 +1,15 @@ +package org.infinispan.client.hotrod.configuration; + +public class ClusterConfigurationBuilder { + private org.infinispan.client.hotrod.jni.ClusterConfigurationBuilder ccb; + private ConfigurationBuilder builder; + public ClusterConfigurationBuilder(ConfigurationBuilder builder, org.infinispan.client.hotrod.jni.ClusterConfigurationBuilder ccb) { + this.ccb=ccb; + this.builder=builder; + } + public ClusterConfigurationBuilder addClusterNode(String host, int port) + { + ccb.addClusterNode(host, port); + return this; + } +} diff --git a/jni/src/main/java/org/infinispan/client/hotrod/configuration/ConfigurationBuilder.java b/jni/src/main/java/org/infinispan/client/hotrod/configuration/ConfigurationBuilder.java index ec8cc8e7..0b3a76aa 100644 --- a/jni/src/main/java/org/infinispan/client/hotrod/configuration/ConfigurationBuilder.java +++ b/jni/src/main/java/org/infinispan/client/hotrod/configuration/ConfigurationBuilder.java @@ -1,8 +1,8 @@ package org.infinispan.client.hotrod.configuration; import java.lang.ref.WeakReference; -import java.util.ArrayList; -import java.util.List; +import java.util.HashMap; +import java.util.Map; import java.util.Properties; import org.infinispan.client.hotrod.RemoteCacheManager; @@ -11,10 +11,10 @@ import org.infinispan.client.hotrod.impl.consistenthash.ConsistentHash; import org.infinispan.client.hotrod.impl.transport.TransportFactory; import org.infinispan.client.hotrod.impl.transport.tcp.RequestBalancingStrategy; +import org.infinispan.client.hotrod.jni.ServerConfigurationBuilderVector; import org.infinispan.commons.configuration.Builder; import org.infinispan.commons.marshall.Marshaller; import org.infinispan.commons.marshall.jboss.GenericJBossMarshaller; -import org.infinispan.commons.util.Util; /** * ConfigurationBuilder used to generate immutable {@link Configuration} objects to pass to the @@ -37,7 +37,7 @@ public class ConfigurationBuilder implements ConfigurationChildBuilder, Builder< private Marshaller marshaller; private boolean pingOnStartup = true; private String protocolVersion = ConfigurationProperties.DEFAULT_PROTOCOL_VERSION; - private List servers = new ArrayList(); + private Map serversMap = new HashMap(); private int socketTimeout = ConfigurationProperties.DEFAULT_SO_TIMEOUT; private final SslConfigurationBuilder ssl; private boolean tcpNoDelay = true; @@ -69,10 +69,13 @@ public org.infinispan.client.hotrod.jni.ConfigurationBuilder getJniConfiguration @Override public ServerConfigurationBuilder addServer() { - ServerConfigurationBuilder builder = new ServerConfigurationBuilder(this); - this.servers.add(builder); - return builder; + return new ServerConfigurationBuilder(this,jniConfigurationBuilder.addServer()); } + + public ClusterConfigurationBuilder addCluster(String clusterName) { + return new ClusterConfigurationBuilder(this, jniConfigurationBuilder.addCluster(clusterName)); + } + @Override public ConfigurationBuilder addServers(String servers) { @@ -262,7 +265,7 @@ public ConfigurationBuilder withProperties(Properties properties) { // } this.pingOnStartup(typed.getBooleanProperty(ConfigurationProperties.PING_ON_STARTUP, pingOnStartup)); this.protocolVersion(typed.getProperty(ConfigurationProperties.PROTOCOL_VERSION, protocolVersion)); - this.servers.clear(); + this.serversMap.clear(); this.addServers(typed.getProperty(ConfigurationProperties.SERVER_LIST, "")); this.socketTimeout(typed.getIntProperty(ConfigurationProperties.SO_TIMEOUT, socketTimeout)); this.tcpNoDelay(typed.getBooleanProperty(ConfigurationProperties.TCP_NO_DELAY, tcpNoDelay)); @@ -282,14 +285,6 @@ public void validate() { @Override public Configuration create() { - List servers = new ArrayList(); - if (this.servers.size() > 0) - for (ServerConfigurationBuilder server : this.servers) { - servers.add(server.create()); - } - else { - servers.add(new ServerConfiguration("127.0.0.1", ConfigurationProperties.DEFAULT_HOTROD_PORT)); - } return new Configuration(this.jniConfigurationBuilder.create()); } @@ -320,7 +315,7 @@ public ConfigurationBuilder read(Configuration template) { this.marshallerClass = template.marshallerClass(); this.pingOnStartup(true); this.protocolVersion(template.protocolVersion()); - this.servers.clear(); + this.serversMap.clear(); for (ServerConfiguration server : template.servers()) { this.addServer().host(server.host()).port(server.port()); } diff --git a/jni/src/main/java/org/infinispan/client/hotrod/configuration/ServerConfigurationBuilder.java b/jni/src/main/java/org/infinispan/client/hotrod/configuration/ServerConfigurationBuilder.java index 3aed8e30..cfec19be 100644 --- a/jni/src/main/java/org/infinispan/client/hotrod/configuration/ServerConfigurationBuilder.java +++ b/jni/src/main/java/org/infinispan/client/hotrod/configuration/ServerConfigurationBuilder.java @@ -1,7 +1,5 @@ package org.infinispan.client.hotrod.configuration; -import org.infinispan.commons.configuration.Builder; - /** * ServerConfigurationBuilder. * @@ -12,9 +10,9 @@ public class ServerConfigurationBuilder extends AbstractConfigurationChildBuilde private org.infinispan.client.hotrod.jni.ServerConfigurationBuilder jniServerConfigurationBuilder; - ServerConfigurationBuilder(ConfigurationBuilder builder) { + ServerConfigurationBuilder(ConfigurationBuilder builder, org.infinispan.client.hotrod.jni.ServerConfigurationBuilder scb) { super(builder); - jniServerConfigurationBuilder = builder.getJniConfigurationBuilder().addServer(); + jniServerConfigurationBuilder = scb; } public ServerConfigurationBuilder host(String host) { diff --git a/jni/src/main/swig/java.i b/jni/src/main/swig/java.i index 5fb8b5cf..d32dabee 100644 --- a/jni/src/main/swig/java.i +++ b/jni/src/main/swig/java.i @@ -178,6 +178,9 @@ class RelayBytes { %template(StringVectorReturn) std::vector; %template(IntegerVectorReturn) std::vector; %template(InetSocketAddressvectorReturn) std::vector; +%template(ServerConfigurationBuilderVector) std::vector; +%template(ServerConfigurationVector) std::vector; +%template(ServerConfigurationMap) std::map >; %inline %{ bool isNull(std::shared_ptr ptr) { diff --git a/jni/src/test/java/org/infinispan/client/jni/hotrod/CrossSiteFailoverTest.java b/jni/src/test/java/org/infinispan/client/jni/hotrod/CrossSiteFailoverTest.java new file mode 100644 index 00000000..0b74aef1 --- /dev/null +++ b/jni/src/test/java/org/infinispan/client/jni/hotrod/CrossSiteFailoverTest.java @@ -0,0 +1,9 @@ +package org.infinispan.client.jni.hotrod; + +import org.infinispan.client.hotrod.xsite.SiteDownFailoverTest; +import org.testng.annotations.Test; + +@Test(groups = "functional", testName = "client.hotrod.CrossSiteFailoverTest") +public class CrossSiteFailoverTest extends SiteDownFailoverTest { + +} \ No newline at end of file diff --git a/jni/src/test/java/org/infinispan/client/jni/hotrod/JniTest.java b/jni/src/test/java/org/infinispan/client/jni/hotrod/JniTest.java index cf11e5c1..15e55bcf 100644 --- a/jni/src/test/java/org/infinispan/client/jni/hotrod/JniTest.java +++ b/jni/src/test/java/org/infinispan/client/jni/hotrod/JniTest.java @@ -8,6 +8,7 @@ import java.util.TreeSet; import org.infinispan.client.hotrod.*; +import org.infinispan.client.hotrod.xsite.SiteDownFailoverTest; import org.testng.IMethodSelector; import org.testng.IMethodSelectorContext; import org.testng.ITestNGMethod; @@ -75,7 +76,8 @@ public static void main(String[] args) { SocketTimeoutErrorTest.class, SegmentOwnershipLocalTest.class, SegmentOwnershipDistTest.class, - ServerShutdownTest.class + ServerShutdownTest.class, + SiteDownFailoverTest.class // SslTest.class, // SSL not implemented // TransportObjectFactoryTest.class, // omitting }); @@ -96,9 +98,7 @@ public static void main(String[] args) { "ForceReturnValuesTest.testDifferentInstancesForDifferentForceReturnValues", "ForceReturnValuesTest.testSameInstanceForSameForceReturnValues", "HotRodIntegrationTest.testGetWithMetadata", - "RemoteCacheManagerTest.testGetUndefinedCache", - "ServerShutdownTest.testServerShutdownWithConnectedClient", - "ServerShutdownTest.testServerShutdownWithoutConnectedClient" + "RemoteCacheManagerTest.testGetUndefinedCache" )); Set expectedSkips = Collections.emptySet(); diff --git a/src/hotrod/api/RemoteCacheManager.cpp b/src/hotrod/api/RemoteCacheManager.cpp index f2f932a6..62c1bc0f 100644 --- a/src/hotrod/api/RemoteCacheManager.cpp +++ b/src/hotrod/api/RemoteCacheManager.cpp @@ -57,4 +57,13 @@ const Configuration& RemoteCacheManager::getConfiguration() { return IMPL->getConfiguration(); } +ClusterStatus RemoteCacheManager::clusterSwitch() +{ + return IMPL->clusterSwitch(); +} +ClusterStatus RemoteCacheManager::clusterSwitch(std::string clusterName) +{ + return IMPL->clusterSwitch(clusterName); +} + }} // namespace infinispan::hotrod diff --git a/src/hotrod/impl/RemoteCacheManagerImpl.cpp b/src/hotrod/impl/RemoteCacheManagerImpl.cpp index bc5ba1b4..8b435061 100644 --- a/src/hotrod/impl/RemoteCacheManagerImpl.cpp +++ b/src/hotrod/impl/RemoteCacheManagerImpl.cpp @@ -139,4 +139,14 @@ PingResult RemoteCacheManagerImpl::ping(RemoteCacheImpl& remoteCache) { return remoteCache.ping(); } +ClusterStatus RemoteCacheManagerImpl::clusterSwitch(std::string clusterName) +{ + return transportFactory->clusterSwitch(clusterName); +} + +ClusterStatus RemoteCacheManagerImpl::clusterSwitch() +{ + return transportFactory->clusterSwitch(); +} + }} // namespace infinispan::hotrod diff --git a/src/hotrod/impl/RemoteCacheManagerImpl.h b/src/hotrod/impl/RemoteCacheManagerImpl.h index c66417d4..427d3e47 100644 --- a/src/hotrod/impl/RemoteCacheManagerImpl.h +++ b/src/hotrod/impl/RemoteCacheManagerImpl.h @@ -27,6 +27,9 @@ class RemoteCacheManagerImpl void stop(); bool isStarted(); const Configuration& getConfiguration(); + ClusterStatus clusterSwitch(); + ClusterStatus clusterSwitch(std::string clusterName); + private: sys::Mutex lock; diff --git a/src/hotrod/impl/configuration/Configuration.cpp b/src/hotrod/impl/configuration/Configuration.cpp index 87ed910a..acd0f0de 100644 --- a/src/hotrod/impl/configuration/Configuration.cpp +++ b/src/hotrod/impl/configuration/Configuration.cpp @@ -12,6 +12,8 @@ const char* Configuration::PROTOCOL_VERSION_21 = "2.1"; const char* Configuration::PROTOCOL_VERSION_22 = "2.2"; const char* Configuration::PROTOCOL_VERSION_23 = "2.3"; const char* Configuration::PROTOCOL_VERSION_24 = "2.4"; +const char* Configuration::DEFAULT_CLUSTER_NAME = "DEFAULT_CLUSTER_NAME"; + const char *Configuration::getProtocolVersionCString() const diff --git a/src/hotrod/impl/operations/RetryOnFailureOperation.h b/src/hotrod/impl/operations/RetryOnFailureOperation.h index 3983afb1..cca50f0a 100755 --- a/src/hotrod/impl/operations/RetryOnFailureOperation.h +++ b/src/hotrod/impl/operations/RetryOnFailureOperation.h @@ -28,6 +28,7 @@ template class RetryOnFailureOperation : public HotRodOperation } catch (const TransportException& te) { // Invalidate transport since this exception means that this // instance is no longer usable and should be destroyed + //std::cout << "Transport: " << ((transport::TcpTransport*)transport)->getServerAddress().getPort() << std::endl; transport::InetSocketAddress isa(te.getHostCString(),te.getPort()); transportFactory->invalidateTransport(isa, transport); logErrorAndThrowExceptionIfNeeded(retryCount, te); @@ -62,17 +63,21 @@ template class RetryOnFailureOperation : public HotRodOperation } } - void logErrorAndThrowExceptionIfNeeded(int i, const HotRodClientException& e) { - if (i >= transportFactory->getMaxRetries() - 1 - || transportFactory->getMaxRetries() < 0) { - ERROR("Exception encountered, retry %d of %d: %s", - i, transportFactory->getMaxRetries(), e.what()); - throw; // Rethrow. The exception is rethrown as const! - } else { - TRACE("Exception encountered, retry %d of %d: %s", - i, transportFactory->getMaxRetries(), e.what()); - } - } + void logErrorAndThrowExceptionIfNeeded(int& retryCount, + const HotRodClientException& e) { + if (retryCount >= transportFactory->getMaxRetries() - 1) { + if (transportFactory->clusterSwitch() == ClusterStatus::SWITCHED) { + retryCount = 0; // reset retry counter + } else { + ERROR("Exception encountered, retry %d of %d: %s", retryCount, + transportFactory->getMaxRetries(), e.what()); + throw; // Rethrow. The exception is rethrown as const! + } + } else { + TRACE("Exception encountered, retry %d of %d: %s", retryCount, + transportFactory->getMaxRetries(), e.what()); + } + } virtual transport::Transport& getTransport(int retryCount) = 0; virtual T executeOperation(transport::Transport& transport) = 0; diff --git a/src/hotrod/impl/protocol/Codec20.cpp b/src/hotrod/impl/protocol/Codec20.cpp index 8db8b712..bb1b1ddb 100644 --- a/src/hotrod/impl/protocol/Codec20.cpp +++ b/src/hotrod/impl/protocol/Codec20.cpp @@ -151,9 +151,18 @@ void Codec20::readNewTopologyAndHash(Transport& transport, HeaderParams& params) } TransportFactory &tf = transport.getTransportFactory(); - int currentTopology = tf.getTopologyId(params.cacheName); + bool noTopologyInfo=false; + int currentTopology = 0; + try + { + currentTopology = tf.getTopologyId(params.cacheName); + } + catch (std::exception &e) + { + noTopologyInfo=true; + } int topologyAge = tf.getTopologyAge(); - if (params.topologyAge == topologyAge && currentTopology != newTopologyId) { + if (noTopologyInfo || (params.topologyAge == topologyAge && currentTopology != newTopologyId)) { params.topologyId = newTopologyId; tf.updateServers(addresses); if (hashFunctionVersion == 0) { diff --git a/src/hotrod/impl/transport/TransportFactory.h b/src/hotrod/impl/transport/TransportFactory.h index 9df3c64e..63562253 100644 --- a/src/hotrod/impl/transport/TransportFactory.h +++ b/src/hotrod/impl/transport/TransportFactory.h @@ -5,6 +5,7 @@ #include "hotrod/impl/transport/Transport.h" #include "hotrod/impl/Topology.h" #include "hotrod/impl/TopologyInfo.h" +#include "infinispan/hotrod/Flag.h" #include #include #include @@ -28,13 +29,15 @@ class InetSocketAddress; class TransportFactory { + friend class infinispan::hotrod::RemoteCacheManagerImpl; public: virtual void start(protocol::Codec& codec, int defaultTopologyId) = 0; virtual void destroy() = 0; virtual Transport& getTransport(const std::vector& cacheName) = 0; virtual Transport& getTransport(const std::vector& key, const std::vector& cacheName) = 0; - + virtual ClusterStatus clusterSwitch() = 0; + virtual ClusterStatus clusterSwitch(std::string) = 0; virtual void releaseTransport(Transport& transport) = 0; virtual void invalidateTransport( const InetSocketAddress& serverAddress, Transport* transport) = 0; @@ -89,7 +92,6 @@ class TransportFactory private: static TransportFactory* newInstance(const Configuration& config); - friend class infinispan::hotrod::RemoteCacheManagerImpl; }; }}} // namespace infinispan::hotrod::transport diff --git a/src/hotrod/impl/transport/tcp/TcpTransportFactory.cpp b/src/hotrod/impl/transport/tcp/TcpTransportFactory.cpp index f3f9db0b..ac4fb509 100644 --- a/src/hotrod/impl/transport/tcp/TcpTransportFactory.cpp +++ b/src/hotrod/impl/transport/tcp/TcpTransportFactory.cpp @@ -28,9 +28,21 @@ void TcpTransportFactory::start( { ScopedLock l(lock); topologyAge = 0; - std::vector configuredServers = configuration.getServersConfiguration(); - for (std::vector::const_iterator iter = configuredServers.begin(); - iter != configuredServers.end(); iter++) + transportFactory.reset(new TransportObjectFactory(codec, *this)); + auto serversMap = configuration.getServersMapConfiguration(); + std::vector* configuredServers; + if (serversMap.find(Configuration::DEFAULT_CLUSTER_NAME)!=serversMap.end()) + { + configuredServers = &serversMap[Configuration::DEFAULT_CLUSTER_NAME]; + currCluster=Configuration::DEFAULT_CLUSTER_NAME; + } + else + { + configuredServers = &serversMap.begin()->second; + currCluster=serversMap.begin()->first; + } + for (std::vector::const_iterator iter = configuredServers->begin(); + iter != configuredServers->end(); iter++) { initialServers.push_back(InetSocketAddress(iter->getHostCString(), iter->getPort())); } @@ -45,7 +57,6 @@ void TcpTransportFactory::start( } topologyInfo = new TopologyInfo(defaultTopologyId, initialServers, configuration); - transportFactory.reset(new TransportObjectFactory(codec, *this)); createAndPreparePool(); @@ -53,6 +64,27 @@ void TcpTransportFactory::start( pingServers(); } +std::vector TcpTransportFactory::getNextWorkingServersConfiguration() { + for (auto p: configuration.getServersMapConfiguration()){ + if (p.first==currCluster) + continue; + for (auto v: p.second) + { + try + { + pingExternalServer(InetSocketAddress(v.getHost(), v.getPort())); + currCluster=p.first; + return p.second; + } + catch (Exception &e) + { + + } + } + } + return std::vector(); +} + Transport& TcpTransportFactory::getTransport(const std::vector& /*cacheName*/) { const InetSocketAddress* server = &balancer->nextServer(); return borrowTransportFromPool(*server); @@ -89,6 +121,71 @@ void TcpTransportFactory::invalidateTransport( pool->invalidateObject(serverAddress, dynamic_cast(transport)); } +ClusterStatus TcpTransportFactory::clusterSwitch() +{ + auto configuredServers = getNextWorkingServersConfiguration(); + if (configuredServers.size()==0) + { + return NOT_SWITCHED; + } + ScopedLock l(lock); + topologyAge = 0; + initialServers.clear(); + for (auto iter = configuredServers.begin(); + iter != configuredServers.end(); iter++) + { + initialServers.push_back(InetSocketAddress(iter->getHostCString(), iter->getPort())); + } + + auto producerFn=configuration.getBalancingStrategy(); + if (producerFn!= nullptr) { + balancer.reset((*producerFn)()); + } + else + { + balancer.reset(RoundRobinBalancingStrategy::newInstance()); + } + topologyInfo->updateServers(initialServers); + + createAndPreparePool(); + + balancer->setServers(initialServers); + pingServers(); + return SWITCHED; +} + +ClusterStatus TcpTransportFactory::clusterSwitch(std::string clusterName) +{ + auto servers=configuration.getServersMapConfiguration(); + if (servers.find(clusterName)==servers.end()) + return NOT_SWITCHED; + auto configuredServers = servers[clusterName]; + ScopedLock l(lock); + topologyAge = 0; + initialServers.clear(); + for (auto iter = configuredServers.begin(); + iter != configuredServers.end(); iter++) + { + initialServers.push_back(InetSocketAddress(iter->getHostCString(), iter->getPort())); + } + + auto producerFn=configuration.getBalancingStrategy(); + if (producerFn!= nullptr) { + balancer.reset((*producerFn)()); + } + else + { + balancer.reset(RoundRobinBalancingStrategy::newInstance()); + } + topologyInfo->updateServers(initialServers); + + createAndPreparePool(); + + balancer->setServers(initialServers); + pingServers(); + return SWITCHED; +} + bool TcpTransportFactory::isTcpNoDelay() { return configuration.isTcpNoDelay(); } @@ -133,6 +230,13 @@ void TcpTransportFactory::createAndPreparePool() } } +void TcpTransportFactory::pingExternalServer(InetSocketAddress s) { + transport::TcpTransport& t = transportFactory->makeObject(s); + transportFactory->ping(t); + transportFactory->destroyObject(s, t); +} + + void TcpTransportFactory::pingServers() { std::vector s = topologyInfo->getServers(); for (std::vector::const_iterator iter = s.begin(); iter != s.end(); iter++) { @@ -144,7 +248,7 @@ void TcpTransportFactory::pingServers() { transportFactory->ping(*transport); connectionPool->returnObject(*iter, *transport); } catch (const Exception &e) { - TRACE("Initial ping has thrown an exception when pinging %s:%d : %s", + ERROR("Initial ping has thrown an exception when pinging %s:%d : %s", iter->getHostname().c_str(), iter->getPort(), e.what()); // Ping's objective is to retrieve a potentially newer // version of the Hot Rod cluster topology, so ignore diff --git a/src/hotrod/impl/transport/tcp/TcpTransportFactory.h b/src/hotrod/impl/transport/tcp/TcpTransportFactory.h index 42ca757c..f5490550 100644 --- a/src/hotrod/impl/transport/tcp/TcpTransportFactory.h +++ b/src/hotrod/impl/transport/tcp/TcpTransportFactory.h @@ -33,7 +33,8 @@ class TcpTransportFactory : public TransportFactory void releaseTransport(Transport& transport); void invalidateTransport( const InetSocketAddress& address, Transport* transport); - + ClusterStatus clusterSwitch(); + ClusterStatus clusterSwitch(std::string clusterName); bool isTcpNoDelay(); int getMaxRetries(); int getSoTimeout(); @@ -66,11 +67,14 @@ class TcpTransportFactory : public TransportFactory std::shared_ptr transportFactory; std::shared_ptr connectionPool; std::shared_ptr balancer; + std::string currCluster; void createAndPreparePool(); void updateTransportCount(); void pingServers(); Transport& borrowTransportFromPool(const InetSocketAddress& server); ConnectionPool* getConnectionPool(); + std::vector getNextWorkingServersConfiguration(); + void pingExternalServer(InetSocketAddress s); }; }}} // namespace infinispan::hotrod::transport