Skip to content

Commit

Permalink
Add PlanNodeId to OperatorStats
Browse files Browse the repository at this point in the history
  • Loading branch information
cberner committed Jan 9, 2016
1 parent 7e23257 commit afb65e4
Show file tree
Hide file tree
Showing 87 changed files with 542 additions and 273 deletions.
Expand Up @@ -25,6 +25,7 @@
import com.facebook.presto.operator.TaskContext;
import com.facebook.presto.operator.TaskStats;
import com.facebook.presto.spi.type.Type;
import com.facebook.presto.sql.planner.plan.PlanNodeId;
import com.facebook.presto.testing.LocalQueryRunner;
import com.facebook.presto.util.CpuTimer;
import com.facebook.presto.util.CpuTimer.CpuDuration;
Expand Down Expand Up @@ -61,22 +62,23 @@ protected AbstractOperatorBenchmark(
this.localQueryRunner = requireNonNull(localQueryRunner, "localQueryRunner is null");
}

protected OperatorFactory createTableScanOperator(int operatorId, String tableName, String... columnNames)
protected OperatorFactory createTableScanOperator(int operatorId, PlanNodeId planNodeId, String tableName, String... columnNames)
{
return localQueryRunner.createTableScanOperator(operatorId, tableName, columnNames);
return localQueryRunner.createTableScanOperator(operatorId, planNodeId, tableName, columnNames);
}

public OperatorFactory createTableScanOperator(Session session, int operatorId, String tableName, String... columnNames)
public OperatorFactory createTableScanOperator(Session session, int operatorId, PlanNodeId planNodeId, String tableName, String... columnNames)
{
return localQueryRunner.createTableScanOperator(session,
operatorId,
planNodeId,
tableName,
columnNames);
}

protected OperatorFactory createHashProjectOperator(int operatorId, List<Type> types)
protected OperatorFactory createHashProjectOperator(int operatorId, PlanNodeId planNodeId, List<Type> types)
{
return localQueryRunner.createHashProjectOperator(operatorId, types);
return localQueryRunner.createHashProjectOperator(operatorId, planNodeId, types);
}

protected abstract List<Driver> createDrivers(TaskContext taskContext);
Expand Down
Expand Up @@ -18,6 +18,7 @@
import com.facebook.presto.operator.DriverFactory;
import com.facebook.presto.operator.OperatorFactory;
import com.facebook.presto.operator.TaskContext;
import com.facebook.presto.sql.planner.plan.PlanNodeId;
import com.facebook.presto.testing.LocalQueryRunner;
import com.facebook.presto.testing.NullOutputOperator.NullOutputOperatorFactory;
import com.google.common.collect.ImmutableList;
Expand All @@ -44,7 +45,7 @@ protected DriverFactory createDriverFactory()
{
List<OperatorFactory> operatorFactories = new ArrayList<>(createOperatorFactories());

operatorFactories.add(new NullOutputOperatorFactory(999, Iterables.getLast(operatorFactories).getTypes()));
operatorFactories.add(new NullOutputOperatorFactory(999, new PlanNodeId("test"), Iterables.getLast(operatorFactories).getTypes()));

return new DriverFactory(true, true, operatorFactories);
}
Expand Down
Expand Up @@ -16,6 +16,7 @@
import com.facebook.presto.operator.AggregationOperator.AggregationOperatorFactory;
import com.facebook.presto.operator.OperatorFactory;
import com.facebook.presto.sql.planner.plan.AggregationNode.Step;
import com.facebook.presto.sql.planner.plan.PlanNodeId;
import com.facebook.presto.testing.LocalQueryRunner;
import com.google.common.collect.ImmutableList;

Expand All @@ -36,8 +37,8 @@ public CountAggregationBenchmark(LocalQueryRunner localQueryRunner)
@Override
protected List<? extends OperatorFactory> createOperatorFactories()
{
OperatorFactory tableScanOperator = createTableScanOperator(0, "orders", "orderkey");
AggregationOperatorFactory aggregationOperator = new AggregationOperatorFactory(1, Step.SINGLE, ImmutableList.of(COUNT.bind(ImmutableList.of(0), Optional.empty(), Optional.empty(), 1.0)));
OperatorFactory tableScanOperator = createTableScanOperator(0, new PlanNodeId("test"), "orders", "orderkey");
AggregationOperatorFactory aggregationOperator = new AggregationOperatorFactory(1, new PlanNodeId("test"), Step.SINGLE, ImmutableList.of(COUNT.bind(ImmutableList.of(0), Optional.empty(), Optional.empty(), 1.0)));
return ImmutableList.of(tableScanOperator, aggregationOperator);
}

Expand Down
Expand Up @@ -16,6 +16,7 @@
import com.facebook.presto.operator.AggregationOperator.AggregationOperatorFactory;
import com.facebook.presto.operator.OperatorFactory;
import com.facebook.presto.sql.planner.plan.AggregationNode.Step;
import com.facebook.presto.sql.planner.plan.PlanNodeId;
import com.facebook.presto.testing.LocalQueryRunner;
import com.google.common.collect.ImmutableList;

Expand All @@ -36,8 +37,8 @@ public DoubleSumAggregationBenchmark(LocalQueryRunner localQueryRunner)
@Override
protected List<? extends OperatorFactory> createOperatorFactories()
{
OperatorFactory tableScanOperator = createTableScanOperator(0, "orders", "totalprice");
AggregationOperatorFactory aggregationOperator = new AggregationOperatorFactory(1, Step.SINGLE, ImmutableList.of(DOUBLE_SUM.bind(ImmutableList.of(0), Optional.empty(), Optional.empty(), 1.0)));
OperatorFactory tableScanOperator = createTableScanOperator(0, new PlanNodeId("test"), "orders", "totalprice");
AggregationOperatorFactory aggregationOperator = new AggregationOperatorFactory(1, new PlanNodeId("test"), Step.SINGLE, ImmutableList.of(DOUBLE_SUM.bind(ImmutableList.of(0), Optional.empty(), Optional.empty(), 1.0)));
return ImmutableList.of(tableScanOperator, aggregationOperator);
}

Expand Down
Expand Up @@ -24,6 +24,7 @@
import com.facebook.presto.spi.block.Block;
import com.facebook.presto.spi.type.Type;
import com.facebook.presto.sql.planner.plan.AggregationNode.Step;
import com.facebook.presto.sql.planner.plan.PlanNodeId;
import com.facebook.presto.testing.LocalQueryRunner;
import com.facebook.presto.util.DateTimeUtils;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -82,6 +83,7 @@ protected List<? extends OperatorFactory> createOperatorFactories()

OperatorFactory tableScanOperator = createTableScanOperator(
0,
new PlanNodeId("test"),
"lineitem",
"returnflag",
"linestatus",
Expand All @@ -94,6 +96,7 @@ protected List<? extends OperatorFactory> createOperatorFactories()
TpchQuery1OperatorFactory tpchQuery1Operator = new TpchQuery1OperatorFactory(1);
HashAggregationOperatorFactory aggregationOperator = new HashAggregationOperatorFactory(
2,
new PlanNodeId("test"),
ImmutableList.of(tpchQuery1Operator.getTypes().get(0), tpchQuery1Operator.getTypes().get(1)),
Ints.asList(0, 1),
Step.SINGLE,
Expand Down Expand Up @@ -145,7 +148,7 @@ public List<Type> getTypes()
@Override
public Operator createOperator(DriverContext driverContext)
{
OperatorContext operatorContext = driverContext.addOperatorContext(operatorId, TpchQuery1Operator.class.getSimpleName());
OperatorContext operatorContext = driverContext.addOperatorContext(operatorId, new PlanNodeId("test"), TpchQuery1Operator.class.getSimpleName());
return new TpchQuery1Operator(operatorContext);
}

Expand Down
Expand Up @@ -23,6 +23,7 @@
import com.facebook.presto.spi.block.Block;
import com.facebook.presto.spi.type.Type;
import com.facebook.presto.sql.planner.plan.AggregationNode.Step;
import com.facebook.presto.sql.planner.plan.PlanNodeId;
import com.facebook.presto.testing.LocalQueryRunner;
import com.facebook.presto.util.DateTimeUtils;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -54,12 +55,13 @@ protected List<? extends OperatorFactory> createOperatorFactories()
// and discount >= 0.05
// and discount <= 0.07
// and quantity < 24;
OperatorFactory tableScanOperator = createTableScanOperator(0, "lineitem", "extendedprice", "discount", "shipdate", "quantity");
OperatorFactory tableScanOperator = createTableScanOperator(0, new PlanNodeId("test"), "lineitem", "extendedprice", "discount", "shipdate", "quantity");

FilterAndProjectOperator.FilterAndProjectOperatorFactory tpchQuery6Operator = new FilterAndProjectOperator.FilterAndProjectOperatorFactory(1, new TpchQuery6Processor(), ImmutableList.<Type>of(DOUBLE));
FilterAndProjectOperator.FilterAndProjectOperatorFactory tpchQuery6Operator = new FilterAndProjectOperator.FilterAndProjectOperatorFactory(1, new PlanNodeId("test"), new TpchQuery6Processor(), ImmutableList.<Type>of(DOUBLE));

AggregationOperatorFactory aggregationOperator = new AggregationOperatorFactory(
2,
new PlanNodeId("test"),
Step.SINGLE,
ImmutableList.of(
DOUBLE_SUM.bind(ImmutableList.of(0), Optional.empty(), Optional.empty(), 1.0)
Expand Down
Expand Up @@ -17,6 +17,7 @@
import com.facebook.presto.operator.OperatorFactory;
import com.facebook.presto.spi.type.Type;
import com.facebook.presto.sql.planner.plan.AggregationNode.Step;
import com.facebook.presto.sql.planner.plan.PlanNodeId;
import com.facebook.presto.testing.LocalQueryRunner;
import com.google.common.collect.ImmutableList;
import com.google.common.primitives.Ints;
Expand All @@ -40,9 +41,10 @@ public HashAggregationBenchmark(LocalQueryRunner localQueryRunner)
@Override
protected List<? extends OperatorFactory> createOperatorFactories()
{
OperatorFactory tableScanOperator = createTableScanOperator(0, "orders", "orderstatus", "totalprice");
OperatorFactory tableScanOperator = createTableScanOperator(0, new PlanNodeId("test"), "orders", "orderstatus", "totalprice");
List<Type> types = ImmutableList.of(tableScanOperator.getTypes().get(0));
HashAggregationOperatorFactory aggregationOperator = new HashAggregationOperatorFactory(1,
new PlanNodeId("test"),
types,
Ints.asList(0),
Step.SINGLE,
Expand Down
Expand Up @@ -22,6 +22,7 @@
import com.facebook.presto.operator.OperatorFactory;
import com.facebook.presto.operator.TaskContext;
import com.facebook.presto.spi.type.Type;
import com.facebook.presto.sql.planner.plan.PlanNodeId;
import com.facebook.presto.testing.LocalQueryRunner;
import com.facebook.presto.testing.NullOutputOperator.NullOutputOperatorFactory;
import com.google.common.collect.ImmutableList;
Expand All @@ -40,8 +41,8 @@ public class HashBuildAndJoinBenchmark
extends AbstractOperatorBenchmark
{
private final boolean hashEnabled;
private final OperatorFactory ordersTableScan = createTableScanOperator(0, "orders", "orderkey", "totalprice");
private final OperatorFactory lineItemTableScan = createTableScanOperator(0, "lineitem", "orderkey", "quantity");
private final OperatorFactory ordersTableScan = createTableScanOperator(0, new PlanNodeId("test"), "orders", "orderkey", "totalprice");
private final OperatorFactory lineItemTableScan = createTableScanOperator(0, new PlanNodeId("test"), "lineitem", "orderkey", "quantity");

public HashBuildAndJoinBenchmark(Session session, LocalQueryRunner localQueryRunner)
{
Expand All @@ -66,13 +67,13 @@ protected List<Driver> createDrivers(TaskContext taskContext)
OperatorFactory source = ordersTableScan;
Optional<Integer> hashChannel = Optional.empty();
if (hashEnabled) {
source = createHashProjectOperator(1, ImmutableList.<Type>of(BIGINT, DOUBLE));
source = createHashProjectOperator(1, new PlanNodeId("test"), ImmutableList.<Type>of(BIGINT, DOUBLE));
driversBuilder.add(source);
hashChannel = Optional.of(2);
}

// hash build
HashBuilderOperatorFactory hashBuilder = new HashBuilderOperatorFactory(2, source.getTypes(), Ints.asList(0), hashChannel, 1_500_000);
HashBuilderOperatorFactory hashBuilder = new HashBuilderOperatorFactory(2, new PlanNodeId("test"), source.getTypes(), Ints.asList(0), hashChannel, 1_500_000);
driversBuilder.add(hashBuilder);
DriverFactory hashBuildDriverFactory = new DriverFactory(true, false, driversBuilder.build());
Driver hashBuildDriver = hashBuildDriverFactory.createDriver(taskContext.addPipelineContext(true, false).addDriverContext());
Expand All @@ -83,14 +84,14 @@ protected List<Driver> createDrivers(TaskContext taskContext)
source = lineItemTableScan;
hashChannel = Optional.empty();
if (hashEnabled) {
source = createHashProjectOperator(1, ImmutableList.<Type>of(BIGINT, BIGINT));
source = createHashProjectOperator(1, new PlanNodeId("test"), ImmutableList.<Type>of(BIGINT, BIGINT));
joinDriversBuilder.add(source);
hashChannel = Optional.of(2);
}

OperatorFactory joinOperator = LookupJoinOperators.innerJoin(2, hashBuilder.getLookupSourceSupplier(), source.getTypes(), Ints.asList(0), hashChannel);
OperatorFactory joinOperator = LookupJoinOperators.innerJoin(2, new PlanNodeId("test"), hashBuilder.getLookupSourceSupplier(), source.getTypes(), Ints.asList(0), hashChannel);
joinDriversBuilder.add(joinOperator);
joinDriversBuilder.add(new NullOutputOperatorFactory(3, joinOperator.getTypes()));
joinDriversBuilder.add(new NullOutputOperatorFactory(3, new PlanNodeId("test"), joinOperator.getTypes()));
DriverFactory joinDriverFactory = new DriverFactory(true, true, joinDriversBuilder.build());
Driver joinDriver = joinDriverFactory.createDriver(taskContext.addPipelineContext(true, true).addDriverContext());

Expand Down
Expand Up @@ -18,6 +18,7 @@
import com.facebook.presto.operator.HashBuilderOperator.HashBuilderOperatorFactory;
import com.facebook.presto.operator.OperatorFactory;
import com.facebook.presto.operator.TaskContext;
import com.facebook.presto.sql.planner.plan.PlanNodeId;
import com.facebook.presto.testing.LocalQueryRunner;
import com.google.common.collect.ImmutableList;
import com.google.common.primitives.Ints;
Expand All @@ -38,8 +39,8 @@ public HashBuildBenchmark(LocalQueryRunner localQueryRunner)
@Override
protected List<Driver> createDrivers(TaskContext taskContext)
{
OperatorFactory ordersTableScan = createTableScanOperator(0, "orders", "orderkey", "totalprice");
HashBuilderOperatorFactory hashBuilder = new HashBuilderOperatorFactory(1, ordersTableScan.getTypes(), Ints.asList(0), Optional.empty(), 1_500_000);
OperatorFactory ordersTableScan = createTableScanOperator(0, new PlanNodeId("test"), "orders", "orderkey", "totalprice");
HashBuilderOperatorFactory hashBuilder = new HashBuilderOperatorFactory(1, new PlanNodeId("test"), ordersTableScan.getTypes(), Ints.asList(0), Optional.empty(), 1_500_000);

DriverFactory driverFactory = new DriverFactory(true, true, ordersTableScan, hashBuilder);
Driver driver = driverFactory.createDriver(taskContext.addPipelineContext(true, true).addDriverContext());
Expand Down
Expand Up @@ -21,6 +21,7 @@
import com.facebook.presto.operator.LookupSourceSupplier;
import com.facebook.presto.operator.OperatorFactory;
import com.facebook.presto.operator.TaskContext;
import com.facebook.presto.sql.planner.plan.PlanNodeId;
import com.facebook.presto.testing.LocalQueryRunner;
import com.facebook.presto.testing.NullOutputOperator.NullOutputOperatorFactory;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -50,8 +51,8 @@ from lineitem join orders using (orderkey)
protected List<Driver> createDrivers(TaskContext taskContext)
{
if (lookupSourceSupplier == null) {
OperatorFactory ordersTableScan = createTableScanOperator(0, "orders", "orderkey", "totalprice");
HashBuilderOperatorFactory hashBuilder = new HashBuilderOperatorFactory(1, ordersTableScan.getTypes(), Ints.asList(0), Optional.empty(), 1_500_000);
OperatorFactory ordersTableScan = createTableScanOperator(0, new PlanNodeId("test"), "orders", "orderkey", "totalprice");
HashBuilderOperatorFactory hashBuilder = new HashBuilderOperatorFactory(1, new PlanNodeId("test"), ordersTableScan.getTypes(), Ints.asList(0), Optional.empty(), 1_500_000);

DriverContext driverContext = taskContext.addPipelineContext(false, false).addDriverContext();
Driver driver = new DriverFactory(false, false, ordersTableScan, hashBuilder).createDriver(driverContext);
Expand All @@ -61,11 +62,11 @@ protected List<Driver> createDrivers(TaskContext taskContext)
lookupSourceSupplier = hashBuilder.getLookupSourceSupplier();
}

OperatorFactory lineItemTableScan = createTableScanOperator(0, "lineitem", "orderkey", "quantity");
OperatorFactory lineItemTableScan = createTableScanOperator(0, new PlanNodeId("test"), "lineitem", "orderkey", "quantity");

OperatorFactory joinOperator = LookupJoinOperators.innerJoin(1, lookupSourceSupplier, lineItemTableScan.getTypes(), Ints.asList(0), Optional.empty());
OperatorFactory joinOperator = LookupJoinOperators.innerJoin(1, new PlanNodeId("test"), lookupSourceSupplier, lineItemTableScan.getTypes(), Ints.asList(0), Optional.empty());

NullOutputOperatorFactory output = new NullOutputOperatorFactory(2, joinOperator.getTypes());
NullOutputOperatorFactory output = new NullOutputOperatorFactory(2, new PlanNodeId("test"), joinOperator.getTypes());

DriverFactory driverFactory = new DriverFactory(true, true, lineItemTableScan, joinOperator, output);
DriverContext driverContext = taskContext.addPipelineContext(true, true).addDriverContext();
Expand Down
Expand Up @@ -16,6 +16,7 @@
import com.facebook.presto.operator.LimitOperator.LimitOperatorFactory;
import com.facebook.presto.operator.OperatorFactory;
import com.facebook.presto.operator.OrderByOperator.OrderByOperatorFactory;
import com.facebook.presto.sql.planner.plan.PlanNodeId;
import com.facebook.presto.testing.LocalQueryRunner;
import com.google.common.collect.ImmutableList;

Expand All @@ -37,12 +38,13 @@ public OrderByBenchmark(LocalQueryRunner localQueryRunner)
@Override
protected List<? extends OperatorFactory> createOperatorFactories()
{
OperatorFactory tableScanOperator = createTableScanOperator(0, "orders", "totalprice", "clerk");
OperatorFactory tableScanOperator = createTableScanOperator(0, new PlanNodeId("test"), "orders", "totalprice", "clerk");

LimitOperatorFactory limitOperator = new LimitOperatorFactory(1, tableScanOperator.getTypes(), ROWS);
LimitOperatorFactory limitOperator = new LimitOperatorFactory(1, new PlanNodeId("test"), tableScanOperator.getTypes(), ROWS);

OrderByOperatorFactory orderByOperator = new OrderByOperatorFactory(
2,
new PlanNodeId("test"),
limitOperator.getTypes(),
ImmutableList.of(1),
ROWS,
Expand Down
Expand Up @@ -20,6 +20,7 @@
import com.facebook.presto.spi.RecordCursor;
import com.facebook.presto.spi.block.Block;
import com.facebook.presto.spi.type.Type;
import com.facebook.presto.sql.planner.plan.PlanNodeId;
import com.facebook.presto.testing.LocalQueryRunner;
import com.google.common.collect.ImmutableList;

Expand All @@ -40,9 +41,10 @@ public PredicateFilterBenchmark(LocalQueryRunner localQueryRunner)
@Override
protected List<? extends OperatorFactory> createOperatorFactories()
{
OperatorFactory tableScanOperator = createTableScanOperator(0, "orders", "totalprice");
OperatorFactory tableScanOperator = createTableScanOperator(0, new PlanNodeId("test"), "orders", "totalprice");
FilterAndProjectOperator.FilterAndProjectOperatorFactory filterAndProjectOperator = new FilterAndProjectOperator.FilterAndProjectOperatorFactory(
1,
new PlanNodeId("test"),
new GenericPageProcessor(new DoubleFilter(50000.00), ImmutableList.of(singleColumn(DOUBLE, 0))),
ImmutableList.<Type>of(DOUBLE));

Expand Down

0 comments on commit afb65e4

Please sign in to comment.