Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Valmeida scaleout compute nodes 2 #514

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,23 @@ public enum Type implements com.google.protobuf.ProtocolMessageEnum {
* worker -> master
* </pre>
*/
ADD_WORKER_ACK(5, 6), ;
ADD_WORKER_ACK(5, 6),
/**
* <code>STOP_WORKER = 7;</code>
*
* <pre>
* master -&gt; workers
* </pre>
*/
STOP_WORKER(6, 7),
/**
* <code>STOP_WORKER_ACK = 8;</code>
*
* <pre>
* worker -&gt; master
* </pre>
*/
STOP_WORKER_ACK(7, 8);

/**
* <code>SHUTDOWN = 1;</code>
Expand Down Expand Up @@ -283,6 +299,22 @@ public enum Type implements com.google.protobuf.ProtocolMessageEnum {
* </pre>
*/
public static final int ADD_WORKER_ACK_VALUE = 6;
/**
* <code>STOP_WORKER = 7;</code>
*
* <pre>
* master -&gt; workers
* </pre>
*/
public static final int STOP_WORKER_VALUE = 7;
/**
* <code>STOP_WORKER_ACK = 8;</code>
*
* <pre>
* worker -&gt; master
* </pre>
*/
public static final int STOP_WORKER_ACK_VALUE = 8;

@Override
public final int getNumber() {
Expand All @@ -303,6 +335,10 @@ public static Type valueOf(int value) {
return REMOVE_WORKER_ACK;
case 6:
return ADD_WORKER_ACK;
case 7:
return STOP_WORKER;
case 8:
return STOP_WORKER_ACK;
default:
return null;
}
Expand Down
62 changes: 62 additions & 0 deletions src/edu/washington/escience/myria/api/WorkerCollection.java
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
package edu.washington.escience.myria.api;

import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;

import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.UriInfo;

import edu.washington.escience.myria.parallel.Server;
import edu.washington.escience.myria.parallel.SocketInfo;
Expand Down Expand Up @@ -59,6 +62,65 @@ public String getWorker(@PathParam("workerId") final String workerId) {
return workerInfo.toString();
}

/**
* @param sWorkerId identifier of the worker, input string.
* @param uriInfo information about the URL of the request.
* @return the URI of the worker added.
*/
@POST
@Path("/start/worker-{workerId}")
public Response startWorker(@PathParam("workerId") final String sWorkerId, @Context final UriInfo uriInfo) {
Integer workerId;
try {
workerId = Integer.parseInt(sWorkerId);
} catch (final NumberFormatException e) {
/* Parsing failed, throw a 400 (Bad Request) */
throw new MyriaApiException(Status.BAD_REQUEST, e);
}
if (server.isWorkerAlive(workerId)) {
/* Worker already alive, throw a 400 (Bad Request) */
throw new MyriaApiException(Status.BAD_REQUEST, "Worker already alive");
}
try {
server.startWorker(workerId);
} catch (final RuntimeException e) {
throw new MyriaApiException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
}

URI queryUri = uriInfo.getRequestUri();
return Response.status(Status.ACCEPTED).location(queryUri).build();
}

/**
* @param sWorkerId identifier of the worker, input string.
* @param uriInfo information about the URL of the request.
* @return the URI of the worker added.
*/
@POST
@Path("/stop/worker-{workerId}")
public Response stopWorker(@PathParam("workerId") final String sWorkerId, @Context final UriInfo uriInfo) {
Integer workerId;
try {
workerId = Integer.parseInt(sWorkerId);
} catch (final NumberFormatException e) {
/* Parsing failed, throw a 400 (Bad Request) */
throw new MyriaApiException(Status.BAD_REQUEST, e);
}
SocketInfo workerInfo = server.getWorkers().get(workerId);
if (workerInfo == null) {
/* Worker not in catalog, throw a 400 (Bad Request) */
throw new MyriaApiException(Status.BAD_REQUEST, "Worker " + sWorkerId + " not deployed");
}
if (!server.isWorkerAlive(workerId)) {
/* Worker not alive, throw a 400 (Bad Request) */
throw new MyriaApiException(Status.BAD_REQUEST, "Worker " + sWorkerId + " not alive");
}
server.stopWorker(workerId);

URI queryUri = uriInfo.getRequestUri();
return Response.status(Status.ACCEPTED).location(queryUri).build();
}

/**
* @return the set of workers (identifier : host-port string) known by this server.
*/
Expand Down
71 changes: 71 additions & 0 deletions src/edu/washington/escience/myria/parallel/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,14 @@ public void run() {
}
}
break;
case STOP_WORKER_ACK:
workerID = controlM.getWorkerId();
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Received STOP_WORKER_ACK from worker #" + workerID);
}
stoppingWorkers.remove(workerID);
connectionPool.sendShortMessage(workerID, IPCUtils.CONTROL_SHUTDOWN);
break;
default:
if (LOGGER.isErrorEnabled()) {
LOGGER.error("Unexpected control message received at master: " + controlM);
Expand Down Expand Up @@ -237,6 +245,13 @@ public void run() {
*/
private final ConcurrentHashMap<Integer, Long> aliveWorkers;

/**
* Current stopping worker set. No intersection with the alive worker set is possible. When a worker sends a
* STOP_WORKER_ACK message, it will be removed from this set.
*
*/
private final ConcurrentHashMap<Integer, Long> stoppingWorkers;

/**
* Scheduled new workers, when a scheduled worker sends the first heartbeat, it'll be removed from this set.
*/
Expand Down Expand Up @@ -467,6 +482,7 @@ public Server(final String catalogFileName) throws FileNotFoundException, Catalo
execEnvVars.put(MyriaConstants.EXEC_ENV_VAR_EXECUTION_MODE, getExecutionMode());

aliveWorkers = new ConcurrentHashMap<Integer, Long>();
stoppingWorkers = new ConcurrentHashMap<Integer, Long>();
scheduledWorkers = new ConcurrentHashMap<Integer, SocketInfo>();
scheduledWorkersTime = new ConcurrentHashMap<Integer, Long>();

Expand Down Expand Up @@ -1174,6 +1190,61 @@ public Set<Integer> getAliveWorkers() {
return ImmutableSet.copyOf(aliveWorkers.keySet());
}

/**
*
* @param workerId the worker identification
* @return whether a worker is alive or not
*/
public boolean isWorkerAlive(final Integer workerId) {
return aliveWorkers.containsKey(workerId);
}

/**
* Retrieves the worker info given the worker identification. Returns null if the worker does not exist on the worker
* list.
*
* @param workerId the worker identification
* @return the worker information (host:port)
*/
public SocketInfo getWorkerInfo(final Integer workerId) {
return workers.get(workerId);
}

/**
* Adds a worker turning it alive.
*
* @param workerId the worker identification
*/
public void startWorker(final Integer workerId) {
SocketInfo workerInfo = getWorkerInfo(workerId);
if (workerInfo == null) {
throw new RuntimeException("Worker id: " + workerId + " not found");
}
sendAddWorker = new Thread(new SendAddWorker(workerId, workerInfo, aliveWorkers.size()));
new Thread(new NewWorkerScheduler(workerId, workerInfo.getHost(), workerInfo.getPort())).start();
}

/**
* Stops a worker.
*
* @param workerId the worker identification
*/
public void stopWorker(final Integer workerId) {
SocketInfo workerInfo = getWorkerInfo(workerId);
if (workerInfo == null) {
throw new RuntimeException("Worker id: " + workerId + " not found");
}
if (connectionPool.isRemoteAlive(workerId)) {
// TODO add process kill
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Stopping worker #{} : {}", workerId, workerInfo);
}
connectionPool.sendShortMessage(workerId, IPCUtils.stopWorkerTM(workerId));
}
aliveWorkers.remove(workerId);
stoppingWorkers.put(workerId, System.currentTimeMillis());
}

/**
* @return the set of known workers in this Master.
*/
Expand Down
75 changes: 72 additions & 3 deletions src/edu/washington/escience/myria/parallel/Worker.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,24 @@ public void run() {
connectionPool.putRemote(workerId, SocketInfo.fromProtobuf(cm.getRemoteAddress()));
sendMessageToMaster(IPCUtils.addWorkerAckTM(workerId));
break;
case STOP_WORKER:
if (LOGGER.isInfoEnabled()) {
LOGGER.info("received STOP_WORKER " + workerId);
}
if (activeQueries.isEmpty()) {
connectionPool.removeRemote(workerId).await();
sendMessageToMaster(IPCUtils.stopWorkerAckTM(workerId));
if (LOGGER.isInfoEnabled()) {
LOGGER.info("worker " + workerId + " stopped");
}
status = WorkerStatus.Stopped;
} else {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("worker " + workerId + " stopping, waiting to active queries to finish");
}
status = WorkerStatus.Stopping;
}
break;
default:
if (LOGGER.isErrorEnabled()) {
LOGGER.error("Unexpected control message received at worker: " + cm.getType());
Expand All @@ -144,6 +162,11 @@ private class QueryMessageProcessor implements Runnable {

@Override
public final void run() {
/* A stopping worker must not accept any more queries. */
if (status == WorkerStatus.Stopping) {
return;
}

try {
WorkerQueryPartition q = null;
while (true) {
Expand Down Expand Up @@ -179,10 +202,12 @@ public final void run() {
private class HeartbeatReporter extends ErrorLoggingTimerTask {
@Override
public synchronized void runInner() {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("sending heartbeat to server");
if (status == WorkerStatus.Running) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("sending heartbeat to server");
}
sendMessageToMaster(IPCUtils.CONTROL_WORKER_HEARTBEAT).awaitUninterruptibly();
}
sendMessageToMaster(IPCUtils.CONTROL_WORKER_HEARTBEAT).awaitUninterruptibly();
}
}

Expand Down Expand Up @@ -267,6 +292,36 @@ ExecutorService getQueryExecutor() {
*/
private final IPCConnectionPool connectionPool;

/**
* Possible worker status.
*
* @author valmeida
*
*/
public enum WorkerStatus {
/**
* the worker is stopped.
*/
Stopped,
/**
* The worker is stopping, it does not accept new queries at this point and no heartbeats are sent to the master.
*/
Stopping,
/**
* The worker is up and running.
*/
Running,
/**
* The worker has stopped for some unknown reason. This status should be used for failure handling only.
*/
Zombie
}

/**
* The worker status.
*/
private volatile WorkerStatus status = WorkerStatus.Stopped;

/**
* A indicator of shutting down the worker.
*/
Expand Down Expand Up @@ -624,6 +679,8 @@ public Worker(final String workingDirectory, final QueryExecutionMode mode) thro
}
LOGGER.info("Worker: Connection info " + jsonConnInfo);
execEnvVars.put(MyriaConstants.EXEC_ENV_VAR_DATABASE_CONN_INFO, ConnectionInfo.of(databaseSystem, jsonConnInfo));

status = WorkerStatus.Running;
}

/**
Expand Down Expand Up @@ -688,6 +745,18 @@ public void operationComplete(final ChannelFuture future) throws Exception {
}
});
}
if (status == WorkerStatus.Stopping && activeQueries.isEmpty()) {
try {
connectionPool.removeRemote(myID).await();
sendMessageToMaster(IPCUtils.stopWorkerAckTM(myID));
if (LOGGER.isInfoEnabled()) {
LOGGER.info("worker " + myID + " stopped");
}
status = WorkerStatus.Stopped;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
});
}
Expand Down
4 changes: 3 additions & 1 deletion src/edu/washington/escience/myria/util/DeploymentUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

import org.slf4j.LoggerFactory;

import com.google.common.base.Objects;

import edu.washington.escience.myria.MyriaConstants;
import edu.washington.escience.myria.tool.MyriaConfigurationReader;

Expand Down Expand Up @@ -160,7 +162,7 @@ public static void startWorker(final String address, final String workingDir, fi
String workerDir = description + "/" + "worker_" + workerId;
String classpath = "'conf:libs/*'";
String librarypath = "sqlite4java-282";
String heapSize = maxHeapSize;
String heapSize = Objects.firstNonNull(maxHeapSize, "");
if (description == null) {
/* built in system test */
path = workingDir;
Expand Down
Loading