Skip to content

Commit

Permalink
Make NodeSelector an interface
Browse files Browse the repository at this point in the history
  • Loading branch information
cberner committed Dec 3, 2015
1 parent dc17c59 commit 16332b5
Show file tree
Hide file tree
Showing 6 changed files with 203 additions and 131 deletions.
@@ -0,0 +1,60 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.execution;

import com.facebook.presto.spi.Node;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static java.util.Objects.requireNonNull;

public final class NodeAssignmentStats
{
private final NodeTaskMap nodeTaskMap;
private final Map<Node, Integer> assignmentCount = new HashMap<>();
private final Map<Node, Integer> splitCountByNode = new HashMap<>();
private final Map<String, Integer> queuedSplitCountByNode = new HashMap<>();

public NodeAssignmentStats(NodeTaskMap nodeTaskMap, NodeScheduler.NodeMap nodeMap, List<RemoteTask> existingTasks)
{
this.nodeTaskMap = requireNonNull(nodeTaskMap, "nodeTaskMap is null");

// pre-populate the assignment counts with zeros. This makes getOrDefault() faster
for (Node node : nodeMap.getNodesByHostAndPort().values()) {
assignmentCount.put(node, 0);
}

for (RemoteTask task : existingTasks) {
String nodeId = task.getNodeId();
queuedSplitCountByNode.put(nodeId, queuedSplitCountByNode.getOrDefault(nodeId, 0) + task.getQueuedPartitionedSplitCount());
}
}

public int getTotalSplitCount(Node node)
{
return assignmentCount.getOrDefault(node, 0) + splitCountByNode.computeIfAbsent(node, nodeTaskMap::getPartitionedSplitsOnNode);
}

public int getTotalQueuedSplitCount(Node node)
{
return queuedSplitCountByNode.getOrDefault(node.getNodeIdentifier(), 0) + assignmentCount.getOrDefault(node, 0);
}

public void addAssignedSplit(Node node)
{
assignmentCount.merge(node, 1, (x, y) -> x + y);
}
}
Expand Up @@ -32,11 +32,9 @@
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -108,130 +106,80 @@ public NodeSelector createNodeSelector(String dataSourceName)
return new NodeMap(byHostAndPort.build(), byHost.build(), coordinatorNodeIds);
}, 5, TimeUnit.SECONDS);

return new NodeSelector(nodeMap);
return new SimpleNodeSelector(nodeMap);
}

public class NodeSelector
private class SimpleNodeSelector
implements NodeSelector
{
private final AtomicReference<Supplier<NodeMap>> nodeMap;

public NodeSelector(Supplier<NodeMap> nodeMap)
public SimpleNodeSelector(Supplier<NodeMap> nodeMap)
{
this.nodeMap = new AtomicReference<>(nodeMap);
}

@Override
public void lockDownNodes()
{
nodeMap.set(Suppliers.ofInstance(nodeMap.get().get()));
}

@Override
public List<Node> allNodes()
{
return ImmutableList.copyOf(nodeMap.get().get().getNodesByHostAndPort().values());
}

@Override
public Node selectCurrentNode()
{
// TODO: this is a hack to force scheduling on the coordinator
return nodeManager.getCurrentNode();
}

@Override
public List<Node> selectRandomNodes(int limit)
{
return selectNodes(limit, randomizedNodes());
return selectNodes(limit, randomizedNodes(nodeMap.get().get(), includeCoordinator), doubleScheduling);
}

private List<Node> selectNodes(int limit, Iterator<Node> candidates)
{
checkArgument(limit > 0, "limit must be at least 1");

List<Node> selected = new ArrayList<>(limit);
while (selected.size() < limit && candidates.hasNext()) {
selected.add(candidates.next());
}

if (doubleScheduling && !selected.isEmpty()) {
// Cycle the nodes until we reach the limit
int uniqueNodes = selected.size();
int i = 0;
while (selected.size() < limit) {
if (i >= uniqueNodes) {
i = 0;
}
selected.add(selected.get(i));
i++;
}
}
return selected;
}

/**
* Identifies the nodes for running the specified splits.
*
* @param splits the splits that need to be assigned to nodes
* @return a multimap from node to splits only for splits for which we could identify a node to schedule on.
* If we cannot find an assignment for a split, it is not included in the map.
*/
public Multimap<Node, Split> computeAssignments(Set<Split> splits, Iterable<RemoteTask> existingTasks)
@Override
public Multimap<Node, Split> computeAssignments(Set<Split> splits, List<RemoteTask> existingTasks)
{
Multimap<Node, Split> assignment = HashMultimap.create();
Map<Node, Integer> assignmentCount = new HashMap<>();
// pre-populate the assignment counts with zeros. This makes getOrDefault() faster
for (Node node : nodeMap.get().get().getNodesByHostAndPort().values()) {
assignmentCount.put(node, 0);
}

// maintain a temporary local cache of partitioned splits on the node
Map<Node, Integer> splitCountByNode = new HashMap<>();

Map<String, Integer> queuedSplitCountByNode = new HashMap<>();

for (RemoteTask task : existingTasks) {
String nodeId = task.getNodeId();
queuedSplitCountByNode.put(nodeId, queuedSplitCountByNode.getOrDefault(nodeId, 0) + task.getQueuedPartitionedSplitCount());
}
NodeMap nodeMap = this.nodeMap.get().get();
NodeAssignmentStats assignmentStats = new NodeAssignmentStats(nodeTaskMap, nodeMap, existingTasks);

ResettableRandomizedIterator<Node> randomCandidates = randomizedNodes();
ResettableRandomizedIterator<Node> randomCandidates = randomizedNodes(nodeMap, includeCoordinator);
for (Split split : splits) {
randomCandidates.reset();

List<Node> candidateNodes;
NodeMap nodeMap = this.nodeMap.get().get();
if (!split.isRemotelyAccessible()) {
candidateNodes = selectNodesBasedOnHint(nodeMap, split.getAddresses());
candidateNodes = selectExactNodes(nodeMap, split.getAddresses(), includeCoordinator);
}
else {
candidateNodes = selectNodes(minCandidates, randomCandidates);
candidateNodes = selectNodes(minCandidates, randomCandidates, doubleScheduling);
}
if (candidateNodes.isEmpty()) {
log.debug("No nodes available to schedule %s. Available nodes %s", split, nodeMap.getNodesByHost().keys());
throw new PrestoException(NO_NODES_AVAILABLE, "No nodes available to run query");
}

// compute and cache number of splits currently assigned to each node
// NOTE: This does not use the Stream API for performance reasons.
for (Node node : candidateNodes) {
if (!splitCountByNode.containsKey(node)) {
splitCountByNode.put(node, nodeTaskMap.getPartitionedSplitsOnNode(node));
}
}

Node chosenNode = null;
int min = Integer.MAX_VALUE;

for (Node node : candidateNodes) {
int totalSplitCount = assignmentCount.getOrDefault(node, 0) + splitCountByNode.get(node);

int totalSplitCount = assignmentStats.getTotalSplitCount(node);
if (totalSplitCount < min && totalSplitCount < maxSplitsPerNode) {
chosenNode = node;
min = totalSplitCount;
}
}
if (chosenNode == null) {
for (Node node : candidateNodes) {
int assignedSplitCount = assignmentCount.getOrDefault(node, 0);
int queuedSplitCount = queuedSplitCountByNode.getOrDefault(node.getNodeIdentifier(), 0);
int totalSplitCount = queuedSplitCount + assignedSplitCount;
int totalSplitCount = assignmentStats.getTotalQueuedSplitCount(node);
if (totalSplitCount < min && totalSplitCount < maxSplitsPerNodePerTaskWhenFull) {
chosenNode = node;
min = totalSplitCount;
Expand All @@ -240,80 +188,103 @@ public Multimap<Node, Split> computeAssignments(Set<Split> splits, Iterable<Remo
}
if (chosenNode != null) {
assignment.put(chosenNode, split);
assignmentCount.put(chosenNode, assignmentCount.getOrDefault(chosenNode, 0) + 1);
assignmentStats.addAssignedSplit(chosenNode);
}
}
return assignment;
}
}

private ResettableRandomizedIterator<Node> randomizedNodes()
{
NodeMap nodeMap = this.nodeMap.get().get();
ImmutableList<Node> nodes = nodeMap.getNodesByHostAndPort().values().stream()
.filter(node -> includeCoordinator || !nodeMap.getCoordinatorNodeIds().contains(node.getNodeIdentifier()))
.collect(toImmutableList());
return new ResettableRandomizedIterator<>(nodes);
private static List<Node> selectNodes(int limit, Iterator<Node> candidates, boolean doubleScheduling)
{
checkArgument(limit > 0, "limit must be at least 1");

List<Node> selected = new ArrayList<>(limit);
while (selected.size() < limit && candidates.hasNext()) {
selected.add(candidates.next());
}

private List<Node> selectNodesBasedOnHint(NodeMap nodeMap, List<HostAddress> addresses)
{
Set<Node> chosen = new LinkedHashSet<>(minCandidates);
Set<String> coordinatorIds = nodeMap.getCoordinatorNodeIds();
if (doubleScheduling && !selected.isEmpty()) {
// Cycle the nodes until we reach the limit
int uniqueNodes = selected.size();
int i = 0;
while (selected.size() < limit) {
if (i >= uniqueNodes) {
i = 0;
}
selected.add(selected.get(i));
i++;
}
}
return selected;
}

private static ResettableRandomizedIterator<Node> randomizedNodes(NodeMap nodeMap, boolean includeCoordinator)
{
ImmutableList<Node> nodes = nodeMap.getNodesByHostAndPort().values().stream()
.filter(node -> includeCoordinator || !nodeMap.getCoordinatorNodeIds().contains(node.getNodeIdentifier()))
.collect(toImmutableList());
return new ResettableRandomizedIterator<>(nodes);
}

private static List<Node> selectExactNodes(NodeMap nodeMap, List<HostAddress> hosts, boolean includeCoordinator)
{
Set<Node> chosen = new LinkedHashSet<>();
Set<String> coordinatorIds = nodeMap.getCoordinatorNodeIds();

for (HostAddress hint : addresses) {
nodeMap.getNodesByHostAndPort().get(hint).stream()
for (HostAddress host : hosts) {
nodeMap.getNodesByHostAndPort().get(host).stream()
.filter(node -> includeCoordinator || !coordinatorIds.contains(node.getNodeIdentifier()))
.forEach(chosen::add);

InetAddress address;
try {
address = host.toInetAddress();
}
catch (UnknownHostException e) {
// skip hosts that don't resolve
continue;
}

// consider a split with a host without a port as being accessible by all nodes in that host
if (!host.hasPort()) {
nodeMap.getNodesByHost().get(address).stream()
.filter(node -> includeCoordinator || !coordinatorIds.contains(node.getNodeIdentifier()))
.forEach(chosen::add);
}
}

// if the chosen set is empty and the host is the coordinator, force pick the coordinator
if (chosen.isEmpty() && !includeCoordinator) {
for (HostAddress host : hosts) {
// In the code below, before calling `chosen::add`, it could have been checked that
// `coordinatorIds.contains(node.getNodeIdentifier())`. But checking the condition isn't necessary
// because every node satisfies it. Otherwise, `chosen` wouldn't have been empty.

nodeMap.getNodesByHostAndPort().get(host).stream()
.forEach(chosen::add);

InetAddress address;
try {
address = hint.toInetAddress();
address = host.toInetAddress();
}
catch (UnknownHostException e) {
// skip addresses that don't resolve
// skip hosts that don't resolve
continue;
}

// consider a split with a host hint without a port as being accessible by all nodes in that host
if (!hint.hasPort()) {
// consider a split with a host without a port as being accessible by all nodes in that host
if (!host.hasPort()) {
nodeMap.getNodesByHost().get(address).stream()
.filter(node -> includeCoordinator || !coordinatorIds.contains(node.getNodeIdentifier()))
.forEach(chosen::add);
}
}

// if the chosen set is empty and the hint includes the coordinator, force pick the coordinator
if (chosen.isEmpty() && !includeCoordinator) {
for (HostAddress hint : addresses) {
// In the code below, before calling `chosen::add`, it could have been checked that
// `coordinatorIds.contains(node.getNodeIdentifier())`. But checking the condition isn't necessary
// because every node satisfies it. Otherwise, `chosen` wouldn't have been empty.

nodeMap.getNodesByHostAndPort().get(hint).stream()
.forEach(chosen::add);

InetAddress address;
try {
address = hint.toInetAddress();
}
catch (UnknownHostException e) {
// skip addresses that don't resolve
continue;
}

// consider a split with a host hint without a port as being accessible by all nodes in that host
if (!hint.hasPort()) {
nodeMap.getNodesByHost().get(address).stream()
.forEach(chosen::add);
}
}
}

return ImmutableList.copyOf(chosen);
}

return ImmutableList.copyOf(chosen);
}

private static class NodeMap
public static class NodeMap
{
private final SetMultimap<HostAddress, Node> nodesByHostAndPort;
private final SetMultimap<InetAddress, Node> nodesByHost;
Expand All @@ -328,7 +299,7 @@ public NodeMap(SetMultimap<HostAddress, Node> nodesByHostAndPort,
this.coordinatorNodeIds = coordinatorNodeIds;
}

private SetMultimap<HostAddress, Node> getNodesByHostAndPort()
public SetMultimap<HostAddress, Node> getNodesByHostAndPort()
{
return nodesByHostAndPort;
}
Expand Down

0 comments on commit 16332b5

Please sign in to comment.