Skip to content

Commit

Permalink
Add support for graceful worker shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
nezihyigitbasi authored and cberner committed Nov 29, 2015
1 parent 7b62e02 commit fa763cf
Show file tree
Hide file tree
Showing 34 changed files with 885 additions and 59 deletions.
Expand Up @@ -31,6 +31,7 @@
import java.util.List;

import static com.facebook.presto.connector.jmx.Types.checkType;
import static com.facebook.presto.spi.NodeState.ACTIVE;
import static com.facebook.presto.spi.predicate.TupleDomain.fromFixedValues;
import static com.facebook.presto.spi.type.VarcharType.VARCHAR;
import static com.google.common.base.MoreObjects.toStringHelper;
Expand Down Expand Up @@ -76,7 +77,7 @@ public ConnectorSplitSource getPartitionSplits(ConnectorSession session, Connect
//TODO is there a better way to get the node column?
JmxColumnHandle nodeColumnHandle = tableHandle.getColumns().get(0);

List<ConnectorSplit> splits = nodeManager.getActiveNodes()
List<ConnectorSplit> splits = nodeManager.getNodes(ACTIVE)
.stream()
.filter(node -> {
NullableValue value = NullableValue.of(VARCHAR, utf8Slice(node.getNodeIdentifier()));
Expand Down
Expand Up @@ -20,6 +20,7 @@
import com.facebook.presto.spi.HostAddress;
import com.facebook.presto.spi.Node;
import com.facebook.presto.spi.NodeManager;
import com.facebook.presto.spi.NodeState;
import com.facebook.presto.spi.RecordCursor;
import com.facebook.presto.spi.RecordSet;
import com.facebook.presto.spi.SchemaTableName;
Expand Down Expand Up @@ -132,7 +133,7 @@ private class TestingNodeManager
implements NodeManager
{
@Override
public Set<Node> getActiveNodes()
public Set<Node> getNodes(NodeState state)
{
return nodes;
}
Expand Down
Expand Up @@ -18,6 +18,7 @@
import com.facebook.presto.spi.HostAddress;
import com.facebook.presto.spi.Node;
import com.facebook.presto.spi.NodeManager;
import com.facebook.presto.spi.NodeState;
import com.facebook.presto.spi.type.Type;
import com.facebook.presto.spi.type.TypeManager;
import com.facebook.presto.spi.type.TypeSignature;
Expand Down Expand Up @@ -91,7 +92,7 @@ private static class TestingNodeManager
private static final Node LOCAL_NODE = new TestingNode();

@Override
public Set<Node> getActiveNodes()
public Set<Node> getNodes(NodeState state)
{
return ImmutableSet.of(LOCAL_NODE);
}
Expand Down
Expand Up @@ -21,14 +21,20 @@
import com.facebook.presto.spi.InMemoryRecordSet;
import com.facebook.presto.spi.InMemoryRecordSet.Builder;
import com.facebook.presto.spi.Node;
import com.facebook.presto.spi.NodeState;
import com.facebook.presto.spi.RecordCursor;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.SystemTable;
import com.facebook.presto.spi.predicate.TupleDomain;

import javax.inject.Inject;

import java.util.Set;

import static com.facebook.presto.metadata.MetadataUtil.TableMetadataBuilder.tableMetadataBuilder;
import static com.facebook.presto.spi.NodeState.ACTIVE;
import static com.facebook.presto.spi.NodeState.INACTIVE;
import static com.facebook.presto.spi.NodeState.SHUTTING_DOWN;
import static com.facebook.presto.spi.SystemTable.Distribution.SINGLE_COORDINATOR;
import static com.facebook.presto.spi.type.BooleanType.BOOLEAN;
import static com.facebook.presto.spi.type.VarcharType.VARCHAR;
Expand All @@ -44,7 +50,7 @@ public class NodeSystemTable
.column("http_uri", VARCHAR)
.column("node_version", VARCHAR)
.column("coordinator", BOOLEAN)
.column("active", BOOLEAN)
.column("state", VARCHAR)
.build();

private final InternalNodeManager nodeManager;
Expand Down Expand Up @@ -72,15 +78,19 @@ public RecordCursor cursor(ConnectorSession session, TupleDomain<Integer> constr
{
Builder table = InMemoryRecordSet.builder(NODES_TABLE);
AllNodes allNodes = nodeManager.getAllNodes();
for (Node node : allNodes.getActiveNodes()) {
table.addRow(node.getNodeIdentifier(), node.getHttpUri().toString(), getNodeVersion(node), isCoordinator(node), Boolean.TRUE);
}
for (Node node : allNodes.getInactiveNodes()) {
table.addRow(node.getNodeIdentifier(), node.getHttpUri().toString(), getNodeVersion(node), isCoordinator(node), Boolean.FALSE);
}
addRows(table, allNodes.getActiveNodes(), ACTIVE);
addRows(table, allNodes.getInactiveNodes(), INACTIVE);
addRows(table, allNodes.getShuttingDownNodes(), SHUTTING_DOWN);
return table.build().cursor();
}

private void addRows(Builder table, Set<Node> nodes, NodeState state)
{
for (Node node : nodes) {
table.addRow(node.getNodeIdentifier(), node.getHttpUri().toString(), getNodeVersion(node), isCoordinator(node), state.toString().toLowerCase());
}
}

private static String getNodeVersion(Node node)
{
if (node instanceof PrestoNode) {
Expand Down
Expand Up @@ -33,6 +33,7 @@
import java.util.Map;
import java.util.Set;

import static com.facebook.presto.spi.NodeState.ACTIVE;
import static com.facebook.presto.spi.SystemTable.Distribution.ALL_COORDINATORS;
import static com.facebook.presto.spi.SystemTable.Distribution.ALL_NODES;
import static com.facebook.presto.spi.SystemTable.Distribution.SINGLE_COORDINATOR;
Expand Down Expand Up @@ -74,7 +75,7 @@ public ConnectorSplitSource getSplits(ConnectorSession session, ConnectorTableLa
nodes.addAll(nodeManager.getCoordinators());
}
else if (tableDistributionMode == ALL_NODES) {
nodes.addAll(nodeManager.getActiveNodes());
nodes.addAll(nodeManager.getNodes(ACTIVE));
}
Set<Node> nodeSet = nodes.build();
for (Node node : nodeSet) {
Expand Down
Expand Up @@ -43,6 +43,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import static com.facebook.presto.spi.NodeState.ACTIVE;
import static com.facebook.presto.spi.StandardErrorCode.NO_NODES_AVAILABLE;
import static com.facebook.presto.util.ImmutableCollectors.toImmutableList;
import static com.facebook.presto.util.ImmutableCollectors.toImmutableSet;
Expand Down Expand Up @@ -108,7 +109,7 @@ public NodeSelector createNodeSelector(String dataSourceName)
nodes = nodeManager.getActiveDatasourceNodes(dataSourceName);
}
else {
nodes = nodeManager.getActiveNodes();
nodes = nodeManager.getNodes(ACTIVE);
}

for (Node node : nodes) {
Expand Down
Expand Up @@ -371,4 +371,9 @@ public SqlTaskIoStats getIoStats()
return new SqlTaskIoStats(taskContext.getInputDataSize(), taskContext.getInputPositions(), taskContext.getOutputDataSize(), taskContext.getOutputPositions());
}
}

public void addStateChangeListener(StateChangeListener<TaskState> stateChangeListener)
{
taskStateMachine.addStateChangeListener(stateChangeListener);
}
}
Expand Up @@ -17,6 +17,7 @@
import com.facebook.presto.Session;
import com.facebook.presto.TaskSource;
import com.facebook.presto.event.query.QueryMonitor;
import com.facebook.presto.execution.StateMachine.StateChangeListener;
import com.facebook.presto.memory.LocalMemoryManager;
import com.facebook.presto.memory.MemoryManagerConfig;
import com.facebook.presto.memory.MemoryPoolAssignment;
Expand Down Expand Up @@ -356,4 +357,11 @@ private void updateStats()

cachedStats.resetTo(tempIoStats);
}

@Override
public void addStateChangeListener(TaskId taskId, StateChangeListener<TaskState> stateChangeListener)
{
requireNonNull(taskId, "taskId is null");
tasks.getUnchecked(taskId).addStateChangeListener(stateChangeListener);
}
}
Expand Up @@ -16,6 +16,7 @@
import com.facebook.presto.OutputBuffers;
import com.facebook.presto.Session;
import com.facebook.presto.TaskSource;
import com.facebook.presto.execution.StateMachine.StateChangeListener;
import com.facebook.presto.memory.MemoryPoolAssignmentsRequest;
import com.facebook.presto.sql.planner.PlanFragment;
import io.airlift.units.DataSize;
Expand Down Expand Up @@ -90,4 +91,9 @@ public interface TaskManager
* eventually exist are queried.
*/
TaskInfo abortTaskResults(TaskId taskId, TaskId outputId);

/**
* Adds a state change listener to the specified task.
*/
void addStateChangeListener(TaskId taskId, StateChangeListener<TaskState> stateChangeListener);
}
Expand Up @@ -50,6 +50,8 @@
import static com.facebook.presto.SystemSessionProperties.getQueryMaxMemory;
import static com.facebook.presto.memory.LocalMemoryManager.GENERAL_POOL;
import static com.facebook.presto.memory.LocalMemoryManager.RESERVED_POOL;
import static com.facebook.presto.spi.NodeState.ACTIVE;
import static com.facebook.presto.spi.NodeState.SHUTTING_DOWN;
import static com.facebook.presto.spi.StandardErrorCode.CLUSTER_OUT_OF_MEMORY;
import static com.facebook.presto.util.ImmutableCollectors.toImmutableList;
import static com.facebook.presto.util.ImmutableCollectors.toImmutableSet;
Expand Down Expand Up @@ -246,18 +248,23 @@ private boolean allAssignmentsHavePropagated(Iterable<QueryExecution> queries)

private void updateNodes(MemoryPoolAssignmentsRequest assignments)
{
Set<Node> activeNodes = nodeManager.getActiveNodes();
ImmutableSet<String> activeNodeIds = activeNodes.stream()
ImmutableSet.Builder builder = new ImmutableSet.Builder();
Set<Node> aliveNodes = builder
.addAll(nodeManager.getNodes(ACTIVE))
.addAll(nodeManager.getNodes(SHUTTING_DOWN))
.build();

ImmutableSet<String> aliveNodeIds = aliveNodes.stream()
.map(Node::getNodeIdentifier)
.collect(toImmutableSet());

// Remove nodes that don't exist anymore
// Make a copy to materialize the set difference
Set<String> deadNodes = ImmutableSet.copyOf(difference(nodes.keySet(), activeNodeIds));
Set<String> deadNodes = ImmutableSet.copyOf(difference(nodes.keySet(), aliveNodeIds));
nodes.keySet().removeAll(deadNodes);

// Add new nodes
for (Node node : activeNodes) {
for (Node node : aliveNodes) {
if (!nodes.containsKey(node.getNodeIdentifier())) {
nodes.put(node.getNodeIdentifier(), new RemoteNodeMemory(httpClient, memoryInfoCodec, assignmentsRequestJsonCodec, locationFactory.createMemoryInfoLocation(node)));
}
Expand Down
Expand Up @@ -24,11 +24,13 @@ public class AllNodes
{
private final Set<Node> activeNodes;
private final Set<Node> inactiveNodes;
private final Set<Node> shuttingDownNodes;

public AllNodes(Set<Node> activeNodes, Set<Node> inactiveNodes)
public AllNodes(Set<Node> activeNodes, Set<Node> inactiveNodes, Set<Node> shuttingDownNodes)
{
this.activeNodes = ImmutableSet.copyOf(requireNonNull(activeNodes, "activeNodes is null"));
this.inactiveNodes = ImmutableSet.copyOf(requireNonNull(inactiveNodes, "inactiveNodes is null"));
this.shuttingDownNodes = ImmutableSet.copyOf(requireNonNull(shuttingDownNodes, "shuttingDownNodes is null"));
}

public Set<Node> getActiveNodes()
Expand All @@ -40,4 +42,9 @@ public Set<Node> getInactiveNodes()
{
return inactiveNodes;
}

public Set<Node> getShuttingDownNodes()
{
return shuttingDownNodes;
}
}

0 comments on commit fa763cf

Please sign in to comment.