Skip to content

Commit

Permalink
Create new page sink for every table writer
Browse files Browse the repository at this point in the history
  • Loading branch information
nileema committed Jan 6, 2015
1 parent 6b708ec commit ea60fbf
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 21 deletions.
Expand Up @@ -18,6 +18,8 @@
import com.facebook.presto.spi.PageBuilder;
import com.facebook.presto.spi.block.Block;
import com.facebook.presto.spi.type.Type;
import com.facebook.presto.split.PageSinkManager;
import com.facebook.presto.sql.planner.plan.TableWriterNode.WriterTarget;
import com.google.common.collect.ImmutableList;
import io.airlift.slice.Slices;

Expand All @@ -26,6 +28,9 @@

import static com.facebook.presto.spi.type.BigintType.BIGINT;
import static com.facebook.presto.spi.type.VarcharType.VARCHAR;
import static com.facebook.presto.sql.planner.plan.TableWriterNode.CreateHandle;
import static com.facebook.presto.sql.planner.plan.TableWriterNode.InsertHandle;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;

Expand All @@ -38,16 +43,19 @@ public static class TableWriterOperatorFactory
implements OperatorFactory
{
private final int operatorId;
private final ConnectorPageSink pageSink;
private final PageSinkManager pageSinkManager;
private final WriterTarget target;
private final List<Integer> inputChannels;
private final Optional<Integer> sampleWeightChannel;
private boolean closed;

public TableWriterOperatorFactory(int operatorId, ConnectorPageSink pageSink, List<Integer> inputChannels, Optional<Integer> sampleWeightChannel)
public TableWriterOperatorFactory(int operatorId, PageSinkManager pageSinkManager, WriterTarget writerTarget, List<Integer> inputChannels, Optional<Integer> sampleWeightChannel)
{
this.operatorId = operatorId;
this.inputChannels = checkNotNull(inputChannels, "inputChannels is null");
this.pageSink = checkNotNull(pageSink, "pageSink is null");
this.pageSinkManager = checkNotNull(pageSinkManager, "pageSinkManager is null");
checkArgument(writerTarget instanceof CreateHandle || writerTarget instanceof InsertHandle, "writerTarget must be CreateHandle or InsertHandle");
this.target = checkNotNull(writerTarget, "writerTarget is null");
this.sampleWeightChannel = checkNotNull(sampleWeightChannel, "sampleWeightChannel is null");
}

Expand All @@ -62,7 +70,18 @@ public Operator createOperator(DriverContext driverContext)
{
checkState(!closed, "Factory is already closed");
OperatorContext context = driverContext.addOperatorContext(operatorId, TableWriterOperator.class.getSimpleName());
return new TableWriterOperator(context, pageSink, inputChannels, sampleWeightChannel);
return new TableWriterOperator(context, createPageSink(), inputChannels, sampleWeightChannel);
}

private ConnectorPageSink createPageSink()
{
if (target instanceof CreateHandle) {
return pageSinkManager.createPageSink(((CreateHandle) target).getHandle());
}
if (target instanceof InsertHandle) {
return pageSinkManager.createPageSink(((InsertHandle) target).getHandle());
}
throw new UnsupportedOperationException("Unhandled target type: " + target.getClass().getName());
}

@Override
Expand Down
Expand Up @@ -64,7 +64,6 @@
import com.facebook.presto.operator.index.IndexJoinLookupStats;
import com.facebook.presto.operator.index.IndexLookupSourceSupplier;
import com.facebook.presto.operator.index.IndexSourceOperator;
import com.facebook.presto.spi.ConnectorPageSink;
import com.facebook.presto.spi.Index;
import com.facebook.presto.spi.PageBuilder;
import com.facebook.presto.spi.PrestoException;
Expand Down Expand Up @@ -1313,17 +1312,14 @@ public PhysicalOperation visitTableWriter(TableWriterNode node, LocalExecutionPl

Optional<Integer> sampleWeightChannel = node.getSampleWeightSymbol().map(exchange::symbolToChannel);

// create the table writer
ConnectorPageSink pageSink = getPageSink(node);

// Set table writer count
context.setDriverInstanceCount(writerCount);

List<Integer> inputChannels = node.getColumns().stream()
.map(exchange::symbolToChannel)
.collect(toImmutableList());

OperatorFactory operatorFactory = new TableWriterOperatorFactory(context.getNextOperatorId(), pageSink, inputChannels, sampleWeightChannel);
OperatorFactory operatorFactory = new TableWriterOperatorFactory(context.getNextOperatorId(), pageSinkManager, node.getTarget(), inputChannels, sampleWeightChannel);

Map<Symbol, Integer> layout = ImmutableMap.<Symbol, Integer>builder()
.put(node.getOutputSymbols().get(0), 0)
Expand Down Expand Up @@ -1522,18 +1518,6 @@ private PhysicalOperation planGroupByAggregation(AggregationNode node, final Phy
}
}

private ConnectorPageSink getPageSink(TableWriterNode node)
{
WriterTarget target = node.getTarget();
if (target instanceof CreateHandle) {
return pageSinkManager.createPageSink(((CreateHandle) target).getHandle());
}
if (target instanceof InsertHandle) {
return pageSinkManager.createPageSink(((InsertHandle) target).getHandle());
}
throw new AssertionError("Unhandled target type: " + target.getClass().getName());
}

public static List<Type> toTypes(List<ProjectionFunction> projections)
{
ImmutableList.Builder<Type> builder = ImmutableList.builder();
Expand Down

0 comments on commit ea60fbf

Please sign in to comment.