Skip to content

Commit

Permalink
Add Prepared statement sql to query lifecycle events
Browse files Browse the repository at this point in the history
The commit makes two changes at a high level.
(1) Modifies QueryInfo and BasicQueryInfo for EXECUTE statements to include
    sql query from the corresponding PREPARE.
(2) Update QueryMetadata to include the prepared sql for QueryCreated and
    QueryCompleted events.

This partly fixes:  prestodb/presto#12009
  • Loading branch information
sshardool authored and martint committed Jun 5, 2019
1 parent 179ffe4 commit 11d3734
Show file tree
Hide file tree
Showing 23 changed files with 149 additions and 14 deletions.
Expand Up @@ -96,6 +96,7 @@ public DispatchQuery createDispatchQuery(
WarningCollector warningCollector = warningCollectorFactory.create();
QueryStateMachine stateMachine = QueryStateMachine.begin(
query,
preparedQuery.getPrepareSql(),
session,
locationFactory.createQueryLocation(session.getQueryId()),
resourceGroup,
Expand Down
Expand Up @@ -125,6 +125,7 @@ public void queryCreatedEvent(BasicQueryInfo queryInfo)
queryInfo.getQueryId().toString(),
queryInfo.getSession().getTransactionId().map(TransactionId::toString),
queryInfo.getQuery(),
queryInfo.getPreparedQuery(),
QUEUED.toString(),
queryInfo.getSelf(),
Optional.empty(),
Expand All @@ -138,6 +139,7 @@ public void queryImmediateFailureEvent(BasicQueryInfo queryInfo, ExecutionFailur
queryInfo.getQueryId().toString(),
queryInfo.getSession().getTransactionId().map(TransactionId::toString),
queryInfo.getQuery(),
queryInfo.getPreparedQuery(),
queryInfo.getState().toString(),
queryInfo.getSelf(),
Optional.empty(),
Expand Down Expand Up @@ -205,6 +207,7 @@ private QueryMetadata createQueryMetadata(QueryInfo queryInfo)
queryInfo.getQueryId().toString(),
queryInfo.getSession().getTransactionId().map(TransactionId::toString),
queryInfo.getQuery(),
queryInfo.getPreparedQuery(),
queryInfo.getState().toString(),
queryInfo.getSelf(),
createTextQueryPlan(queryInfo),
Expand Down
10 changes: 10 additions & 0 deletions presto-main/src/main/java/io/prestosql/execution/QueryInfo.java
Expand Up @@ -52,6 +52,7 @@ public class QueryInfo
private final URI self;
private final List<String> fieldNames;
private final String query;
private final Optional<String> preparedQuery;
private final QueryStats queryStats;
private final Optional<String> setCatalog;
private final Optional<String> setSchema;
Expand Down Expand Up @@ -84,6 +85,7 @@ public QueryInfo(
@JsonProperty("self") URI self,
@JsonProperty("fieldNames") List<String> fieldNames,
@JsonProperty("query") String query,
@JsonProperty("preparedQuery") Optional<String> preparedQuery,
@JsonProperty("queryStats") QueryStats queryStats,
@JsonProperty("setCatalog") Optional<String> setCatalog,
@JsonProperty("setSchema") Optional<String> setSchema,
Expand Down Expand Up @@ -120,6 +122,7 @@ public QueryInfo(
requireNonNull(deallocatedPreparedStatements, "deallocatedPreparedStatements is null");
requireNonNull(startedTransactionId, "startedTransactionId is null");
requireNonNull(query, "query is null");
requireNonNull(preparedQuery, "preparedQuery is null");
requireNonNull(outputStage, "outputStage is null");
requireNonNull(inputs, "inputs is null");
requireNonNull(output, "output is null");
Expand All @@ -134,6 +137,7 @@ public QueryInfo(
this.self = self;
this.fieldNames = ImmutableList.copyOf(fieldNames);
this.query = query;
this.preparedQuery = preparedQuery;
this.queryStats = queryStats;
this.setCatalog = setCatalog;
this.setSchema = setSchema;
Expand Down Expand Up @@ -205,6 +209,12 @@ public String getQuery()
return query;
}

@JsonProperty
public Optional<String> getPreparedQuery()
{
return preparedQuery;
}

@JsonProperty
public QueryStats getQueryStats()
{
Expand Down
29 changes: 16 additions & 13 deletions presto-main/src/main/java/io/prestosql/execution/QueryPreparer.java
Expand Up @@ -59,7 +59,13 @@ public PreparedQuery prepareQuery(Session session, String query)
public PreparedQuery prepareQuery(Session session, Statement wrappedStatement)
throws ParsingException, PrestoException, SemanticException
{
Statement statement = unwrapExecuteStatement(wrappedStatement, sqlParser, session);
Statement statement = wrappedStatement;
Optional<String> prepareSql = Optional.empty();
if (statement instanceof Execute) {
prepareSql = Optional.of(session.getPreparedStatementFromExecute((Execute) statement));
statement = sqlParser.createStatement(prepareSql.get(), createParsingOptions(session));
}

if (statement instanceof Explain && ((Explain) statement).isAnalyze()) {
Statement innerStatement = ((Explain) statement).getStatement();
Optional<QueryType> innerQueryType = StatementUtils.getQueryType(innerStatement.getClass());
Expand All @@ -72,17 +78,7 @@ public PreparedQuery prepareQuery(Session session, Statement wrappedStatement)
parameters = ((Execute) wrappedStatement).getParameters();
}
validateParameters(statement, parameters);
return new PreparedQuery(statement, parameters);
}

private static Statement unwrapExecuteStatement(Statement statement, SqlParser sqlParser, Session session)
{
if (!(statement instanceof Execute)) {
return statement;
}

String sql = session.getPreparedStatementFromExecute((Execute) statement);
return sqlParser.createStatement(sql, createParsingOptions(session));
return new PreparedQuery(statement, parameters, prepareSql);
}

private static void validateParameters(Statement node, List<Expression> parameterValues)
Expand All @@ -100,11 +96,13 @@ public static class PreparedQuery
{
private final Statement statement;
private final List<Expression> parameters;
private final Optional<String> prepareSql;

public PreparedQuery(Statement statement, List<Expression> parameters)
public PreparedQuery(Statement statement, List<Expression> parameters, Optional<String> prepareSql)
{
this.statement = requireNonNull(statement, "statement is null");
this.parameters = ImmutableList.copyOf(requireNonNull(parameters, "parameters is null"));
this.prepareSql = requireNonNull(prepareSql, "prepareSql is null");
}

public Statement getStatement()
Expand All @@ -116,5 +114,10 @@ public List<Expression> getParameters()
{
return parameters;
}

public Optional<String> getPrepareSql()
{
return prepareSql;
}
}
}
Expand Up @@ -96,6 +96,7 @@ public class QueryStateMachine

private final QueryId queryId;
private final String query;
private final Optional<String> preparedQuery;
private final Session session;
private final URI self;
private final ResourceGroupId resourceGroup;
Expand Down Expand Up @@ -150,6 +151,7 @@ public class QueryStateMachine

private QueryStateMachine(
String query,
Optional<String> preparedQuery,
Session session,
URI self,
ResourceGroupId resourceGroup,
Expand All @@ -160,6 +162,7 @@ private QueryStateMachine(
WarningCollector warningCollector)
{
this.query = requireNonNull(query, "query is null");
this.preparedQuery = requireNonNull(preparedQuery, "preparedQuery is null");
this.session = requireNonNull(session, "session is null");
this.queryId = session.getQueryId();
this.self = requireNonNull(self, "self is null");
Expand All @@ -179,6 +182,7 @@ private QueryStateMachine(
*/
public static QueryStateMachine begin(
String query,
Optional<String> preparedQuery,
Session session,
URI self,
ResourceGroupId resourceGroup,
Expand All @@ -191,6 +195,7 @@ public static QueryStateMachine begin(
{
return beginWithTicker(
query,
preparedQuery,
session,
self,
resourceGroup,
Expand All @@ -205,6 +210,7 @@ public static QueryStateMachine begin(

static QueryStateMachine beginWithTicker(
String query,
Optional<String> preparedQuery,
Session session,
URI self,
ResourceGroupId resourceGroup,
Expand All @@ -225,6 +231,7 @@ static QueryStateMachine beginWithTicker(

QueryStateMachine queryStateMachine = new QueryStateMachine(
query,
preparedQuery,
session,
self,
resourceGroup,
Expand Down Expand Up @@ -356,6 +363,7 @@ public BasicQueryInfo getBasicQueryInfo(Optional<BasicStageStats> rootStage)
stageStats.isScheduled(),
self,
query,
preparedQuery,
queryStats,
errorCode == null ? null : errorCode.getType(),
errorCode);
Expand Down Expand Up @@ -391,6 +399,7 @@ QueryInfo getQueryInfo(Optional<StageInfo> rootStage)
self,
outputManager.getQueryOutputInfo().map(QueryOutputInfo::getColumnNames).orElse(ImmutableList.of()),
query,
preparedQuery,
getQueryStats(rootStage),
Optional.ofNullable(setCatalog.get()),
Optional.ofNullable(setSchema.get()),
Expand Down Expand Up @@ -985,6 +994,7 @@ public void pruneQueryInfo()
queryInfo.getSelf(),
queryInfo.getFieldNames(),
queryInfo.getQuery(),
queryInfo.getPreparedQuery(),
pruneQueryStats(queryInfo.getQueryStats()),
queryInfo.getSetCatalog(),
queryInfo.getSetSchema(),
Expand Down
11 changes: 11 additions & 0 deletions presto-main/src/main/java/io/prestosql/server/BasicQueryInfo.java
Expand Up @@ -52,6 +52,7 @@ public class BasicQueryInfo
private final boolean scheduled;
private final URI self;
private final String query;
private final Optional<String> preparedQuery;
private final BasicQueryStats queryStats;
private final ErrorType errorType;
private final ErrorCode errorCode;
Expand All @@ -66,6 +67,7 @@ public BasicQueryInfo(
@JsonProperty("scheduled") boolean scheduled,
@JsonProperty("self") URI self,
@JsonProperty("query") String query,
@JsonProperty("preparedQuery") Optional<String> preparedQuery,
@JsonProperty("queryStats") BasicQueryStats queryStats,
@JsonProperty("errorType") ErrorType errorType,
@JsonProperty("errorCode") ErrorCode errorCode)
Expand All @@ -80,6 +82,7 @@ public BasicQueryInfo(
this.scheduled = scheduled;
this.self = requireNonNull(self, "self is null");
this.query = requireNonNull(query, "query is null");
this.preparedQuery = requireNonNull(preparedQuery, "preparedQuery is null");
this.queryStats = requireNonNull(queryStats, "queryStats is null");
}

Expand All @@ -93,6 +96,7 @@ public BasicQueryInfo(QueryInfo queryInfo)
queryInfo.isScheduled(),
queryInfo.getSelf(),
queryInfo.getQuery(),
queryInfo.getPreparedQuery(),
new BasicQueryStats(queryInfo.getQueryStats()),
queryInfo.getErrorType(),
queryInfo.getErrorCode());
Expand All @@ -109,6 +113,7 @@ public static BasicQueryInfo immediateFailureQueryInfo(Session session, String q
false,
self,
query,
Optional.empty(),
immediateFailureQueryStats(),
errorCode == null ? null : errorCode.getType(),
errorCode);
Expand Down Expand Up @@ -162,6 +167,12 @@ public String getQuery()
return query;
}

@JsonProperty
public Optional<String> getPreparedQuery()
{
return preparedQuery;
}

@JsonProperty
public BasicQueryStats getQueryStats()
{
Expand Down
Expand Up @@ -230,6 +230,7 @@ private static QueryInfo toFullQueryInfo(DispatchQuery query)
info.getSelf(),
ImmutableList.of(),
info.getQuery(),
info.getPreparedQuery(),
queryStats,
Optional.empty(),
Optional.empty(),
Expand Down
Expand Up @@ -106,6 +106,7 @@ public BasicQueryInfo getBasicQueryInfo()
!state.isDone(),
URI.create("http://test"),
"SELECT 1",
Optional.empty(),
new BasicQueryStats(
new DateTime(1),
new DateTime(2),
Expand Down
Expand Up @@ -126,6 +126,7 @@ private QueryStateMachine createQueryStateMachine(String query, Session session,
{
return QueryStateMachine.begin(
query,
Optional.empty(),
session,
URI.create("fake://uri"),
new ResourceGroupId("test"),
Expand Down
Expand Up @@ -29,6 +29,7 @@
import org.testng.annotations.Test;

import java.net.URI;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;

Expand Down Expand Up @@ -83,6 +84,7 @@ private Set<String> executeDeallocate(String statementName, String sqlString, Se
AccessControl accessControl = new AccessControlManager(transactionManager);
QueryStateMachine stateMachine = QueryStateMachine.begin(
sqlString,
Optional.empty(),
session,
URI.create("fake://uri"),
new ResourceGroupId("test"),
Expand Down
Expand Up @@ -34,6 +34,7 @@

import java.net.URI;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;

import static io.airlift.concurrent.Threads.daemonThreadsNamed;
Expand Down Expand Up @@ -104,6 +105,7 @@ private Map<String, String> executePrepare(String statementName, Statement state
TransactionManager transactionManager = createTestTransactionManager();
QueryStateMachine stateMachine = QueryStateMachine.begin(
sqlString,
Optional.empty(),
session,
URI.create("fake://uri"),
new ResourceGroupId("test"),
Expand Down
Expand Up @@ -494,6 +494,7 @@ private QueryStateMachine createQueryStateMachineWithTicker(Ticker ticker)
AccessControl accessControl = new AccessControlManager(transactionManager);
QueryStateMachine stateMachine = QueryStateMachine.beginWithTicker(
QUERY,
Optional.empty(),
TEST_SESSION,
LOCATION,
new ResourceGroupId("test"),
Expand Down
Expand Up @@ -37,6 +37,7 @@
import org.testng.annotations.Test;

import java.net.URI;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;

Expand Down Expand Up @@ -105,6 +106,7 @@ public void test()

QueryStateMachine stateMachine = QueryStateMachine.begin(
"reset foo",
Optional.empty(),
session,
URI.create("fake://uri"),
new ResourceGroupId("test"),
Expand Down
Expand Up @@ -117,6 +117,7 @@ private QueryStateMachine createQueryStateMachine(String query, Session session,
{
return QueryStateMachine.begin(
query,
Optional.empty(),
session,
URI.create("fake://uri"),
new ResourceGroupId("test"),
Expand Down
Expand Up @@ -105,6 +105,7 @@ private QueryStateMachine createQueryStateMachine(String query)
{
return QueryStateMachine.begin(
query,
Optional.empty(),
TEST_SESSION,
URI.create("fake://uri"),
new ResourceGroupId("test"),
Expand Down
Expand Up @@ -107,6 +107,7 @@ private void assertSetRole(String statement, Map<String, SelectedRole> expected)
SetRole setRole = (SetRole) parser.createStatement(statement);
QueryStateMachine stateMachine = QueryStateMachine.begin(
statement,
Optional.empty(),
testSessionBuilder()
.setCatalog(CATALOG_NAME)
.build(),
Expand Down
Expand Up @@ -46,6 +46,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;

import static io.airlift.concurrent.MoreFutures.getFutureValue;
Expand Down Expand Up @@ -180,6 +181,7 @@ private void testSetSessionWithParameters(String property, Expression expression
QualifiedName qualifiedPropName = QualifiedName.of(CATALOG_NAME, property);
QueryStateMachine stateMachine = QueryStateMachine.begin(
format("set %s = 'old_value'", qualifiedPropName),
Optional.empty(),
TEST_SESSION,
URI.create("fake://uri"),
new ResourceGroupId("test"),
Expand Down
Expand Up @@ -260,6 +260,7 @@ private QueryStateMachine createQueryStateMachine(String query, Session session,
{
return QueryStateMachine.begin(
query,
Optional.empty(),
session,
URI.create("fake://uri"),
new ResourceGroupId("test"),
Expand Down

0 comments on commit 11d3734

Please sign in to comment.