Skip to content

Commit

Permalink
Allow Presto clients to receive query results in binary format
Browse files Browse the repository at this point in the history
  • Loading branch information
mbasmanova committed Sep 22, 2023
1 parent 9ed0fc2 commit da04bac
Show file tree
Hide file tree
Showing 9 changed files with 190 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ protected QueryResults createMockQueryResults()
null,
ImmutableList.of(new Column("_col0", BigintType.BIGINT)),
ImmutableList.of(ImmutableList.of(123)),
null,
StatementStats.builder().setState("FINISHED").build(),
null,
ImmutableList.of(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class QueryResults
private final URI nextUri;
private final List<Column> columns;
private final Iterable<List<Object>> data;
private final Iterable<String> binaryData;
private final StatementStats stats;
private final QueryError error;
private final List<PrestoWarning> warnings;
Expand All @@ -55,6 +56,7 @@ public QueryResults(
@JsonProperty("nextUri") URI nextUri,
@JsonProperty("columns") List<Column> columns,
@JsonProperty("data") List<List<Object>> data,
@JsonProperty("binaryData") List<String> binaryData,
@JsonProperty("stats") StatementStats stats,
@JsonProperty("error") QueryError error,
@JsonProperty("warnings") List<PrestoWarning> warnings,
Expand All @@ -68,6 +70,7 @@ public QueryResults(
nextUri,
columns,
fixData(columns, data),
binaryData,
stats,
error,
firstNonNull(warnings, ImmutableList.of()),
Expand All @@ -82,6 +85,7 @@ public QueryResults(
URI nextUri,
List<Column> columns,
Iterable<List<Object>> data,
Iterable<String> binaryData,
StatementStats stats,
QueryError error,
List<PrestoWarning> warnings,
Expand All @@ -94,7 +98,8 @@ public QueryResults(
this.nextUri = nextUri;
this.columns = (columns != null) ? ImmutableList.copyOf(columns) : null;
this.data = (data != null) ? unmodifiableIterable(data) : null;
checkArgument(data == null || columns != null, "data present without columns");
this.binaryData = (binaryData != null) ? unmodifiableIterable(binaryData) : null;
checkArgument((data == null && binaryData == null) || columns != null, "data present without columns");
this.stats = requireNonNull(stats, "stats is null");
this.error = error;
this.warnings = ImmutableList.copyOf(requireNonNull(warnings, "warnings is null"));
Expand Down Expand Up @@ -169,6 +174,16 @@ public Iterable<List<Object>> getData()
return data;
}

/**
* Returns an iterator to the payload (results) in binary format
*/
@Nullable
@JsonProperty
public Iterable<String> getBinaryData()
{
return binaryData;
}

/**
* Returns cumulative statistics on the query being executed
* @return {@link com.facebook.presto.client.StatementStats}
Expand Down Expand Up @@ -237,6 +252,7 @@ public String toString()
.add("nextUri", nextUri)
.add("columns", columns)
.add("hasData", data != null)
.add("hasBinaryData", binaryData != null)
.add("stats", stats)
.add("error", error)
.add("updateType", updateType)
Expand Down
5 changes: 5 additions & 0 deletions presto-docs/src/main/sphinx/develop/client-protocol.rst
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ the columns returned by the query. Most of the response headers should be treat
browser cookies by the client, and echoed back as request headers in subsequent client requests,
as documented below.

To request the results in binary format, include binaryResults=true query parameter in the initial
``/v1/statement`` ``POST`` request. The response JSON document will contain ``binaryData`` field
with a list of base64-encoded pages in :doc:`SerializedPage </develop/serialized-page>` format. The
``data`` field will not be present.

If the JSON document returned by the ``POST`` to ``/v1/statement`` does not contain a ``nextUri`` link, the query has completed,
either successfully or unsuccessfully, and no additional requests need to be made. If the ``nextUri`` link is present in
the document, there are more query results to be fetched. The client should loop executing a ``GET`` request
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ private String newQueryResults(Integer partialCancelId, Integer nextUriId, List<
nextUriId == null ? null : server.url(format("/v1/statement/%s/%s", queryId, nextUriId)).uri(),
responseColumns,
data,
null,
StatementStats.builder()
.setState(state)
.setWaitingForPrerequisites(state.equals("WAITING_FOR_PREREQUISITES"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import javax.annotation.security.RolesAllowed;
import javax.inject.Inject;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.HeaderParam;
import javax.ws.rs.Path;
Expand Down Expand Up @@ -99,6 +100,7 @@ public void getQueryResults(
@QueryParam("slug") String slug,
@QueryParam("maxWait") Duration maxWait,
@QueryParam("targetResultSize") DataSize targetResultSize,
@DefaultValue("false") @QueryParam("binaryResults") boolean binaryResults,
@HeaderParam(X_FORWARDED_PROTO) String proto,
@HeaderParam(PRESTO_PREFIX_URL) String xPrestoPrefixUrl,
@Context UriInfo uriInfo,
Expand All @@ -125,7 +127,7 @@ public void getQueryResults(
acquirePermitAsync,
acquirePermitTimeSeconds -> {
queryRateLimiter.addRateLimiterBlockTime(new Duration(acquirePermitTimeSeconds, SECONDS));
return query.waitForResults(token, uriInfo, effectiveFinalProto, wait, effectiveFinalTargetResultSize);
return query.waitForResults(token, uriInfo, effectiveFinalProto, wait, effectiveFinalTargetResultSize, binaryResults);
},
responseExecutor);
ListenableFuture<Response> queryResultsFuture = transform(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.slice.DynamicSliceOutput;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;

Expand All @@ -61,6 +62,7 @@
import javax.ws.rs.core.UriInfo;

import java.net.URI;
import java.util.Base64;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand All @@ -81,6 +83,7 @@
import static com.facebook.presto.execution.QueryState.WAITING_FOR_PREREQUISITES;
import static com.facebook.presto.server.protocol.QueryResourceUtil.toStatementStats;
import static com.facebook.presto.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static com.facebook.presto.spi.page.PagesSerdeUtil.writeSerializedPage;
import static com.facebook.presto.util.Failures.toFailure;
import static com.google.common.base.MoreObjects.firstNonNull;
import static com.google.common.base.Preconditions.checkArgument;
Expand All @@ -94,6 +97,7 @@
class Query
{
private static final Logger log = Logger.get(Query.class);
private static final Base64.Encoder BASE64_ENCODER = Base64.getEncoder();

private final QueryManager queryManager;
private final TransactionManager transactionManager;
Expand Down Expand Up @@ -307,7 +311,7 @@ public synchronized Set<SqlFunctionId> getRemovedSessionFunctions()
return removedSessionFunctions;
}

public synchronized ListenableFuture<QueryResults> waitForResults(long token, UriInfo uriInfo, String scheme, Duration wait, DataSize targetResultSize)
public synchronized ListenableFuture<QueryResults> waitForResults(long token, UriInfo uriInfo, String scheme, Duration wait, DataSize targetResultSize, boolean binaryResults)
{
// before waiting, check if this request has already been processed and cached
Optional<QueryResults> cachedResult = getCachedResult(token);
Expand All @@ -323,7 +327,7 @@ public synchronized ListenableFuture<QueryResults> waitForResults(long token, Ur
timeoutExecutor);

// when state changes, fetch the next result
return Futures.transform(futureStateChange, ignored -> getNextResultWithRetry(token, uriInfo, scheme, targetResultSize), resultsProcessorExecutor);
return Futures.transform(futureStateChange, ignored -> getNextResultWithRetry(token, uriInfo, scheme, targetResultSize, binaryResults), resultsProcessorExecutor);
}

private synchronized ListenableFuture<?> getFutureStateChange()
Expand Down Expand Up @@ -376,9 +380,9 @@ private synchronized Optional<QueryResults> getCachedResult(long token)
return Optional.empty();
}

private synchronized QueryResults getNextResultWithRetry(long token, UriInfo uriInfo, String scheme, DataSize targetResultSize)
private synchronized QueryResults getNextResultWithRetry(long token, UriInfo uriInfo, String scheme, DataSize targetResultSize, boolean binaryResults)
{
QueryResults queryResults = getNextResult(token, uriInfo, scheme, targetResultSize);
QueryResults queryResults = getNextResult(token, uriInfo, scheme, targetResultSize, binaryResults);
if (queryResults.getError() == null || !queryResults.getError().isRetriable()) {
return queryResults;
}
Expand Down Expand Up @@ -410,6 +414,7 @@ private synchronized QueryResults getNextResultWithRetry(long token, UriInfo uri
createRetryUri(scheme, uriInfo),
queryResults.getColumns(),
null,
null,
StatementStats.builder()
.setState(WAITING_FOR_PREREQUISITES.toString())
.setWaitingForPrerequisites(true)
Expand All @@ -420,7 +425,7 @@ private synchronized QueryResults getNextResultWithRetry(long token, UriInfo uri
queryResults.getUpdateCount());
}

private synchronized QueryResults getNextResult(long token, UriInfo uriInfo, String scheme, DataSize targetResultSize)
private synchronized QueryResults getNextResult(long token, UriInfo uriInfo, String scheme, DataSize targetResultSize, boolean binaryResults)
{
// check if the result for the token have already been created
Optional<QueryResults> cachedResult = getCachedResult(token);
Expand All @@ -442,25 +447,51 @@ private synchronized QueryResults getNextResult(long token, UriInfo uriInfo, Str
// last page is removed. If another thread observes this state before the response is cached
// the pages will be lost.
Iterable<List<Object>> data = null;
List<String> binaryData = null;
try {
ImmutableList.Builder<RowIterable> pages = ImmutableList.builder();
long bytes = 0;
long rows = 0;
long bytes = 0;
long targetResultBytes = targetResultSize.toBytes();
while (bytes < targetResultBytes) {
SerializedPage serializedPage = exchangeClient.pollPage();
if (serializedPage == null) {
break;
if (binaryResults) {
ImmutableList.Builder<String> pages = ImmutableList.builder();
while (bytes < targetResultBytes) {
SerializedPage serializedPage = exchangeClient.pollPage();
if (serializedPage == null) {
break;
}

rows += serializedPage.getPositionCount();
bytes += serializedPage.getSizeInBytes();

DynamicSliceOutput sliceOutput = new DynamicSliceOutput(1000);
writeSerializedPage(sliceOutput, serializedPage);

String encodedPage = BASE64_ENCODER.encodeToString(sliceOutput.slice().byteArray());
pages.add(encodedPage);
}
if (rows > 0) {
binaryData = pages.build();
}
}
else {
ImmutableList.Builder<RowIterable> pages = ImmutableList.builder();
while (bytes < targetResultBytes) {
SerializedPage serializedPage = exchangeClient.pollPage();
if (serializedPage == null) {
break;
}

Page page = serde.deserialize(serializedPage);
bytes += page.getLogicalSizeInBytes();
rows += page.getPositionCount();
pages.add(new RowIterable(session.toConnectorSession(), types, page));
}
if (rows > 0) {
// client implementations do not properly handle empty list of data
data = Iterables.concat(pages.build());
}

Page page = serde.deserialize(serializedPage);
bytes += page.getLogicalSizeInBytes();
rows += page.getPositionCount();
pages.add(new RowIterable(session.toConnectorSession(), types, page));
}
if (rows > 0) {
// client implementations do not properly handle empty list of data
data = Iterables.concat(pages.build());
hasProducedResult = true;
}
}
Expand Down Expand Up @@ -508,7 +539,7 @@ private synchronized QueryResults getNextResult(long token, UriInfo uriInfo, Str

URI nextResultsUri = null;
if (nextToken.isPresent()) {
nextResultsUri = createNextResultsUri(scheme, uriInfo, nextToken.getAsLong());
nextResultsUri = createNextResultsUri(scheme, uriInfo, nextToken.getAsLong(), binaryResults);
}

// update catalog, schema, and path
Expand Down Expand Up @@ -542,6 +573,7 @@ private synchronized QueryResults getNextResult(long token, UriInfo uriInfo, Str
nextResultsUri,
columns,
data,
binaryData,
toStatementStats(queryInfo),
toQueryError(queryInfo),
queryInfo.getWarnings(),
Expand Down Expand Up @@ -596,7 +628,7 @@ private ListenableFuture<?> queryDoneFuture(QueryState currentState)
return Futures.transformAsync(queryManager.getStateChange(queryId, currentState), this::queryDoneFuture, directExecutor());
}

private synchronized URI createNextResultsUri(String scheme, UriInfo uriInfo, long nextToken)
private synchronized URI createNextResultsUri(String scheme, UriInfo uriInfo, long nextToken, boolean binaryResults)
{
UriBuilder uri = uriInfo.getBaseUriBuilder()
.scheme(scheme)
Expand All @@ -605,6 +637,9 @@ private synchronized URI createNextResultsUri(String scheme, UriInfo uriInfo, lo
.path(String.valueOf(nextToken))
.replaceQuery("")
.queryParam("slug", this.slug);
if (binaryResults) {
uri.queryParam("binaryResults", "true");
}
Optional<DataSize> targetResultSize = getTargetResultSize(session);
if (targetResultSize.isPresent()) {
uri = uri.queryParam("targetResultSize", targetResultSize.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ public static Response toResponse(Query query, QueryResults queryResults, String
prependUri(queryResults.getNextUri(), xPrestoPrefixUri),
queryResults.getColumns(),
prepareJsonData(queryResults.getColumns(), queryResults.getData()),
queryResults.getBinaryData(),
queryResults.getStats(),
queryResults.getError(),
queryResults.getWarnings(),
Expand Down

0 comments on commit da04bac

Please sign in to comment.