Skip to content

Commit

Permalink
Record output column details to EventListener
Browse files Browse the repository at this point in the history
This captures the columns created/inserted/updated in a given query
  • Loading branch information
Praveen2112 committed Mar 31, 2021
1 parent ec68911 commit 5a038f8
Show file tree
Hide file tree
Showing 7 changed files with 183 additions and 25 deletions.
Expand Up @@ -76,6 +76,7 @@
import java.util.OptionalLong;
import java.util.stream.Collectors;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.execution.QueryState.QUEUED;
import static io.trino.execution.StageInfo.getAllStages;
import static io.trino.sql.planner.planprinter.PlanPrinter.textDistributedPlan;
Expand Down Expand Up @@ -367,6 +368,10 @@ private static QueryIOMetadata getQueryIOMetadata(QueryInfo queryInfo)
queryInfo.getOutput().get().getCatalogName(),
queryInfo.getOutput().get().getSchema(),
queryInfo.getOutput().get().getTable(),
queryInfo.getOutput().get().getColumns()
.map(columns -> columns.stream()
.map(Column::getName)
.collect(toImmutableList())),
tableFinishInfo.map(TableFinishInfo::getConnectorOutputMetadata),
tableFinishInfo.map(TableFinishInfo::isJsonLengthLimitExceeded)));
}
Expand Down
16 changes: 12 additions & 4 deletions core/trino-main/src/main/java/io/trino/sql/analyzer/Analysis.java
Expand Up @@ -22,6 +22,7 @@
import com.google.common.collect.Multimap;
import com.google.common.collect.Multiset;
import com.google.common.collect.Streams;
import io.trino.execution.Column;
import io.trino.metadata.NewTableLayout;
import io.trino.metadata.QualifiedObjectName;
import io.trino.metadata.ResolvedFunction;
Expand Down Expand Up @@ -216,14 +217,14 @@ public Optional<Output> getTarget()
{
return target.map(target -> {
QualifiedObjectName name = target.getName();
return new Output(name.getCatalogName(), name.getSchemaName(), name.getObjectName());
return new Output(name.getCatalogName(), name.getSchemaName(), name.getObjectName(), target.getColumns());
});
}

public void setUpdateType(String updateType, QualifiedObjectName targetName, Optional<Table> targetTable)
public void setUpdateType(String updateType, QualifiedObjectName targetName, Optional<Table> targetTable, Optional<List<Column>> targetColumns)
{
this.updateType = updateType;
this.target = Optional.of(new UpdateTarget(targetName, targetTable));
this.target = Optional.of(new UpdateTarget(targetName, targetTable, targetColumns));
}

public void resetUpdateType()
Expand Down Expand Up @@ -1513,11 +1514,13 @@ private static class UpdateTarget
{
private final QualifiedObjectName name;
private final Optional<Table> table;
private final Optional<List<Column>> columns;

public UpdateTarget(QualifiedObjectName name, Optional<Table> table)
public UpdateTarget(QualifiedObjectName name, Optional<Table> table, Optional<List<Column>> columns)
{
this.name = requireNonNull(name, "name is null");
this.table = requireNonNull(table, "table is null");
this.columns = requireNonNull(columns, "columns is null").map(ImmutableList::copyOf);
}

public QualifiedObjectName getName()
Expand All @@ -1529,5 +1532,10 @@ public Optional<Table> getTable()
{
return table;
}

public Optional<List<Column>> getColumns()
{
return columns;
}
}
}
20 changes: 17 additions & 3 deletions core/trino-main/src/main/java/io/trino/sql/analyzer/Output.java
Expand Up @@ -15,10 +15,14 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import io.trino.execution.Column;

import javax.annotation.concurrent.Immutable;

import java.util.List;
import java.util.Objects;
import java.util.Optional;

import static java.util.Objects.requireNonNull;

Expand All @@ -28,16 +32,19 @@ public final class Output
private final String catalogName;
private final String schema;
private final String table;
private final Optional<List<Column>> columns;

@JsonCreator
public Output(
@JsonProperty("catalogName") String catalogName,
@JsonProperty("schema") String schema,
@JsonProperty("table") String table)
@JsonProperty("table") String table,
@JsonProperty("columns") Optional<List<Column>> columns)
{
this.catalogName = requireNonNull(catalogName, "catalogName is null");
this.schema = requireNonNull(schema, "schema is null");
this.table = requireNonNull(table, "table is null");
this.columns = requireNonNull(columns, "columns is null").map(ImmutableList::copyOf);
}

@JsonProperty
Expand All @@ -58,6 +65,12 @@ public String getTable()
return table;
}

@JsonProperty
public Optional<List<Column>> getColumns()
{
return columns;
}

@Override
public boolean equals(Object o)
{
Expand All @@ -70,12 +83,13 @@ public boolean equals(Object o)
Output output = (Output) o;
return Objects.equals(catalogName, output.catalogName) &&
Objects.equals(schema, output.schema) &&
Objects.equals(table, output.table);
Objects.equals(table, output.table) &&
Objects.equals(columns, output.columns);
}

@Override
public int hashCode()
{
return Objects.hash(catalogName, schema, table);
return Objects.hash(catalogName, schema, table, columns);
}
}
Expand Up @@ -22,6 +22,7 @@
import com.google.common.collect.Iterables;
import io.trino.Session;
import io.trino.connector.CatalogName;
import io.trino.execution.Column;
import io.trino.execution.warnings.WarningCollector;
import io.trino.metadata.FunctionKind;
import io.trino.metadata.FunctionMetadata;
Expand Down Expand Up @@ -397,8 +398,6 @@ protected Scope visitInsert(Insert insert, Optional<Scope> scope)
// analyze the query that creates the data
Scope queryScope = analyze(insert.getQuery(), createScope(scope));

analysis.setUpdateType("INSERT", targetTable, Optional.empty());

// verify the insert destination columns match the query
Optional<TableHandle> targetTableHandle = metadata.getTableHandle(session, targetTable);
if (targetTableHandle.isEmpty()) {
Expand Down Expand Up @@ -477,6 +476,14 @@ protected Scope visitInsert(Insert insert, Optional<Scope> scope)
Joiner.on(", ").join(queryTypes));
}

analysis.setUpdateType(
"INSERT",
targetTable,
Optional.empty(),
Optional.of(insertColumns.stream()
.map(insertColumn -> new Column(insertColumn, tableMetadata.getColumn(insertColumn).getType().toString()))
.collect(toImmutableList())));

return createAndAssignScope(insert, scope, Field.newUnqualified("rows", BIGINT));
}

Expand All @@ -502,8 +509,6 @@ protected Scope visitRefreshMaterializedView(RefreshMaterializedView refreshMate
Query query = parseView(optionalView.get().getOriginalSql(), name, refreshMaterializedView);
Scope queryScope = process(query, scope);

analysis.setUpdateType("REFRESH MATERIALIZED VIEW", targetTable, Optional.empty());

// verify the insert destination columns match the query
Optional<TableHandle> targetTableHandle = metadata.getTableHandle(session, targetTable);
if (targetTableHandle.isEmpty()) {
Expand Down Expand Up @@ -545,6 +550,14 @@ protected Scope visitRefreshMaterializedView(RefreshMaterializedView refreshMate
"Query: [" + Joiner.on(", ").join(queryTypes) + "]");
}

analysis.setUpdateType(
"REFRESH MATERIALIZED VIEW",
targetTable,
Optional.empty(),
Optional.of(insertColumns.stream()
.map(insertColumn -> new Column(insertColumn, tableMetadata.getColumn(insertColumn).getType().toString()))
.collect(toImmutableList())));

return createAndAssignScope(refreshMaterializedView, scope, Field.newUnqualified("rows", BIGINT));
}

Expand Down Expand Up @@ -642,7 +655,7 @@ protected Scope visitDelete(Delete node, Optional<Scope> scope)
Scope tableScope = analyzer.analyzeForUpdate(table, scope, UpdateKind.DELETE);
node.getWhere().ifPresent(where -> analyzeWhere(node, tableScope, where));

analysis.setUpdateType("DELETE", tableName, Optional.of(table));
analysis.setUpdateType("DELETE", tableName, Optional.of(table), Optional.empty());

return createAndAssignScope(node, scope, Field.newUnqualified("rows", BIGINT));
}
Expand All @@ -651,7 +664,7 @@ protected Scope visitDelete(Delete node, Optional<Scope> scope)
protected Scope visitAnalyze(Analyze node, Optional<Scope> scope)
{
QualifiedObjectName tableName = createQualifiedObjectName(session, node, node.getTableName());
analysis.setUpdateType("ANALYZE", tableName, Optional.empty());
analysis.setUpdateType("ANALYZE", tableName, Optional.empty(), Optional.empty());

// verify the target table exists and it's not a view
if (metadata.getView(session, tableName).isPresent()) {
Expand Down Expand Up @@ -696,7 +709,6 @@ protected Scope visitCreateTableAsSelect(CreateTableAsSelect node, Optional<Scop
{
// turn this into a query that has a new table writer node on top.
QualifiedObjectName targetTable = createQualifiedObjectName(session, node, node.getName());
analysis.setUpdateType("CREATE TABLE", targetTable, Optional.empty());

Optional<TableHandle> targetTableHandle = metadata.getTableHandle(session, targetTable);
if (targetTableHandle.isPresent()) {
Expand All @@ -707,6 +719,7 @@ protected Scope visitCreateTableAsSelect(CreateTableAsSelect node, Optional<Scop
Optional.empty(),
node.isWithData(),
true));
analysis.setUpdateType("CREATE TABLE", targetTable, Optional.empty(), Optional.of(ImmutableList.of()));
return createAndAssignScope(node, scope, Field.newUnqualified("rows", BIGINT));
}
throw semanticException(TABLE_ALREADY_EXISTS, node, "Destination table '%s' already exists", targetTable);
Expand Down Expand Up @@ -783,14 +796,21 @@ protected Scope visitCreateTableAsSelect(CreateTableAsSelect node, Optional<Scop
node.isWithData(),
false));

analysis.setUpdateType(
"CREATE TABLE",
targetTable,
Optional.empty(),
Optional.of(columns.build().stream()
.map(column -> new Column(column.getName(), column.getType().toString()))
.collect(toImmutableList())));

return createAndAssignScope(node, scope, Field.newUnqualified("rows", BIGINT));
}

@Override
protected Scope visitCreateView(CreateView node, Optional<Scope> scope)
{
QualifiedObjectName viewName = createQualifiedObjectName(session, node, node.getName());
analysis.setUpdateType("CREATE VIEW", viewName, Optional.empty());

// analyze the query that creates the view
StatementAnalyzer analyzer = new StatementAnalyzer(analysis, metadata, sqlParser, groupProvider, accessControl, session, warningCollector, CorrelationSupport.ALLOWED);
Expand All @@ -801,6 +821,14 @@ protected Scope visitCreateView(CreateView node, Optional<Scope> scope)

validateColumns(node, queryScope.getRelationType());

analysis.setUpdateType(
"CREATE VIEW",
viewName,
Optional.empty(),
Optional.of(queryScope.getRelationType().getVisibleFields().stream()
.map(field -> new Column(field.getName().orElseThrow(), field.getType().toString()))
.collect(toImmutableList())));

return createAndAssignScope(node, scope);
}

Expand Down Expand Up @@ -975,7 +1003,6 @@ protected Scope visitCall(Call node, Optional<Scope> scope)
protected Scope visitCreateMaterializedView(CreateMaterializedView node, Optional<Scope> scope)
{
QualifiedObjectName viewName = createQualifiedObjectName(session, node, node.getName());
analysis.setUpdateType("CREATE MATERIALIZED VIEW", viewName, Optional.empty());

if (node.isReplace() && node.isNotExists()) {
throw semanticException(NOT_SUPPORTED, node, "'CREATE OR REPLACE' and 'IF NOT EXISTS' clauses can not be used together");
Expand All @@ -994,6 +1021,14 @@ protected Scope visitCreateMaterializedView(CreateMaterializedView node, Optiona

validateColumns(node, queryScope.getRelationType());

analysis.setUpdateType(
"CREATE MATERIALIZED VIEW",
viewName,
Optional.empty(),
Optional.of(queryScope.getRelationType().getVisibleFields().stream()
.map(field -> new Column(field.getName().orElseThrow(), field.getType().toString()))
.collect(toImmutableList())));

return createAndAssignScope(node, scope);
}

Expand Down Expand Up @@ -1879,7 +1914,6 @@ protected Scope visitUpdate(Update update, Optional<Scope> scope)
List<ColumnMetadata> updatedColumns = allColumns.stream()
.filter(column -> assignmentTargets.contains(column.getName()))
.collect(toImmutableList());
analysis.setUpdateType("UPDATE", tableName, Optional.of(table));
analysis.setUpdatedColumns(updatedColumns);

// Analyzer checks for select permissions but UPDATE has a separate permission, so disable access checks
Expand Down Expand Up @@ -1930,6 +1964,14 @@ protected Scope visitUpdate(Update update, Optional<Scope> scope)
analysis.recordSubqueries(update, analyses.get(index));
}

analysis.setUpdateType(
"UPDATE",
tableName,
Optional.of(table),
Optional.of(updatedColumns.stream()
.map(column -> new Column(column.getName(), column.getType().toString()))
.collect(toImmutableList())));

return createAndAssignScope(update, scope, Field.newUnqualified("rows", BIGINT));
}

Expand Down
Expand Up @@ -13,9 +13,13 @@
*/
package io.trino.sql.analyzer;

import com.google.common.collect.ImmutableList;
import io.airlift.json.JsonCodec;
import io.trino.execution.Column;
import org.testng.annotations.Test;

import java.util.Optional;

import static org.testng.Assert.assertEquals;

public class TestOutput
Expand All @@ -25,7 +29,7 @@ public class TestOutput
@Test
public void testRoundTrip()
{
Output expected = new Output("connectorId", "schema", "table");
Output expected = new Output("connectorId", "schema", "table", Optional.of(ImmutableList.of(new Column("column", "type"))));

String json = codec.toJson(expected);
Output actual = codec.fromJson(json);
Expand Down
Expand Up @@ -15,6 +15,7 @@

import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.List;
import java.util.Optional;

import static java.util.Objects.requireNonNull;
Expand All @@ -24,15 +25,17 @@ public class QueryOutputMetadata
private final String catalogName;
private final String schema;
private final String table;
private final Optional<List<String>> columns;

private final Optional<String> connectorOutputMetadata;
private final Optional<Boolean> jsonLengthLimitExceeded;

public QueryOutputMetadata(String catalogName, String schema, String table, Optional<String> connectorOutputMetadata, Optional<Boolean> jsonLengthLimitExceeded)
public QueryOutputMetadata(String catalogName, String schema, String table, Optional<List<String>> columns, Optional<String> connectorOutputMetadata, Optional<Boolean> jsonLengthLimitExceeded)
{
this.catalogName = requireNonNull(catalogName, "catalogName is null");
this.schema = requireNonNull(schema, "schema is null");
this.table = requireNonNull(table, "table is null");
this.columns = requireNonNull(columns, "columns is null");
this.connectorOutputMetadata = requireNonNull(connectorOutputMetadata, "connectorOutputMetadata is null");
this.jsonLengthLimitExceeded = requireNonNull(jsonLengthLimitExceeded, "jsonLengthLimitExceeded is null");
}
Expand All @@ -55,6 +58,12 @@ public String getTable()
return table;
}

@JsonProperty
public Optional<List<String>> getColumns()
{
return columns;
}

@JsonProperty
public Optional<String> getConnectorOutputMetadata()
{
Expand Down

0 comments on commit 5a038f8

Please sign in to comment.