Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ private static QueryInfo immediateFailureQueryInfo(
ImmutableList.of(),
ImmutableSet.of(),
Optional.empty(),
Optional.empty(),
ImmutableList.of(),
ImmutableList.of(),
true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ public void queryImmediateFailureEvent(BasicQueryInfo queryInfo, ExecutionFailur
queryInfo.getQueryType(),
queryInfo.getRetryPolicy()),
new QueryIOMetadata(ImmutableList.of(), Optional.empty()),
Optional.empty(),
createQueryFailureInfo(failure, Optional.empty()),
ImmutableList.of(),
queryInfo.getQueryStats().getCreateTime(),
Expand All @@ -276,6 +277,7 @@ public void queryCompletedEvent(QueryInfo queryInfo)
queryInfo.getQueryType(),
queryInfo.getRetryPolicy()),
getQueryIOMetadata(queryInfo),
queryInfo.getSelectColumnsLineageInfo(),
createQueryFailureInfo(queryInfo.getFailureInfo(), queryInfo.getStages()),
queryInfo.getWarnings(),
queryStats.getCreateTime(),
Expand Down
11 changes: 11 additions & 0 deletions core/trino-main/src/main/java/io/trino/execution/QueryInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.trino.spi.ErrorType;
import io.trino.spi.QueryId;
import io.trino.spi.TrinoWarning;
import io.trino.spi.eventlistener.ColumnLineageInfo;
import io.trino.spi.eventlistener.RoutineInfo;
import io.trino.spi.eventlistener.TableInfo;
import io.trino.spi.resourcegroups.QueryType;
Expand Down Expand Up @@ -80,6 +81,7 @@ public class QueryInfo
private final List<TrinoWarning> warnings;
private final Set<Input> inputs;
private final Optional<Output> output;
private final Optional<List<ColumnLineageInfo>> selectColumnsLineageInfo;
private final boolean finalQueryInfo;
private final Optional<ResourceGroupId> resourceGroupId;
private final Optional<QueryType> queryType;
Expand Down Expand Up @@ -117,6 +119,7 @@ public QueryInfo(
@JsonProperty("warnings") List<TrinoWarning> warnings,
@JsonProperty("inputs") Set<Input> inputs,
@JsonProperty("output") Optional<Output> output,
@JsonProperty("selectColumnsLineageInfo") Optional<List<ColumnLineageInfo>> selectColumnsLineageInfo,
@JsonProperty("referencedTables") List<TableInfo> referencedTables,
@JsonProperty("routines") List<RoutineInfo> routines,
@JsonProperty("finalQueryInfo") boolean finalQueryInfo,
Expand Down Expand Up @@ -154,6 +157,7 @@ public QueryInfo(
requireNonNull(queryType, "queryType is null");
requireNonNull(retryPolicy, "retryPolicy is null");
requireNonNull(version, "version is null");
requireNonNull(selectColumnsLineageInfo, "selectColumnsLineageInfo is null");

this.queryId = queryId;
this.session = session;
Expand Down Expand Up @@ -193,6 +197,13 @@ public QueryInfo(
this.retryPolicy = retryPolicy;
this.pruned = pruned;
this.version = version;
this.selectColumnsLineageInfo = selectColumnsLineageInfo.map(ImmutableList::copyOf);
}

@JsonProperty
public Optional<List<ColumnLineageInfo>> getSelectColumnsLineageInfo()
{
return selectColumnsLineageInfo;
}

@JsonProperty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import io.trino.spi.QueryId;
import io.trino.spi.TrinoException;
import io.trino.spi.TrinoWarning;
import io.trino.spi.eventlistener.ColumnLineageInfo;
import io.trino.spi.eventlistener.RoutineInfo;
import io.trino.spi.eventlistener.StageGcStatistics;
import io.trino.spi.eventlistener.TableInfo;
Expand Down Expand Up @@ -182,6 +183,7 @@ public class QueryStateMachine

private final AtomicReference<Set<Input>> inputs = new AtomicReference<>(ImmutableSet.of());
private final AtomicReference<Optional<Output>> output = new AtomicReference<>(Optional.empty());
private final AtomicReference<Optional<List<ColumnLineageInfo>>> selectColumnsLineageInfo = new AtomicReference<>(Optional.empty());
private final AtomicReference<List<TableInfo>> referencedTables = new AtomicReference<>(ImmutableList.of());
private final AtomicReference<List<RoutineInfo>> routines = new AtomicReference<>(ImmutableList.of());
private final AtomicReference<Map<String, Metrics>> catalogMetadataMetrics = new AtomicReference<>(ImmutableMap.of());
Expand Down Expand Up @@ -693,6 +695,7 @@ QueryInfo getQueryInfo(Optional<StagesInfo> stages)
warningCollector.getWarnings(),
inputs.get(),
output.get(),
selectColumnsLineageInfo.get(),
referencedTables.get(),
routines.get(),
finalInfo,
Expand Down Expand Up @@ -1029,6 +1032,12 @@ public void setOutput(Optional<Output> output)
this.output.set(output);
}

public void setSelectColumnsLineageInfo(Optional<List<ColumnLineageInfo>> selectOutputColumnsLineage)
{
requireNonNull(selectOutputColumnsLineage, "selectOutputColumnsLineage is null");
this.selectColumnsLineageInfo.set(selectOutputColumnsLineage);
}

public void setReferencedTables(List<TableInfo> tables)
{
requireNonNull(tables, "tables is null");
Expand Down Expand Up @@ -1499,6 +1508,7 @@ public static QueryInfo pruneQueryInfo(QueryInfo queryInfo, NodeVersion version)
queryInfo.getWarnings(),
queryInfo.getInputs(),
queryInfo.getOutput(),
Optional.empty(),
queryInfo.getReferencedTables(),
queryInfo.getRoutines(),
queryInfo.isFinalQueryInfo(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ private static Analysis analyze(
stateMachine.setUpdateType(analysis.getUpdateType());
stateMachine.setReferencedTables(analysis.getReferencedTables());
stateMachine.setRoutines(analysis.getRoutines());
stateMachine.setSelectColumnsLineageInfo(analysis.getSelectColumnsLineageInfo());

stateMachine.endAnalysis();

Expand Down
27 changes: 27 additions & 0 deletions core/trino-main/src/main/java/io/trino/sql/analyzer/Analysis.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import io.trino.spi.eventlistener.BaseViewReferenceInfo;
import io.trino.spi.eventlistener.ColumnDetail;
import io.trino.spi.eventlistener.ColumnInfo;
import io.trino.spi.eventlistener.ColumnLineageInfo;
import io.trino.spi.eventlistener.ColumnMaskReferenceInfo;
import io.trino.spi.eventlistener.MaterializedViewReferenceInfo;
import io.trino.spi.eventlistener.RoutineInfo;
Expand Down Expand Up @@ -100,6 +101,7 @@
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.Deque;
import java.util.HashSet;
import java.util.LinkedHashMap;
Expand Down Expand Up @@ -276,6 +278,31 @@ public Analysis(@Nullable Statement root, Map<NodeRef<Parameter>, Expression> pa
this.queryType = requireNonNull(queryType, "queryType is null");
}

public Optional<List<ColumnLineageInfo>> getSelectColumnsLineageInfo()
{
// This single check should handle all cases where we don't want to produce lineage info:
// - EXPLAIN ✓
// - INSERT/UPDATE/DELETE/MERGE ✓
// - ALTER TABLE ADD COLUMN ✓
// - SET COLUMN TYPE or any other DDL ✓
if (!(root instanceof Query)) {
return Optional.empty();
}

RelationType rootRelation = getOutputDescriptor();
List<ColumnLineageInfo> lineageInfo = rootRelation.getVisibleFields().stream()
// sort output fields by their index to ensure consistent ordering of lineage info
.sorted(Comparator.comparingInt(rootRelation::indexOf))
.map(field -> new ColumnLineageInfo(
field.getName().orElse(""),
getSourceColumns(field)
.stream()
.map(SourceColumn::getColumnDetail)
.collect(toImmutableSet())))
.collect(toImmutableList());
return lineageInfo.isEmpty() ? Optional.empty() : Optional.of(lineageInfo);
}

public Statement getStatement()
{
return root;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ public QueryInfo getFullQueryInfo()
ImmutableList.of(),
ImmutableSet.of(),
Optional.empty(),
Optional.empty(),
ImmutableList.of(),
ImmutableList.of(),
state.isDone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ private static QueryInfo createQueryInfo(Optional<StagesInfo> stagesInfo)
ImmutableList.of(new TrinoWarning(new WarningCode(1, "name"), "message")),
ImmutableSet.of(new Input(Optional.of("connectorName"), "catalog", new CatalogVersion("default"), "schema", "talble", Optional.empty(), ImmutableList.of(new Column("name", "type")), new PlanFragmentId("id"), new PlanNodeId("1"))),
Optional.empty(),
Optional.empty(),
ImmutableList.of(),
ImmutableList.of(),
true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ public void testConstructor()
ImmutableList.of(),
ImmutableSet.of(),
Optional.empty(),
Optional.empty(),
ImmutableList.of(),
ImmutableList.of(),
false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ private QueryInfo createQueryInfo(String queryId, QueryState state, String query
ImmutableList.of(),
ImmutableSet.of(),
Optional.empty(),
Optional.empty(),
ImmutableList.of(),
ImmutableList.of(),
false,
Expand Down
Loading