Skip to content

Commit

Permalink
Add bucket abstraction and bucket to partition mapping
Browse files Browse the repository at this point in the history
A source is divided into buckets, buckets are assigned to
partitions, and partitions assigned to nodes.  This allows the source
to perform calculations without knowing how many nodes are
going to be used in a query.
Move PartitionFunction creation to DistributionManager
  • Loading branch information
dain committed Jan 30, 2016
1 parent e40de36 commit be22100
Show file tree
Hide file tree
Showing 14 changed files with 168 additions and 85 deletions.
Expand Up @@ -45,7 +45,6 @@
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -191,14 +190,14 @@ private List<SqlStageExecution> createStages(


stages.add(stage); stages.add(stage);


OptionalInt partitionCount = OptionalInt.empty(); Optional<int[]> bucketToPartition = Optional.empty();
if (plan.getPartitioningHandle().isPresent()) { if (plan.getPartitioningHandle().isPresent()) {
NodePartitionMap nodePartitionMap = nodePartitioningManager.getNodePartitioningMap(session, plan.getPartitioningHandle().get()); NodePartitionMap nodePartitionMap = nodePartitioningManager.getNodePartitioningMap(session, plan.getPartitioningHandle().get());
Map<Integer, Node> partitionToNode = nodePartitionMap.getPartitionToNode(); Map<Integer, Node> partitionToNode = nodePartitionMap.getPartitionToNode();
// todo this should asynchronously wait a standard timeout period before failing // todo this should asynchronously wait a standard timeout period before failing
checkCondition(!partitionToNode.isEmpty(), NO_NODES_AVAILABLE, "No worker nodes available"); checkCondition(!partitionToNode.isEmpty(), NO_NODES_AVAILABLE, "No worker nodes available");
stageSchedulers.put(stageId, new FixedCountScheduler(stage, partitionToNode)); stageSchedulers.put(stageId, new FixedCountScheduler(stage, partitionToNode));
partitionCount = OptionalInt.of(nodePartitionMap.getPartitionToNode().size()); bucketToPartition = Optional.of(nodePartitionMap.getBucketToPartition());
} }
else { else {
checkArgument(plan.getFragment().getDistribution() == PlanDistribution.SOURCE, "Expected plan fragment to be source partitioned"); checkArgument(plan.getFragment().getDistribution() == PlanDistribution.SOURCE, "Expected plan fragment to be source partitioned");
Expand All @@ -213,7 +212,7 @@ private List<SqlStageExecution> createStages(
Optional.of(stage), Optional.of(stage),
nextStageId, nextStageId,
locationFactory, locationFactory,
subStagePlan.withPartitionCount(partitionCount), subStagePlan.withBucketToPartition(bucketToPartition),
nodeScheduler, nodeScheduler,
remoteTaskFactory, remoteTaskFactory,
session, session,
Expand Down
@@ -0,0 +1,21 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.operator;

import com.facebook.presto.spi.Page;

public interface BucketFunction
{
int getBucket(Page page, int position);
}
Expand Up @@ -15,12 +15,34 @@


import com.facebook.presto.spi.Page; import com.facebook.presto.spi.Page;


public interface PartitionFunction import java.util.stream.IntStream;

import static java.util.Objects.requireNonNull;

public class PartitionFunction
{ {
int getPartitionCount(); private final BucketFunction bucketFunction;
private final int[] bucketToPartition;
private final int partitionCount;

public PartitionFunction(BucketFunction bucketFunction, int[] bucketToPartition)
{
this.bucketFunction = requireNonNull(bucketFunction, "bucketFunction is null");
this.bucketToPartition = requireNonNull(bucketToPartition, "bucketToPartition is null").clone();
partitionCount = IntStream.of(bucketToPartition).max().getAsInt() + 1;
}

public int getPartitionCount()
{
return partitionCount;
}


/** /**
* @param functionArguments the arguments to partition function in order (no extra columns) * @param functionArguments the arguments to partition function in order (no extra columns)
*/ */
int getPartition(Page functionArguments, int position); public int getPartition(Page functionArguments, int position)
{
int bucket = bucketFunction.getBucket(functionArguments, position);
return bucketToPartition[bucket];
}
} }
Expand Up @@ -212,6 +212,7 @@ public class LocalExecutionPlanner


private final PageSourceProvider pageSourceProvider; private final PageSourceProvider pageSourceProvider;
private final IndexManager indexManager; private final IndexManager indexManager;
private final NodePartitioningManager nodePartitioningManager;
private final PageSinkManager pageSinkManager; private final PageSinkManager pageSinkManager;
private final ExchangeClientSupplier exchangeClientSupplier; private final ExchangeClientSupplier exchangeClientSupplier;
private final ExpressionCompiler compiler; private final ExpressionCompiler compiler;
Expand All @@ -226,6 +227,7 @@ public LocalExecutionPlanner(
SqlParser sqlParser, SqlParser sqlParser,
PageSourceProvider pageSourceProvider, PageSourceProvider pageSourceProvider,
IndexManager indexManager, IndexManager indexManager,
NodePartitioningManager nodePartitioningManager,
PageSinkManager pageSinkManager, PageSinkManager pageSinkManager,
ExchangeClientSupplier exchangeClientSupplier, ExchangeClientSupplier exchangeClientSupplier,
ExpressionCompiler compiler, ExpressionCompiler compiler,
Expand All @@ -236,6 +238,7 @@ public LocalExecutionPlanner(
requireNonNull(compilerConfig, "compilerConfig is null"); requireNonNull(compilerConfig, "compilerConfig is null");
this.pageSourceProvider = requireNonNull(pageSourceProvider, "pageSourceProvider is null"); this.pageSourceProvider = requireNonNull(pageSourceProvider, "pageSourceProvider is null");
this.indexManager = requireNonNull(indexManager, "indexManager is null"); this.indexManager = requireNonNull(indexManager, "indexManager is null");
this.nodePartitioningManager = requireNonNull(nodePartitioningManager, "nodePartitioningManager is null");
this.exchangeClientSupplier = exchangeClientSupplier; this.exchangeClientSupplier = exchangeClientSupplier;
this.metadata = requireNonNull(metadata, "metadata is null"); this.metadata = requireNonNull(metadata, "metadata is null");
this.sqlParser = requireNonNull(sqlParser, "sqlParser is null"); this.sqlParser = requireNonNull(sqlParser, "sqlParser is null");
Expand Down Expand Up @@ -280,8 +283,7 @@ public LocalExecutionPlan plan(
.collect(toImmutableList()); .collect(toImmutableList());
} }


PartitionFunction partitionFunction = functionBinding.getFunctionHandle().createPartitionFunction(functionBinding, partitionChannelTypes); PartitionFunction partitionFunction = nodePartitioningManager.getPartitionFunction(session, functionBinding, partitionChannelTypes);

OptionalInt nullChannel = OptionalInt.empty(); OptionalInt nullChannel = OptionalInt.empty();
if (functionBinding.isReplicateNulls()) { if (functionBinding.isReplicateNulls()) {
checkArgument(functionBinding.getPartitioningColumns().size() == 1); checkArgument(functionBinding.getPartitioningColumns().size() == 1);
Expand Down
Expand Up @@ -17,20 +17,35 @@
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;


import java.util.Map; import java.util.Map;
import java.util.stream.IntStream;


import static java.util.Objects.requireNonNull; import static java.util.Objects.requireNonNull;


public class NodePartitionMap public class NodePartitionMap
{ {
private final Map<Integer, Node> partitionToNode; private final Map<Integer, Node> partitionToNode;
private final int[] bucketToPartition;


public NodePartitionMap(Map<Integer, Node> partitionToNode) public NodePartitionMap(Map<Integer, Node> partitionToNode)
{ {
this.partitionToNode = ImmutableMap.copyOf(requireNonNull(partitionToNode, "partitionToNode is null")); this.partitionToNode = ImmutableMap.copyOf(requireNonNull(partitionToNode, "partitionToNode is null"));

this.bucketToPartition = IntStream.range(0, partitionToNode.size()).toArray();
}

public NodePartitionMap(Map<Integer, Node> partitionToNode, int[] bucketToPartition)
{
this.bucketToPartition = requireNonNull(bucketToPartition, "bucketToPartition is null");
this.partitionToNode = ImmutableMap.copyOf(requireNonNull(partitionToNode, "partitionToNode is null"));
} }


public Map<Integer, Node> getPartitionToNode() public Map<Integer, Node> getPartitionToNode()
{ {
return partitionToNode; return partitionToNode;
} }

public int[] getBucketToPartition()
{
return bucketToPartition;
}
} }
Expand Up @@ -16,7 +16,10 @@
import com.facebook.presto.Session; import com.facebook.presto.Session;
import com.facebook.presto.execution.scheduler.NodeScheduler; import com.facebook.presto.execution.scheduler.NodeScheduler;
import com.facebook.presto.execution.scheduler.NodeSelector; import com.facebook.presto.execution.scheduler.NodeSelector;
import com.facebook.presto.operator.BucketFunction;
import com.facebook.presto.operator.PartitionFunction;
import com.facebook.presto.spi.Node; import com.facebook.presto.spi.Node;
import com.facebook.presto.spi.type.Type;
import com.facebook.presto.sql.planner.PlanFragment.PlanDistribution; import com.facebook.presto.sql.planner.PlanFragment.PlanDistribution;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -77,4 +80,10 @@ else if (planDistribution == FIXED) {
} }
return new NodePartitionMap(partitionToNode.build()); return new NodePartitionMap(partitionToNode.build());
} }

public PartitionFunction getPartitionFunction(Session session, PartitionFunctionBinding functionBinding, List<Type> partitionChannelTypes)
{
BucketFunction bucketFunction = functionBinding.getFunctionHandle().createBucketFunction(functionBinding, partitionChannelTypes);
return new PartitionFunction(bucketFunction, functionBinding.getBucketToPartition().get());
}
} }
Expand Up @@ -20,7 +20,6 @@
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.OptionalInt;


import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkArgument;
Expand All @@ -32,15 +31,15 @@ public class PartitionFunctionBinding
private final List<Symbol> partitioningColumns; private final List<Symbol> partitioningColumns;
private final Optional<Symbol> hashColumn; private final Optional<Symbol> hashColumn;
private final boolean replicateNulls; private final boolean replicateNulls;
private final OptionalInt partitionCount; private final Optional<int[]> bucketToPartition;


public PartitionFunctionBinding(PartitionFunctionHandle functionHandle, List<Symbol> partitioningColumns) public PartitionFunctionBinding(PartitionFunctionHandle functionHandle, List<Symbol> partitioningColumns)
{ {
this(functionHandle, this(functionHandle,
partitioningColumns, partitioningColumns,
Optional.empty(), Optional.empty(),
false, false,
OptionalInt.empty()); Optional.empty());
} }


public PartitionFunctionBinding(PartitionFunctionHandle functionHandle, List<Symbol> partitioningColumns, Optional<Symbol> hashColumn) public PartitionFunctionBinding(PartitionFunctionHandle functionHandle, List<Symbol> partitioningColumns, Optional<Symbol> hashColumn)
Expand All @@ -49,7 +48,7 @@ public PartitionFunctionBinding(PartitionFunctionHandle functionHandle, List<Sym
partitioningColumns, partitioningColumns,
hashColumn, hashColumn,
false, false,
OptionalInt.empty()); Optional.empty());
} }


@JsonCreator @JsonCreator
Expand All @@ -58,14 +57,14 @@ public PartitionFunctionBinding(
@JsonProperty("partitioningColumns") List<Symbol> partitioningColumns, @JsonProperty("partitioningColumns") List<Symbol> partitioningColumns,
@JsonProperty("hashColumn") Optional<Symbol> hashColumn, @JsonProperty("hashColumn") Optional<Symbol> hashColumn,
@JsonProperty("replicateNulls") boolean replicateNulls, @JsonProperty("replicateNulls") boolean replicateNulls,
@JsonProperty("partitionCount") OptionalInt partitionCount) @JsonProperty("bucketToPartition") Optional<int[]> bucketToPartition)
{ {
this.functionHandle = requireNonNull(functionHandle, "functionHandle is null"); this.functionHandle = requireNonNull(functionHandle, "functionHandle is null");
this.partitioningColumns = ImmutableList.copyOf(requireNonNull(partitioningColumns, "partitioningColumns is null")); this.partitioningColumns = ImmutableList.copyOf(requireNonNull(partitioningColumns, "partitioningColumns is null"));
this.hashColumn = requireNonNull(hashColumn, "hashColumn is null"); this.hashColumn = requireNonNull(hashColumn, "hashColumn is null");
checkArgument(!replicateNulls || partitioningColumns.size() == 1, "size of partitioningColumns is not 1 when nullPartition is REPLICATE."); checkArgument(!replicateNulls || partitioningColumns.size() == 1, "size of partitioningColumns is not 1 when nullPartition is REPLICATE.");
this.replicateNulls = replicateNulls; this.replicateNulls = replicateNulls;
this.partitionCount = requireNonNull(partitionCount, "partitionCount is null"); this.bucketToPartition = requireNonNull(bucketToPartition, "bucketToPartition is null");
} }


@JsonProperty @JsonProperty
Expand Down Expand Up @@ -93,14 +92,14 @@ public boolean isReplicateNulls()
} }


@JsonProperty @JsonProperty
public OptionalInt getPartitionCount() public Optional<int[]> getBucketToPartition()
{ {
return partitionCount; return bucketToPartition;
} }


public PartitionFunctionBinding withPartitionCount(OptionalInt partitionCount) public PartitionFunctionBinding withBucketToPartition(Optional<int[]> bucketToPartition)
{ {
return new PartitionFunctionBinding(functionHandle, partitioningColumns, hashColumn, replicateNulls, partitionCount); return new PartitionFunctionBinding(functionHandle, partitioningColumns, hashColumn, replicateNulls, bucketToPartition);
} }


@Override @Override
Expand All @@ -117,13 +116,13 @@ public boolean equals(Object o)
Objects.equals(partitioningColumns, that.partitioningColumns) && Objects.equals(partitioningColumns, that.partitioningColumns) &&
Objects.equals(hashColumn, that.hashColumn) && Objects.equals(hashColumn, that.hashColumn) &&
replicateNulls == that.replicateNulls && replicateNulls == that.replicateNulls &&
Objects.equals(partitionCount, that.partitionCount); Objects.equals(bucketToPartition, that.bucketToPartition);
} }


@Override @Override
public int hashCode() public int hashCode()
{ {
return Objects.hash(functionHandle, partitioningColumns, replicateNulls, partitionCount); return Objects.hash(functionHandle, partitioningColumns, replicateNulls, bucketToPartition);
} }


@Override @Override
Expand All @@ -134,7 +133,7 @@ public String toString()
.add("partitioningChannels", partitioningColumns) .add("partitioningChannels", partitioningColumns)
.add("hashChannel", hashColumn) .add("hashChannel", hashColumn)
.add("replicateNulls", replicateNulls) .add("replicateNulls", replicateNulls)
.add("partitionCount", partitionCount) .add("bucketToPartition", bucketToPartition)
.toString(); .toString();
} }
} }

0 comments on commit be22100

Please sign in to comment.