Skip to content

Commit

Permalink
[ARTEMIS-2176] RA connection properties are not propagated to XARecov…
Browse files Browse the repository at this point in the history
…eryConfig

(cherry picked from commit eb41be7)

downstream: ENTMQBR-2706
  • Loading branch information
spyrkob authored and brusdev committed Mar 5, 2020
1 parent 40d8b89 commit c97c51b
Show file tree
Hide file tree
Showing 2 changed files with 173 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,41 @@ public class XARecoveryConfig {
private final Map<String, String> properties;
private final ClientProtocolManagerFactory clientProtocolManager;

// ServerLocator properties
private Long callFailoverTimeout;
private Long callTimeout;
private Long clientFailureCheckPeriod;
private Integer confirmationWindowSize;
private String connectionLoadBalancingPolicyClassName;
private Long connectionTTL;
private Integer consumerMaxRate;
private Integer consumerWindowSize;
private Integer initialConnectAttempts;
private Integer producerMaxRate;
private Integer producerWindowSize;
private Integer minLargeMessageSize;
private Long retryInterval;
private Double retryIntervalMultiplier;
private Long maxRetryInterval;
private Integer reconnectAttempts;
private Integer initialMessagePacketSize;
private Integer scheduledThreadPoolMaxSize;
private Integer threadPoolMaxSize;
private boolean autoGroup;
private boolean blockOnAcknowledge;
private boolean blockOnNonDurableSend;
private boolean blockOnDurableSend;
private boolean preAcknowledge;
private boolean useGlobalPools;
private boolean cacheLargeMessagesClient;
private boolean compressLargeMessage;
private boolean failoverOnInitialConnection;

public static XARecoveryConfig newConfig(ActiveMQConnectionFactory factory,
String userName,
String password,
Map<String, String> properties) {
if (factory.getServerLocator().getDiscoveryGroupConfiguration() != null) {
return new XARecoveryConfig(factory.getServerLocator().isHA(), factory.getServerLocator().getDiscoveryGroupConfiguration(), userName, password, properties, factory.getServerLocator().getProtocolManagerFactory());
} else {
return new XARecoveryConfig(factory.getServerLocator().isHA(), factory.getServerLocator().getStaticTransportConfigurations(), userName, password, properties, factory.getServerLocator().getProtocolManagerFactory());
}

return new XARecoveryConfig(factory.getServerLocator(), userName, password, properties);
}

public XARecoveryConfig(final boolean ha,
Expand Down Expand Up @@ -112,6 +137,37 @@ public XARecoveryConfig(final boolean ha,
this(ha, discoveryConfiguration, username, password, properties, null);
}

private XARecoveryConfig(ServerLocator serverLocator,
String username,
String password,
Map<String, String> properties) {
ClientProtocolManagerFactory clientProtocolManager = serverLocator.getProtocolManagerFactory();
if (serverLocator.getDiscoveryGroupConfiguration() != null) {
this.discoveryConfiguration = serverLocator.getDiscoveryGroupConfiguration();
this.transportConfiguration = null;
} else {
TransportConfiguration[] transportConfiguration = serverLocator.getStaticTransportConfigurations();
TransportConfiguration[] newTransportConfiguration = new TransportConfiguration[transportConfiguration.length];
for (int i = 0; i < transportConfiguration.length; i++) {
if (clientProtocolManager != null) {
newTransportConfiguration[i] = clientProtocolManager.adaptTransportConfiguration(transportConfiguration[i].newTransportConfig(""));
} else {
newTransportConfiguration[i] = transportConfiguration[i].newTransportConfig("");
}
}

this.transportConfiguration = newTransportConfiguration;
this.discoveryConfiguration = null;
}
this.username = username;
this.password = password;
this.ha = serverLocator.isHA();
this.properties = properties == null ? Collections.unmodifiableMap(new HashMap<String, String>()) : Collections.unmodifiableMap(properties);
this.clientProtocolManager = clientProtocolManager;

readLocatorProperties(serverLocator);
}

public boolean isHA() {
return ha;
}
Expand Down Expand Up @@ -146,12 +202,87 @@ public ClientProtocolManagerFactory getClientProtocolManager() {
* @return locator
*/
public ServerLocator createServerLocator() {
ServerLocator serverLocator;
if (getDiscoveryConfiguration() != null) {
return ActiveMQClient.createServerLocator(isHA(), getDiscoveryConfiguration()).setProtocolManagerFactory(clientProtocolManager);
serverLocator = ActiveMQClient.createServerLocator(isHA(), getDiscoveryConfiguration()).setProtocolManagerFactory(clientProtocolManager);
} else {
return ActiveMQClient.createServerLocator(isHA(), getTransportConfig()).setProtocolManagerFactory(clientProtocolManager);
serverLocator = ActiveMQClient.createServerLocator(isHA(), getTransportConfig()).setProtocolManagerFactory(clientProtocolManager);
}

writeLocatorProperties(serverLocator);

return serverLocator;
}

private void writeLocatorProperties(ServerLocator serverLocator) {
serverLocator.setAutoGroup(this.autoGroup);
serverLocator.setBlockOnAcknowledge(this.blockOnAcknowledge);
serverLocator.setBlockOnNonDurableSend(this.blockOnNonDurableSend);
serverLocator.setBlockOnDurableSend(this.blockOnDurableSend);
serverLocator.setPreAcknowledge(this.preAcknowledge);
serverLocator.setUseGlobalPools(this.useGlobalPools);
serverLocator.setCacheLargeMessagesClient(this.cacheLargeMessagesClient);
serverLocator.setCompressLargeMessage(this.compressLargeMessage);
serverLocator.setFailoverOnInitialConnection(this.failoverOnInitialConnection);

serverLocator.setConsumerMaxRate(this.consumerMaxRate);
serverLocator.setConsumerWindowSize(this.consumerWindowSize);
serverLocator.setMinLargeMessageSize(this.minLargeMessageSize);
serverLocator.setProducerMaxRate(this.producerMaxRate);
serverLocator.setProducerWindowSize(this.producerWindowSize);
serverLocator.setConfirmationWindowSize(this.confirmationWindowSize);
serverLocator.setReconnectAttempts(this.reconnectAttempts);
serverLocator.setThreadPoolMaxSize(this.threadPoolMaxSize);
serverLocator.setScheduledThreadPoolMaxSize(this.scheduledThreadPoolMaxSize);
serverLocator.setInitialConnectAttempts(this.initialConnectAttempts);
serverLocator.setInitialMessagePacketSize(this.initialMessagePacketSize);

serverLocator.setClientFailureCheckPeriod(this.clientFailureCheckPeriod);
serverLocator.setCallTimeout(this.callTimeout);
serverLocator.setCallFailoverTimeout(this.callFailoverTimeout);
serverLocator.setConnectionTTL(this.connectionTTL);
serverLocator.setRetryInterval(this.retryInterval);
serverLocator.setMaxRetryInterval(this.maxRetryInterval);

serverLocator.setRetryIntervalMultiplier(this.retryIntervalMultiplier);

serverLocator.setConnectionLoadBalancingPolicyClassName(this.connectionLoadBalancingPolicyClassName);
}

private void readLocatorProperties(ServerLocator locator) {

this.autoGroup = locator.isAutoGroup();
this.blockOnAcknowledge = locator.isBlockOnAcknowledge();
this.blockOnNonDurableSend = locator.isBlockOnNonDurableSend();
this.blockOnDurableSend = locator.isBlockOnDurableSend();
this.preAcknowledge = locator.isPreAcknowledge();
this.useGlobalPools = locator.isUseGlobalPools();
this.cacheLargeMessagesClient = locator.isCacheLargeMessagesClient();
this.compressLargeMessage = locator.isCompressLargeMessage();
this.failoverOnInitialConnection = locator.isFailoverOnInitialConnection();

this.consumerMaxRate = locator.getConsumerMaxRate();
this.consumerWindowSize = locator.getConsumerWindowSize();
this.minLargeMessageSize = locator.getMinLargeMessageSize();
this.producerMaxRate = locator.getProducerMaxRate();
this.producerWindowSize = locator.getProducerWindowSize();
this.confirmationWindowSize = locator.getConfirmationWindowSize();
this.reconnectAttempts = locator.getReconnectAttempts();
this.threadPoolMaxSize = locator.getThreadPoolMaxSize();
this.scheduledThreadPoolMaxSize = locator.getScheduledThreadPoolMaxSize();
this.initialConnectAttempts = locator.getInitialConnectAttempts();
this.initialMessagePacketSize = locator.getInitialMessagePacketSize();

this.clientFailureCheckPeriod = locator.getClientFailureCheckPeriod();
this.callTimeout = locator.getCallTimeout();
this.callFailoverTimeout = locator.getCallFailoverTimeout();
this.connectionTTL = locator.getConnectionTTL();
this.retryInterval = locator.getRetryInterval();
this.maxRetryInterval = locator.getMaxRetryInterval();

this.retryIntervalMultiplier = locator.getRetryIntervalMultiplier();

this.connectionLoadBalancingPolicyClassName = locator.getConnectionLoadBalancingPolicyClassName();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -797,6 +797,40 @@ public void testConnectionParameterStringParsing() throws Exception {
}
}

@Test
public void testConnectionFactoryPropertiesApplyToRecoveryConfig() throws Exception {
ServerLocator locator = createInVMNonHALocator();
ClientSessionFactory factory = locator.createSessionFactory();
ClientSession session = factory.createSession(false, false, false);
ActiveMQDestination queue = (ActiveMQDestination) ActiveMQJMSClient.createQueue("test");
session.createQueue(queue.getSimpleAddress(), queue.getSimpleAddress(), true);
session.close();

ActiveMQResourceAdapter ra = new ActiveMQResourceAdapter();

ra.setConnectorClassName(INVM_CONNECTOR_FACTORY);
ra.setUserName("userGlobal");
ra.setPassword("passwordGlobal");
ra.setConnectionTTL(100L);
ra.setCallFailoverTimeout(100L);
ra.start(new BootstrapContext());

Set<XARecoveryConfig> resources = ra.getRecoveryManager().getResources();
assertEquals(100L, ra.getDefaultActiveMQConnectionFactory().getServerLocator().getConnectionTTL());
assertEquals(100L, ra.getDefaultActiveMQConnectionFactory().getServerLocator().getCallFailoverTimeout());


for (XARecoveryConfig resource : resources) {
assertEquals(100L, resource.createServerLocator().getConnectionTTL());
assertEquals(100L, resource.createServerLocator().getCallFailoverTimeout());
}

ra.stop();
assertEquals(0, resources.size());
locator.close();

}

@Override
public boolean useSecurity() {
return false;
Expand Down

0 comments on commit c97c51b

Please sign in to comment.