From 3712987765641e36332bb2a2e735c041233cce96 Mon Sep 17 00:00:00 2001 From: Bartosz Spyrko-Smietanko Date: Tue, 13 Nov 2018 15:50:33 +0000 Subject: [PATCH 1/2] [ARTEMIS-2176] RA connection properties are not propagated to XARecoveryConfig (cherry picked from commit eb41be78f36c6df42376c4b0b79e174703084fa8) downstream: ENTMQBR-2706 --- .../xa/recovery/XARecoveryConfig.java | 147 +++++++++++++++++- .../integration/ra/ResourceAdapterTest.java | 34 ++++ 2 files changed, 173 insertions(+), 8 deletions(-) diff --git a/artemis-service-extensions/src/main/java/org/apache/activemq/artemis/service/extensions/xa/recovery/XARecoveryConfig.java b/artemis-service-extensions/src/main/java/org/apache/activemq/artemis/service/extensions/xa/recovery/XARecoveryConfig.java index 292395a55a..f9826e0210 100644 --- a/artemis-service-extensions/src/main/java/org/apache/activemq/artemis/service/extensions/xa/recovery/XARecoveryConfig.java +++ b/artemis-service-extensions/src/main/java/org/apache/activemq/artemis/service/extensions/xa/recovery/XARecoveryConfig.java @@ -45,16 +45,41 @@ public class XARecoveryConfig { private final Map properties; private final ClientProtocolManagerFactory clientProtocolManager; + // ServerLocator properties + private Long callFailoverTimeout; + private Long callTimeout; + private Long clientFailureCheckPeriod; + private Integer confirmationWindowSize; + private String connectionLoadBalancingPolicyClassName; + private Long connectionTTL; + private Integer consumerMaxRate; + private Integer consumerWindowSize; + private Integer initialConnectAttempts; + private Integer producerMaxRate; + private Integer producerWindowSize; + private Integer minLargeMessageSize; + private Long retryInterval; + private Double retryIntervalMultiplier; + private Long maxRetryInterval; + private Integer reconnectAttempts; + private Integer initialMessagePacketSize; + private Integer scheduledThreadPoolMaxSize; + private Integer threadPoolMaxSize; + private boolean autoGroup; + private boolean blockOnAcknowledge; + private boolean blockOnNonDurableSend; + private boolean blockOnDurableSend; + private boolean preAcknowledge; + private boolean useGlobalPools; + private boolean cacheLargeMessagesClient; + private boolean compressLargeMessage; + private boolean failoverOnInitialConnection; + public static XARecoveryConfig newConfig(ActiveMQConnectionFactory factory, String userName, String password, Map properties) { - if (factory.getServerLocator().getDiscoveryGroupConfiguration() != null) { - return new XARecoveryConfig(factory.getServerLocator().isHA(), factory.getServerLocator().getDiscoveryGroupConfiguration(), userName, password, properties, factory.getServerLocator().getProtocolManagerFactory()); - } else { - return new XARecoveryConfig(factory.getServerLocator().isHA(), factory.getServerLocator().getStaticTransportConfigurations(), userName, password, properties, factory.getServerLocator().getProtocolManagerFactory()); - } - + return new XARecoveryConfig(factory.getServerLocator(), userName, password, properties); } public XARecoveryConfig(final boolean ha, @@ -112,6 +137,37 @@ public XARecoveryConfig(final boolean ha, this(ha, discoveryConfiguration, username, password, properties, null); } + private XARecoveryConfig(ServerLocator serverLocator, + String username, + String password, + Map properties) { + ClientProtocolManagerFactory clientProtocolManager = serverLocator.getProtocolManagerFactory(); + if (serverLocator.getDiscoveryGroupConfiguration() != null) { + this.discoveryConfiguration = serverLocator.getDiscoveryGroupConfiguration(); + this.transportConfiguration = null; + } else { + TransportConfiguration[] transportConfiguration = serverLocator.getStaticTransportConfigurations(); + TransportConfiguration[] newTransportConfiguration = new TransportConfiguration[transportConfiguration.length]; + for (int i = 0; i < transportConfiguration.length; i++) { + if (clientProtocolManager != null) { + newTransportConfiguration[i] = clientProtocolManager.adaptTransportConfiguration(transportConfiguration[i].newTransportConfig("")); + } else { + newTransportConfiguration[i] = transportConfiguration[i].newTransportConfig(""); + } + } + + this.transportConfiguration = newTransportConfiguration; + this.discoveryConfiguration = null; + } + this.username = username; + this.password = password; + this.ha = serverLocator.isHA(); + this.properties = properties == null ? Collections.unmodifiableMap(new HashMap()) : Collections.unmodifiableMap(properties); + this.clientProtocolManager = clientProtocolManager; + + readLocatorProperties(serverLocator); + } + public boolean isHA() { return ha; } @@ -146,12 +202,87 @@ public ClientProtocolManagerFactory getClientProtocolManager() { * @return locator */ public ServerLocator createServerLocator() { + ServerLocator serverLocator; if (getDiscoveryConfiguration() != null) { - return ActiveMQClient.createServerLocator(isHA(), getDiscoveryConfiguration()).setProtocolManagerFactory(clientProtocolManager); + serverLocator = ActiveMQClient.createServerLocator(isHA(), getDiscoveryConfiguration()).setProtocolManagerFactory(clientProtocolManager); } else { - return ActiveMQClient.createServerLocator(isHA(), getTransportConfig()).setProtocolManagerFactory(clientProtocolManager); + serverLocator = ActiveMQClient.createServerLocator(isHA(), getTransportConfig()).setProtocolManagerFactory(clientProtocolManager); } + writeLocatorProperties(serverLocator); + + return serverLocator; + } + + private void writeLocatorProperties(ServerLocator serverLocator) { + serverLocator.setAutoGroup(this.autoGroup); + serverLocator.setBlockOnAcknowledge(this.blockOnAcknowledge); + serverLocator.setBlockOnNonDurableSend(this.blockOnNonDurableSend); + serverLocator.setBlockOnDurableSend(this.blockOnDurableSend); + serverLocator.setPreAcknowledge(this.preAcknowledge); + serverLocator.setUseGlobalPools(this.useGlobalPools); + serverLocator.setCacheLargeMessagesClient(this.cacheLargeMessagesClient); + serverLocator.setCompressLargeMessage(this.compressLargeMessage); + serverLocator.setFailoverOnInitialConnection(this.failoverOnInitialConnection); + + serverLocator.setConsumerMaxRate(this.consumerMaxRate); + serverLocator.setConsumerWindowSize(this.consumerWindowSize); + serverLocator.setMinLargeMessageSize(this.minLargeMessageSize); + serverLocator.setProducerMaxRate(this.producerMaxRate); + serverLocator.setProducerWindowSize(this.producerWindowSize); + serverLocator.setConfirmationWindowSize(this.confirmationWindowSize); + serverLocator.setReconnectAttempts(this.reconnectAttempts); + serverLocator.setThreadPoolMaxSize(this.threadPoolMaxSize); + serverLocator.setScheduledThreadPoolMaxSize(this.scheduledThreadPoolMaxSize); + serverLocator.setInitialConnectAttempts(this.initialConnectAttempts); + serverLocator.setInitialMessagePacketSize(this.initialMessagePacketSize); + + serverLocator.setClientFailureCheckPeriod(this.clientFailureCheckPeriod); + serverLocator.setCallTimeout(this.callTimeout); + serverLocator.setCallFailoverTimeout(this.callFailoverTimeout); + serverLocator.setConnectionTTL(this.connectionTTL); + serverLocator.setRetryInterval(this.retryInterval); + serverLocator.setMaxRetryInterval(this.maxRetryInterval); + + serverLocator.setRetryIntervalMultiplier(this.retryIntervalMultiplier); + + serverLocator.setConnectionLoadBalancingPolicyClassName(this.connectionLoadBalancingPolicyClassName); +} + + private void readLocatorProperties(ServerLocator locator) { + + this.autoGroup = locator.isAutoGroup(); + this.blockOnAcknowledge = locator.isBlockOnAcknowledge(); + this.blockOnNonDurableSend = locator.isBlockOnNonDurableSend(); + this.blockOnDurableSend = locator.isBlockOnDurableSend(); + this.preAcknowledge = locator.isPreAcknowledge(); + this.useGlobalPools = locator.isUseGlobalPools(); + this.cacheLargeMessagesClient = locator.isCacheLargeMessagesClient(); + this.compressLargeMessage = locator.isCompressLargeMessage(); + this.failoverOnInitialConnection = locator.isFailoverOnInitialConnection(); + + this.consumerMaxRate = locator.getConsumerMaxRate(); + this.consumerWindowSize = locator.getConsumerWindowSize(); + this.minLargeMessageSize = locator.getMinLargeMessageSize(); + this.producerMaxRate = locator.getProducerMaxRate(); + this.producerWindowSize = locator.getProducerWindowSize(); + this.confirmationWindowSize = locator.getConfirmationWindowSize(); + this.reconnectAttempts = locator.getReconnectAttempts(); + this.threadPoolMaxSize = locator.getThreadPoolMaxSize(); + this.scheduledThreadPoolMaxSize = locator.getScheduledThreadPoolMaxSize(); + this.initialConnectAttempts = locator.getInitialConnectAttempts(); + this.initialMessagePacketSize = locator.getInitialMessagePacketSize(); + + this.clientFailureCheckPeriod = locator.getClientFailureCheckPeriod(); + this.callTimeout = locator.getCallTimeout(); + this.callFailoverTimeout = locator.getCallFailoverTimeout(); + this.connectionTTL = locator.getConnectionTTL(); + this.retryInterval = locator.getRetryInterval(); + this.maxRetryInterval = locator.getMaxRetryInterval(); + + this.retryIntervalMultiplier = locator.getRetryIntervalMultiplier(); + + this.connectionLoadBalancingPolicyClassName = locator.getConnectionLoadBalancingPolicyClassName(); } @Override diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ResourceAdapterTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ResourceAdapterTest.java index 6216bb354d..fa7d0c8b5a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ResourceAdapterTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ResourceAdapterTest.java @@ -797,6 +797,40 @@ public void testConnectionParameterStringParsing() throws Exception { } } + @Test + public void testConnectionFactoryPropertiesApplyToRecoveryConfig() throws Exception { + ServerLocator locator = createInVMNonHALocator(); + ClientSessionFactory factory = locator.createSessionFactory(); + ClientSession session = factory.createSession(false, false, false); + ActiveMQDestination queue = (ActiveMQDestination) ActiveMQJMSClient.createQueue("test"); + session.createQueue(queue.getSimpleAddress(), queue.getSimpleAddress(), true); + session.close(); + + ActiveMQResourceAdapter ra = new ActiveMQResourceAdapter(); + + ra.setConnectorClassName(INVM_CONNECTOR_FACTORY); + ra.setUserName("userGlobal"); + ra.setPassword("passwordGlobal"); + ra.setConnectionTTL(100L); + ra.setCallFailoverTimeout(100L); + ra.start(new BootstrapContext()); + + Set resources = ra.getRecoveryManager().getResources(); + assertEquals(100L, ra.getDefaultActiveMQConnectionFactory().getServerLocator().getConnectionTTL()); + assertEquals(100L, ra.getDefaultActiveMQConnectionFactory().getServerLocator().getCallFailoverTimeout()); + + + for (XARecoveryConfig resource : resources) { + assertEquals(100L, resource.createServerLocator().getConnectionTTL()); + assertEquals(100L, resource.createServerLocator().getCallFailoverTimeout()); + } + + ra.stop(); + assertEquals(0, resources.size()); + locator.close(); + + } + @Override public boolean useSecurity() { return false; From 9ee0d65d719ef55f378d93f7afa05029b87bfe42 Mon Sep 17 00:00:00 2001 From: Howard Gao Date: Tue, 3 Mar 2020 21:17:51 +0800 Subject: [PATCH 2/2] ARTEMIS-2176 Refactoring (cherry picked from commit cb8da541107355f16eb26b21b6563a06876b741f) downstream: ENTMQBR-2706 --- .../api/config/ServerLocatorConfig.java | 88 +++++ .../api/core/client/ServerLocator.java | 5 + .../client/impl/ClientSessionFactoryImpl.java | 27 +- .../core/client/impl/ServerLocatorImpl.java | 326 +++++------------- .../xa/recovery/XARecoveryConfig.java | 110 +----- 5 files changed, 205 insertions(+), 351 deletions(-) create mode 100644 artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ServerLocatorConfig.java diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ServerLocatorConfig.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ServerLocatorConfig.java new file mode 100644 index 0000000000..a1830fa264 --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ServerLocatorConfig.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.api.config; + +import org.apache.activemq.artemis.api.core.client.ActiveMQClient; + +public class ServerLocatorConfig { + public boolean failoverOnInitialConnection = ActiveMQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION; + public long clientFailureCheckPeriod = ActiveMQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD; + public long connectionTTL = ActiveMQClient.DEFAULT_CONNECTION_TTL; + public long callTimeout = ActiveMQClient.DEFAULT_CALL_TIMEOUT; + public long callFailoverTimeout = ActiveMQClient.DEFAULT_CALL_FAILOVER_TIMEOUT; + public int minLargeMessageSize = ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE; + public int consumerWindowSize = ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE; + public int consumerMaxRate = ActiveMQClient.DEFAULT_CONSUMER_MAX_RATE; + public int confirmationWindowSize = ActiveMQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE; + public int producerWindowSize = ActiveMQClient.DEFAULT_PRODUCER_WINDOW_SIZE; + public int producerMaxRate = ActiveMQClient.DEFAULT_PRODUCER_MAX_RATE; + public boolean blockOnAcknowledge = ActiveMQClient.DEFAULT_BLOCK_ON_ACKNOWLEDGE; + public boolean blockOnDurableSend = ActiveMQClient.DEFAULT_BLOCK_ON_DURABLE_SEND; + public boolean blockOnNonDurableSend = ActiveMQClient.DEFAULT_BLOCK_ON_NON_DURABLE_SEND; + public boolean autoGroup = ActiveMQClient.DEFAULT_AUTO_GROUP; + public boolean preAcknowledge = ActiveMQClient.DEFAULT_PRE_ACKNOWLEDGE; + public int ackBatchSize = ActiveMQClient.DEFAULT_ACK_BATCH_SIZE; + public String connectionLoadBalancingPolicyClassName = ActiveMQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME; + public boolean useGlobalPools = ActiveMQClient.DEFAULT_USE_GLOBAL_POOLS; + public int threadPoolMaxSize = ActiveMQClient.DEFAULT_THREAD_POOL_MAX_SIZE; + public int scheduledThreadPoolMaxSize = ActiveMQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE; + public long retryInterval = ActiveMQClient.DEFAULT_RETRY_INTERVAL; + public double retryIntervalMultiplier = ActiveMQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER; + public long maxRetryInterval = ActiveMQClient.DEFAULT_MAX_RETRY_INTERVAL; + public int reconnectAttempts = ActiveMQClient.DEFAULT_RECONNECT_ATTEMPTS; + public int initialConnectAttempts = ActiveMQClient.INITIAL_CONNECT_ATTEMPTS; + public int initialMessagePacketSize = ActiveMQClient.DEFAULT_INITIAL_MESSAGE_PACKET_SIZE; + public boolean cacheLargeMessagesClient = ActiveMQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT; + public boolean compressLargeMessage = ActiveMQClient.DEFAULT_COMPRESS_LARGE_MESSAGES; + public boolean useTopologyForLoadBalancing = ActiveMQClient.DEFAULT_USE_TOPOLOGY_FOR_LOADBALANCING; + + public ServerLocatorConfig() { + } + + public ServerLocatorConfig(final ServerLocatorConfig locator) { + failoverOnInitialConnection = locator.failoverOnInitialConnection; + compressLargeMessage = locator.compressLargeMessage; + cacheLargeMessagesClient = locator.cacheLargeMessagesClient; + clientFailureCheckPeriod = locator.clientFailureCheckPeriod; + connectionTTL = locator.connectionTTL; + callTimeout = locator.callTimeout; + callFailoverTimeout = locator.callFailoverTimeout; + minLargeMessageSize = locator.minLargeMessageSize; + consumerWindowSize = locator.consumerWindowSize; + consumerMaxRate = locator.consumerMaxRate; + confirmationWindowSize = locator.confirmationWindowSize; + producerWindowSize = locator.producerWindowSize; + producerMaxRate = locator.producerMaxRate; + blockOnAcknowledge = locator.blockOnAcknowledge; + blockOnDurableSend = locator.blockOnDurableSend; + blockOnNonDurableSend = locator.blockOnNonDurableSend; + autoGroup = locator.autoGroup; + preAcknowledge = locator.preAcknowledge; + connectionLoadBalancingPolicyClassName = locator.connectionLoadBalancingPolicyClassName; + ackBatchSize = locator.ackBatchSize; + useGlobalPools = locator.useGlobalPools; + scheduledThreadPoolMaxSize = locator.scheduledThreadPoolMaxSize; + threadPoolMaxSize = locator.threadPoolMaxSize; + retryInterval = locator.retryInterval; + retryIntervalMultiplier = locator.retryIntervalMultiplier; + maxRetryInterval = locator.maxRetryInterval; + reconnectAttempts = locator.reconnectAttempts; + initialConnectAttempts = locator.initialConnectAttempts; + initialMessagePacketSize = locator.initialMessagePacketSize; + useTopologyForLoadBalancing = locator.useTopologyForLoadBalancing; + } +} diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ServerLocator.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ServerLocator.java index 38aa1a7f30..e8f8787a9f 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ServerLocator.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ServerLocator.java @@ -19,6 +19,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; +import org.apache.activemq.artemis.api.config.ServerLocatorConfig; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration; import org.apache.activemq.artemis.api.core.Interceptor; @@ -802,4 +803,8 @@ ClientSessionFactory createSessionFactory(TransportConfiguration transportConfig /** This will only instantiate internal objects such as the topology */ void initialize() throws ActiveMQException; + + ServerLocatorConfig getLocatorConfig(); + + void setLocatorConfig(ServerLocatorConfig serverLocatorConfig); } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java index 4ca1af1125..9f509d99e2 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java @@ -32,6 +32,7 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import org.apache.activemq.artemis.api.config.ServerLocatorConfig; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException; @@ -156,13 +157,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C public ClientSessionFactoryImpl(final ServerLocatorInternal serverLocator, final TransportConfiguration connectorConfig, - final long callTimeout, - final long callFailoverTimeout, - final long clientFailureCheckPeriod, - final long connectionTTL, - final long retryInterval, - final double retryIntervalMultiplier, - final long maxRetryInterval, + final ServerLocatorConfig locatorConfig, final int reconnectAttempts, final Executor threadPool, final ScheduledExecutorService scheduledThreadPool, @@ -182,27 +177,27 @@ public ClientSessionFactoryImpl(final ServerLocatorInternal serverLocator, checkTransportKeys(connectorFactory, connectorConfig); - this.callTimeout = callTimeout; + this.callTimeout = locatorConfig.callTimeout; - this.callFailoverTimeout = callFailoverTimeout; + this.callFailoverTimeout = locatorConfig.callFailoverTimeout; // HORNETQ-1314 - if this in an in-vm connection then disable connection monitoring if (connectorFactory.isReliable() && - clientFailureCheckPeriod == ActiveMQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD && - connectionTTL == ActiveMQClient.DEFAULT_CONNECTION_TTL) { + locatorConfig.clientFailureCheckPeriod == ActiveMQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD && + locatorConfig.connectionTTL == ActiveMQClient.DEFAULT_CONNECTION_TTL) { this.clientFailureCheckPeriod = ActiveMQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD_INVM; this.connectionTTL = ActiveMQClient.DEFAULT_CONNECTION_TTL_INVM; } else { - this.clientFailureCheckPeriod = clientFailureCheckPeriod; + this.clientFailureCheckPeriod = locatorConfig.clientFailureCheckPeriod; - this.connectionTTL = connectionTTL; + this.connectionTTL = locatorConfig.connectionTTL; } - this.retryInterval = retryInterval; + this.retryInterval = locatorConfig.retryInterval; - this.retryIntervalMultiplier = retryIntervalMultiplier; + this.retryIntervalMultiplier = locatorConfig.retryIntervalMultiplier; - this.maxRetryInterval = maxRetryInterval; + this.maxRetryInterval = locatorConfig.maxRetryInterval; this.reconnectAttempts = reconnectAttempts; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java index fd3507e275..4c1d328187 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java @@ -38,6 +38,7 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import org.apache.activemq.artemis.api.config.ServerLocatorConfig; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException; @@ -120,7 +121,6 @@ private enum STATE { private volatile boolean receivedTopology; - private boolean compressLargeMessage; // if the system should shutdown the pool when shutting down private transient boolean shutdownPool; @@ -133,64 +133,6 @@ private enum STATE { private transient ConnectionLoadBalancingPolicy loadBalancingPolicy; - // Settable attributes: - - private boolean cacheLargeMessagesClient; - - private long clientFailureCheckPeriod; - - private long connectionTTL; - - private long callTimeout; - - private long callFailoverTimeout; - - private int minLargeMessageSize; - - private int consumerWindowSize; - - private int consumerMaxRate; - - private int confirmationWindowSize; - - private int producerWindowSize; - - private int producerMaxRate; - - private boolean blockOnAcknowledge; - - private boolean blockOnDurableSend; - - private boolean blockOnNonDurableSend; - - private boolean autoGroup; - - private boolean preAcknowledge; - - private String connectionLoadBalancingPolicyClassName; - - private int ackBatchSize; - - private boolean useGlobalPools; - - private int scheduledThreadPoolMaxSize; - - private int threadPoolMaxSize; - - private long retryInterval; - - private double retryIntervalMultiplier; - - private long maxRetryInterval; - - private int reconnectAttempts; - - private int initialConnectAttempts; - - private boolean failoverOnInitialConnection; - - private int initialMessagePacketSize; - private final Object stateGuard = new Object(); private transient STATE state; private transient CountDownLatch latch; @@ -215,6 +157,8 @@ private enum STATE { private final Exception traceException = new Exception(); + private ServerLocatorConfig config = new ServerLocatorConfig(); + public static synchronized void clearThreadPools() { ActiveMQClient.clearThreadPools(); } @@ -222,7 +166,7 @@ public static synchronized void clearThreadPools() { private synchronized void setThreadPools() { if (threadPool != null) { return; - } else if (useGlobalPools) { + } else if (config.useGlobalPools) { threadPool = ActiveMQClient.getGlobalThreadPool(); scheduledThreadPool = ActiveMQClient.getGlobalScheduledThreadPool(); @@ -236,10 +180,10 @@ public ThreadFactory run() { } }); - if (threadPoolMaxSize == -1) { + if (config.threadPoolMaxSize == -1) { threadPool = Executors.newCachedThreadPool(factory); } else { - threadPool = new ActiveMQThreadPoolExecutor(0, threadPoolMaxSize, 60L, TimeUnit.SECONDS, factory); + threadPool = new ActiveMQThreadPoolExecutor(0, config.threadPoolMaxSize, 60L, TimeUnit.SECONDS, factory); } factory = AccessController.doPrivileged(new PrivilegedAction() { @@ -249,7 +193,7 @@ public ThreadFactory run() { } }); - scheduledThreadPool = Executors.newScheduledThreadPool(scheduledThreadPoolMaxSize, factory); + scheduledThreadPool = Executors.newScheduledThreadPool(config.scheduledThreadPoolMaxSize, factory); } this.updateArrayActor = new Actor<>(threadPool, this::internalUpdateArray); } @@ -261,7 +205,7 @@ public synchronized boolean setThreadPools(Executor threadPool, ScheduledExecuto return false; if (this.threadPool == null && this.scheduledThreadPool == null) { - useGlobalPools = false; + config.useGlobalPools = false; shutdownPool = false; this.threadPool = threadPool; this.scheduledThreadPool = scheduledThreadPool; @@ -272,13 +216,13 @@ public synchronized boolean setThreadPools(Executor threadPool, ScheduledExecuto } private void instantiateLoadBalancingPolicy() { - if (connectionLoadBalancingPolicyClassName == null) { + if (config.connectionLoadBalancingPolicyClassName == null) { throw new IllegalStateException("Please specify a load balancing policy class name on the session factory"); } AccessController.doPrivileged(new PrivilegedAction() { @Override public Object run() { - loadBalancingPolicy = (ConnectionLoadBalancingPolicy) ClassloadingUtil.newInstanceFromClassLoader(ServerLocatorImpl.class, connectionLoadBalancingPolicyClassName); + loadBalancingPolicy = (ConnectionLoadBalancingPolicy) ClassloadingUtil.newInstanceFromClassLoader(ServerLocatorImpl.class, config.connectionLoadBalancingPolicyClassName); return null; } }); @@ -315,6 +259,16 @@ public synchronized void initialize() throws ActiveMQException { } } + @Override + public ServerLocatorConfig getLocatorConfig() { + return config; + } + + @Override + public void setLocatorConfig(ServerLocatorConfig config) { + this.config = config; + } + private static DiscoveryGroup createDiscoveryGroup(String nodeID, DiscoveryGroupConfiguration config) throws Exception { return new DiscoveryGroup(nodeID, config.getName(), config.getRefreshTimeout(), config.getBroadcastEndpointFactory(), null); @@ -336,69 +290,7 @@ private ServerLocatorImpl(final Topology topology, this.nodeID = UUIDGenerator.getInstance().generateStringUUID(); - clientFailureCheckPeriod = ActiveMQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD; - - connectionTTL = ActiveMQClient.DEFAULT_CONNECTION_TTL; - - callTimeout = ActiveMQClient.DEFAULT_CALL_TIMEOUT; - - callFailoverTimeout = ActiveMQClient.DEFAULT_CALL_FAILOVER_TIMEOUT; - - minLargeMessageSize = ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE; - - consumerWindowSize = ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE; - - consumerMaxRate = ActiveMQClient.DEFAULT_CONSUMER_MAX_RATE; - - confirmationWindowSize = ActiveMQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE; - - producerWindowSize = ActiveMQClient.DEFAULT_PRODUCER_WINDOW_SIZE; - - producerMaxRate = ActiveMQClient.DEFAULT_PRODUCER_MAX_RATE; - - blockOnAcknowledge = ActiveMQClient.DEFAULT_BLOCK_ON_ACKNOWLEDGE; - - blockOnDurableSend = ActiveMQClient.DEFAULT_BLOCK_ON_DURABLE_SEND; - - blockOnNonDurableSend = ActiveMQClient.DEFAULT_BLOCK_ON_NON_DURABLE_SEND; - - autoGroup = ActiveMQClient.DEFAULT_AUTO_GROUP; - - preAcknowledge = ActiveMQClient.DEFAULT_PRE_ACKNOWLEDGE; - - ackBatchSize = ActiveMQClient.DEFAULT_ACK_BATCH_SIZE; - - connectionLoadBalancingPolicyClassName = ActiveMQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME; - - useGlobalPools = ActiveMQClient.DEFAULT_USE_GLOBAL_POOLS; - - threadPoolMaxSize = ActiveMQClient.DEFAULT_THREAD_POOL_MAX_SIZE; - - scheduledThreadPoolMaxSize = ActiveMQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE; - - retryInterval = ActiveMQClient.DEFAULT_RETRY_INTERVAL; - - retryIntervalMultiplier = ActiveMQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER; - - maxRetryInterval = ActiveMQClient.DEFAULT_MAX_RETRY_INTERVAL; - - reconnectAttempts = ActiveMQClient.DEFAULT_RECONNECT_ATTEMPTS; - - initialConnectAttempts = ActiveMQClient.INITIAL_CONNECT_ATTEMPTS; - - failoverOnInitialConnection = ActiveMQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION; - - cacheLargeMessagesClient = ActiveMQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT; - - initialMessagePacketSize = ActiveMQClient.DEFAULT_INITIAL_MESSAGE_PACKET_SIZE; - - cacheLargeMessagesClient = ActiveMQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT; - - compressLargeMessage = ActiveMQClient.DEFAULT_COMPRESS_LARGE_MESSAGES; - clusterConnection = false; - - useTopologyForLoadBalancing = ActiveMQClient.DEFAULT_USE_TOPOLOGY_FOR_LOADBALANCING; } public static ServerLocator newLocator(String uri) { @@ -496,41 +388,12 @@ private ServerLocatorImpl(ServerLocatorImpl locator) { topology = locator.topology; topologyArray = locator.topologyArray; receivedTopology = locator.receivedTopology; - compressLargeMessage = locator.compressLargeMessage; - cacheLargeMessagesClient = locator.cacheLargeMessagesClient; - clientFailureCheckPeriod = locator.clientFailureCheckPeriod; - connectionTTL = locator.connectionTTL; - callTimeout = locator.callTimeout; - callFailoverTimeout = locator.callFailoverTimeout; - minLargeMessageSize = locator.minLargeMessageSize; - consumerWindowSize = locator.consumerWindowSize; - consumerMaxRate = locator.consumerMaxRate; - confirmationWindowSize = locator.confirmationWindowSize; - producerWindowSize = locator.producerWindowSize; - producerMaxRate = locator.producerMaxRate; - blockOnAcknowledge = locator.blockOnAcknowledge; - blockOnDurableSend = locator.blockOnDurableSend; - blockOnNonDurableSend = locator.blockOnNonDurableSend; - autoGroup = locator.autoGroup; - preAcknowledge = locator.preAcknowledge; - connectionLoadBalancingPolicyClassName = locator.connectionLoadBalancingPolicyClassName; - ackBatchSize = locator.ackBatchSize; - useGlobalPools = locator.useGlobalPools; - scheduledThreadPoolMaxSize = locator.scheduledThreadPoolMaxSize; - threadPoolMaxSize = locator.threadPoolMaxSize; - retryInterval = locator.retryInterval; - retryIntervalMultiplier = locator.retryIntervalMultiplier; - maxRetryInterval = locator.maxRetryInterval; - reconnectAttempts = locator.reconnectAttempts; - initialConnectAttempts = locator.initialConnectAttempts; - failoverOnInitialConnection = locator.failoverOnInitialConnection; - initialMessagePacketSize = locator.initialMessagePacketSize; + config = new ServerLocatorConfig(locator.config); startExecutor = locator.startExecutor; afterConnectListener = locator.afterConnectListener; groupID = locator.groupID; nodeID = locator.nodeID; clusterTransportConfiguration = locator.clusterTransportConfiguration; - useTopologyForLoadBalancing = locator.useTopologyForLoadBalancing; } private TransportConfiguration selectConnector() { @@ -543,7 +406,7 @@ private TransportConfiguration selectConnector() { } synchronized (this) { - if (usedTopology != null && useTopologyForLoadBalancing) { + if (usedTopology != null && config.useTopologyForLoadBalancing) { if (logger.isTraceEnabled()) { logger.trace("Selecting connector from topology."); } @@ -573,7 +436,7 @@ private int getConnectorsSize() { } synchronized (this) { - if (usedTopology != null && useTopologyForLoadBalancing) { + if (usedTopology != null && config.useTopologyForLoadBalancing) { return usedTopology.length; } else { return initialConnectors.length; @@ -700,16 +563,17 @@ public ClientSessionFactory createSessionFactory(String nodeID) throws Exception @Override public ClientSessionFactory createSessionFactory(final TransportConfiguration transportConfiguration) throws Exception { + assertOpen(); initialize(); - ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(this, transportConfiguration, callTimeout, callFailoverTimeout, clientFailureCheckPeriod, connectionTTL, retryInterval, retryIntervalMultiplier, maxRetryInterval, reconnectAttempts, threadPool, scheduledThreadPool, incomingInterceptors, outgoingInterceptors); + ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(this, transportConfiguration, config, config.reconnectAttempts, threadPool, scheduledThreadPool, incomingInterceptors, outgoingInterceptors); addToConnecting(factory); try { try { - factory.connect(reconnectAttempts, failoverOnInitialConnection); + factory.connect(config.reconnectAttempts, config.failoverOnInitialConnection); } catch (ActiveMQException e1) { //we need to make sure is closed just for garbage collection factory.close(); @@ -730,7 +594,7 @@ public ClientSessionFactory createSessionFactory(final TransportConfiguration tr initialize(); - ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(this, transportConfiguration, callTimeout, callFailoverTimeout, clientFailureCheckPeriod, connectionTTL, retryInterval, retryIntervalMultiplier, maxRetryInterval, reconnectAttempts, threadPool, scheduledThreadPool, incomingInterceptors, outgoingInterceptors); + ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(this, transportConfiguration, config, reconnectAttempts, threadPool, scheduledThreadPool, incomingInterceptors, outgoingInterceptors); addToConnecting(factory); try { @@ -795,7 +659,7 @@ public ClientSessionFactory createSessionFactory() throws ActiveMQException { // try each factory in the list until we find one which works try { - factory = new ClientSessionFactoryImpl(this, tc, callTimeout, callFailoverTimeout, clientFailureCheckPeriod, connectionTTL, retryInterval, retryIntervalMultiplier, maxRetryInterval, reconnectAttempts, threadPool, scheduledThreadPool, incomingInterceptors, outgoingInterceptors); + factory = new ClientSessionFactoryImpl(this, tc, config, config.reconnectAttempts, threadPool, scheduledThreadPool, incomingInterceptors, outgoingInterceptors); try { addToConnecting(factory); // We always try to connect here with only one attempt, @@ -810,12 +674,12 @@ public ClientSessionFactory createSessionFactory() throws ActiveMQException { attempts++; int connectorsSize = getConnectorsSize(); - int maxAttempts = initialConnectAttempts == 0 ? 1 : initialConnectAttempts; + int maxAttempts = config.initialConnectAttempts == 0 ? 1 : config.initialConnectAttempts; - if (initialConnectAttempts >= 0 && attempts >= maxAttempts * connectorsSize) { + if (config.initialConnectAttempts >= 0 && attempts >= maxAttempts * connectorsSize) { throw ActiveMQClientMessageBundle.BUNDLE.cannotConnectToServers(); } - if (factory.waitForRetry(retryInterval)) { + if (factory.waitForRetry(config.retryInterval)) { throw ActiveMQClientMessageBundle.BUNDLE.cannotConnectToServers(); } retry = true; @@ -833,7 +697,7 @@ public ClientSessionFactory createSessionFactory() throws ActiveMQException { // ATM topology is never != null. Checking here just to be consistent with // how the sendSubscription happens. // in case this ever changes. - if (topology != null && !factory.waitForTopology(callTimeout, TimeUnit.MILLISECONDS)) { + if (topology != null && !factory.waitForTopology(config.callTimeout, TimeUnit.MILLISECONDS)) { factory.cleanup(); throw ActiveMQClientMessageBundle.BUNDLE.connectionTimedOutOnReceiveTopology(discoveryGroup); } @@ -886,324 +750,324 @@ public String getOutgoingInterceptorList() { @Override public boolean isCacheLargeMessagesClient() { - return cacheLargeMessagesClient; + return config.cacheLargeMessagesClient; } @Override public ServerLocatorImpl setCacheLargeMessagesClient(final boolean cached) { - cacheLargeMessagesClient = cached; + config.cacheLargeMessagesClient = cached; return this; } @Override public long getClientFailureCheckPeriod() { - return clientFailureCheckPeriod; + return config.clientFailureCheckPeriod; } @Override public ServerLocatorImpl setClientFailureCheckPeriod(final long clientFailureCheckPeriod) { checkWrite(); - this.clientFailureCheckPeriod = clientFailureCheckPeriod; + this.config.clientFailureCheckPeriod = clientFailureCheckPeriod; return this; } @Override public long getConnectionTTL() { - return connectionTTL; + return config.connectionTTL; } @Override public ServerLocatorImpl setConnectionTTL(final long connectionTTL) { checkWrite(); - this.connectionTTL = connectionTTL; + this.config.connectionTTL = connectionTTL; return this; } @Override public long getCallTimeout() { - return callTimeout; + return config.callTimeout; } @Override public ServerLocatorImpl setCallTimeout(final long callTimeout) { checkWrite(); - this.callTimeout = callTimeout; + this.config.callTimeout = callTimeout; return this; } @Override public long getCallFailoverTimeout() { - return callFailoverTimeout; + return config.callFailoverTimeout; } @Override public ServerLocatorImpl setCallFailoverTimeout(long callFailoverTimeout) { checkWrite(); - this.callFailoverTimeout = callFailoverTimeout; + this.config.callFailoverTimeout = callFailoverTimeout; return this; } @Override public int getMinLargeMessageSize() { - return minLargeMessageSize; + return config.minLargeMessageSize; } @Override public ServerLocatorImpl setMinLargeMessageSize(final int minLargeMessageSize) { checkWrite(); - this.minLargeMessageSize = minLargeMessageSize; + this.config.minLargeMessageSize = minLargeMessageSize; return this; } @Override public int getConsumerWindowSize() { - return consumerWindowSize; + return config.consumerWindowSize; } @Override public ServerLocatorImpl setConsumerWindowSize(final int consumerWindowSize) { checkWrite(); - this.consumerWindowSize = consumerWindowSize; + this.config.consumerWindowSize = consumerWindowSize; return this; } @Override public int getConsumerMaxRate() { - return consumerMaxRate; + return config.consumerMaxRate; } @Override public ServerLocatorImpl setConsumerMaxRate(final int consumerMaxRate) { checkWrite(); - this.consumerMaxRate = consumerMaxRate; + this.config.consumerMaxRate = consumerMaxRate; return this; } @Override public int getConfirmationWindowSize() { - return confirmationWindowSize; + return config.confirmationWindowSize; } @Override public ServerLocatorImpl setConfirmationWindowSize(final int confirmationWindowSize) { checkWrite(); - this.confirmationWindowSize = confirmationWindowSize; + this.config.confirmationWindowSize = confirmationWindowSize; return this; } @Override public int getProducerWindowSize() { - return producerWindowSize; + return config.producerWindowSize; } @Override public ServerLocatorImpl setProducerWindowSize(final int producerWindowSize) { checkWrite(); - this.producerWindowSize = producerWindowSize; + this.config.producerWindowSize = producerWindowSize; return this; } @Override public int getProducerMaxRate() { - return producerMaxRate; + return config.producerMaxRate; } @Override public ServerLocatorImpl setProducerMaxRate(final int producerMaxRate) { checkWrite(); - this.producerMaxRate = producerMaxRate; + this.config.producerMaxRate = producerMaxRate; return this; } @Override public boolean isBlockOnAcknowledge() { - return blockOnAcknowledge; + return config.blockOnAcknowledge; } @Override public ServerLocatorImpl setBlockOnAcknowledge(final boolean blockOnAcknowledge) { checkWrite(); - this.blockOnAcknowledge = blockOnAcknowledge; + this.config.blockOnAcknowledge = blockOnAcknowledge; return this; } @Override public boolean isBlockOnDurableSend() { - return blockOnDurableSend; + return config.blockOnDurableSend; } @Override public ServerLocatorImpl setBlockOnDurableSend(final boolean blockOnDurableSend) { checkWrite(); - this.blockOnDurableSend = blockOnDurableSend; + this.config.blockOnDurableSend = blockOnDurableSend; return this; } @Override public boolean isBlockOnNonDurableSend() { - return blockOnNonDurableSend; + return config.blockOnNonDurableSend; } @Override public ServerLocatorImpl setBlockOnNonDurableSend(final boolean blockOnNonDurableSend) { checkWrite(); - this.blockOnNonDurableSend = blockOnNonDurableSend; + this.config.blockOnNonDurableSend = blockOnNonDurableSend; return this; } @Override public boolean isAutoGroup() { - return autoGroup; + return config.autoGroup; } @Override public ServerLocatorImpl setAutoGroup(final boolean autoGroup) { checkWrite(); - this.autoGroup = autoGroup; + this.config.autoGroup = autoGroup; return this; } @Override public boolean isPreAcknowledge() { - return preAcknowledge; + return config.preAcknowledge; } @Override public ServerLocatorImpl setPreAcknowledge(final boolean preAcknowledge) { checkWrite(); - this.preAcknowledge = preAcknowledge; + this.config.preAcknowledge = preAcknowledge; return this; } @Override public int getAckBatchSize() { - return ackBatchSize; + return config.ackBatchSize; } @Override public ServerLocatorImpl setAckBatchSize(final int ackBatchSize) { checkWrite(); - this.ackBatchSize = ackBatchSize; + this.config.ackBatchSize = ackBatchSize; return this; } @Override public boolean isUseGlobalPools() { - return useGlobalPools; + return config.useGlobalPools; } @Override public ServerLocatorImpl setUseGlobalPools(final boolean useGlobalPools) { checkWrite(); - this.useGlobalPools = useGlobalPools; + this.config.useGlobalPools = useGlobalPools; return this; } @Override public int getScheduledThreadPoolMaxSize() { - return scheduledThreadPoolMaxSize; + return config.scheduledThreadPoolMaxSize; } @Override public ServerLocatorImpl setScheduledThreadPoolMaxSize(final int scheduledThreadPoolMaxSize) { checkWrite(); - this.scheduledThreadPoolMaxSize = scheduledThreadPoolMaxSize; + this.config.scheduledThreadPoolMaxSize = scheduledThreadPoolMaxSize; return this; } @Override public int getThreadPoolMaxSize() { - return threadPoolMaxSize; + return config.threadPoolMaxSize; } @Override public ServerLocatorImpl setThreadPoolMaxSize(final int threadPoolMaxSize) { checkWrite(); - this.threadPoolMaxSize = threadPoolMaxSize; + this.config.threadPoolMaxSize = threadPoolMaxSize; return this; } @Override public long getRetryInterval() { - return retryInterval; + return config.retryInterval; } @Override public ServerLocatorImpl setRetryInterval(final long retryInterval) { checkWrite(); - this.retryInterval = retryInterval; + this.config.retryInterval = retryInterval; return this; } @Override public long getMaxRetryInterval() { - return maxRetryInterval; + return config.maxRetryInterval; } @Override public ServerLocatorImpl setMaxRetryInterval(final long retryInterval) { checkWrite(); - maxRetryInterval = retryInterval; + this.config.maxRetryInterval = retryInterval; return this; } @Override public double getRetryIntervalMultiplier() { - return retryIntervalMultiplier; + return config.retryIntervalMultiplier; } @Override public ServerLocatorImpl setRetryIntervalMultiplier(final double retryIntervalMultiplier) { checkWrite(); - this.retryIntervalMultiplier = retryIntervalMultiplier; + this.config.retryIntervalMultiplier = retryIntervalMultiplier; return this; } @Override public int getReconnectAttempts() { - return reconnectAttempts; + return config.reconnectAttempts; } @Override public ServerLocatorImpl setReconnectAttempts(final int reconnectAttempts) { checkWrite(); - this.reconnectAttempts = reconnectAttempts; + this.config.reconnectAttempts = reconnectAttempts; return this; } @Override public ServerLocatorImpl setInitialConnectAttempts(int initialConnectAttempts) { checkWrite(); - this.initialConnectAttempts = initialConnectAttempts; + this.config.initialConnectAttempts = initialConnectAttempts; return this; } @Override public int getInitialConnectAttempts() { - return initialConnectAttempts; + return config.initialConnectAttempts; } @Override public boolean isFailoverOnInitialConnection() { - return this.failoverOnInitialConnection; + return this.config.failoverOnInitialConnection; } @Override public ServerLocatorImpl setFailoverOnInitialConnection(final boolean failover) { checkWrite(); - this.failoverOnInitialConnection = failover; + this.config.failoverOnInitialConnection = failover; return this; } @Override public String getConnectionLoadBalancingPolicyClassName() { - return connectionLoadBalancingPolicyClassName; + return config.connectionLoadBalancingPolicyClassName; } @Override public ServerLocatorImpl setConnectionLoadBalancingPolicyClassName(final String loadBalancingPolicyClassName) { checkWrite(); - connectionLoadBalancingPolicyClassName = loadBalancingPolicyClassName; + config.connectionLoadBalancingPolicyClassName = loadBalancingPolicyClassName; return this; } @@ -1243,13 +1107,13 @@ public boolean removeOutgoingInterceptor(final Interceptor interceptor) { @Override public int getInitialMessagePacketSize() { - return initialMessagePacketSize; + return config.initialMessagePacketSize; } @Override public ServerLocatorImpl setInitialMessagePacketSize(final int size) { checkWrite(); - initialMessagePacketSize = size; + config.initialMessagePacketSize = size; return this; } @@ -1267,12 +1131,12 @@ public String getGroupID() { @Override public boolean isCompressLargeMessage() { - return compressLargeMessage; + return config.compressLargeMessage; } @Override public ServerLocatorImpl setCompressLargeMessage(boolean avoid) { - this.compressLargeMessage = avoid; + this.config.compressLargeMessage = avoid; return this; } @@ -1618,13 +1482,13 @@ public void factoryClosed(final ClientSessionFactory factory) { @Override public ServerLocator setUseTopologyForLoadBalancing(boolean useTopologyForLoadBalancing) { - this.useTopologyForLoadBalancing = useTopologyForLoadBalancing; + this.config.useTopologyForLoadBalancing = useTopologyForLoadBalancing; return this; } @Override public boolean getUseTopologyForLoadBalancing() { - return useTopologyForLoadBalancing; + return config.useTopologyForLoadBalancing; } @Override @@ -1746,11 +1610,11 @@ public String toString() { } } - if (initialConnectAttempts >= 0 && retryNumber > initialConnectAttempts) { + if (config.initialConnectAttempts >= 0 && retryNumber > config.initialConnectAttempts) { break; } - if (latch.await(retryInterval, TimeUnit.MILLISECONDS)) + if (latch.await(config.retryInterval, TimeUnit.MILLISECONDS)) return null; } @@ -1785,7 +1649,7 @@ private synchronized void createConnectors() { connectors = new ArrayList<>(); if (initialConnectors != null) { for (TransportConfiguration initialConnector : initialConnectors) { - ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(ServerLocatorImpl.this, initialConnector, callTimeout, callFailoverTimeout, clientFailureCheckPeriod, connectionTTL, retryInterval, retryIntervalMultiplier, maxRetryInterval, reconnectAttempts, threadPool, scheduledThreadPool, incomingInterceptors, outgoingInterceptors); + ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(ServerLocatorImpl.this, initialConnector, config, config.reconnectAttempts, threadPool, scheduledThreadPool, incomingInterceptors, outgoingInterceptors); factory.disableFinalizeCheck(); diff --git a/artemis-service-extensions/src/main/java/org/apache/activemq/artemis/service/extensions/xa/recovery/XARecoveryConfig.java b/artemis-service-extensions/src/main/java/org/apache/activemq/artemis/service/extensions/xa/recovery/XARecoveryConfig.java index f9826e0210..f2ae7edbdd 100644 --- a/artemis-service-extensions/src/main/java/org/apache/activemq/artemis/service/extensions/xa/recovery/XARecoveryConfig.java +++ b/artemis-service-extensions/src/main/java/org/apache/activemq/artemis/service/extensions/xa/recovery/XARecoveryConfig.java @@ -21,6 +21,7 @@ import java.util.HashMap; import java.util.Map; +import org.apache.activemq.artemis.api.config.ServerLocatorConfig; import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; @@ -44,36 +45,7 @@ public class XARecoveryConfig { private final String password; private final Map properties; private final ClientProtocolManagerFactory clientProtocolManager; - - // ServerLocator properties - private Long callFailoverTimeout; - private Long callTimeout; - private Long clientFailureCheckPeriod; - private Integer confirmationWindowSize; - private String connectionLoadBalancingPolicyClassName; - private Long connectionTTL; - private Integer consumerMaxRate; - private Integer consumerWindowSize; - private Integer initialConnectAttempts; - private Integer producerMaxRate; - private Integer producerWindowSize; - private Integer minLargeMessageSize; - private Long retryInterval; - private Double retryIntervalMultiplier; - private Long maxRetryInterval; - private Integer reconnectAttempts; - private Integer initialMessagePacketSize; - private Integer scheduledThreadPoolMaxSize; - private Integer threadPoolMaxSize; - private boolean autoGroup; - private boolean blockOnAcknowledge; - private boolean blockOnNonDurableSend; - private boolean blockOnDurableSend; - private boolean preAcknowledge; - private boolean useGlobalPools; - private boolean cacheLargeMessagesClient; - private boolean compressLargeMessage; - private boolean failoverOnInitialConnection; + private ServerLocatorConfig locatorConfig; public static XARecoveryConfig newConfig(ActiveMQConnectionFactory factory, String userName, @@ -164,8 +136,7 @@ private XARecoveryConfig(ServerLocator serverLocator, this.ha = serverLocator.isHA(); this.properties = properties == null ? Collections.unmodifiableMap(new HashMap()) : Collections.unmodifiableMap(properties); this.clientProtocolManager = clientProtocolManager; - - readLocatorProperties(serverLocator); + this.locatorConfig = serverLocator.getLocatorConfig(); } public boolean isHA() { @@ -209,82 +180,13 @@ public ServerLocator createServerLocator() { serverLocator = ActiveMQClient.createServerLocator(isHA(), getTransportConfig()).setProtocolManagerFactory(clientProtocolManager); } - writeLocatorProperties(serverLocator); + if (this.locatorConfig != null) { + serverLocator.setLocatorConfig(new ServerLocatorConfig(this.locatorConfig)); + } return serverLocator; } - private void writeLocatorProperties(ServerLocator serverLocator) { - serverLocator.setAutoGroup(this.autoGroup); - serverLocator.setBlockOnAcknowledge(this.blockOnAcknowledge); - serverLocator.setBlockOnNonDurableSend(this.blockOnNonDurableSend); - serverLocator.setBlockOnDurableSend(this.blockOnDurableSend); - serverLocator.setPreAcknowledge(this.preAcknowledge); - serverLocator.setUseGlobalPools(this.useGlobalPools); - serverLocator.setCacheLargeMessagesClient(this.cacheLargeMessagesClient); - serverLocator.setCompressLargeMessage(this.compressLargeMessage); - serverLocator.setFailoverOnInitialConnection(this.failoverOnInitialConnection); - - serverLocator.setConsumerMaxRate(this.consumerMaxRate); - serverLocator.setConsumerWindowSize(this.consumerWindowSize); - serverLocator.setMinLargeMessageSize(this.minLargeMessageSize); - serverLocator.setProducerMaxRate(this.producerMaxRate); - serverLocator.setProducerWindowSize(this.producerWindowSize); - serverLocator.setConfirmationWindowSize(this.confirmationWindowSize); - serverLocator.setReconnectAttempts(this.reconnectAttempts); - serverLocator.setThreadPoolMaxSize(this.threadPoolMaxSize); - serverLocator.setScheduledThreadPoolMaxSize(this.scheduledThreadPoolMaxSize); - serverLocator.setInitialConnectAttempts(this.initialConnectAttempts); - serverLocator.setInitialMessagePacketSize(this.initialMessagePacketSize); - - serverLocator.setClientFailureCheckPeriod(this.clientFailureCheckPeriod); - serverLocator.setCallTimeout(this.callTimeout); - serverLocator.setCallFailoverTimeout(this.callFailoverTimeout); - serverLocator.setConnectionTTL(this.connectionTTL); - serverLocator.setRetryInterval(this.retryInterval); - serverLocator.setMaxRetryInterval(this.maxRetryInterval); - - serverLocator.setRetryIntervalMultiplier(this.retryIntervalMultiplier); - - serverLocator.setConnectionLoadBalancingPolicyClassName(this.connectionLoadBalancingPolicyClassName); -} - - private void readLocatorProperties(ServerLocator locator) { - - this.autoGroup = locator.isAutoGroup(); - this.blockOnAcknowledge = locator.isBlockOnAcknowledge(); - this.blockOnNonDurableSend = locator.isBlockOnNonDurableSend(); - this.blockOnDurableSend = locator.isBlockOnDurableSend(); - this.preAcknowledge = locator.isPreAcknowledge(); - this.useGlobalPools = locator.isUseGlobalPools(); - this.cacheLargeMessagesClient = locator.isCacheLargeMessagesClient(); - this.compressLargeMessage = locator.isCompressLargeMessage(); - this.failoverOnInitialConnection = locator.isFailoverOnInitialConnection(); - - this.consumerMaxRate = locator.getConsumerMaxRate(); - this.consumerWindowSize = locator.getConsumerWindowSize(); - this.minLargeMessageSize = locator.getMinLargeMessageSize(); - this.producerMaxRate = locator.getProducerMaxRate(); - this.producerWindowSize = locator.getProducerWindowSize(); - this.confirmationWindowSize = locator.getConfirmationWindowSize(); - this.reconnectAttempts = locator.getReconnectAttempts(); - this.threadPoolMaxSize = locator.getThreadPoolMaxSize(); - this.scheduledThreadPoolMaxSize = locator.getScheduledThreadPoolMaxSize(); - this.initialConnectAttempts = locator.getInitialConnectAttempts(); - this.initialMessagePacketSize = locator.getInitialMessagePacketSize(); - - this.clientFailureCheckPeriod = locator.getClientFailureCheckPeriod(); - this.callTimeout = locator.getCallTimeout(); - this.callFailoverTimeout = locator.getCallFailoverTimeout(); - this.connectionTTL = locator.getConnectionTTL(); - this.retryInterval = locator.getRetryInterval(); - this.maxRetryInterval = locator.getMaxRetryInterval(); - - this.retryIntervalMultiplier = locator.getRetryIntervalMultiplier(); - - this.connectionLoadBalancingPolicyClassName = locator.getConnectionLoadBalancingPolicyClassName(); - } - @Override public int hashCode() { final int prime = 31;