Skip to content

Commit

Permalink
Merge pull request #8415 from jmesnil/WFLY-5709_HA_configuration_fix
Browse files Browse the repository at this point in the history
[WFLY-5709] Fix Messaging HA configuration
  • Loading branch information
n1hility committed Dec 4, 2015
2 parents 3e3a003 + a39814c commit 7f259ce
Show file tree
Hide file tree
Showing 14 changed files with 54 additions and 65 deletions.
Expand Up @@ -102,6 +102,7 @@
import org.jboss.as.controller.registry.ManagementResourceRegistration;
import org.jboss.as.controller.registry.OperationEntry;
import org.jboss.as.controller.registry.Resource;
import org.jboss.as.messaging.logging.MessagingLogger;
import org.jboss.dmr.ModelNode;
import org.jboss.dmr.ModelType;
import org.jboss.dmr.Property;
Expand Down Expand Up @@ -431,7 +432,7 @@ private void transformResources(OperationContext context, final ModelNode legacy
if (parameterIsAllowed(name, resourceType)) {
parentAddOp.get("params").add(new Property(name, value));
} else {
warnings.add(ROOT_LOGGER.couldNotMigrateIgnoredParameter(name, address.getParent()));
warnings.add(ROOT_LOGGER.couldNotMigrateUnsupportedAttribute(name, address.getParent()));
}
}
continue;
Expand Down Expand Up @@ -655,20 +656,20 @@ private void migrateHAPolicy(ModelNode serverAddOperation, Map<PathAddress, Mode
if (backup) {
haPolicyAddress = serverAddress.append(HA_POLICY, "shared-store-slave");
setAndDiscard(haPolicyAddOperation, serverAddOperation, ALLOW_FAILBACK, "allow-failback");
setAndDiscard(haPolicyAddOperation, serverAddOperation, FAILBACK_DELAY, "failback-delay");
setAndDiscard(haPolicyAddOperation, serverAddOperation, FAILOVER_ON_SHUTDOWN, "failover-on-server-shutdown");
discardUnsupportedAttribute(haPolicyAddOperation, FAILBACK_DELAY, warnings);
} else {
haPolicyAddress = serverAddress.append(HA_POLICY, "shared-store-master");
setAndDiscard(haPolicyAddOperation, serverAddOperation, FAILBACK_DELAY, "failback-delay");
setAndDiscard(haPolicyAddOperation, serverAddOperation, FAILOVER_ON_SHUTDOWN, "failover-on-server-shutdown");
discardUnsupportedAttribute(haPolicyAddOperation, FAILBACK_DELAY, warnings);
}
} else {
if (backup) {
haPolicyAddress = serverAddress.append(HA_POLICY, "replication-slave");
setAndDiscard(haPolicyAddOperation, serverAddOperation, ALLOW_FAILBACK, "allow-failback");
setAndDiscard(haPolicyAddOperation, serverAddOperation, FAILBACK_DELAY, "failback-delay");
setAndDiscard(haPolicyAddOperation, serverAddOperation, MAX_SAVED_REPLICATED_JOURNAL_SIZE, "max-saved-replicated-journal-size");
setAndDiscard(haPolicyAddOperation, serverAddOperation, BACKUP_GROUP_NAME, "group-name");
discardUnsupportedAttribute(haPolicyAddOperation, FAILBACK_DELAY, warnings);
} else {
haPolicyAddress = serverAddress.append(HA_POLICY, "replication-master");
setAndDiscard(haPolicyAddOperation, serverAddOperation, CHECK_FOR_LIVE_SERVER, "check-for-live-server");
Expand Down Expand Up @@ -698,6 +699,12 @@ private void setAndDiscard(ModelNode setNode, ModelNode discardNode, AttributeDe
setNode.get(newAttributeName).set(attribute);
discardNode.remove(legacyAttributeDefinition.getName());
}
}

private void discardUnsupportedAttribute(ModelNode newAddOp, AttributeDefinition legacyAttributeDefinition, List<String> warnings) {
if (newAddOp.hasDefined(legacyAttributeDefinition.getName())) {
newAddOp.remove(legacyAttributeDefinition.getName());
warnings.add(MessagingLogger.ROOT_LOGGER.couldNotMigrateUnsupportedAttribute(legacyAttributeDefinition.getName(), pathAddress(newAddOp.get(OP_ADDR))));
}
}
}
Expand Up @@ -824,6 +824,6 @@ public interface MessagingLogger extends BasicLogger {
@Message(id = 86, value = "Could not migrate attribute %s from resource %s. The attribute uses an expression that can be resolved differently depending on system properties. To be able to migrate this property, replace the expression by an actual value.")
String couldNotMigrateResourceAttributeWithExpression(String attribute, PathAddress address);

@Message(id = 87, value = "Could not migrate parameter %s from resource %s. This parameter is ignored in the new messaging-activemq subsystem.")
String couldNotMigrateIgnoredParameter(String attribute, PathAddress address);
@Message(id = 87, value = "Could not migrate attribute %s from resource %s. This attribute is no longer supported by the new messaging-activemq subsystem.")
String couldNotMigrateUnsupportedAttribute(String attribute, PathAddress address);
}
Expand Up @@ -98,19 +98,18 @@ public void testMigrateHA() throws Exception {

ModelNode haPolicyForDefaultServer = model.get(SUBSYSTEM, MESSAGING_ACTIVEMQ_SUBSYSTEM_NAME, "server", "default", "ha-policy", "shared-store-master");
assertTrue(haPolicyForDefaultServer.isDefined());
// default values
assertEquals(5000, haPolicyForDefaultServer.get("failback-delay").asLong());
assertFalse(haPolicyForDefaultServer.get("failback-delay").isDefined());
assertEquals(false, haPolicyForDefaultServer.get("failover-on-server-shutdown").asBoolean());

ModelNode haPolicyForSharedStoreMasterServer = model.get(SUBSYSTEM, MESSAGING_ACTIVEMQ_SUBSYSTEM_NAME, "server", "shared-store-master", "ha-policy", "shared-store-master");
assertTrue(haPolicyForSharedStoreMasterServer.isDefined());
assertEquals("${failback.delay:9876}", haPolicyForSharedStoreMasterServer.get("failback-delay").asString());
assertFalse(haPolicyForSharedStoreMasterServer.get("failback-delay").isDefined());
assertEquals("${failover.on.shutdown:true}", haPolicyForSharedStoreMasterServer.get("failover-on-server-shutdown").asString());

ModelNode haPolicyForSharedStoreSlaveServer = model.get(SUBSYSTEM, MESSAGING_ACTIVEMQ_SUBSYSTEM_NAME, "server", "shared-store-slave", "ha-policy", "shared-store-slave");
assertTrue(haPolicyForSharedStoreSlaveServer.isDefined());
assertEquals("${allow.failback.1:false}", haPolicyForSharedStoreSlaveServer.get("allow-failback").asString());
assertEquals("${failback.delay.1:1234}", haPolicyForSharedStoreSlaveServer.get("failback-delay").asString());
assertFalse(haPolicyForSharedStoreSlaveServer.get("failback-delay").isDefined());
assertEquals("${failover.on.shutdown.1:true}", haPolicyForSharedStoreSlaveServer.get("failover-on-server-shutdown").asString());

ModelNode haPolicyForReplicationMasterServer = model.get(SUBSYSTEM, MESSAGING_ACTIVEMQ_SUBSYSTEM_NAME, "server", "replication-master", "ha-policy", "replication-master");
Expand All @@ -121,7 +120,7 @@ public void testMigrateHA() throws Exception {
ModelNode haPolicyForReplicationSlaveServer = model.get(SUBSYSTEM, MESSAGING_ACTIVEMQ_SUBSYSTEM_NAME, "server", "replication-slave", "ha-policy", "replication-slave");
assertTrue(haPolicyForReplicationSlaveServer.isDefined());
assertEquals("${allow.failback.2:false}", haPolicyForReplicationSlaveServer.get("allow-failback").asString());
assertEquals("${failback.delay.2:1234}", haPolicyForReplicationSlaveServer.get("failback-delay").asString());
assertFalse(haPolicyForReplicationSlaveServer.get("failback-delay").isDefined());
assertEquals("${max.saved.replicated.journal.size:2}", haPolicyForReplicationSlaveServer.get("max-saved-replicated-journal-size").asString());
assertEquals("${replication.master.group.name:mygroup2}", haPolicyForReplicationSlaveServer.get("group-name").asString());
}
Expand Down
Expand Up @@ -145,14 +145,15 @@ public class MessagingSubsystemParser_1_0 implements XMLStreamConstants, XMLElem
.addAttributes(
HAAttributes.CLUSTER_NAME,
HAAttributes.GROUP_NAME,
HAAttributes.CHECK_FOR_LIVE_SERVER))
HAAttributes.CHECK_FOR_LIVE_SERVER,
HAAttributes.INITIAL_REPLICATION_SYNC_TIMEOUT))
.addChild(
builder(ReplicationSlaveDefinition.INSTANCE)
.addAttributes(
HAAttributes.CLUSTER_NAME,
HAAttributes.GROUP_NAME,
HAAttributes.ALLOW_FAILBACK,
HAAttributes.FAILBACK_DELAY,
HAAttributes.INITIAL_REPLICATION_SYNC_TIMEOUT,
HAAttributes.MAX_SAVED_REPLICATED_JOURNAL_SIZE,
HAAttributes.RESTART_BACKUP,
ScaleDownAttributes.SCALE_DOWN,
Expand All @@ -174,14 +175,15 @@ public class MessagingSubsystemParser_1_0 implements XMLStreamConstants, XMLElem
.addAttributes(
HAAttributes.CLUSTER_NAME,
HAAttributes.GROUP_NAME,
HAAttributes.CHECK_FOR_LIVE_SERVER))
HAAttributes.CHECK_FOR_LIVE_SERVER,
HAAttributes.INITIAL_REPLICATION_SYNC_TIMEOUT))
.addChild(
builder(ReplicationSlaveDefinition.CONFIGURATION_INSTANCE)
.addAttributes(
HAAttributes.CLUSTER_NAME,
HAAttributes.GROUP_NAME,
HAAttributes.ALLOW_FAILBACK,
HAAttributes.FAILBACK_DELAY,
HAAttributes.INITIAL_REPLICATION_SYNC_TIMEOUT,
HAAttributes.MAX_SAVED_REPLICATED_JOURNAL_SIZE,
HAAttributes.RESTART_BACKUP,
ScaleDownAttributes.SCALE_DOWN,
Expand All @@ -192,13 +194,11 @@ public class MessagingSubsystemParser_1_0 implements XMLStreamConstants, XMLElem
.addChild(
builder(SharedStoreMasterDefinition.INSTANCE)
.addAttributes(
HAAttributes.FAILBACK_DELAY,
HAAttributes.FAILOVER_ON_SERVER_SHUTDOWN))
.addChild(
builder(SharedStoreSlaveDefinition.INSTANCE)
.addAttributes(
HAAttributes.ALLOW_FAILBACK,
HAAttributes.FAILBACK_DELAY,
HAAttributes.FAILOVER_ON_SERVER_SHUTDOWN,
HAAttributes.RESTART_BACKUP,
ScaleDownAttributes.SCALE_DOWN,
Expand All @@ -217,13 +217,11 @@ public class MessagingSubsystemParser_1_0 implements XMLStreamConstants, XMLElem
.addChild(
builder(SharedStoreMasterDefinition.CONFIGURATION_INSTANCE)
.addAttributes(
HAAttributes.FAILBACK_DELAY,
HAAttributes.FAILOVER_ON_SERVER_SHUTDOWN))
.addChild(
builder(SharedStoreSlaveDefinition.CONFIGURATION_INSTANCE)
.addAttributes(
HAAttributes.ALLOW_FAILBACK,
HAAttributes.FAILBACK_DELAY,
HAAttributes.FAILOVER_ON_SERVER_SHUTDOWN,
HAAttributes.RESTART_BACKUP,
ScaleDownAttributes.SCALE_DOWN,
Expand Down
Expand Up @@ -93,22 +93,22 @@ public class HAAttributes {
.setRestartAllServices()
.build();

public static final SimpleAttributeDefinition FAILBACK_DELAY = create("failback-delay", LONG)
.setDefaultValue(new ModelNode(ActiveMQDefaultConfiguration.getDefaultFailbackDelay()))
.setMeasurementUnit(MILLISECONDS)
public static final SimpleAttributeDefinition FAILOVER_ON_SERVER_SHUTDOWN = create("failover-on-server-shutdown", ModelType.BOOLEAN)
.setDefaultValue(new ModelNode(ActiveMQDefaultConfiguration.isDefaultFailoverOnServerShutdown()))
.setAllowNull(true)
.setAllowExpression(true)
.setRestartAllServices()
.build();

public static final SimpleAttributeDefinition FAILOVER_ON_SERVER_SHUTDOWN = create("failover-on-server-shutdown", ModelType.BOOLEAN)
.setDefaultValue(new ModelNode(ActiveMQDefaultConfiguration.isDefaultFailoverOnServerShutdown()))
public static SimpleAttributeDefinition GROUP_NAME = SimpleAttributeDefinitionBuilder.create(CommonAttributes.GROUP_NAME, STRING)
.setAllowNull(true)
.setAllowExpression(true)
.setRestartAllServices()
.build();

public static SimpleAttributeDefinition GROUP_NAME = SimpleAttributeDefinitionBuilder.create(CommonAttributes.GROUP_NAME, STRING)
public static final SimpleAttributeDefinition INITIAL_REPLICATION_SYNC_TIMEOUT = create("initial-replication-sync-timeout", LONG)
.setDefaultValue(new ModelNode(ActiveMQDefaultConfiguration.getDefaultInitialReplicationSyncTimeout()))
.setMeasurementUnit(MILLISECONDS)
.setAllowNull(true)
.setAllowExpression(true)
.setRestartAllServices()
Expand Down
Expand Up @@ -114,16 +114,12 @@ static HAPolicyConfiguration buildConfiguration(OperationContext context, ModelN
}

ModelNode masterConfigurationModel = model.get(CONFIGURATION, MASTER);
if (masterConfigurationModel.isDefined()) {
HAPolicyConfiguration masterConfiguration = ReplicationMasterDefinition.buildConfiguration(context, masterConfigurationModel);
haPolicyConfiguration.setLiveConfig(masterConfiguration);
}
HAPolicyConfiguration masterConfiguration = ReplicationMasterDefinition.buildConfiguration(context, masterConfigurationModel);
haPolicyConfiguration.setLiveConfig(masterConfiguration);

ModelNode slaveConfigurationModel = model.get(CONFIGURATION, SLAVE);
if (slaveConfigurationModel.isDefined()) {
HAPolicyConfiguration slaveConfiguration = ReplicationSlaveDefinition.buildConfiguration(context, slaveConfigurationModel);
haPolicyConfiguration.setBackupConfig(slaveConfiguration);
}
HAPolicyConfiguration slaveConfiguration = ReplicationSlaveDefinition.buildConfiguration(context, slaveConfigurationModel);
haPolicyConfiguration.setBackupConfig(slaveConfiguration);

return haPolicyConfiguration;
}
Expand Down
Expand Up @@ -27,6 +27,7 @@
import static org.wildfly.extension.messaging.activemq.ha.HAAttributes.CHECK_FOR_LIVE_SERVER;
import static org.wildfly.extension.messaging.activemq.ha.HAAttributes.CLUSTER_NAME;
import static org.wildfly.extension.messaging.activemq.ha.HAAttributes.GROUP_NAME;
import static org.wildfly.extension.messaging.activemq.ha.HAAttributes.INITIAL_REPLICATION_SYNC_TIMEOUT;
import static org.wildfly.extension.messaging.activemq.ha.ManagementHelper.createAddOperation;

import java.util.Arrays;
Expand Down Expand Up @@ -55,7 +56,8 @@ public class ReplicationMasterDefinition extends PersistentResourceDefinition {
public static Collection<AttributeDefinition> ATTRIBUTES = Collections.unmodifiableList(Arrays.asList(
(AttributeDefinition) CLUSTER_NAME,
GROUP_NAME,
CHECK_FOR_LIVE_SERVER
CHECK_FOR_LIVE_SERVER,
INITIAL_REPLICATION_SYNC_TIMEOUT
));

public static final ReplicationMasterDefinition INSTANCE = new ReplicationMasterDefinition(MessagingExtension.REPLICATION_MASTER_PATH, false);
Expand Down Expand Up @@ -84,7 +86,8 @@ public Collection<AttributeDefinition> getAttributes() {
static HAPolicyConfiguration buildConfiguration(OperationContext context, ModelNode model) throws OperationFailedException {
ReplicatedPolicyConfiguration haPolicyConfiguration = new ReplicatedPolicyConfiguration();

haPolicyConfiguration.setCheckForLiveServer(CHECK_FOR_LIVE_SERVER.resolveModelAttribute(context, model).asBoolean());
haPolicyConfiguration.setCheckForLiveServer(CHECK_FOR_LIVE_SERVER.resolveModelAttribute(context, model).asBoolean())
.setInitialReplicationSyncTimeout(INITIAL_REPLICATION_SYNC_TIMEOUT.resolveModelAttribute(context, model).asLong());

ModelNode clusterName = CLUSTER_NAME.resolveModelAttribute(context, model);
if (clusterName.isDefined()) {
Expand Down

0 comments on commit 7f259ce

Please sign in to comment.