Skip to content

Commit

Permalink
Make TestTableWriterOperator safe for parallel testing
Browse files Browse the repository at this point in the history
Fixes #4669
  • Loading branch information
dain committed Feb 29, 2016
1 parent b952af4 commit 9968aaf
Showing 1 changed file with 20 additions and 17 deletions.
Expand Up @@ -27,7 +27,8 @@
import com.facebook.presto.sql.planner.plan.TableWriterNode;
import com.google.common.collect.ImmutableList;
import io.airlift.slice.Slice;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import java.util.Collection;
Expand All @@ -47,29 +48,26 @@
public class TestTableWriterOperator
{
private static final String CONNECTOR_ID = "testConnectorId";
private PageSinkManager pageSinkProvider;
private BlockingPageSink blockingPageSink;
private DriverContext driverContext;
private ExecutorService executor;

@BeforeMethod
@BeforeClass
public void setUp()
throws Exception
{
pageSinkProvider = new PageSinkManager();
blockingPageSink = new BlockingPageSink();
ExecutorService executor = newCachedThreadPool(daemonThreadsNamed("test-%s"));
driverContext = createTaskContext(executor, TEST_SESSION)
.addPipelineContext(true, true)
.addDriverContext();
executor = newCachedThreadPool(daemonThreadsNamed("test-%s"));
}

pageSinkProvider.addConnectorPageSinkProvider(CONNECTOR_ID, new ConstantPageSinkProvider(blockingPageSink));
@AfterClass(alwaysRun = true)
public void tearDown()
{
executor.shutdownNow();
}

@Test
public void testBlockedPageSink()
throws Exception
{
Operator operator = createTableWriterOperator();
BlockingPageSink blockingPageSink = new BlockingPageSink();
Operator operator = createTableWriterOperator(blockingPageSink);

// initial state validation
assertEquals(operator.isBlocked().isDone(), true);
Expand Down Expand Up @@ -122,7 +120,7 @@ public void testBlockedPageSink()
public void addInputFailsOnBlockedOperator()
throws Exception
{
Operator operator = createTableWriterOperator();
Operator operator = createTableWriterOperator(new BlockingPageSink());

operator.addInput(rowPagesBuilder(BIGINT).row(42).build().get(0));

Expand All @@ -132,8 +130,11 @@ public void addInputFailsOnBlockedOperator()
operator.addInput(rowPagesBuilder(BIGINT).row(42).build().get(0));
}

private Operator createTableWriterOperator()
private Operator createTableWriterOperator(BlockingPageSink blockingPageSink)
{
PageSinkManager pageSinkProvider = new PageSinkManager();
pageSinkProvider.addConnectorPageSinkProvider(CONNECTOR_ID, new ConstantPageSinkProvider(blockingPageSink));

TableWriterOperator.TableWriterOperatorFactory factory = new TableWriterOperator.TableWriterOperatorFactory(
0,
new PlanNodeId("test"),
Expand All @@ -146,7 +147,9 @@ private Operator createTableWriterOperator()
Optional.empty(),
TEST_SESSION);

return factory.createOperator(driverContext);
return factory.createOperator(createTaskContext(executor, TEST_SESSION)
.addPipelineContext(true, true)
.addDriverContext());
}

private static class ConstantPageSinkProvider
Expand Down

0 comments on commit 9968aaf

Please sign in to comment.