Skip to content

Commit

Permalink
server: add limit and sort capabilities to sql stats requests
Browse files Browse the repository at this point in the history
This commit adds limit and sort fields to the combined
statements request. These params can be used to specify
how much data is returned and the priority at which to
return data (e.g. top-k). The current sort options are:
- service latency
- contention time
- execution count

Epic: none
Part of: cockroachdb#97876
Part of: cockroachdb#97875

Release note: None
  • Loading branch information
xinhaoz committed Mar 23, 2023
1 parent 0c3b492 commit 3494060
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 40 deletions.
144 changes: 104 additions & 40 deletions pkg/server/combined_statement_stats.go
Expand Up @@ -69,20 +69,17 @@ func getCombinedStatementStats(
settings *cluster.Settings,
testingKnobs *sqlstats.TestingKnobs,
) (*serverpb.StatementsResponse, error) {
startTime := getTimeFromSeconds(req.Start)
endTime := getTimeFromSeconds(req.End)
limit := SQLStatsResponseMax.Get(&settings.SV)
showInternal := SQLStatsShowInternal.Get(&settings.SV)

whereClause, orderAndLimit, args := getCombinedStatementsQueryClausesAndArgs(
startTime, endTime, limit, testingKnobs, showInternal)
req, testingKnobs, showInternal, settings)

var statements []serverpb.StatementsResponse_CollectedStatementStatistics
var transactions []serverpb.StatementsResponse_ExtendedCollectedTransactionStatistics
var err error

if req.FetchMode == nil || req.FetchMode.StatsType == serverpb.CombinedStatementsStatsRequest_TxnStatsOnly {
transactions, err = collectCombinedTransactions(ctx, ie, whereClause, args, orderAndLimit)
transactions, err = collectCombinedTransactions(ctx, ie, whereClause, args, orderAndLimit, testingKnobs)
if err != nil {
return nil, serverError(ctx, err)
}
Expand All @@ -96,7 +93,8 @@ func getCombinedStatementStats(
whereClause, args = buildWhereClauseForStmtsByTxn(req, transactions, testingKnobs)
}

statements, err = collectCombinedStatements(ctx, ie, whereClause, args, orderAndLimit, settings)
statements, err = collectCombinedStatements(ctx, ie, whereClause, args, orderAndLimit, settings, testingKnobs)

if err != nil {
return nil, serverError(ctx, err)
}
Expand All @@ -111,6 +109,39 @@ func getCombinedStatementStats(
return response, nil
}

// Common stmt and txn columns to sort on.
const (
sortSvcLatDesc = `(statistics -> 'statistics' -> 'svcLat' ->> 'mean')::FLOAT DESC`
sortExecCountDesc = `(statistics -> 'statistics' ->> 'cnt')::INT DESC`
sortContentionTimeDesc = `(statistics -> 'execution_statistics' -> 'contentionTime' ->> 'mean')::FLOAT DESC`
)

func getStmtColumnFromSortOption(sort serverpb.StatsSortOptions) string {
switch sort {
case serverpb.StatsSortOptions_SERVICE_LAT:
return sortSvcLatDesc
case serverpb.StatsSortOptions_EXECUTION_COUNT:
return sortExecCountDesc
case serverpb.StatsSortOptions_CONTENTION_TIME:
return sortContentionTimeDesc
default:
return sortSvcLatDesc
}
}

func getTxnColumnFromSortOption(sort serverpb.StatsSortOptions) string {
switch sort {
case serverpb.StatsSortOptions_SERVICE_LAT:
return sortSvcLatDesc
case serverpb.StatsSortOptions_EXECUTION_COUNT:
return sortExecCountDesc
case serverpb.StatsSortOptions_CONTENTION_TIME:
return sortContentionTimeDesc
default:
return sortSvcLatDesc
}
}

// buildWhereClauseForStmtsByTxn builds the where clause to get the statement
// stats based on a list of transactions. The list of transactions provided must
// contain no duplicate transaction fingerprint ids.
Expand Down Expand Up @@ -156,7 +187,10 @@ func buildWhereClauseForStmtsByTxn(
// The whereClause will be in the format `WHERE A = $1 AND B = $2` and
// args will return the list of arguments in order that will replace the actual values.
func getCombinedStatementsQueryClausesAndArgs(
start, end *time.Time, limit int64, testingKnobs *sqlstats.TestingKnobs, showInternal bool,
req *serverpb.CombinedStatementsStatsRequest,
testingKnobs *sqlstats.TestingKnobs,
showInternal bool,
settings *cluster.Settings,
) (whereClause string, orderAndLimitClause string, args []interface{}) {
var buffer strings.Builder
buffer.WriteString(testingKnobs.GetAOSTClause())
Expand All @@ -171,17 +205,37 @@ func getCombinedStatementsQueryClausesAndArgs(
catconstants.DelegatedAppNamePrefix))
}

if start != nil {
// Add start and end filters from request.
startTime := getTimeFromSeconds(req.Start)
endTime := getTimeFromSeconds(req.End)
if startTime != nil {
buffer.WriteString(" AND aggregated_ts >= $1")
args = append(args, *start)
args = append(args, *startTime)
}

if end != nil {
args = append(args, *end)
if endTime != nil {
args = append(args, *endTime)
buffer.WriteString(fmt.Sprintf(" AND aggregated_ts <= $%d", len(args)))
}

// Add LIMIT from request.
limit := req.Limit
if limit == 0 {
limit = SQLStatsResponseMax.Get(&settings.SV)
}
args = append(args, limit)
orderAndLimitClause = fmt.Sprintf(` ORDER BY aggregated_ts DESC LIMIT $%d`, len(args))

// Determine sort column.
var col string
if req.FetchMode == nil {
col = "fingerprint_id"
} else if req.FetchMode.StatsType == serverpb.CombinedStatementsStatsRequest_StmtStatsOnly {
col = getStmtColumnFromSortOption(req.FetchMode.Sort)
} else if req.FetchMode.StatsType == serverpb.CombinedStatementsStatsRequest_TxnStatsOnly {
col = getTxnColumnFromSortOption(req.FetchMode.Sort)
}

orderAndLimitClause = fmt.Sprintf(` ORDER BY %s LIMIT $%d`, col, len(args))

return buffer.String(), orderAndLimitClause, args
}
Expand All @@ -193,26 +247,32 @@ func collectCombinedStatements(
args []interface{},
orderAndLimit string,
settings *cluster.Settings,
testingKnobs *sqlstats.TestingKnobs,
) ([]serverpb.StatementsResponse_CollectedStatementStatistics, error) {
table := "crdb_internal.statement_statistics_persisted"
if !settings.Version.IsActive(ctx, clusterversion.AlterSystemStatementStatisticsAddIndexRecommendations) {
table = "crdb_internal.statement_statistics_v22_1"
}
query := fmt.Sprintf(
`SELECT
fingerprint_id,
transaction_fingerprint_id,
app_name,
max(aggregated_ts) as aggregated_ts,
metadata,
crdb_internal.merge_statement_stats(array_agg(statistics)) AS statistics

aostClause := testingKnobs.GetAOSTClause()

query := fmt.Sprintf(`
SELECT * FROM (
SELECT
fingerprint_id,
transaction_fingerprint_id,
app_name,
max(aggregated_ts) as aggregated_ts,
metadata,
crdb_internal.merge_statement_stats(array_agg(statistics)) AS statistics
FROM %s %s
GROUP BY
fingerprint_id,
transaction_fingerprint_id,
app_name,
metadata
%s`, table, whereClause, orderAndLimit)
GROUP BY
fingerprint_id,
transaction_fingerprint_id,
app_name,
metadata
) %s
%s`, table, whereClause, aostClause, orderAndLimit)

const expectedNumDatums = 6

Expand Down Expand Up @@ -298,21 +358,25 @@ func collectCombinedTransactions(
whereClause string,
args []interface{},
orderAndLimit string,
testingKnobs *sqlstats.TestingKnobs,
) ([]serverpb.StatementsResponse_ExtendedCollectedTransactionStatistics, error) {

query := fmt.Sprintf(
`SELECT
app_name,
max(aggregated_ts) as aggregated_ts,
fingerprint_id,
metadata,
crdb_internal.merge_transaction_stats(array_agg(statistics)) AS statistics
FROM crdb_internal.transaction_statistics_persisted %s
GROUP BY
app_name,
fingerprint_id,
metadata
%s`, whereClause, orderAndLimit)
aostClause := testingKnobs.GetAOSTClause()

query := fmt.Sprintf(`
SELECT * FROM (
SELECT
app_name,
max(aggregated_ts) as aggregated_ts,
fingerprint_id,
metadata,
crdb_internal.merge_transaction_stats(array_agg(statistics)) AS statistics
FROM crdb_internal.transaction_statistics_persisted %s
GROUP BY
app_name,
fingerprint_id,
metadata
) %s
%s`, whereClause, aostClause, orderAndLimit)

const expectedNumDatums = 5

Expand Down
10 changes: 10 additions & 0 deletions pkg/server/serverpb/status.proto
Expand Up @@ -1562,6 +1562,13 @@ message StatementsResponse {
repeated ExtendedCollectedTransactionStatistics transactions = 5 [(gogoproto.nullable) = false];
}

enum StatsSortOptions {
SERVICE_LAT = 0;
reserved 1; // This is for CPU Time in 23.1
EXECUTION_COUNT = 2;
reserved 3; // This is for P99 in 23.1
CONTENTION_TIME = 4;
}
message CombinedStatementsStatsRequest {
enum StatsType {
StmtStatsOnly = 0;
Expand All @@ -1570,6 +1577,7 @@ message CombinedStatementsStatsRequest {

message FetchMode {
StatsType stats_type = 1;
StatsSortOptions sort = 2;
}

// Unix time range for aggregated statements.
Expand All @@ -1588,6 +1596,8 @@ message CombinedStatementsStatsRequest {
// TODO (xinhaoz) - Split this API into stmts and txns properly instead of using
// this param.
FetchMode fetch_mode = 5 [(gogoproto.nullable) = true];

int64 limit = 6;
}

// StatementDetailsRequest requests the details of a Statement, based on its keys.
Expand Down
4 changes: 4 additions & 0 deletions pkg/server/statements.go
Expand Up @@ -28,6 +28,10 @@ func (s *statusServer) Statements(
combinedRequest := serverpb.CombinedStatementsStatsRequest{
Start: req.Start,
End: req.End,
<<<<<<< HEAD
=======
Limit: req.Limit,
>>>>>>> 20292009d1f (server: add limit and sort capabilities to sql stats requests)
}
return s.CombinedStatementStats(ctx, &combinedRequest)
}
Expand Down

0 comments on commit 3494060

Please sign in to comment.