Skip to content

Commit

Permalink
Allow multiple sources in a task
Browse files Browse the repository at this point in the history
  • Loading branch information
dain committed May 3, 2016
1 parent d42609e commit e8a5a90
Show file tree
Hide file tree
Showing 24 changed files with 417 additions and 245 deletions.
Expand Up @@ -14,6 +14,7 @@
package com.facebook.presto;

import com.facebook.presto.metadata.Split;
import com.facebook.presto.sql.planner.plan.PlanNodeId;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.primitives.Longs;
Expand All @@ -24,12 +25,17 @@
public class ScheduledSplit
{
private final long sequenceId;
private final PlanNodeId planNodeId;
private final Split split;

@JsonCreator
public ScheduledSplit(@JsonProperty("sequenceId") long sequenceId, @JsonProperty("split") Split split)
public ScheduledSplit(
@JsonProperty("sequenceId") long sequenceId,
@JsonProperty("planNodeId") PlanNodeId planNodeId,
@JsonProperty("split") Split split)
{
this.sequenceId = sequenceId;
this.planNodeId = requireNonNull(planNodeId, "planNodeId is null");
this.split = requireNonNull(split, "split is null");
}

Expand All @@ -39,6 +45,12 @@ public long getSequenceId()
return sequenceId;
}

@JsonProperty
public PlanNodeId getPlanNodeId()
{
return planNodeId;
}

@JsonProperty
public Split getSplit()
{
Expand Down Expand Up @@ -69,6 +81,7 @@ public String toString()
{
return toStringHelper(this)
.add("sequenceId", sequenceId)
.add("planNodeId", planNodeId)
.add("split", split)
.toString();
}
Expand Down
Expand Up @@ -57,6 +57,7 @@ public final class SystemSessionProperties
public static final String COLUMNAR_PROCESSING_DICTIONARY = "columnar_processing_dictionary";
public static final String DICTIONARY_AGGREGATION = "dictionary_aggregation";
public static final String PLAN_WITH_TABLE_NODE_PARTITIONING = "plan_with_table_node_partitioning";
public static final String COLOCATED_JOIN = "colocated_join";
public static final String INITIAL_SPLITS_PER_NODE = "initial_splits_per_node";
public static final String SPLIT_CONCURRENCY_ADJUSTMENT_INTERVAL = "split_concurrency_adjustment_interval";
public static final String OPTIMIZE_METADATA_QUERIES = "optimize_metadata_queries";
Expand Down Expand Up @@ -219,6 +220,11 @@ public SystemSessionProperties(
PLAN_WITH_TABLE_NODE_PARTITIONING,
"Experimental: Adapt plan to pre-partitioned tables",
true,
false),
booleanSessionProperty(
COLOCATED_JOIN,
"Experimental: Use a colocated join when possible",
false,
false));
}

Expand Down Expand Up @@ -322,6 +328,11 @@ public static boolean planWithTableNodePartitioning(Session session)
return session.getProperty(PLAN_WITH_TABLE_NODE_PARTITIONING, Boolean.class);
}

public static boolean isColocatedJoinEnabled(Session session)
{
return session.getProperty(COLOCATED_JOIN, Boolean.class);
}

public static int getInitialSplitsPerNode(Session session)
{
return session.getProperty(INITIAL_SPLITS_PER_NODE, Integer.class);
Expand Down
Expand Up @@ -278,7 +278,7 @@ public QueryInfo getQueryInfo(StageInfo rootStage)
}

PlanFragment plan = stageInfo.getPlan();
if (plan != null && plan.getPartitionedSourceNode() instanceof TableScanNode) {
if (plan != null && plan.getPartitionedSourceNodes().stream().anyMatch(TableScanNode.class::isInstance)) {
rawInputDataSize += stageStats.getRawInputDataSize().toBytes();
rawInputPositions += stageStats.getRawInputPositions();

Expand Down
Expand Up @@ -52,7 +52,6 @@
import static com.facebook.presto.OutputBuffers.INITIAL_EMPTY_OUTPUT_BUFFERS;
import static com.facebook.presto.util.ImmutableCollectors.toImmutableList;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.Sets.newConcurrentHashSet;
import static io.airlift.concurrent.MoreFutures.firstCompletedFuture;
import static io.airlift.http.client.HttpUriBuilder.uriBuilderFrom;
Expand Down Expand Up @@ -160,8 +159,7 @@ public synchronized void schedulingComplete()
stateMachine.transitionToFinished();
}

PlanNodeId partitionedSource = stateMachine.getFragment().getPartitionedSource();
if (partitionedSource != null) {
for (PlanNodeId partitionedSource : stateMachine.getFragment().getPartitionedSources()) {
for (RemoteTask task : getAllTasks()) {
task.noMoreSplits(partitionedSource);
}
Expand Down Expand Up @@ -291,39 +289,34 @@ public synchronized RemoteTask scheduleTask(Node node, int partition)
{
requireNonNull(node, "node is null");

return scheduleTask(node, partition, null, ImmutableList.<Split>of());
return scheduleTask(node, partition, ImmutableMultimap.of());
}

public synchronized Set<RemoteTask> scheduleSplits(Node node, int partition, Iterable<Split> splits)
public synchronized Set<RemoteTask> scheduleSplits(Node node, int partition, Multimap<PlanNodeId, Split> splits)
{
requireNonNull(node, "node is null");
requireNonNull(splits, "splits is null");

PlanNodeId partitionedSource = stateMachine.getFragment().getPartitionedSource();
checkState(partitionedSource != null, "Partitioned source is null");
checkArgument(stateMachine.getFragment().getPartitionedSources().containsAll(splits.keySet()), "Invalid splits");

ImmutableSet.Builder<RemoteTask> newTasks = ImmutableSet.builder();
Collection<RemoteTask> tasks = this.tasks.get(node);
if (tasks == null) {
newTasks.add(scheduleTask(node, partition, partitionedSource, splits));
newTasks.add(scheduleTask(node, partition, splits));
}
else {
RemoteTask task = tasks.iterator().next();
task.addSplits(ImmutableMultimap.<PlanNodeId, Split>builder()
.putAll(partitionedSource, splits)
.build());
task.addSplits(splits);
}
return newTasks.build();
}

private synchronized RemoteTask scheduleTask(Node node, int partition, PlanNodeId sourceId, Iterable<Split> sourceSplits)
private synchronized RemoteTask scheduleTask(Node node, int partition, Multimap<PlanNodeId, Split> sourceSplits)
{
TaskId taskId = new TaskId(stateMachine.getStageId(), String.valueOf(nextTaskId.getAndIncrement()));

ImmutableMultimap.Builder<PlanNodeId, Split> initialSplits = ImmutableMultimap.builder();
for (Split sourceSplit : sourceSplits) {
initialSplits.put(sourceId, sourceSplit);
}
initialSplits.putAll(sourceSplits);
for (Entry<PlanNodeId, URI> entry : exchangeLocations.entries()) {
initialSplits.put(entry.getKey(), createRemoteSplitFor(taskId, entry.getValue()));
}
Expand Down

0 comments on commit e8a5a90

Please sign in to comment.