Permalink
Browse files

[AS7-5887] reconnect servers automatically

  • Loading branch information...
1 parent 6fbb1ab commit ed2bc551a55ec6a8167a8657cbb5d8abc6e07748 @emuckenhuber emuckenhuber committed with bstansberry Nov 15, 2012
@@ -704,8 +704,8 @@ public ServerStatus startServer(String serverName, ModelNode domainModel, boolea
return serverInventory.startServer(serverName, domainModel, blocking);
}
- public void reconnectServer(String serverName, ModelNode domainModel, boolean running, boolean stopping) {
- serverInventory.reconnectServer(serverName, domainModel, running, stopping);
+ public void reconnectServer(String serverName, ModelNode domainModel, byte[] authKey, boolean running, boolean stopping) {
+ serverInventory.reconnectServer(serverName, domainModel, authKey, running, stopping);
}
public ServerStatus restartServer(String serverName, int gracefulTimeout, ModelNode domainModel) {
@@ -120,9 +120,9 @@ public static String getServerName(String serverProcessName) {
private volatile InternalState requiredState = InternalState.STOPPED;
private volatile InternalState internalState = InternalState.STOPPED;
- ManagedServer(final String hostControllerName, final String serverName, final ProcessControllerClient processControllerClient,
- final InetSocketAddress managementSocket, final ManagedServer.ManagedServerBootConfiguration bootConfiguration,
- final TransformationTarget transformationTarget) {
+ ManagedServer(final String hostControllerName, final String serverName, final byte[] authKey,
+ final ProcessControllerClient processControllerClient, final InetSocketAddress managementSocket,
+ final ManagedServer.ManagedServerBootConfiguration bootConfiguration, final TransformationTarget transformationTarget) {
assert hostControllerName != null : "hostControllerName is null";
assert serverName != null : "serverName is null";
@@ -137,8 +137,6 @@ public static String getServerName(String serverProcessName) {
this.bootConfiguration = bootConfiguration;
this.transformationTarget = transformationTarget;
- final byte[] authKey = new byte[16];
- new Random(new SecureRandom().nextLong()).nextBytes(authKey);
this.authKey = authKey;
serverPath = PathElement.pathElement(RUNNING_SERVER, serverName);
}
@@ -320,11 +318,30 @@ protected synchronized void serverStartFailed() {
/**
* Unregister the mgmt channel.
+ *
+ * @param old the proxy controller to unregister
+ * @param shuttingDown whether the server inventory is shutting down
*/
- protected synchronized void callbackUnregistered(final ProxyController old) {
+ protected synchronized void callbackUnregistered(final ProxyController old, final boolean shuttingDown) {
if(proxyController == old) {
this.proxyController = null;
}
+ // If the connection dropped without us stopping the process ask for reconnection
+ if(! shuttingDown && requiredState == InternalState.SERVER_STARTED) {
+ final InternalState state = internalState;
+ if(state == InternalState.PROCESS_STOPPED
+ || state == InternalState.PROCESS_STOPPING
+ || state == InternalState.STOPPED) {
+ // In case it stopped we don't reconnect
+ return;
+ }
+ try {
+ HostControllerLogger.ROOT_LOGGER.tracef("trying to reconnect to %s current-state (%s) required-state (%s)", serverName, state, requiredState);
+ internalSetState(new ReconnectTask(), state, InternalState.SERVER_STARTING);
+ } catch (Exception e) {
+ HostControllerLogger.ROOT_LOGGER.debugf(e, "failed to send reconnect task");
+ }
+ }
}
/**
@@ -158,10 +158,12 @@
*
* @param serverName the name of the server
* @param domainModel the configuration model for the domain
+ * @param authKey the authentication key
* @param running whether the process was running. If {@code false}, the existence of the server will be
* recorded but no attempt to contact it will be made
+ * @param stopping whether the process is currently stopping
*/
- void reconnectServer(final String serverName, final ModelNode domainModel, final boolean running, final boolean stopping);
+ void reconnectServer(String serverName, ModelNode domainModel, byte[] authKey, boolean running, boolean stopping);
/**
* Gets a callback handler security services can use for handling authentication data provided by
@@ -22,6 +22,7 @@
package org.jboss.as.host.controller;
+import org.jboss.as.controller.HashUtil;
import org.jboss.as.controller.ModelVersion;
import org.jboss.as.controller.PathAddress;
import org.jboss.as.controller.ProxyController;
@@ -33,11 +34,13 @@
import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import java.security.NoSuchAlgorithmException;
+import java.security.SecureRandom;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
+import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
@@ -172,7 +175,11 @@ public ServerStatus startServer(final String serverName, final ModelNode domainM
}
ManagedServer server = servers.get(serverName);
if(server == null) {
- final ManagedServer newServer = createManagedServer(serverName, domainModel);
+ // Create a new authKey
+ final byte[] authKey = new byte[16];
+ new Random(new SecureRandom().nextLong()).nextBytes(authKey);
+ // Create the managed server
+ final ManagedServer newServer = createManagedServer(serverName, domainModel, authKey);
server = servers.putIfAbsent(serverName, newServer);
if(server == null) {
server = newServer;
@@ -235,7 +242,7 @@ public ServerStatus stopServer(final String serverName, final int gracefulTimeou
}
@Override
- public void reconnectServer(final String serverName, final ModelNode domainModel, final boolean running, final boolean stopping) {
+ public void reconnectServer(final String serverName, final ModelNode domainModel, final byte[] authKey, final boolean running, final boolean stopping) {
if(shutdown || connectionFinished) {
throw HostControllerMessages.MESSAGES.hostAlreadyShutdown();
}
@@ -244,7 +251,7 @@ public void reconnectServer(final String serverName, final ModelNode domainModel
ROOT_LOGGER.existingServerWithState(serverName, existing.getState());
return;
}
- final ManagedServer server = createManagedServer(serverName, domainModel);
+ final ManagedServer server = createManagedServer(serverName, domainModel, authKey);
if(servers.putIfAbsent(serverName, server) != null) {
ROOT_LOGGER.existingServerWithState(serverName, existing.getState());
return;
@@ -304,7 +311,7 @@ public void stopServers(final int gracefulTimeout, final boolean blockUntilStopp
}
}
- public void shutdown(final int gracefulTimeout, final boolean blockUntilStopped) {
+ void shutdown(final boolean shutdownServers, final int gracefulTimeout, final boolean blockUntilStopped) {
final boolean shutdown = this.shutdown;
this.shutdown = true;
if(! shutdown) {
@@ -313,7 +320,10 @@ public void shutdown(final int gracefulTimeout, final boolean blockUntilStopped)
// nor can expect to receive any further notifications notifications.
return;
}
- stopServers(gracefulTimeout, blockUntilStopped);
+ if(shutdownServers) {
+ // Shutdown the servers as well
+ stopServers(gracefulTimeout, blockUntilStopped);
+ }
}
}
@@ -334,7 +344,8 @@ public ProxyController serverCommunicationRegistered(final String serverProcessN
channel.addCloseHandler(new CloseHandler<Channel>() {
public void handleClose(final Channel closed, final IOException exception) {
- server.callbackUnregistered(proxy);
+ final boolean shuttingDown = shutdown || connectionFinished;
+ server.callbackUnregistered(proxy, shuttingDown);
domainController.unregisterRunningServer(server.getServerName());
}
});
@@ -484,7 +495,7 @@ public void processInventory(final Map<String, ProcessInfo> processInfos) {
}
}
- private ManagedServer createManagedServer(final String serverName, final ModelNode domainModel) {
+ private ManagedServer createManagedServer(final String serverName, final ModelNode domainModel, final byte[] authKey) {
final String hostControllerName = domainController.getLocalHostInfo().getLocalHostName();
final ModelNode hostModel = domainModel.require(HOST).require(hostControllerName);
final ManagedServerBootCmdFactory combiner = new ManagedServerBootCmdFactory(serverName, domainModel, hostModel, environment, domainController.getExpressionResolver());
@@ -493,7 +504,7 @@ private ManagedServer createManagedServer(final String serverName, final ModelNo
final ModelVersion modelVersion = ModelVersion.create(Version.MANAGEMENT_MAJOR_VERSION, Version.MANAGEMENT_MINOR_VERSION, Version.MANAGEMENT_MICRO_VERSION);
final TransformationTarget target = TransformationTargetImpl.create(extensionRegistry.getTransformerRegistry(),
modelVersion, subsystems, null, TransformationTarget.TransformationTargetType.SERVER);
- return new ManagedServer(hostControllerName, serverName, processControllerClient, managementAddress, configuration, target);
+ return new ManagedServer(hostControllerName, serverName, authKey, processControllerClient, managementAddress, configuration, target);
}
@Override
@@ -114,14 +114,15 @@ public synchronized void start(StartContext context) throws StartException {
*/
@Override
public synchronized void stop(final StopContext context) {
- if (runningModeControl.getRestartMode() == RestartMode.SERVERS) {
+ final boolean shutdownServers = runningModeControl.getRestartMode() == RestartMode.SERVERS;
+ if (shutdownServers) {
context.asynchronous();
Runnable r = new Runnable() {
@Override
public void run() {
try {
- serverInventory.shutdown(-1, true); // TODO graceful shutdown // TODO async
+ serverInventory.shutdown(true, -1, true); // TODO graceful shutdown
serverInventory = null;
// client.getValue().setServerInventory(null);
} finally {
@@ -131,6 +132,10 @@ public void run() {
}
};
executorService.getValue().execute(r);
+ } else {
+ // We have to set the shutdown flag in any case
+ serverInventory.shutdown(false, -1, true);
+ serverInventory = null;
}
}
@@ -31,6 +31,7 @@
import org.jboss.as.domain.controller.DomainController;
import org.jboss.as.host.controller.ServerInventory;
import org.jboss.as.protocol.mgmt.ManagementChannelHandler;
+import org.jboss.as.protocol.mgmt.ManagementPongRequestHandler;
import org.jboss.as.protocol.mgmt.support.ManagementChannelInitialization;
import org.jboss.msc.service.Service;
import org.jboss.msc.service.ServiceName;
@@ -105,6 +106,7 @@ public synchronized ManagementChannelInitialization getValue() throws IllegalSta
public HandleableCloseable.Key startReceiving(final Channel channel) {
final ManagementChannelHandler channelHandler = new ManagementChannelHandler(channel, executorService);
final ServerToHostProtocolHandler registrationHandler = new ServerToHostProtocolHandler(serverInventory.getValue(), operationExecutor, domainController, channelHandler, registrations, expressionResolver);
+ channelHandler.addHandlerFactory(new ManagementPongRequestHandler());
channelHandler.addHandlerFactory(registrationHandler);
channel.receiveMessage(channelHandler.getReceiver());
return null;
@@ -55,6 +55,7 @@
import org.jboss.as.protocol.mgmt.ActiveOperation;
import org.jboss.as.protocol.mgmt.FlushableDataOutput;
import org.jboss.as.protocol.mgmt.ManagementChannelHandler;
+import org.jboss.as.protocol.mgmt.ManagementPongRequestHandler;
import org.jboss.as.protocol.mgmt.ManagementProtocol;
import org.jboss.as.protocol.mgmt.ManagementProtocolHeader;
import org.jboss.as.protocol.mgmt.ManagementRequestContext;
@@ -19,6 +19,7 @@
package org.jboss.as.host.controller.operations;
+import org.jboss.as.controller.HashUtil;
import static org.jboss.as.controller.descriptions.ModelDescriptionConstants.AUTO_START;
import static org.jboss.as.controller.descriptions.ModelDescriptionConstants.OP;
import static org.jboss.as.controller.descriptions.ModelDescriptionConstants.SERVER_CONFIG;
@@ -130,8 +131,9 @@ private void restartedHcStartOrReconnectServers(final ModelNode servers, final M
ROOT_LOGGER.failedToStartServer(e, serverName);
}
} else if (info != null){
- //Reconnect the server
- serverInventory.reconnectServer(serverName, domainModel, info.isRunning(), info.isStopping());
+ // Reconnect the server using the current authKey
+ final byte[] authKey = info.getAuthKey();
+ serverInventory.reconnectServer(serverName, domainModel, authKey, info.isRunning(), info.isStopping());
}
}
}
@@ -173,7 +173,10 @@ public void reconnect(String hostName, int port, boolean managementSubsystemEndp
stdin.write(asAuthKey);
stdin.flush();
} catch (IOException e) {
- log.failedToSendReconnect(e, processName);
+ if(state == State.STARTED) {
+ // Only log in case the process is still running
+ log.failedToSendReconnect(e, processName);
+ }
}
}
@@ -54,7 +54,6 @@
*/
private final Object lock = new Object();
- private final Random rng;
private final ProtocolServer server;
private final Map<String, ManagedProcess> processes = new HashMap<String, ManagedProcess>();
private final Map<Key, ManagedProcess> processesByKey = new HashMap<Key, ManagedProcess>();
@@ -68,7 +67,6 @@
public ProcessController(final ProtocolServer.Configuration configuration, final PrintStream stdout, final PrintStream stderr) throws IOException {
this.stdout = stdout;
this.stderr = stderr;
- rng = new Random(new SecureRandom().nextLong());
//noinspection ThisEscapedInObjectConstruction
configuration.setConnectionHandler(new ProcessControllerServerHandler(this));
final ProtocolServer server = new ProtocolServer(configuration);
@@ -90,6 +88,14 @@ void removeManagedConnection(final Connection connection) {
}
public void addProcess(final String processName, final List<String> command, final Map<String, String> env, final String workingDirectory, final boolean isPrivileged, final boolean respawn) {
+ // Create a new authKey
+ final byte[] authKey = new byte[16];
+ new Random(new SecureRandom().nextLong()).nextBytes(authKey);
+ //
+ addProcess(processName, authKey, command, env, workingDirectory, isPrivileged, respawn);
+ }
+
+ public void addProcess(final String processName, final byte[] authKey, final List<String> command, final Map<String, String> env, final String workingDirectory, final boolean isPrivileged, final boolean respawn) {
for (String s : command) {
if (s == null) {
throw MESSAGES.nullCommandComponent();
@@ -105,8 +111,6 @@ public void addProcess(final String processName, final List<String> command, fin
// ignore
return;
}
- final byte[] authKey = new byte[16];
- rng.nextBytes(authKey);
final ManagedProcess process = new ManagedProcess(processName, command, env, workingDirectory, lock, this, authKey, isPrivileged, respawn);
processes.put(processName, process);
processesByKey.put(new Key(authKey), process);
@@ -292,7 +296,6 @@ void processRemoved(final String processName) {
}
} catch (IOException e) {
ROOT_LOGGER.failedToWriteMessage("PROCESS_REMOVED " + processName, e);
- e.printStackTrace();
removeManagedConnection(connection);
}
}
@@ -112,7 +112,7 @@ public void handleMessage(final Connection connection, final InputStream dataStr
readFully(dataStream, processAuthCode);
final boolean processRunning = StreamUtils.readBoolean(dataStream);
final boolean processStopping = StreamUtils.readBoolean(dataStream);
- inventory.put(processName, new ProcessInfo(processName, authCode, processRunning, processStopping));
+ inventory.put(processName, new ProcessInfo(processName, processAuthCode, processRunning, processStopping));
}
dataStream.close();
CLIENT_LOGGER.tracef("Received process_inventory");
@@ -160,7 +160,7 @@ public void handleMessage(final Connection connection, final InputStream dataStr
}
final String workingDirectory = readUTFZBytes(dataStream);
SERVER_LOGGER.tracef("Received add_process for process %s", processName);
- processController.addProcess(processName, Arrays.asList(command), env, workingDirectory, false, false);
+ processController.addProcess(processName, authKey, Arrays.asList(command), env, workingDirectory, false, false);
} else {
SERVER_LOGGER.tracef("Ignoring add_process message from untrusted source");
}
@@ -94,6 +94,15 @@ public void handleClose(Connection closed, IOException exception) {
return connection;
}
+ /**
+ * Get the connection.
+ *
+ * @return the connection
+ */
+ public Connection getConnection() {
+ return connection;
+ }
+
/**
* Shutdown the connection manager.
*/
Oops, something went wrong.

0 comments on commit ed2bc55

Please sign in to comment.