Skip to content

Commit

Permalink
[WFLY-5709] Fix Messaging HA configuration
Browse files Browse the repository at this point in the history
* the shared-store-colocated configuration creates replication
  live/backup configuration instead of shared-store ones.
* live and backup configuration must always be created for colocated
  configurations.
* remove failback-delay attribute that is ignored in Artemis since
  1.0.0.wildfly-008
* add attribute initial-replication-sync-timeout to specify
  deterministically how long to wait for the initial replication.

JIRA: https://issues.jboss.org/browse/WFLY-5709
  • Loading branch information
jmesnil committed Nov 19, 2015
1 parent dcf554c commit a5fd1ad
Show file tree
Hide file tree
Showing 14 changed files with 54 additions and 62 deletions.
Expand Up @@ -102,6 +102,7 @@
import org.jboss.as.controller.registry.ManagementResourceRegistration;
import org.jboss.as.controller.registry.OperationEntry;
import org.jboss.as.controller.registry.Resource;
import org.jboss.as.messaging.logging.MessagingLogger;
import org.jboss.dmr.ModelNode;
import org.jboss.dmr.ModelType;
import org.jboss.dmr.Property;
Expand Down Expand Up @@ -629,20 +630,20 @@ private void migrateHAPolicy(ModelNode serverAddOperation, Map<PathAddress, Mode
if (backup) {
haPolicyAddress = serverAddress.append(HA_POLICY, "shared-store-slave");
setAndDiscard(haPolicyAddOperation, serverAddOperation, ALLOW_FAILBACK, "allow-failback");
setAndDiscard(haPolicyAddOperation, serverAddOperation, FAILBACK_DELAY, "failback-delay");
setAndDiscard(haPolicyAddOperation, serverAddOperation, FAILOVER_ON_SHUTDOWN, "failover-on-server-shutdown");
discardUnsupportedAttribute(haPolicyAddOperation, FAILBACK_DELAY, warnings);
} else {
haPolicyAddress = serverAddress.append(HA_POLICY, "shared-store-master");
setAndDiscard(haPolicyAddOperation, serverAddOperation, FAILBACK_DELAY, "failback-delay");
setAndDiscard(haPolicyAddOperation, serverAddOperation, FAILOVER_ON_SHUTDOWN, "failover-on-server-shutdown");
discardUnsupportedAttribute(haPolicyAddOperation, FAILBACK_DELAY, warnings);
}
} else {
if (backup) {
haPolicyAddress = serverAddress.append(HA_POLICY, "replication-slave");
setAndDiscard(haPolicyAddOperation, serverAddOperation, ALLOW_FAILBACK, "allow-failback");
setAndDiscard(haPolicyAddOperation, serverAddOperation, FAILBACK_DELAY, "failback-delay");
setAndDiscard(haPolicyAddOperation, serverAddOperation, MAX_SAVED_REPLICATED_JOURNAL_SIZE, "max-saved-replicated-journal-size");
setAndDiscard(haPolicyAddOperation, serverAddOperation, BACKUP_GROUP_NAME, "group-name");
discardUnsupportedAttribute(haPolicyAddOperation, FAILBACK_DELAY, warnings);
} else {
haPolicyAddress = serverAddress.append(HA_POLICY, "replication-master");
setAndDiscard(haPolicyAddOperation, serverAddOperation, CHECK_FOR_LIVE_SERVER, "check-for-live-server");
Expand Down Expand Up @@ -672,6 +673,12 @@ private void setAndDiscard(ModelNode setNode, ModelNode discardNode, AttributeDe
setNode.get(newAttributeName).set(attribute);
discardNode.remove(legacyAttributeDefinition.getName());
}
}

private void discardUnsupportedAttribute(ModelNode newAddOp, AttributeDefinition legacyAttributeDefinition, List<String> warnings) {
if (newAddOp.hasDefined(legacyAttributeDefinition.getName())) {
newAddOp.remove(legacyAttributeDefinition.getName());
warnings.add(MessagingLogger.ROOT_LOGGER.couldNotMigrateUnsupportedAttribute(legacyAttributeDefinition.getName(), pathAddress(newAddOp.get(OP_ADDR))));
}
}
}
Expand Up @@ -823,4 +823,7 @@ public interface MessagingLogger extends BasicLogger {

@Message(id = 86, value = "Could not migrate attribute %s from resource %s. The attribute uses an expression that can be resolved differently depending on system properties. To be able to migrate this property, replace the expression by an actual value.")
String couldNotMigrateResourceAttributeWithExpression(String attribute, PathAddress address);

@Message(id = 87, value = "Could not migrate attribute %s from resource %s. This attribute is no longer supported by the new messaging-activemq subsystem.")
String couldNotMigrateUnsupportedAttribute(String attribute, PathAddress address);
}
Expand Up @@ -98,19 +98,18 @@ public void testMigrateHA() throws Exception {

ModelNode haPolicyForDefaultServer = model.get(SUBSYSTEM, MESSAGING_ACTIVEMQ_SUBSYSTEM_NAME, "server", "default", "ha-policy", "shared-store-master");
assertTrue(haPolicyForDefaultServer.isDefined());
// default values
assertEquals(5000, haPolicyForDefaultServer.get("failback-delay").asLong());
assertFalse(haPolicyForDefaultServer.get("failback-delay").isDefined());
assertEquals(false, haPolicyForDefaultServer.get("failover-on-server-shutdown").asBoolean());

ModelNode haPolicyForSharedStoreMasterServer = model.get(SUBSYSTEM, MESSAGING_ACTIVEMQ_SUBSYSTEM_NAME, "server", "shared-store-master", "ha-policy", "shared-store-master");
assertTrue(haPolicyForSharedStoreMasterServer.isDefined());
assertEquals("${failback.delay:9876}", haPolicyForSharedStoreMasterServer.get("failback-delay").asString());
assertFalse(haPolicyForSharedStoreMasterServer.get("failback-delay").isDefined());
assertEquals("${failover.on.shutdown:true}", haPolicyForSharedStoreMasterServer.get("failover-on-server-shutdown").asString());

ModelNode haPolicyForSharedStoreSlaveServer = model.get(SUBSYSTEM, MESSAGING_ACTIVEMQ_SUBSYSTEM_NAME, "server", "shared-store-slave", "ha-policy", "shared-store-slave");
assertTrue(haPolicyForSharedStoreSlaveServer.isDefined());
assertEquals("${allow.failback.1:false}", haPolicyForSharedStoreSlaveServer.get("allow-failback").asString());
assertEquals("${failback.delay.1:1234}", haPolicyForSharedStoreSlaveServer.get("failback-delay").asString());
assertFalse(haPolicyForSharedStoreSlaveServer.get("failback-delay").isDefined());
assertEquals("${failover.on.shutdown.1:true}", haPolicyForSharedStoreSlaveServer.get("failover-on-server-shutdown").asString());

ModelNode haPolicyForReplicationMasterServer = model.get(SUBSYSTEM, MESSAGING_ACTIVEMQ_SUBSYSTEM_NAME, "server", "replication-master", "ha-policy", "replication-master");
Expand All @@ -121,7 +120,7 @@ public void testMigrateHA() throws Exception {
ModelNode haPolicyForReplicationSlaveServer = model.get(SUBSYSTEM, MESSAGING_ACTIVEMQ_SUBSYSTEM_NAME, "server", "replication-slave", "ha-policy", "replication-slave");
assertTrue(haPolicyForReplicationSlaveServer.isDefined());
assertEquals("${allow.failback.2:false}", haPolicyForReplicationSlaveServer.get("allow-failback").asString());
assertEquals("${failback.delay.2:1234}", haPolicyForReplicationSlaveServer.get("failback-delay").asString());
assertFalse(haPolicyForReplicationSlaveServer.get("failback-delay").isDefined());
assertEquals("${max.saved.replicated.journal.size:2}", haPolicyForReplicationSlaveServer.get("max-saved-replicated-journal-size").asString());
assertEquals("${replication.master.group.name:mygroup2}", haPolicyForReplicationSlaveServer.get("group-name").asString());
}
Expand Down
Expand Up @@ -145,14 +145,15 @@ public class MessagingSubsystemParser_1_0 implements XMLStreamConstants, XMLElem
.addAttributes(
HAAttributes.CLUSTER_NAME,
HAAttributes.GROUP_NAME,
HAAttributes.CHECK_FOR_LIVE_SERVER))
HAAttributes.CHECK_FOR_LIVE_SERVER,
HAAttributes.INITIAL_REPLICATION_SYNC_TIMEOUT))
.addChild(
builder(ReplicationSlaveDefinition.INSTANCE)
.addAttributes(
HAAttributes.CLUSTER_NAME,
HAAttributes.GROUP_NAME,
HAAttributes.ALLOW_FAILBACK,
HAAttributes.FAILBACK_DELAY,
HAAttributes.INITIAL_REPLICATION_SYNC_TIMEOUT,
HAAttributes.MAX_SAVED_REPLICATED_JOURNAL_SIZE,
HAAttributes.RESTART_BACKUP,
ScaleDownAttributes.SCALE_DOWN,
Expand All @@ -174,14 +175,15 @@ public class MessagingSubsystemParser_1_0 implements XMLStreamConstants, XMLElem
.addAttributes(
HAAttributes.CLUSTER_NAME,
HAAttributes.GROUP_NAME,
HAAttributes.CHECK_FOR_LIVE_SERVER))
HAAttributes.CHECK_FOR_LIVE_SERVER,
HAAttributes.INITIAL_REPLICATION_SYNC_TIMEOUT))
.addChild(
builder(ReplicationSlaveDefinition.CONFIGURATION_INSTANCE)
.addAttributes(
HAAttributes.CLUSTER_NAME,
HAAttributes.GROUP_NAME,
HAAttributes.ALLOW_FAILBACK,
HAAttributes.FAILBACK_DELAY,
HAAttributes.INITIAL_REPLICATION_SYNC_TIMEOUT,
HAAttributes.MAX_SAVED_REPLICATED_JOURNAL_SIZE,
HAAttributes.RESTART_BACKUP,
ScaleDownAttributes.SCALE_DOWN,
Expand All @@ -192,13 +194,11 @@ public class MessagingSubsystemParser_1_0 implements XMLStreamConstants, XMLElem
.addChild(
builder(SharedStoreMasterDefinition.INSTANCE)
.addAttributes(
HAAttributes.FAILBACK_DELAY,
HAAttributes.FAILOVER_ON_SERVER_SHUTDOWN))
.addChild(
builder(SharedStoreSlaveDefinition.INSTANCE)
.addAttributes(
HAAttributes.ALLOW_FAILBACK,
HAAttributes.FAILBACK_DELAY,
HAAttributes.FAILOVER_ON_SERVER_SHUTDOWN,
HAAttributes.RESTART_BACKUP,
ScaleDownAttributes.SCALE_DOWN,
Expand All @@ -217,13 +217,11 @@ public class MessagingSubsystemParser_1_0 implements XMLStreamConstants, XMLElem
.addChild(
builder(SharedStoreMasterDefinition.CONFIGURATION_INSTANCE)
.addAttributes(
HAAttributes.FAILBACK_DELAY,
HAAttributes.FAILOVER_ON_SERVER_SHUTDOWN))
.addChild(
builder(SharedStoreSlaveDefinition.CONFIGURATION_INSTANCE)
.addAttributes(
HAAttributes.ALLOW_FAILBACK,
HAAttributes.FAILBACK_DELAY,
HAAttributes.FAILOVER_ON_SERVER_SHUTDOWN,
HAAttributes.RESTART_BACKUP,
ScaleDownAttributes.SCALE_DOWN,
Expand Down
Expand Up @@ -93,22 +93,22 @@ public class HAAttributes {
.setRestartAllServices()
.build();

public static final SimpleAttributeDefinition FAILBACK_DELAY = create("failback-delay", LONG)
.setDefaultValue(new ModelNode(ActiveMQDefaultConfiguration.getDefaultFailbackDelay()))
.setMeasurementUnit(MILLISECONDS)
public static final SimpleAttributeDefinition FAILOVER_ON_SERVER_SHUTDOWN = create("failover-on-server-shutdown", ModelType.BOOLEAN)
.setDefaultValue(new ModelNode(ActiveMQDefaultConfiguration.isDefaultFailoverOnServerShutdown()))
.setAllowNull(true)
.setAllowExpression(true)
.setRestartAllServices()
.build();

public static final SimpleAttributeDefinition FAILOVER_ON_SERVER_SHUTDOWN = create("failover-on-server-shutdown", ModelType.BOOLEAN)
.setDefaultValue(new ModelNode(ActiveMQDefaultConfiguration.isDefaultFailoverOnServerShutdown()))
public static SimpleAttributeDefinition GROUP_NAME = SimpleAttributeDefinitionBuilder.create(CommonAttributes.GROUP_NAME, STRING)
.setAllowNull(true)
.setAllowExpression(true)
.setRestartAllServices()
.build();

public static SimpleAttributeDefinition GROUP_NAME = SimpleAttributeDefinitionBuilder.create(CommonAttributes.GROUP_NAME, STRING)
public static final SimpleAttributeDefinition INITIAL_REPLICATION_SYNC_TIMEOUT = create("initial-replication-sync-timeout", LONG)
.setDefaultValue(new ModelNode(ActiveMQDefaultConfiguration.getDefaultInitialReplicationSyncTimeout()))
.setMeasurementUnit(MILLISECONDS)
.setAllowNull(true)
.setAllowExpression(true)
.setRestartAllServices()
Expand Down
Expand Up @@ -114,16 +114,12 @@ static HAPolicyConfiguration buildConfiguration(OperationContext context, ModelN
}

ModelNode masterConfigurationModel = model.get(CONFIGURATION, MASTER);
if (masterConfigurationModel.isDefined()) {
HAPolicyConfiguration masterConfiguration = ReplicationMasterDefinition.buildConfiguration(context, masterConfigurationModel);
haPolicyConfiguration.setLiveConfig(masterConfiguration);
}
HAPolicyConfiguration masterConfiguration = ReplicationMasterDefinition.buildConfiguration(context, masterConfigurationModel);
haPolicyConfiguration.setLiveConfig(masterConfiguration);

ModelNode slaveConfigurationModel = model.get(CONFIGURATION, SLAVE);
if (slaveConfigurationModel.isDefined()) {
HAPolicyConfiguration slaveConfiguration = ReplicationSlaveDefinition.buildConfiguration(context, slaveConfigurationModel);
haPolicyConfiguration.setBackupConfig(slaveConfiguration);
}
HAPolicyConfiguration slaveConfiguration = ReplicationSlaveDefinition.buildConfiguration(context, slaveConfigurationModel);
haPolicyConfiguration.setBackupConfig(slaveConfiguration);

return haPolicyConfiguration;
}
Expand Down
Expand Up @@ -27,6 +27,7 @@
import static org.wildfly.extension.messaging.activemq.ha.HAAttributes.CHECK_FOR_LIVE_SERVER;
import static org.wildfly.extension.messaging.activemq.ha.HAAttributes.CLUSTER_NAME;
import static org.wildfly.extension.messaging.activemq.ha.HAAttributes.GROUP_NAME;
import static org.wildfly.extension.messaging.activemq.ha.HAAttributes.INITIAL_REPLICATION_SYNC_TIMEOUT;
import static org.wildfly.extension.messaging.activemq.ha.ManagementHelper.createAddOperation;

import java.util.Arrays;
Expand Down Expand Up @@ -55,7 +56,8 @@ public class ReplicationMasterDefinition extends PersistentResourceDefinition {
public static Collection<AttributeDefinition> ATTRIBUTES = Collections.unmodifiableList(Arrays.asList(
(AttributeDefinition) CLUSTER_NAME,
GROUP_NAME,
CHECK_FOR_LIVE_SERVER
CHECK_FOR_LIVE_SERVER,
INITIAL_REPLICATION_SYNC_TIMEOUT
));

public static final ReplicationMasterDefinition INSTANCE = new ReplicationMasterDefinition(MessagingExtension.REPLICATION_MASTER_PATH, false);
Expand Down Expand Up @@ -84,7 +86,8 @@ public Collection<AttributeDefinition> getAttributes() {
static HAPolicyConfiguration buildConfiguration(OperationContext context, ModelNode model) throws OperationFailedException {
ReplicatedPolicyConfiguration haPolicyConfiguration = new ReplicatedPolicyConfiguration();

haPolicyConfiguration.setCheckForLiveServer(CHECK_FOR_LIVE_SERVER.resolveModelAttribute(context, model).asBoolean());
haPolicyConfiguration.setCheckForLiveServer(CHECK_FOR_LIVE_SERVER.resolveModelAttribute(context, model).asBoolean())
.setInitialReplicationSyncTimeout(INITIAL_REPLICATION_SYNC_TIMEOUT.resolveModelAttribute(context, model).asLong());

ModelNode clusterName = CLUSTER_NAME.resolveModelAttribute(context, model);
if (clusterName.isDefined()) {
Expand Down
Expand Up @@ -25,8 +25,8 @@
import static org.wildfly.extension.messaging.activemq.CommonAttributes.HA_POLICY;
import static org.wildfly.extension.messaging.activemq.ha.HAAttributes.ALLOW_FAILBACK;
import static org.wildfly.extension.messaging.activemq.ha.HAAttributes.CLUSTER_NAME;
import static org.wildfly.extension.messaging.activemq.ha.HAAttributes.FAILBACK_DELAY;
import static org.wildfly.extension.messaging.activemq.ha.HAAttributes.GROUP_NAME;
import static org.wildfly.extension.messaging.activemq.ha.HAAttributes.INITIAL_REPLICATION_SYNC_TIMEOUT;
import static org.wildfly.extension.messaging.activemq.ha.HAAttributes.MAX_SAVED_REPLICATED_JOURNAL_SIZE;
import static org.wildfly.extension.messaging.activemq.ha.HAAttributes.RESTART_BACKUP;
import static org.wildfly.extension.messaging.activemq.ha.ManagementHelper.createAddOperation;
Expand Down Expand Up @@ -62,7 +62,7 @@ public class ReplicationSlaveDefinition extends PersistentResourceDefinition {
attributes.add(CLUSTER_NAME);
attributes.add(GROUP_NAME);
attributes.add(ALLOW_FAILBACK);
attributes.add(FAILBACK_DELAY);
attributes.add(INITIAL_REPLICATION_SYNC_TIMEOUT);
attributes.add(MAX_SAVED_REPLICATED_JOURNAL_SIZE);
attributes.add(RESTART_BACKUP);

Expand Down Expand Up @@ -97,7 +97,7 @@ public Collection<AttributeDefinition> getAttributes() {
static HAPolicyConfiguration buildConfiguration(OperationContext context, ModelNode model) throws OperationFailedException {
ReplicaPolicyConfiguration haPolicyConfiguration = new ReplicaPolicyConfiguration()
.setAllowFailBack(ALLOW_FAILBACK.resolveModelAttribute(context, model).asBoolean())
.setFailbackDelay(FAILBACK_DELAY.resolveModelAttribute(context, model).asLong())
.setInitialReplicationSyncTimeout(INITIAL_REPLICATION_SYNC_TIMEOUT.resolveModelAttribute(context, model).asLong())
.setMaxSavedReplicatedJournalsSize(MAX_SAVED_REPLICATED_JOURNAL_SIZE.resolveModelAttribute(context, model).asInt())
.setScaleDownConfiguration(addScaleDownConfiguration(context, model))
.setRestartBackup(RESTART_BACKUP.resolveModelAttribute(context, model).asBoolean());
Expand Down

0 comments on commit a5fd1ad

Please sign in to comment.