Skip to content

Commit

Permalink
Implement skeleton for coordinator-only execute
Browse files Browse the repository at this point in the history
  • Loading branch information
homar authored and findepi committed Apr 1, 2022
1 parent f5c627a commit 1c1fb30
Show file tree
Hide file tree
Showing 23 changed files with 380 additions and 10 deletions.
2 changes: 2 additions & 0 deletions core/trino-main/src/main/java/io/trino/metadata/Metadata.java
Expand Up @@ -100,6 +100,8 @@ Optional<TableExecuteHandle> getTableHandleForExecute(

void finishTableExecute(Session session, TableExecuteHandle handle, Collection<Slice> fragments, List<Object> tableExecuteState);

void executeTableExecute(Session session, TableExecuteHandle handle);

TableProperties getTableProperties(Session session, TableHandle handle);

/**
Expand Down
Expand Up @@ -363,6 +363,14 @@ public void finishTableExecute(Session session, TableExecuteHandle tableExecuteH
metadata.finishTableExecute(session.toConnectorSession(catalogName), tableExecuteHandle.getConnectorHandle(), fragments, tableExecuteState);
}

@Override
public void executeTableExecute(Session session, TableExecuteHandle tableExecuteHandle)
{
CatalogName catalogName = tableExecuteHandle.getCatalogName();
ConnectorMetadata metadata = getMetadata(session, catalogName);
metadata.executeTableExecute(session.toConnectorSession(catalogName), tableExecuteHandle.getConnectorHandle());
}

@Override
public Optional<SystemTable> getSystemTable(Session session, QualifiedObjectName tableName)
{
Expand Down
Expand Up @@ -91,6 +91,6 @@ public int hashCode()
@Override
public String toString()
{
return "Execute[" + catalogName + ":" + connectorHandle + "]";
return catalogName + ":" + connectorHandle;
}
}
@@ -0,0 +1,141 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.operator;

import io.trino.Session;
import io.trino.metadata.Metadata;
import io.trino.metadata.TableExecuteHandle;
import io.trino.spi.Page;
import io.trino.sql.planner.plan.PlanNodeId;

import static com.google.common.base.Preconditions.checkState;
import static java.util.Objects.requireNonNull;

public class SimpleTableExecuteOperator
implements Operator
{
private static final Page PAGE = new Page(0);

public static class SimpleTableExecuteOperatorOperatorFactory
implements OperatorFactory
{
private final int operatorId;
private final PlanNodeId planNodeId;
private final Metadata metadata;
private final Session session;
private final TableExecuteHandle executeHandle;
private boolean closed;

public SimpleTableExecuteOperatorOperatorFactory(
int operatorId,
PlanNodeId planNodeId,
Metadata metadata,
Session session,
TableExecuteHandle executeHandle)
{
this.operatorId = operatorId;
this.planNodeId = requireNonNull(planNodeId, "planNodeId is null");
this.metadata = requireNonNull(metadata, "planNodeId is null");
this.session = requireNonNull(session, "planNodeId is null");
this.executeHandle = requireNonNull(executeHandle, "executeHandle is null");
}

@Override
public Operator createOperator(DriverContext driverContext)
{
checkState(!closed, "Factory is already closed");
OperatorContext context = driverContext.addOperatorContext(operatorId, planNodeId, SimpleTableExecuteOperator.class.getSimpleName());
return new SimpleTableExecuteOperator(
context,
metadata,
session,
executeHandle);
}

@Override
public void noMoreOperators()
{
closed = true;
}

@Override
public OperatorFactory duplicate()
{
return new SimpleTableExecuteOperatorOperatorFactory(
operatorId,
planNodeId,
metadata,
session,
executeHandle);
}
}

private final OperatorContext operatorContext;
private final Metadata metadata;
private final Session session;
private final TableExecuteHandle executeHandle;

private boolean finished;

public SimpleTableExecuteOperator(
OperatorContext operatorContext,
Metadata metadata,
Session session,
TableExecuteHandle executeHandle)
{
this.operatorContext = requireNonNull(operatorContext, "operatorContext is null");
this.metadata = requireNonNull(metadata, "metadata is null");
this.session = requireNonNull(session, "session is null");
this.executeHandle = requireNonNull(executeHandle, "executeHandle is null");
}

@Override
public OperatorContext getOperatorContext()
{
return operatorContext;
}

@Override
public boolean needsInput()
{
return false;
}

@Override
public void addInput(Page page)
{
throw new UnsupportedOperationException();
}

@Override
public Page getOutput()
{
if (finished) {
return null;
}

metadata.executeTableExecute(session, executeHandle);
finished = true;
return PAGE;
}

@Override
public void finish() {}

@Override
public boolean isFinished()
{
return finished;
}
}
11 changes: 11 additions & 0 deletions core/trino-main/src/main/java/io/trino/sql/analyzer/Analysis.java
Expand Up @@ -114,6 +114,7 @@ public class Analysis
private String updateType;
private Optional<UpdateTarget> target = Optional.empty();
private boolean skipMaterializedViewRefresh;
private Optional<Boolean> tableExecuteReadsData;

private final Map<NodeRef<Table>, Query> namedQueries = new LinkedHashMap<>();

Expand Down Expand Up @@ -276,6 +277,16 @@ public void setSkipMaterializedViewRefresh(boolean skipMaterializedViewRefresh)
this.skipMaterializedViewRefresh = skipMaterializedViewRefresh;
}

public boolean isTableExecuteReadsData()
{
return tableExecuteReadsData.orElseThrow(() -> new IllegalStateException("tableExecuteReadsData not set"));
}

public void setTableExecuteReadsData(boolean readsData)
{
this.tableExecuteReadsData = Optional.of(readsData);
}

public void setAggregates(QuerySpecification node, List<FunctionCall> aggregates)
{
this.aggregates.put(NodeRef.of(node), ImmutableList.copyOf(aggregates));
Expand Down
Expand Up @@ -1096,6 +1096,7 @@ protected Scope visitTableExecute(TableExecute node, Optional<Scope> scope)
tableProperties)
.orElseThrow(() -> semanticException(NOT_SUPPORTED, node, "Procedure '%s' cannot be executed on table '%s'", procedureName, tableName));

analysis.setTableExecuteReadsData(procedureMetadata.getExecutionMode().isReadsData());
analysis.setTableExecuteHandle(executeHandle);

analysis.setUpdateType("ALTER TABLE EXECUTE");
Expand Down
Expand Up @@ -83,6 +83,7 @@
import io.trino.operator.ScanFilterAndProjectOperator.ScanFilterAndProjectOperatorFactory;
import io.trino.operator.SetBuilderOperator.SetBuilderOperatorFactory;
import io.trino.operator.SetBuilderOperator.SetSupplier;
import io.trino.operator.SimpleTableExecuteOperator.SimpleTableExecuteOperatorOperatorFactory;
import io.trino.operator.SourceOperatorFactory;
import io.trino.operator.SpatialIndexBuilderOperator.SpatialIndexBuilderOperatorFactory;
import io.trino.operator.SpatialIndexBuilderOperator.SpatialPredicate;
Expand Down Expand Up @@ -209,6 +210,7 @@
import io.trino.sql.planner.plan.RowNumberNode;
import io.trino.sql.planner.plan.SampleNode;
import io.trino.sql.planner.plan.SemiJoinNode;
import io.trino.sql.planner.plan.SimpleTableExecuteNode;
import io.trino.sql.planner.plan.SortNode;
import io.trino.sql.planner.plan.SpatialJoinNode;
import io.trino.sql.planner.plan.StatisticAggregationsDescriptor;
Expand Down Expand Up @@ -3327,6 +3329,21 @@ private List<Integer> createColumnValueAndRowIdChannels(List<Symbol> outputSymbo
return Arrays.asList(columnValueAndRowIdChannels);
}

@Override
public PhysicalOperation visitSimpleTableExecuteNode(SimpleTableExecuteNode node, LocalExecutionPlanContext context)
{
context.setDriverInstanceCount(1);
SimpleTableExecuteOperatorOperatorFactory operatorFactory =
new SimpleTableExecuteOperatorOperatorFactory(
context.getNextOperatorId(),
node.getId(),
metadata,
session,
node.getExecuteHandle());

return new PhysicalOperation(operatorFactory, makeLayout(node), context, UNGROUPED_EXECUTION);
}

@Override
public PhysicalOperation visitTableExecute(TableExecuteNode node, LocalExecutionPlanContext context)
{
Expand Down
Expand Up @@ -60,6 +60,7 @@
import io.trino.sql.planner.plan.PlanNode;
import io.trino.sql.planner.plan.ProjectNode;
import io.trino.sql.planner.plan.RefreshMaterializedViewNode;
import io.trino.sql.planner.plan.SimpleTableExecuteNode;
import io.trino.sql.planner.plan.StatisticAggregations;
import io.trino.sql.planner.plan.StatisticsWriterNode;
import io.trino.sql.planner.plan.TableExecuteNode;
Expand Down Expand Up @@ -826,10 +827,18 @@ private static Map<NodeRef<LambdaArgumentDeclaration>, Symbol> buildLambdaDeclar
private RelationPlan createTableExecutePlan(Analysis analysis, TableExecute statement)
{
Table table = statement.getTable();
TableHandle tableHandle = analysis.getTableHandle(table);
QualifiedObjectName tableName = createQualifiedObjectName(session, statement, table.getName());
TableExecuteHandle executeHandle = analysis.getTableExecuteHandle().orElseThrow();

if (!analysis.isTableExecuteReadsData()) {
SimpleTableExecuteNode node = new SimpleTableExecuteNode(
idAllocator.getNextId(),
symbolAllocator.newSymbol("rows", BIGINT),
executeHandle);
return new RelationPlan(node, analysis.getRootScope(), node.getOutputSymbols(), Optional.empty());
}

TableHandle tableHandle = analysis.getTableHandle(table);
RelationPlan tableScanPlan = createRelationPlan(analysis, table);
PlanBuilder sourcePlanBuilder = newPlanBuilder(tableScanPlan, analysis, ImmutableMap.of(), ImmutableMap.of());
if (statement.getWhere().isPresent()) {
Expand Down
Expand Up @@ -46,6 +46,7 @@
import io.trino.sql.planner.plan.RemoteSourceNode;
import io.trino.sql.planner.plan.RowNumberNode;
import io.trino.sql.planner.plan.SimplePlanRewriter;
import io.trino.sql.planner.plan.SimpleTableExecuteNode;
import io.trino.sql.planner.plan.StatisticsWriterNode;
import io.trino.sql.planner.plan.TableDeleteNode;
import io.trino.sql.planner.plan.TableFinishNode;
Expand Down Expand Up @@ -301,6 +302,13 @@ public PlanNode visitStatisticsWriterNode(StatisticsWriterNode node, RewriteCont
return context.defaultRewrite(node, context.get());
}

@Override
public PlanNode visitSimpleTableExecuteNode(SimpleTableExecuteNode node, RewriteContext<FragmentProperties> context)
{
context.get().setCoordinatorOnlyDistribution();
return context.defaultRewrite(node, context.get());
}

@Override
public PlanNode visitTableFinish(TableFinishNode node, RewriteContext<FragmentProperties> context)
{
Expand Down
Expand Up @@ -61,6 +61,7 @@
import io.trino.sql.planner.plan.RefreshMaterializedViewNode;
import io.trino.sql.planner.plan.RowNumberNode;
import io.trino.sql.planner.plan.SemiJoinNode;
import io.trino.sql.planner.plan.SimpleTableExecuteNode;
import io.trino.sql.planner.plan.SortNode;
import io.trino.sql.planner.plan.SpatialJoinNode;
import io.trino.sql.planner.plan.StatisticsWriterNode;
Expand Down Expand Up @@ -600,6 +601,16 @@ public PlanWithProperties visitTableExecute(TableExecuteNode node, PreferredProp
return visitTableWriter(node, node.getPartitioningScheme(), node.getSource(), preferredProperties);
}

@Override
public PlanWithProperties visitSimpleTableExecuteNode(SimpleTableExecuteNode node, PreferredProperties context)
{
return new PlanWithProperties(
node,
ActualProperties.builder()
.global(singleStreamPartition())
.build());
}

private PlanWithProperties visitTableWriter(PlanNode node, Optional<PartitioningScheme> partitioningScheme, PlanNode source, PreferredProperties preferredProperties)
{
PlanWithProperties newSource = source.accept(this, preferredProperties);
Expand Down
Expand Up @@ -49,6 +49,7 @@
import io.trino.sql.planner.plan.ProjectNode;
import io.trino.sql.planner.plan.RowNumberNode;
import io.trino.sql.planner.plan.SemiJoinNode;
import io.trino.sql.planner.plan.SimpleTableExecuteNode;
import io.trino.sql.planner.plan.SortNode;
import io.trino.sql.planner.plan.SpatialJoinNode;
import io.trino.sql.planner.plan.StatisticsWriterNode;
Expand Down Expand Up @@ -580,6 +581,12 @@ public PlanWithProperties visitTopNRanking(TopNRankingNode node, StreamPreferred
return planAndEnforceChildren(node, requiredProperties, requiredProperties);
}

@Override
public PlanWithProperties visitSimpleTableExecuteNode(SimpleTableExecuteNode node, StreamPreferredProperties context)
{
return planAndEnforceChildren(node, singleStream(), singleStream());
}

//
// Table Writer and Table Execute
//
Expand Down
Expand Up @@ -63,6 +63,7 @@
import io.trino.sql.planner.plan.RowNumberNode;
import io.trino.sql.planner.plan.SampleNode;
import io.trino.sql.planner.plan.SemiJoinNode;
import io.trino.sql.planner.plan.SimpleTableExecuteNode;
import io.trino.sql.planner.plan.SortNode;
import io.trino.sql.planner.plan.SpatialJoinNode;
import io.trino.sql.planner.plan.StatisticsWriterNode;
Expand Down Expand Up @@ -481,6 +482,15 @@ public ActualProperties visitTableExecute(TableExecuteNode node, List<ActualProp
.build();
}

@Override
public ActualProperties visitSimpleTableExecuteNode(SimpleTableExecuteNode node, List<ActualProperties> inputProperties)
{
// metadata operations always run on the coordinator
return ActualProperties.builder()
.global(coordinatorSingleStreamPartition())
.build();
}

@Override
public ActualProperties visitJoin(JoinNode node, List<ActualProperties> inputProperties)
{
Expand Down
Expand Up @@ -54,6 +54,7 @@
import io.trino.sql.planner.plan.RowNumberNode;
import io.trino.sql.planner.plan.SampleNode;
import io.trino.sql.planner.plan.SemiJoinNode;
import io.trino.sql.planner.plan.SimpleTableExecuteNode;
import io.trino.sql.planner.plan.SortNode;
import io.trino.sql.planner.plan.SpatialJoinNode;
import io.trino.sql.planner.plan.StatisticsWriterNode;
Expand Down Expand Up @@ -451,6 +452,12 @@ public StreamProperties visitTableExecute(TableExecuteNode node, List<StreamProp
return properties.withUnspecifiedPartitioning();
}

@Override
public StreamProperties visitSimpleTableExecuteNode(SimpleTableExecuteNode node, List<StreamProperties> context)
{
return StreamProperties.singleStream();
}

@Override
public StreamProperties visitRefreshMaterializedView(RefreshMaterializedViewNode node, List<StreamProperties> inputProperties)
{
Expand Down

0 comments on commit 1c1fb30

Please sign in to comment.