Skip to content

Commit

Permalink
Fix Scheduler IntelliJ warnings
Browse files Browse the repository at this point in the history
  • Loading branch information
dain committed May 5, 2015
1 parent 0f25cbd commit 0942f23
Show file tree
Hide file tree
Showing 8 changed files with 107 additions and 198 deletions.
Expand Up @@ -17,7 +17,6 @@
import com.facebook.presto.spi.HostAddress; import com.facebook.presto.spi.HostAddress;
import com.facebook.presto.spi.Node; import com.facebook.presto.spi.Node;
import com.facebook.presto.spi.NodeManager; import com.facebook.presto.spi.NodeManager;
import com.google.common.base.Predicate;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import com.google.common.base.Suppliers; import com.google.common.base.Suppliers;
import com.google.common.collect.AbstractIterator; import com.google.common.collect.AbstractIterator;
Expand Down Expand Up @@ -105,43 +104,38 @@ public void reset()
scheduleRandom.set(0); scheduleRandom.set(0);
} }


public NodeSelector createNodeSelector(final String dataSourceName) public NodeSelector createNodeSelector(String dataSourceName)
{ {
// this supplier is thread-safe. TODO: this logic should probably move to the scheduler since the choice of which node to run in should be // this supplier is thread-safe. TODO: this logic should probably move to the scheduler since the choice of which node to run in should be
// done as close to when the the split is about to be scheduled // done as close to when the the split is about to be scheduled
Supplier<NodeMap> nodeMap = Suppliers.memoizeWithExpiration(new Supplier<NodeMap>() Supplier<NodeMap> nodeMap = Suppliers.memoizeWithExpiration(() -> {
{ ImmutableSetMultimap.Builder<HostAddress, Node> byHostAndPort = ImmutableSetMultimap.builder();
@Override ImmutableSetMultimap.Builder<InetAddress, Node> byHost = ImmutableSetMultimap.builder();
public NodeMap get() ImmutableSetMultimap.Builder<Rack, Node> byRack = ImmutableSetMultimap.builder();
{
ImmutableSetMultimap.Builder<HostAddress, Node> byHostAndPort = ImmutableSetMultimap.builder(); Set<Node> nodes;
ImmutableSetMultimap.Builder<InetAddress, Node> byHost = ImmutableSetMultimap.builder(); if (dataSourceName != null) {
ImmutableSetMultimap.Builder<Rack, Node> byRack = ImmutableSetMultimap.builder(); nodes = nodeManager.getActiveDatasourceNodes(dataSourceName);

}
Set<Node> nodes; else {
if (dataSourceName != null) { nodes = nodeManager.getActiveNodes();
nodes = nodeManager.getActiveDatasourceNodes(dataSourceName); }
}
else {
nodes = nodeManager.getActiveNodes();
}


for (Node node : nodes) { for (Node node : nodes) {
try { try {
byHostAndPort.put(node.getHostAndPort(), node); byHostAndPort.put(node.getHostAndPort(), node);


InetAddress host = InetAddress.getByName(node.getHttpUri().getHost()); InetAddress host = InetAddress.getByName(node.getHttpUri().getHost());
byHost.put(host, node); byHost.put(host, node);


byRack.put(Rack.of(host), node); byRack.put(Rack.of(host), node);
} }
catch (UnknownHostException e) { catch (UnknownHostException e) {
// ignore // ignore
}
} }

return new NodeMap(byHostAndPort.build(), byHost.build(), byRack.build());
} }

return new NodeMap(byHostAndPort.build(), byHost.build(), byRack.build());
}, 5, TimeUnit.SECONDS); }, 5, TimeUnit.SECONDS);


return new NodeSelector(nodeMap); return new NodeSelector(nodeMap);
Expand Down Expand Up @@ -176,17 +170,10 @@ public List<Node> selectRandomNodes(int limit)
{ {
checkArgument(limit > 0, "limit must be at least 1"); checkArgument(limit > 0, "limit must be at least 1");


final String coordinatorIdentifier = nodeManager.getCurrentNode().getNodeIdentifier(); String coordinatorIdentifier = nodeManager.getCurrentNode().getNodeIdentifier();


FluentIterable<Node> nodes = FluentIterable.from(lazyShuffle(nodeMap.get().get().getNodesByHostAndPort().values())) FluentIterable<Node> nodes = FluentIterable.from(lazyShuffle(nodeMap.get().get().getNodesByHostAndPort().values()))
.filter(new Predicate<Node>() .filter(node -> includeCoordinator || !coordinatorIdentifier.equals(node.getNodeIdentifier()));
{
@Override
public boolean apply(Node node)
{
return includeCoordinator || !coordinatorIdentifier.equals(node.getNodeIdentifier());
}
});


if (doubleScheduling) { if (doubleScheduling) {
nodes = nodes.cycle(); nodes = nodes.cycle();
Expand All @@ -195,12 +182,11 @@ public boolean apply(Node node)
} }


/** /**
* Identifies the nodes for runnning the specified splits. * Identifies the nodes for running the specified splits.
* *
* @param splits the splits that need to be assigned to nodes * @param splits the splits that need to be assigned to nodes
*
* @return a multimap from node to splits only for splits for which we could identify a node to schedule on. * @return a multimap from node to splits only for splits for which we could identify a node to schedule on.
* If we cannot find an assignment for a split, it is not included in the map. * If we cannot find an assignment for a split, it is not included in the map.
*/ */
public Multimap<Node, Split> computeAssignments(Set<Split> splits, Iterable<RemoteTask> existingTasks) public Multimap<Node, Split> computeAssignments(Set<Split> splits, Iterable<RemoteTask> existingTasks)
{ {
Expand Down Expand Up @@ -231,11 +217,10 @@ public Multimap<Node, Split> computeAssignments(Set<Split> splits, Iterable<Remo
checkCondition(!candidateNodes.isEmpty(), NO_NODES_AVAILABLE, "No nodes available to run query"); checkCondition(!candidateNodes.isEmpty(), NO_NODES_AVAILABLE, "No nodes available to run query");


// compute and cache number of splits currently assigned to each node // compute and cache number of splits currently assigned to each node
for (Node node : candidateNodes) { candidateNodes.stream()
if (!splitCountByNode.containsKey(node)) { .filter(node -> !splitCountByNode.containsKey(node))
splitCountByNode.put(node, nodeTaskMap.getPartitionedSplitsOnNode(node)); .forEach(node -> splitCountByNode.put(node, nodeTaskMap.getPartitionedSplitsOnNode(node)));
}
}
Node chosenNode = null; Node chosenNode = null;
int min = Integer.MAX_VALUE; int min = Integer.MAX_VALUE;


Expand Down Expand Up @@ -271,20 +256,17 @@ public Multimap<Node, Split> computeAssignments(Set<Split> splits, Iterable<Remo
return assignment; return assignment;
} }


private List<Node> selectCandidateNodes(NodeMap nodeMap, final Split split) private List<Node> selectCandidateNodes(NodeMap nodeMap, Split split)
{ {
Set<Node> chosen = new LinkedHashSet<>(minCandidates); Set<Node> chosen = new LinkedHashSet<>(minCandidates);
String coordinatorIdentifier = nodeManager.getCurrentNode().getNodeIdentifier(); String coordinatorIdentifier = nodeManager.getCurrentNode().getNodeIdentifier();


// first look for nodes that match the hint // first look for nodes that match the hint
for (HostAddress hint : split.getAddresses()) { for (HostAddress hint : split.getAddresses()) {
for (Node node : nodeMap.getNodesByHostAndPort().get(hint)) { nodeMap.getNodesByHostAndPort().get(hint).stream()
if (includeCoordinator || !coordinatorIdentifier.equals(node.getNodeIdentifier())) { .filter(node -> includeCoordinator || !coordinatorIdentifier.equals(node.getNodeIdentifier()))
if (chosen.add(node)) { .filter(chosen::add)
scheduleLocal.incrementAndGet(); .forEach(node -> scheduleLocal.incrementAndGet());
}
}
}


InetAddress address; InetAddress address;
try { try {
Expand All @@ -298,13 +280,10 @@ private List<Node> selectCandidateNodes(NodeMap nodeMap, final Split split)
// consider a split with a host hint without a port as being accessible // consider a split with a host hint without a port as being accessible
// by all nodes in that host // by all nodes in that host
if (!hint.hasPort() || split.isRemotelyAccessible()) { if (!hint.hasPort() || split.isRemotelyAccessible()) {
for (Node node : nodeMap.getNodesByHost().get(address)) { nodeMap.getNodesByHost().get(address).stream()
if (includeCoordinator || !coordinatorIdentifier.equals(node.getNodeIdentifier())) { .filter(node -> includeCoordinator || !coordinatorIdentifier.equals(node.getNodeIdentifier()))
if (chosen.add(node)) { .filter(chosen::add)
scheduleLocal.incrementAndGet(); .forEach(node -> scheduleLocal.incrementAndGet());
}
}
}
} }
} }


Expand Down Expand Up @@ -355,30 +334,29 @@ private List<Node> selectCandidateNodes(NodeMap nodeMap, final Split split)
// if the chosen set is empty and the hint includes the coordinator, // if the chosen set is empty and the hint includes the coordinator,
// force pick the coordinator // force pick the coordinator
if (chosen.isEmpty() && !includeCoordinator) { if (chosen.isEmpty() && !includeCoordinator) {
final HostAddress coordinatorHostAddress = nodeManager.getCurrentNode().getHostAndPort(); HostAddress coordinatorHostAddress = nodeManager.getCurrentNode().getHostAndPort();
if (FluentIterable.from(split.getAddresses()).anyMatch(new Predicate<HostAddress>() { if (split.getAddresses().stream().anyMatch(host -> canSplitRunOnHost(split, coordinatorHostAddress, host))) {
@Override chosen.add(nodeManager.getCurrentNode());
public boolean apply(HostAddress hostAddress)
{
// Exact match of the coordinator
if (hostAddress.equals(coordinatorHostAddress)) {
return true;
}
// If the split is remotely accessible or the split location doesn't specify a port,
// we can ignore the coordinator's port and match just the ip address
return (!hostAddress.hasPort() || split.isRemotelyAccessible()) &&
hostAddress.getHostText().equals(coordinatorHostAddress.getHostText());
}
})) {
chosen.add(nodeManager.getCurrentNode());
} }
} }


return ImmutableList.copyOf(chosen); return ImmutableList.copyOf(chosen);
} }

private boolean canSplitRunOnHost(Split split, HostAddress coordinatorHost, HostAddress host)
{
// Exact match of the coordinator
if (host.equals(coordinatorHost)) {
return true;
}
// If the split is remotely accessible or the split location doesn't specify a port,
// we can ignore the coordinator's port and match just the ip address
return (!host.hasPort() || split.isRemotelyAccessible()) &&
host.getHostText().equals(coordinatorHost.getHostText());
}
} }


private static <T> Iterable<T> lazyShuffle(final Iterable<T> iterable) private static <T> Iterable<T> lazyShuffle(Iterable<T> iterable)
{ {
return new Iterable<T>() return new Iterable<T>()
{ {
Expand All @@ -387,8 +365,8 @@ public Iterator<T> iterator()
{ {
return new AbstractIterator<T>() return new AbstractIterator<T>()
{ {
List<T> list = Lists.newArrayList(iterable); private final List<T> list = Lists.newArrayList(iterable);
int limit = list.size(); private int limit = list.size();


@Override @Override
protected T computeNext() protected T computeNext()
Expand Down Expand Up @@ -441,11 +419,11 @@ public SetMultimap<Rack, Node> getNodesByRack()


private static class Rack private static class Rack
{ {
private int id; private final int id;


public static Rack of(InetAddress address) public static Rack of(InetAddress address)
{ {
// TODO: this needs to be pluggable // TODO: we need a plugin for this
int id = InetAddresses.coerceToInteger(address) & 0xFF_FF_FF_00; int id = InetAddresses.coerceToInteger(address) & 0xFF_FF_FF_00;
return new Rack(id); return new Rack(id);
} }
Expand Down
Expand Up @@ -69,18 +69,13 @@ private synchronized int getPartitionedSplitCount()
return partitionedSplitCount; return partitionedSplitCount;
} }


private synchronized void addTask(final RemoteTask task) private synchronized void addTask(RemoteTask task)
{ {
remoteTasks.add(task); remoteTasks.add(task);
task.addStateChangeListener(new StateMachine.StateChangeListener<TaskInfo>() task.addStateChangeListener(taskInfo -> {
{ if (taskInfo.getState().isDone()) {
@Override synchronized (NodeTasks.this) {
public void stateChanged(TaskInfo taskInfo) remoteTasks.remove(task);
{
if (taskInfo.getState().isDone()) {
synchronized (NodeTasks.this) {
remoteTasks.remove(task);
}
} }
} }
}); });
Expand Down
Expand Up @@ -17,7 +17,6 @@
import com.facebook.presto.execution.StateMachine.StateChangeListener; import com.facebook.presto.execution.StateMachine.StateChangeListener;
import com.facebook.presto.metadata.Split; import com.facebook.presto.metadata.Split;
import com.facebook.presto.sql.planner.plan.PlanNodeId; import com.facebook.presto.sql.planner.plan.PlanNodeId;
import io.airlift.units.Duration;


public interface RemoteTask public interface RemoteTask
{ {
Expand All @@ -42,7 +41,4 @@ public interface RemoteTask
int getPartitionedSplitCount(); int getPartitionedSplitCount();


int getQueuedPartitionedSplitCount(); int getQueuedPartitionedSplitCount();

Duration waitForTaskToFinish(Duration maxWait)
throws InterruptedException;
} }

0 comments on commit 0942f23

Please sign in to comment.