Skip to content

Commit

Permalink
Blockable TableWriterOperator
Browse files Browse the repository at this point in the history
Now connectors can be implemented in such way that driver executing
TableWriterOperator will not be stucked in appendPage() method of a PageSink
  • Loading branch information
pnowojski authored and dain committed Feb 29, 2016
1 parent 17dc4aa commit b952af4
Show file tree
Hide file tree
Showing 8 changed files with 257 additions and 13 deletions.
Expand Up @@ -26,6 +26,7 @@
import io.airlift.slice.Slice;

import java.util.Collection;
import java.util.concurrent.CompletableFuture;

public class BlackHolePageSinkProvider
implements ConnectorPageSinkProvider
Expand All @@ -46,8 +47,9 @@ private static class NoOpConnectorPageSink
implements ConnectorPageSink
{
@Override
public void appendPage(Page page, Block sampleWeightBlock)
public CompletableFuture<?> appendPage(Page page, Block sampleWeightBlock)
{
return NOT_BLOCKED;
}

@Override
Expand Down
Expand Up @@ -59,6 +59,7 @@
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;

import static com.facebook.presto.hive.HiveColumnHandle.SAMPLE_WEIGHT_COLUMN_NAME;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_INVALID_METADATA;
Expand Down Expand Up @@ -247,10 +248,10 @@ public void abort()
}

@Override
public void appendPage(Page page, Block sampleWeightBlock)
public CompletableFuture<?> appendPage(Page page, Block sampleWeightBlock)
{
if (page.getPositionCount() == 0) {
return;
return NOT_BLOCKED;
}

Block[] dataBlocks = getDataBlocks(page, sampleWeightBlock);
Expand Down Expand Up @@ -278,6 +279,7 @@ public void appendPage(Page page, Block sampleWeightBlock)

writer.addRow(dataBlocks, position);
}
return NOT_BLOCKED;
}

private HiveRecordWriter createWriter(List<Object> partitionRow)
Expand Down
Expand Up @@ -24,11 +24,14 @@
import com.facebook.presto.sql.planner.plan.PlanNodeId;
import com.facebook.presto.sql.planner.plan.TableWriterNode.WriterTarget;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.concurrent.MoreFutures;
import io.airlift.slice.Slice;

import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import static com.facebook.presto.spi.type.BigintType.BIGINT;
import static com.facebook.presto.spi.type.VarbinaryType.VARBINARY;
Expand Down Expand Up @@ -121,6 +124,7 @@ private enum State
private final Optional<Integer> sampleWeightChannel;
private final List<Integer> inputChannels;

private ListenableFuture<?> blocked = NOT_BLOCKED;
private State state = State.RUNNING;
private long rowCount;
private boolean committed;
Expand Down Expand Up @@ -160,20 +164,36 @@ public void finish()
@Override
public boolean isFinished()
{
return state == State.FINISHED;
updateBlockedIfNecessary();
return state == State.FINISHED && blocked == NOT_BLOCKED;
}

@Override
public ListenableFuture<?> isBlocked()
{
updateBlockedIfNecessary();
return blocked;
}

@Override
public boolean needsInput()
{
return state == State.RUNNING;
updateBlockedIfNecessary();
return state == State.RUNNING && blocked == NOT_BLOCKED;
}

private void updateBlockedIfNecessary()
{
if (blocked != NOT_BLOCKED && blocked.isDone()) {
blocked = NOT_BLOCKED;
}
}

@Override
public void addInput(Page page)
{
requireNonNull(page, "page is null");
checkState(state == State.RUNNING, "Operator is %s", state);
checkState(needsInput(), "Operator does not need input");

Block[] blocks = new Block[inputChannels.size()];
for (int outputChannel = 0; outputChannel < inputChannels.size(); outputChannel++) {
Expand All @@ -183,7 +203,11 @@ public void addInput(Page page)
if (sampleWeightChannel.isPresent()) {
sampleWeightBlock = page.getBlock(sampleWeightChannel.get());
}
pageSink.appendPage(new Page(blocks), sampleWeightBlock);

CompletableFuture<?> future = pageSink.appendPage(new Page(blocks), sampleWeightBlock);
if (!future.isDone()) {
this.blocked = MoreFutures.toListenableFuture(future);
}
rowCount += page.getPositionCount();
}

Expand Down
@@ -0,0 +1,203 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.operator;

import com.facebook.presto.metadata.OutputTableHandle;
import com.facebook.presto.spi.ConnectorInsertTableHandle;
import com.facebook.presto.spi.ConnectorOutputTableHandle;
import com.facebook.presto.spi.ConnectorPageSink;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.Page;
import com.facebook.presto.spi.block.Block;
import com.facebook.presto.spi.connector.ConnectorPageSinkProvider;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
import com.facebook.presto.split.PageSinkManager;
import com.facebook.presto.sql.planner.plan.PlanNodeId;
import com.facebook.presto.sql.planner.plan.TableWriterNode;
import com.google.common.collect.ImmutableList;
import io.airlift.slice.Slice;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;

import static com.facebook.presto.RowPagesBuilder.rowPagesBuilder;
import static com.facebook.presto.SessionTestUtils.TEST_SESSION;
import static com.facebook.presto.operator.PageAssertions.assertPageEquals;
import static com.facebook.presto.spi.type.BigintType.BIGINT;
import static com.facebook.presto.testing.TestingTaskContext.createTaskContext;
import static io.airlift.concurrent.Threads.daemonThreadsNamed;
import static java.util.concurrent.Executors.newCachedThreadPool;
import static org.testng.Assert.assertEquals;

public class TestTableWriterOperator
{
private static final String CONNECTOR_ID = "testConnectorId";
private PageSinkManager pageSinkProvider;
private BlockingPageSink blockingPageSink;
private DriverContext driverContext;

@BeforeMethod
public void setUp()
throws Exception
{
pageSinkProvider = new PageSinkManager();
blockingPageSink = new BlockingPageSink();
ExecutorService executor = newCachedThreadPool(daemonThreadsNamed("test-%s"));
driverContext = createTaskContext(executor, TEST_SESSION)
.addPipelineContext(true, true)
.addDriverContext();

pageSinkProvider.addConnectorPageSinkProvider(CONNECTOR_ID, new ConstantPageSinkProvider(blockingPageSink));
}

@Test
public void testBlockedPageSink()
throws Exception
{
Operator operator = createTableWriterOperator();

// initial state validation
assertEquals(operator.isBlocked().isDone(), true);
assertEquals(operator.isFinished(), false);
assertEquals(operator.needsInput(), true);

// blockingPageSink that will return blocked future
operator.addInput(rowPagesBuilder(BIGINT).row(42).build().get(0));

assertEquals(operator.isBlocked().isDone(), false);
assertEquals(operator.isFinished(), false);
assertEquals(operator.needsInput(), false);
assertEquals(operator.getOutput(), null);

// complete previously blocked future
blockingPageSink.complete();

assertEquals(operator.isBlocked().isDone(), true);
assertEquals(operator.isFinished(), false);
assertEquals(operator.needsInput(), true);

// add second page
operator.addInput(rowPagesBuilder(BIGINT).row(44).build().get(0));

assertEquals(operator.isBlocked().isDone(), false);
assertEquals(operator.isFinished(), false);
assertEquals(operator.needsInput(), false);

// finish operator, state hasn't changed
operator.finish();

assertEquals(operator.isBlocked().isDone(), false);
assertEquals(operator.isFinished(), false);
assertEquals(operator.needsInput(), false);

// complete previously blocked future
blockingPageSink.complete();
// and getOutput which actually finishes the operator
assertPageEquals(
TableWriterOperator.TYPES,
operator.getOutput(),
rowPagesBuilder(TableWriterOperator.TYPES).row(2, null).build().get(0));

assertEquals(operator.isBlocked().isDone(), true);
assertEquals(operator.isFinished(), true);
assertEquals(operator.needsInput(), false);
}

@Test(expectedExceptions = IllegalStateException.class)
public void addInputFailsOnBlockedOperator()
throws Exception
{
Operator operator = createTableWriterOperator();

operator.addInput(rowPagesBuilder(BIGINT).row(42).build().get(0));

assertEquals(operator.isBlocked().isDone(), false);
assertEquals(operator.needsInput(), false);

operator.addInput(rowPagesBuilder(BIGINT).row(42).build().get(0));
}

private Operator createTableWriterOperator()
{
TableWriterOperator.TableWriterOperatorFactory factory = new TableWriterOperator.TableWriterOperatorFactory(
0,
new PlanNodeId("test"),
pageSinkProvider,
new TableWriterNode.CreateHandle(new OutputTableHandle(
CONNECTOR_ID,
new ConnectorTransactionHandle() {},
new ConnectorOutputTableHandle() {})),
ImmutableList.of(0),
Optional.empty(),
TEST_SESSION);

return factory.createOperator(driverContext);
}

private static class ConstantPageSinkProvider
implements ConnectorPageSinkProvider
{
private final ConnectorPageSink pageSink;

private ConstantPageSinkProvider(ConnectorPageSink pageSink)
{
this.pageSink = pageSink;
}

@Override
public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorOutputTableHandle outputTableHandle)
{
return pageSink;
}

@Override
public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorInsertTableHandle insertTableHandle)
{
return pageSink;
}
}

private class BlockingPageSink
implements ConnectorPageSink
{
private CompletableFuture<?> future = new CompletableFuture<>();

@Override
public CompletableFuture<?> appendPage(Page page, Block sampleWeightBlock)
{
future = new CompletableFuture<>();
return future;
}

@Override
public Collection<Slice> finish()
{
return ImmutableList.of();
}

@Override
public void abort()
{
}

public void complete()
{
future.complete(null);
}
}
}
Expand Up @@ -39,6 +39,7 @@
import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.concurrent.CompletableFuture;

import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.facebook.presto.spi.type.BigintType.BIGINT;
Expand Down Expand Up @@ -103,17 +104,18 @@ public RaptorPageSink(
}

@Override
public void appendPage(Page page, Block sampleWeightBlock)
public CompletableFuture<?> appendPage(Page page, Block sampleWeightBlock)
{
if (page.getPositionCount() == 0) {
return;
return NOT_BLOCKED;
}

if (sampleWeightField >= 0) {
page = createPageWithSampleWeightBlock(page, sampleWeightBlock);
}

pageWriter.appendPage(page);
return NOT_BLOCKED;
}

@Override
Expand Down
Expand Up @@ -17,10 +17,18 @@
import io.airlift.slice.Slice;

import java.util.Collection;
import java.util.concurrent.CompletableFuture;

public interface ConnectorPageSink
{
void appendPage(Page page, Block sampleWeightBlock);
CompletableFuture<?> NOT_BLOCKED = CompletableFuture.completedFuture(null);

/**
* Returns a future that will be completed when the page sink can accept
* more pages. If the page sink can accept more pages immediately,
* this method should return {@code NOT_BLOCKED}.
*/
CompletableFuture<?> appendPage(Page page, Block sampleWeightBlock);

Collection<Slice> finish();

Expand Down

0 comments on commit b952af4

Please sign in to comment.