Skip to content

Commit

Permalink
Add scheduler support for custom partitioning across nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
dain committed Jan 30, 2016
1 parent d168fe5 commit fc0648d
Show file tree
Hide file tree
Showing 12 changed files with 278 additions and 42 deletions.
@@ -0,0 +1,56 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.execution.scheduler;

import com.facebook.presto.execution.RemoteTask;
import com.facebook.presto.metadata.Split;
import com.facebook.presto.spi.Node;
import com.google.common.collect.Multimap;

import java.util.List;
import java.util.Set;
import java.util.function.Supplier;

import static java.util.Objects.requireNonNull;

public class DynamicSplitPlacementPolicy
implements SplitPlacementPolicy
{
private final NodeSelector nodeSelector;
private final Supplier<? extends List<RemoteTask>> remoteTasks;

public DynamicSplitPlacementPolicy(NodeSelector nodeSelector, Supplier<? extends List<RemoteTask>> remoteTasks)
{
this.nodeSelector = requireNonNull(nodeSelector, "nodeSelector is null");
this.remoteTasks = requireNonNull(remoteTasks, "remoteTasks is null");
}

@Override
public Multimap<Node, Split> computeAssignments(Set<Split> splits)
{
return nodeSelector.computeAssignments(splits, remoteTasks.get());
}

@Override
public void lockDownNodes()
{
nodeSelector.lockDownNodes();
}

@Override
public List<Node> allNodes()
{
return nodeSelector.allNodes();
}
}
@@ -0,0 +1,113 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.execution.scheduler;

import com.facebook.presto.execution.RemoteTask;
import com.facebook.presto.execution.SqlStageExecution;
import com.facebook.presto.metadata.Split;
import com.facebook.presto.spi.Node;
import com.facebook.presto.split.SplitSource;
import com.facebook.presto.sql.planner.NodePartitionMap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Multimap;

import java.util.List;
import java.util.Set;
import java.util.function.Supplier;

import static com.facebook.presto.util.ImmutableCollectors.toImmutableList;
import static java.util.Objects.requireNonNull;

public class FixedSourcePartitionedScheduler
implements StageScheduler
{
private final SqlStageExecution stage;
private final NodePartitionMap partitioning;
private final SourcePartitionedScheduler sourcePartitionedScheduler;
private boolean scheduledTasks;

public FixedSourcePartitionedScheduler(
SqlStageExecution stage,
SplitSource splitSource,
NodePartitionMap partitioning,
int splitBatchSize,
NodeSelector nodeSelector)
{
requireNonNull(stage, "stage is null");
requireNonNull(splitSource, "splitSource is null");
requireNonNull(partitioning, "partitioning is null");

this.stage = stage;
this.partitioning = partitioning;

FixedSplitPlacementPolicy splitPlacementPolicy = new FixedSplitPlacementPolicy(nodeSelector, partitioning, stage::getAllTasks);
sourcePartitionedScheduler = new SourcePartitionedScheduler(stage, splitSource, splitPlacementPolicy, splitBatchSize);
}

@Override
public ScheduleResult schedule()
{
// schedule a task on every node in the distribution
List<RemoteTask> newTasks = ImmutableList.of();
if (!scheduledTasks) {
newTasks = partitioning.getPartitionToNode().entrySet().stream()
.map(entry -> stage.scheduleTask(entry.getValue(), entry.getKey()))
.collect(toImmutableList());
scheduledTasks = true;
}

ScheduleResult schedule = sourcePartitionedScheduler.schedule();
return new ScheduleResult(schedule.isFinished(), newTasks, schedule.getBlocked());
}

@Override
public void close()
{
sourcePartitionedScheduler.close();
}

private static class FixedSplitPlacementPolicy
implements SplitPlacementPolicy
{
private final NodeSelector nodeSelector;
private final NodePartitionMap partitioning;
private final Supplier<? extends List<RemoteTask>> remoteTasks;

public FixedSplitPlacementPolicy(NodeSelector nodeSelector,
NodePartitionMap partitioning,
Supplier<? extends List<RemoteTask>> remoteTasks)
{
this.nodeSelector = nodeSelector;
this.partitioning = partitioning;
this.remoteTasks = remoteTasks;
}

@Override
public Multimap<Node, Split> computeAssignments(Set<Split> splits)
{
return nodeSelector.computeAssignments(splits, remoteTasks.get(), partitioning);
}

@Override
public void lockDownNodes()
{
}

@Override
public List<Node> allNodes()
{
return ImmutableList.copyOf(partitioning.getPartitionToNode().values());
}
}
}
Expand Up @@ -14,14 +14,20 @@
package com.facebook.presto.execution.scheduler;

import com.facebook.presto.execution.NodeTaskMap;
import com.facebook.presto.execution.RemoteTask;
import com.facebook.presto.metadata.Split;
import com.facebook.presto.spi.HostAddress;
import com.facebook.presto.spi.Node;
import com.facebook.presto.spi.NodeManager;
import com.facebook.presto.sql.planner.NodePartitionMap;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.ImmutableSetMultimap;
import com.google.common.collect.Multimap;
import io.airlift.stats.CounterStat;

import javax.annotation.PreDestroy;
Expand Down Expand Up @@ -259,4 +265,27 @@ public static List<Node> selectExactNodes(NodeMap nodeMap, List<HostAddress> hos

return ImmutableList.copyOf(chosen);
}

public static Multimap<Node, Split> selectDistributionNodes(
NodeMap nodeMap,
NodeTaskMap nodeTaskMap,
int maxSplitsPerNode,
Set<Split> splits,
List<RemoteTask> existingTasks,
NodePartitionMap partitioning)
{
Multimap<Node, Split> assignments = HashMultimap.create();
NodeAssignmentStats assignmentStats = new NodeAssignmentStats(nodeTaskMap, nodeMap, existingTasks);

for (Split split : splits) {
// node placement is forced by the partitioning
Node node = partitioning.getNode(split);

// if node is full, don't schedule now, which will push back on the scheduling of splits
if (assignmentStats.getTotalSplitCount(node) < maxSplitsPerNode) {
assignments.put(node, split);
}
}
return ImmutableMultimap.copyOf(assignments);
}
}
Expand Up @@ -16,6 +16,7 @@
import com.facebook.presto.execution.RemoteTask;
import com.facebook.presto.metadata.Split;
import com.facebook.presto.spi.Node;
import com.facebook.presto.sql.planner.NodePartitionMap;
import com.google.common.collect.Multimap;

import java.util.List;
Expand All @@ -39,4 +40,13 @@ public interface NodeSelector
* If we cannot find an assignment for a split, it is not included in the map.
*/
Multimap<Node, Split> computeAssignments(Set<Split> splits, List<RemoteTask> existingTasks);

/**
* Identifies the nodes for running the specified splits based on a precomputed fixed partitioning.
*
* @param splits the splits that need to be assigned to nodes
* @return a multimap from node to splits only for splits for which we could identify a node with free space.
* If we cannot find an assignment for a split, it is not included in the map.
*/
Multimap<Node, Split> computeAssignments(Set<Split> splits, List<RemoteTask> existingTasks, NodePartitionMap partitioning);
}
Expand Up @@ -19,6 +19,7 @@
import com.facebook.presto.spi.Node;
import com.facebook.presto.spi.NodeManager;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.sql.planner.NodePartitionMap;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.HashMultimap;
Expand All @@ -31,6 +32,7 @@
import java.util.concurrent.atomic.AtomicReference;

import static com.facebook.presto.execution.scheduler.NodeScheduler.randomizedNodes;
import static com.facebook.presto.execution.scheduler.NodeScheduler.selectDistributionNodes;
import static com.facebook.presto.execution.scheduler.NodeScheduler.selectExactNodes;
import static com.facebook.presto.execution.scheduler.NodeScheduler.selectNodes;
import static com.facebook.presto.spi.StandardErrorCode.NO_NODES_AVAILABLE;
Expand Down Expand Up @@ -144,4 +146,10 @@ public Multimap<Node, Split> computeAssignments(Set<Split> splits, List<RemoteTa
}
return assignment;
}

@Override
public Multimap<Node, Split> computeAssignments(Set<Split> splits, List<RemoteTask> existingTasks, NodePartitionMap partitioning)
{
return selectDistributionNodes(nodeMap.get().get(), nodeTaskMap, maxSplitsPerNode, splits, existingTasks, partitioning);
}
}
Expand Up @@ -13,40 +13,18 @@
*/
package com.facebook.presto.execution.scheduler;

import com.facebook.presto.execution.RemoteTask;
import com.facebook.presto.metadata.Split;
import com.facebook.presto.spi.Node;
import com.google.common.collect.Multimap;

import java.util.List;
import java.util.Set;
import java.util.function.Supplier;

import static java.util.Objects.requireNonNull;

public class SplitPlacementPolicy
public interface SplitPlacementPolicy
{
private final NodeSelector nodeSelector;
private final Supplier<List<RemoteTask>> remoteTasks;

public SplitPlacementPolicy(NodeSelector nodeSelector, Supplier<List<RemoteTask>> remoteTasks)
{
this.nodeSelector = requireNonNull(nodeSelector, "nodeSelector is null");
this.remoteTasks = requireNonNull(remoteTasks, "remoteTasks is null");
}

public Multimap<Node, Split> computeAssignments(Set<Split> splits)
{
return nodeSelector.computeAssignments(splits, remoteTasks.get());
}
Multimap<Node, Split> computeAssignments(Set<Split> splits);

public void lockDownNodes()
{
nodeSelector.lockDownNodes();
}
void lockDownNodes();

public List<Node> allNodes()
{
return nodeSelector.allNodes();
}
List<Node> allNodes();
}

0 comments on commit fc0648d

Please sign in to comment.