Skip to content

Commit

Permalink
sql,roachpb: add plan hash correct value to persisted stats
Browse files Browse the repository at this point in the history
Previosuly, we didn't have the plan hash/gist values, so
a dummy value was being used instead. Now that we have the value,
this commit uses those values to be corrected stored.
The Plan Hash is saved on its own column and is part of a
statement key. A plan gist is a string saved in the metadata
and can later on converted back into a logical plan.

Partially addresses cockroachdb#72129

Release note (sql change): Saving plan hash/gist to the Statements
persisted stats.
  • Loading branch information
maryliag committed Feb 1, 2022
1 parent 7adf8b7 commit 3da0623
Show file tree
Hide file tree
Showing 19 changed files with 116 additions and 42 deletions.
6 changes: 4 additions & 2 deletions pkg/roachpb/app_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func (m *StatementStatisticsKey) FingerprintID() StmtFingerprintID {
m.Failed,
m.ImplicitTxn,
m.Database,
m.PlanHash,
)
}

Expand All @@ -34,7 +35,7 @@ func (m *StatementStatisticsKey) FingerprintID() StmtFingerprintID {
// these are the axis' we use to bucket queries for stats collection
// (see stmtKey).
func ConstructStatementFingerprintID(
anonymizedStmt string, failed bool, implicitTxn bool, database string,
anonymizedStmt string, failed bool, implicitTxn bool, database string, planHash uint64,
) StmtFingerprintID {
fnv := util.MakeFNV64()
for _, c := range anonymizedStmt {
Expand All @@ -53,6 +54,7 @@ func ConstructStatementFingerprintID(
} else {
fnv.Add('E')
}
fnv.Add(planHash)
return StmtFingerprintID(fnv.Sum())
}

Expand Down Expand Up @@ -161,13 +163,13 @@ func (s *StatementStatistics) Add(other *StatementStatistics) {
s.RowsRead.Add(other.RowsRead, s.Count, other.Count)
s.RowsWritten.Add(other.RowsWritten, s.Count, other.Count)
s.Nodes = util.CombineUniqueInt64(s.Nodes, other.Nodes)
s.PlanGists = util.CombineUniqueString(s.PlanGists, other.PlanGists)

s.ExecStats.Add(other.ExecStats)

if other.SensitiveInfo.LastErr != "" {
s.SensitiveInfo.LastErr = other.SensitiveInfo.LastErr
}

if s.SensitiveInfo.MostRecentPlanTimestamp.Before(other.SensitiveInfo.MostRecentPlanTimestamp) {
s.SensitiveInfo = other.SensitiveInfo
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/roachpb/app_stats.proto
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ message StatementStatistics {
// Nodes is the ordered list of nodes ids on which the statement was executed.
repeated int64 nodes = 24;

// plan_gists is list of a compressed version of plan that can be converted (lossily)
// back into a logical plan.
// Each statement contain only one plan gist, but the same statement fingerprint id
// can contain more than one value.
repeated string plan_gists = 26;

// Note: be sure to update `sql/app_stats.go` when adding/removing fields here!

reserved 13, 14, 17, 18, 19, 20;
Expand Down
32 changes: 21 additions & 11 deletions pkg/server/combined_statement_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ func getCombinedStatementStats(
startTime := getTimeFromSeconds(req.Start)
endTime := getTimeFromSeconds(req.End)
limit := SQLStatsResponseMax.Get(&settings.SV)
whereClause, args := getFilterAndParams(startTime, endTime, limit, testingKnobs)
statements, err := collectCombinedStatements(ctx, ie, whereClause, args)
whereClause, args := getFilterAndParams(startTime, endTime, testingKnobs)
statements, err := collectCombinedStatements(ctx, ie, whereClause, args, limit)
if err != nil {
return nil, err
}
Expand All @@ -90,7 +90,7 @@ func getCombinedStatementStats(
}

func getFilterAndParams(
start, end *time.Time, limit int64, testingKnobs *sqlstats.TestingKnobs,
start, end *time.Time, testingKnobs *sqlstats.TestingKnobs,
) (string, []interface{}) {
var args []interface{}
var buffer strings.Builder
Expand All @@ -109,17 +109,15 @@ func getFilterAndParams(
args = append(args, *end)
}

// Retrieve the top rows ordered by aggregation time and service latency.
buffer.WriteString(fmt.Sprintf(`
ORDER BY aggregated_ts DESC,(statistics -> 'statistics' -> 'svcLat' ->> 'mean')::float DESC
LIMIT $%d`, len(args)+1))
args = append(args, limit)

return buffer.String(), args
}

func collectCombinedStatements(
ctx context.Context, ie *sql.InternalExecutor, whereClause string, qargs []interface{},
ctx context.Context,
ie *sql.InternalExecutor,
whereClause string,
qargs []interface{},
limit int64,
) ([]serverpb.StatementsResponse_CollectedStatementStatistics, error) {

query := fmt.Sprintf(
Expand All @@ -133,7 +131,19 @@ func collectCombinedStatements(
sampled_plan,
aggregation_interval
FROM crdb_internal.statement_statistics
%s`, whereClause)
%s
ORDER BY aggregated_ts DESC,(statistics -> 'statistics' -> 'svcLat' ->> 'mean')::float DESC
LIMIT $%d`, whereClause, len(qargs)+1)

qargs = append(qargs, limit)

//buffer.WriteString(`GROUP BY
// aggregated_ts,
// fingerprint_id,
// transaction_fingerprint_id,
// app_name,
// aggregation_interval,
// statistics`)

const expectedNumDatums = 8

Expand Down
4 changes: 1 addition & 3 deletions pkg/sql/crdb_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -5051,10 +5051,8 @@ CREATE TABLE crdb_internal.cluster_statement_statistics (
transactionFingerprintID := tree.NewDBytes(
tree.DBytes(sqlstatsutil.EncodeUint64ToBytes(uint64(statistics.Key.TransactionFingerprintID))))

// TODO(azhng): properly update plan_hash value once we can expose it
// from the optimizer.
planHash := tree.NewDBytes(
tree.DBytes(sqlstatsutil.EncodeUint64ToBytes(0)))
tree.DBytes(sqlstatsutil.EncodeUint64ToBytes(statistics.Key.PlanHash)))

metadataJSON, err := sqlstatsutil.BuildStmtMetadataJSON(statistics)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/executor_statement_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ func (ex *connExecutor) recordStatementSummary(
FullScan: flags.IsSet(planFlagContainsFullIndexScan) || flags.IsSet(planFlagContainsFullTableScan),
Failed: stmtErr != nil,
Database: planner.SessionData().Database,
PlanHash: planner.instrumentation.planGist.Hash(),
}

// We only populate the transaction fingerprint ID field if we are in an
Expand Down Expand Up @@ -204,6 +205,7 @@ func (ex *connExecutor) recordStatementSummary(
Nodes: getNodesFromPlanner(planner),
StatementType: stmt.AST.StatementType(),
Plan: planner.instrumentation.PlanForStats(ctx),
PlanGist: planner.instrumentation.planGist.String(),
StatementError: stmtErr,
}

Expand Down
1 change: 1 addition & 0 deletions pkg/sql/instrumentation.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ func (ih *instrumentationHelper) Finish(
ImplicitTxn: ih.implicitTxn,
Database: p.SessionData().Database,
Failed: retErr != nil,
PlanHash: ih.planGist.Hash(),
}
// We populate transaction fingerprint ID if this is an implicit transaction.
// See executor_statement_metrics.go:recordStatementSummary() for further
Expand Down
13 changes: 8 additions & 5 deletions pkg/sql/instrumentation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,15 @@ func TestSampledStatsCollection(t *testing.T) {
stmt string,
implicitTxn bool,
database string,
planHash uint64,
) *roachpb.CollectedStatementStatistics {
t.Helper()
key := roachpb.StatementStatisticsKey{
Query: stmt,
ImplicitTxn: implicitTxn,
Database: database,
Failed: false,
PlanHash: planHash,
}
var stats *roachpb.CollectedStatementStatistics
require.NoError(t, server.SQLServer().(*Server).sqlStats.
Expand All @@ -65,7 +67,8 @@ func TestSampledStatsCollection(t *testing.T) {
if statistics.Key.Query == key.Query &&
statistics.Key.ImplicitTxn == key.ImplicitTxn &&
statistics.Key.Database == key.Database &&
statistics.Key.Failed == key.Failed {
statistics.Key.Failed == key.Failed &&
statistics.Key.PlanHash == key.PlanHash {
stats = statistics
}

Expand Down Expand Up @@ -127,7 +130,7 @@ func TestSampledStatsCollection(t *testing.T) {
toggleSampling(true)
queryDB(t, db, selectOrderBy)

stats := getStmtStats(t, s, selectOrderBy, true /* implicitTxn */, "defaultdb")
stats := getStmtStats(t, s, selectOrderBy, true /* implicitTxn */, "defaultdb", uint64(0))

require.Equal(t, int64(2), stats.Stats.Count, "expected to have collected two sets of general stats")
require.Equal(t, int64(1), stats.Stats.ExecStats.Count, "expected to have collected exactly one set of execution stats")
Expand All @@ -152,8 +155,8 @@ func TestSampledStatsCollection(t *testing.T) {
toggleSampling(true)
doTxn(t)

aggStats := getStmtStats(t, s, aggregation, false /* implicitTxn */, "defaultdb")
selectStats := getStmtStats(t, s, selectOrderBy, false /* implicitTxn */, "defaultdb")
aggStats := getStmtStats(t, s, aggregation, false /* implicitTxn */, "defaultdb", uint64(0))
selectStats := getStmtStats(t, s, selectOrderBy, false /* implicitTxn */, "defaultdb", uint64(0))

require.Equal(t, int64(2), aggStats.Stats.Count, "expected to have collected two sets of general stats")
require.Equal(t, int64(1), aggStats.Stats.ExecStats.Count, "expected to have collected exactly one set of execution stats")
Expand Down Expand Up @@ -190,7 +193,7 @@ func TestSampledStatsCollection(t *testing.T) {

// Make sure DEALLOCATE statements are grouped together rather than having
// one key per prepared statement name.
stats := getStmtStats(t, s, "DEALLOCATE _", true /* implicitTxn */, "defaultdb")
stats := getStmtStats(t, s, "DEALLOCATE _", true /* implicitTxn */, "defaultdb", uint64(0))

require.Equal(t, int64(2), stats.Stats.Count, "expected to have collected two sets of general stats")
require.Equal(t, int64(0), stats.Stats.ExecStats.Count, "expected to have collected zero execution stats")
Expand Down
8 changes: 4 additions & 4 deletions pkg/sql/sqlstats/persistedsqlstats/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (s *PersistedSQLStats) doFlushSingleStmtStats(

serializedFingerprintID := sqlstatsutil.EncodeUint64ToBytes(uint64(scopedStats.ID))
serializedTransactionFingerprintID := sqlstatsutil.EncodeUint64ToBytes(uint64(scopedStats.Key.TransactionFingerprintID))
serializedPlanHash := sqlstatsutil.EncodeUint64ToBytes(uint64(dummyPlanHash))
serializedPlanHash := sqlstatsutil.EncodeUint64ToBytes(scopedStats.Key.PlanHash)

insertFn := func(ctx context.Context, txn *kv.Txn) (alreadyExists bool, err error) {
rowsAffected, err := s.insertStatementStats(
Expand Down Expand Up @@ -417,7 +417,7 @@ WHERE fingerprint_id = $2
"plan_hash: %d, "+
"node_id: %d",
serializedFingerprintID, serializedTransactionFingerprintID, stats.Key.App,
aggregatedTs, dummyPlanHash, s.cfg.SQLIDContainer.SQLInstanceID())
aggregatedTs, serializedPlanHash, s.cfg.SQLIDContainer.SQLInstanceID())
}

return nil
Expand Down Expand Up @@ -581,12 +581,12 @@ FOR UPDATE
if row == nil {
return errors.AssertionFailedf(
"statement statistics not found fingerprint_id: %s, app: %s, aggregated_ts: %s, plan_hash: %d, node_id: %d",
serializedFingerprintID, key.App, aggregatedTs, dummyPlanHash, s.cfg.SQLIDContainer.SQLInstanceID())
serializedFingerprintID, key.App, aggregatedTs, serializedPlanHash, s.cfg.SQLIDContainer.SQLInstanceID())
}

if len(row) != 1 {
return errors.AssertionFailedf("unexpectedly found %d returning columns for fingerprint_id: %s, app: %s, aggregated_ts: %s, plan_hash %d, node_id: %d",
len(row), serializedFingerprintID, key.App, aggregatedTs, dummyPlanHash,
len(row), serializedFingerprintID, key.App, aggregatedTs, serializedPlanHash,
s.cfg.SQLIDContainer.SQLInstanceID())
}

Expand Down
8 changes: 0 additions & 8 deletions pkg/sql/sqlstats/persistedsqlstats/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,6 @@ import (
"github.com/cockroachdb/errors"
)

const (
// TODO(azhng): currently we do not have the ability to compute a hash for
// query plan. This is currently being worked on by the SQL Queries team.
// Once we are able get consistent hash value from a query plan, we should
// update this.
dummyPlanHash = int64(0)
)

// ErrConcurrentSQLStatsCompaction is reported when two sql stats compaction
// jobs are issued concurrently. This is a sentinel error.
var ErrConcurrentSQLStatsCompaction = errors.New("another sql stats compaction job is already running")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ func TestSQLStatsJsonEncoding(t *testing.T) {
"failed": {{.Bool}},
"implicitTxn": {{.Bool}},
"vec": {{.Bool}},
"fullScan": {{.Bool}}
"fullScan": {{.Bool}},
"planGists": {{.StringArray}}
}
`

Expand Down
34 changes: 34 additions & 0 deletions pkg/sql/sqlstats/persistedsqlstats/sqlstatsutil/json_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ func (s *stmtStatsMetadata) jsonFields() jsonFields {
{"implicitTxn", (*jsonBool)(&s.Key.ImplicitTxn)},
{"vec", (*jsonBool)(&s.Key.Vec)},
{"fullScan", (*jsonBool)(&s.Key.FullScan)},
{"planGists", (*stringArray)(&s.Stats.PlanGists)},
}
}

Expand Down Expand Up @@ -135,6 +136,39 @@ func (a *int64Array) encodeJSON() (json.JSON, error) {
return builder.Build(), nil
}

type stringArray []string

func (a *stringArray) decodeJSON(js json.JSON) error {
arrLen := js.Len()
for i := 0; i < arrLen; i++ {
var value jsonString
valJSON, err := js.FetchValIdx(i)
if err != nil {
return err
}
if err := value.decodeJSON(valJSON); err != nil {
return err
}
*a = append(*a, string(value))
}

return nil
}

func (a *stringArray) encodeJSON() (json.JSON, error) {
builder := json.NewArrayBuilder(len(*a))

for _, value := range *a {
jsVal, err := (*jsonString)(&value).encodeJSON()
if err != nil {
return nil, err
}
builder.Add(jsVal)
}

return builder.Build(), nil
}

type stmtFingerprintIDArray []roachpb.StmtFingerprintID

func (s *stmtFingerprintIDArray) decodeJSON(js json.JSON) error {
Expand Down
26 changes: 19 additions & 7 deletions pkg/sql/sqlstats/persistedsqlstats/sqlstatsutil/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,13 @@ func GetRandomizedCollectedStatementStatisticsForTest(
}

type randomData struct {
Bool bool
String string
Int64 int64
Float float64
IntArray []int64
Time time.Time
Bool bool
String string
Int64 int64
Float float64
IntArray []int64
StringArray []string
Time time.Time
}

var alphabet = []rune("abcdefghijklmkopqrstuvwxyz")
Expand All @@ -57,11 +58,22 @@ func genRandomData() randomData {
}
r.String = b.String()

// Generate a randomized array of length 5.
arrLen := 5
r.StringArray = make([]string, arrLen)
for i := 0; i < arrLen; i++ {
// Randomly generating 10-character string.
b := strings.Builder{}
for i := 0; i < 10; i++ {
b.WriteRune(alphabet[rand.Intn(26)])
}
r.StringArray[i] = b.String()
}

r.Int64 = rand.Int63()
r.Float = rand.Float64()

// Generate a randomized array of length 5.
arrLen := 5
r.IntArray = make([]int64, arrLen)
for i := 0; i < arrLen; i++ {
r.IntArray[i] = rand.Int63()
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/sqlstats/ssmemstorage/ss_mem_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ func (s *StmtStatsIterator) Next() bool {
Failed: stmtKey.failed,
App: s.container.appName,
Database: database,
PlanHash: stmtKey.planHash,
TransactionFingerprintID: stmtKey.transactionFingerprintID,
},
ID: stmtFingerprintID,
Expand Down
Loading

0 comments on commit 3da0623

Please sign in to comment.