Skip to content

Commit

Permalink
Merge pull request #1 from mgorsk1/feat/trino-run-facets
Browse files Browse the repository at this point in the history
feat: add trino facets for query metadata & statistics
  • Loading branch information
mgorsk1 committed Mar 21, 2024
2 parents 56eff15 + be5a569 commit e3485c3
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 49 deletions.
2 changes: 2 additions & 0 deletions event-listener.properties
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
event-listener.name=openlineage
openlineage.url=http://olapi:5000
openlineage.facets.trinoMetadata.enabled=true
openlineage.facets.trinoQueryStatistics.enabled=true
#openlineage.apikey=xxxx
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,20 @@ public void emit(OpenLineage.RunEvent event)
String json = objectMapper.writeValueAsString(event);
logger.info(json);

Request.Builder requestBuilder = Request.builder()
.setMethod("POST")
.setUri(URI.create(url + "/api/v1/lineage"))
.addHeader("Content-Type", "application/json")
.setBodyGenerator(StaticBodyGenerator.createStaticBodyGenerator(json.getBytes(StandardCharsets.UTF_8)));
Request.Builder requestBuilder =
Request.builder()
.setMethod("POST")
.setUri(URI.create(url + "/api/v1/lineage"))
.addHeader("Content-Type", "application/json")
.setBodyGenerator(
StaticBodyGenerator.createStaticBodyGenerator(
json.getBytes(StandardCharsets.UTF_8)));

apiKey.ifPresent(s -> requestBuilder.addHeader("Authorization", "Bearer " + s));

StatusResponseHandler.StatusResponse status = jettyClient.execute(requestBuilder.build(), StatusResponseHandler.createStatusResponseHandler());
StatusResponseHandler.StatusResponse status =
jettyClient.execute(
requestBuilder.build(), StatusResponseHandler.createStatusResponseHandler());
logger.info("Response status: " + status.getStatusCode());
}
catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@
import io.trino.spi.eventlistener.QueryCompletedEvent;
import io.trino.spi.eventlistener.QueryCreatedEvent;
import io.trino.spi.eventlistener.QueryIOMetadata;
import io.trino.spi.eventlistener.QueryMetadata;
import io.trino.spi.eventlistener.QueryOutputMetadata;
import io.trino.spi.eventlistener.QueryStatistics;

import java.lang.reflect.Field;
import java.net.URI;
import java.time.ZoneId;
import java.util.List;
Expand All @@ -34,10 +37,14 @@ public class OpenLineageListener
{
private final OpenLineage ol = new OpenLineage(URI.create("https://github.com/takezoe/trino-openlineage"));
private final OpenLineageClient client;
private final Boolean trinoMetadataFacetEnabled;
private final Boolean queryStatisticsFacetEnabled;

public OpenLineageListener(String url, Optional<String> apiKey)
public OpenLineageListener(String url, Optional<String> apiKey, Boolean trinoMetadataFacetEnabled, Boolean queryStatisticsFacetEnabled)
{
this.client = new OpenLineageClient(url, apiKey);
this.trinoMetadataFacetEnabled = trinoMetadataFacetEnabled;
this.queryStatisticsFacetEnabled = queryStatisticsFacetEnabled;
}

@Override
Expand All @@ -50,61 +57,125 @@ public void queryCreated(QueryCreatedEvent queryCreatedEvent)
public void queryCompleted(QueryCompletedEvent queryCompletedEvent)
{
UUID runID = UUID.randomUUID();
sendStartEvent(runID, queryCompletedEvent);
sendCompletedEvent(runID, queryCompletedEvent);
try {
sendStartEvent(runID, queryCompletedEvent);
}
catch (IllegalAccessException e) {
throw new RuntimeException(e);
}
try {
sendCompletedEvent(runID, queryCompletedEvent);
}
catch (IllegalAccessException e) {
throw new RuntimeException(e);
}
}

private Optional<OpenLineage.RunFacet> getTrinoMetadataFacet(QueryMetadata queryMetadata)
{
if (this.trinoMetadataFacetEnabled) {
OpenLineage.RunFacet trinoMetadataFacet = ol.newRunFacet();

if (queryMetadata.getPlan().isPresent()) {
trinoMetadataFacet.getAdditionalProperties().put("queryPlan", queryMetadata.getPlan().orElse(""));
}
if (queryMetadata.getTransactionId().isPresent()) {
trinoMetadataFacet.getAdditionalProperties().put("transactionId", queryMetadata.getTransactionId().orElse(""));
}
return Optional.of(trinoMetadataFacet);
}
return Optional.empty();
}

private void sendStartEvent(UUID runId, QueryCompletedEvent queryCompletedEvent)
private Optional<OpenLineage.RunFacet> getTrinoQueryStatisticsFacet(QueryStatistics queryStatistics)
throws IllegalAccessException
{
OpenLineage.RunEvent startEvent = ol.newRunEventBuilder()
.eventType(OpenLineage.RunEvent.EventType.START)
.eventTime(queryCompletedEvent.getExecutionStartTime().atZone(ZoneId.of("UTC")))
.run(ol.newRunBuilder()
.runId(runId)
.build())
.job(ol.newJobBuilder()
.namespace(queryCompletedEvent.getContext().getUser())
.name(queryCompletedEvent.getMetadata().getQueryId())
.facets(ol.newJobFacetsBuilder()
.sql(ol.newSQLJobFacet(queryCompletedEvent.getMetadata().getQuery()))
.build())
.build())
.inputs(buildInputs(queryCompletedEvent.getIoMetadata()))
.outputs(buildOutputs(queryCompletedEvent.getIoMetadata()))
.build();
if (this.queryStatisticsFacetEnabled) {
OpenLineage.RunFacet trinoQueryStatisticsFacet = ol.newRunFacet();

for (Field field : queryStatistics.getClass().getDeclaredFields()) {
field.setAccessible(true);
trinoQueryStatisticsFacet
.getAdditionalProperties()
.put(field.getName(), String.valueOf(field.get(queryStatistics)));
}
return Optional.of(trinoQueryStatisticsFacet);
}
return Optional.empty();
}

private void sendStartEvent(UUID runID, QueryCompletedEvent queryCompletedEvent)
throws IllegalAccessException
{
OpenLineage.RunFacetsBuilder runFacetsBuilder = ol.newRunFacetsBuilder();
Optional<OpenLineage.RunFacet> trinoMetadata = getTrinoMetadataFacet(queryCompletedEvent.getMetadata());

trinoMetadata.ifPresent(runFacet -> runFacetsBuilder.put("trino.metadata", runFacet));

OpenLineage.RunEvent startEvent =
ol.newRunEventBuilder()
.eventType(OpenLineage.RunEvent.EventType.START)
.eventTime(queryCompletedEvent.getExecutionStartTime().atZone(ZoneId.of("UTC")))
.run(ol.newRunBuilder().runId(runID).facets(runFacetsBuilder.build()).build())
.job(
ol.newJobBuilder()
.namespace(queryCompletedEvent.getContext().getUser())
.name(queryCompletedEvent.getMetadata().getQueryId())
.facets(
ol.newJobFacetsBuilder()
.sql(ol.newSQLJobFacet(queryCompletedEvent.getMetadata().getQuery()))
.build())
.build())
.inputs(buildInputs(queryCompletedEvent.getIoMetadata()))
.outputs(buildOutputs(queryCompletedEvent.getIoMetadata()))
.build();

client.emit(startEvent);
}

private void sendCompletedEvent(UUID runID, QueryCompletedEvent queryCompletedEvent)
throws IllegalAccessException
{
boolean failed = queryCompletedEvent.getMetadata().getQueryState().equals("FAILED");

OpenLineage.RunEvent completedEvent = ol.newRunEventBuilder()
.eventType(failed ? OpenLineage.RunEvent.EventType.FAIL : OpenLineage.RunEvent.EventType.COMPLETE)
.eventTime(queryCompletedEvent.getEndTime().atZone(ZoneId.of("UTC")))
.run(ol.newRunBuilder().runId(runID).build())
.job(ol.newJobBuilder()
.namespace(queryCompletedEvent.getContext().getUser())
.name(queryCompletedEvent.getMetadata().getQueryId())
.facets(ol.newJobFacetsBuilder()
.sql(ol.newSQLJobFacet(queryCompletedEvent.getMetadata().getQuery()))
.build())
.build())
.inputs(buildInputs(queryCompletedEvent.getIoMetadata()))
.outputs(buildOutputs(queryCompletedEvent.getIoMetadata()))
.build();
OpenLineage.RunFacetsBuilder runFacetsBuilder = ol.newRunFacetsBuilder();
Optional<OpenLineage.RunFacet> trinoMetadata = getTrinoMetadataFacet(queryCompletedEvent.getMetadata());
Optional<OpenLineage.RunFacet> trinoQueryStatistics = getTrinoQueryStatisticsFacet(queryCompletedEvent.getStatistics());

trinoMetadata.ifPresent(runFacet -> runFacetsBuilder.put("trino.metadata", runFacet));
trinoQueryStatistics.ifPresent(runFacet -> runFacetsBuilder.put("trino.queryStatistics", runFacet));

OpenLineage.RunEvent completedEvent =
ol.newRunEventBuilder()
.eventType(
failed
? OpenLineage.RunEvent.EventType.FAIL
: OpenLineage.RunEvent.EventType.COMPLETE)
.eventTime(queryCompletedEvent.getEndTime().atZone(ZoneId.of("UTC")))
.run(ol.newRunBuilder().runId(runID).facets(runFacetsBuilder.build()).build())
.job(
ol.newJobBuilder()
.namespace(queryCompletedEvent.getContext().getUser())
.name(queryCompletedEvent.getMetadata().getQueryId())
.facets(
ol.newJobFacetsBuilder()
.sql(ol.newSQLJobFacet(queryCompletedEvent.getMetadata().getQuery()))
.build())
.build())
.inputs(buildInputs(queryCompletedEvent.getIoMetadata()))
.outputs(buildOutputs(queryCompletedEvent.getIoMetadata()))
.build();

client.emit(completedEvent);
}

private List<OpenLineage.InputDataset> buildInputs(QueryIOMetadata ioMetadata)
{
return ioMetadata.getInputs().stream().map(inputMetadata ->
ol.newInputDatasetBuilder()
.namespace(getDatasetNamespace(inputMetadata.getCatalogName()))
.name(inputMetadata.getSchema() + "." + inputMetadata.getTable())
.build()
ol.newInputDatasetBuilder()
.namespace(getDatasetNamespace(inputMetadata.getCatalogName()))
.name(inputMetadata.getSchema() + "." + inputMetadata.getTable())
.build()
).collect(toImmutableList());
}

Expand All @@ -113,10 +184,11 @@ private List<OpenLineage.OutputDataset> buildOutputs(QueryIOMetadata ioMetadata)
Optional<QueryOutputMetadata> outputs = ioMetadata.getOutput();
if (outputs.isPresent()) {
QueryOutputMetadata outputMetadata = outputs.get();
return ImmutableList.of(ol.newOutputDatasetBuilder()
.namespace(getDatasetNamespace(outputMetadata.getCatalogName()))
.name(outputMetadata.getSchema() + "." + outputMetadata.getTable())
.build());
return ImmutableList.of(
ol.newOutputDatasetBuilder()
.namespace(getDatasetNamespace(outputMetadata.getCatalogName()))
.name(outputMetadata.getSchema() + "." + outputMetadata.getTable())
.build());
}
else {
return ImmutableList.of();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,11 @@ public EventListener create(Map<String, String> config)
{
String url = requireNonNull(config.get("openlineage.url"));
String apiKey = config.get("openlineage.apikey");
Boolean trinoMetadataFacetEnabled = config.get("openlineage.facets.trinoMetadata.enabled").equalsIgnoreCase("true");
Boolean trinoQueryStatisticsFacetEnabled = config.get("openlineage.facets.trinoQueryStatistics.enabled").equalsIgnoreCase("true");

logger.info("openlineage.url: " + url);

return new OpenLineageListener(url, Optional.ofNullable(apiKey));
return new OpenLineageListener(url, Optional.ofNullable(apiKey), trinoMetadataFacetEnabled, trinoQueryStatisticsFacetEnabled);
}
}

0 comments on commit e3485c3

Please sign in to comment.