Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track queries using Postgres internal query ID #339

Closed
wants to merge 16 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions input/postgres/backends.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ import (
"github.com/pganalyze/collector/util"
)

const activitySQLDefaultOptionalFields = "waiting, NULL, NULL, NULL, NULL, NULL"
const activitySQLpg94OptionalFields = "waiting, backend_xid, backend_xmin, NULL, NULL, NULL"
const activitySQLpg96OptionalFields = `COALESCE(wait_event_type, '') = 'Lock' as waiting, backend_xid, backend_xmin, wait_event_type, wait_event, NULL`
const activitySQLpg10OptionalFields = `COALESCE(wait_event_type, '') = 'Lock' as waiting, backend_xid, backend_xmin, wait_event_type, wait_event, backend_type`
const activitySQLDefaultOptionalFields = "waiting, NULL, NULL, NULL, NULL, NULL, NULL"
const activitySQLpg94OptionalFields = "waiting, backend_xid, backend_xmin, NULL, NULL, NULL, NULL"
const activitySQLpg96OptionalFields = `COALESCE(wait_event_type, '') = 'Lock' as waiting, backend_xid, backend_xmin, wait_event_type, wait_event, NULL, NULL`
const activitySQLpg10OptionalFields = `COALESCE(wait_event_type, '') = 'Lock' as waiting, backend_xid, backend_xmin, wait_event_type, wait_event, backend_type, NULL`
const activitySQLpg14OptionalFields = `COALESCE(wait_event_type, '') = 'Lock' as waiting, backend_xid, backend_xmin, wait_event_type, wait_event, backend_type, query_id`

const pgBlockingPidsField = `
CASE
Expand All @@ -33,7 +34,9 @@ func GetBackends(logger *util.Logger, db *sql.DB, postgresVersion state.Postgres
var blockingPidsField string
var sourceTable string

if postgresVersion.Numeric >= state.PostgresVersion10 {
if postgresVersion.Numeric >= state.PostgresVersion14 {
optionalFields = activitySQLpg14OptionalFields
} else if postgresVersion.Numeric >= state.PostgresVersion10 {
optionalFields = activitySQLpg10OptionalFields
} else if postgresVersion.Numeric >= state.PostgresVersion96 {
optionalFields = activitySQLpg96OptionalFields
Expand Down Expand Up @@ -78,7 +81,7 @@ func GetBackends(logger *util.Logger, db *sql.DB, postgresVersion state.Postgres
&row.ClientPort, &row.BackendStart, &row.XactStart, &row.QueryStart,
&row.StateChange, &row.Waiting, &row.BackendXid, &row.BackendXmin,
&row.WaitEventType, &row.WaitEvent, &row.BackendType, pq.Array(&row.BlockedByPids),
&row.State, &row.Query)
&row.QueryID, &row.State, &row.Query)
if err != nil {
return nil, err
}
Expand Down
15 changes: 15 additions & 0 deletions input/postgres/statements.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"hash/fnv"
"strings"
"time"

"github.com/guregu/null"
"github.com/lib/pq"
Expand Down Expand Up @@ -234,6 +235,20 @@ func GetStatements(server *state.Server, logger *util.Logger, db *sql.DB, global
if showtext {
statementTexts[key] = receivedQuery.String
}
if queryID.Valid && showtext {
if server.PrevState.QueryIdentities == nil {
server.PrevState.QueryIdentities = make(state.QueryIdentityMap)
}
seanlinsley marked this conversation as resolved.
Show resolved Hide resolved
if identity, ok := server.PrevState.QueryIdentities[queryID.Int64]; ok {
identity.LastSeen = time.Now()
} else {
seanlinsley marked this conversation as resolved.
Show resolved Hide resolved
server.PrevState.QueryIdentities[queryID.Int64] = state.QueryIdentity{
QueryID: queryID.Int64,
seanlinsley marked this conversation as resolved.
Show resolved Hide resolved
Fingerprint: util.FingerprintQuery(receivedQuery.String, server.Config.FilterQueryText, -1),
LastSeen: time.Now(),
}
seanlinsley marked this conversation as resolved.
Show resolved Hide resolved
}
}
if ignoreIOTiming(postgresVersion, receivedQuery) {
stats.BlkReadTime = 0
stats.BlkWriteTime = 0
Expand Down
1 change: 1 addition & 0 deletions output/transform/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func ActivityStateToCompactActivitySnapshot(server *state.Server, activityState
r.QueryInformations,
b.RoleIdx,
b.DatabaseIdx,
backend.QueryID,
backend.Query.String,
activityState.TrackActivityQuerySize,
)
Expand Down
3 changes: 3 additions & 0 deletions output/transform/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"

"github.com/golang/protobuf/ptypes"
"github.com/guregu/null"
snapshot "github.com/pganalyze/collector/output/pganalyze_collector"
"github.com/pganalyze/collector/state"
uuid "github.com/satori/go.uuid"
Expand Down Expand Up @@ -45,6 +46,7 @@ func transformPostgresQuerySamples(server *state.Server, s snapshot.CompactLogSn
r.QueryInformations,
roleIdx,
databaseIdx,
null.NewInt(0, false),
sampleIn.Query,
-1,
)
Expand Down Expand Up @@ -182,6 +184,7 @@ func transformSystemLogLine(server *state.Server, r *snapshot.CompactSnapshot_Ba
r.QueryInformations,
logLine.RoleIdx,
logLine.DatabaseIdx,
null.NewInt(0, false),
logLineIn.Query,
-1,
)
Expand Down
12 changes: 8 additions & 4 deletions output/transform/postgres_statements.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,17 @@ import (
"github.com/pganalyze/collector/util"
)

func groupStatements(statements state.PostgresStatementMap, statsMap state.DiffedPostgresStatementStatsMap) map[statementKey]statementValue {
func groupStatements(statements state.PostgresStatementMap, statsMap state.DiffedPostgresStatementStatsMap, queryIdentities state.QueryIdentityMap) map[statementKey]statementValue {
groupedStatements := make(map[statementKey]statementValue)

for sKey, stats := range statsMap {
statement, exist := statements[sKey]
if !exist {
statement = state.PostgresStatement{QueryTextUnavailable: true, Fingerprint: util.FingerprintText(util.QueryTextUnavailable)}
if identity, ok := queryIdentities[sKey.QueryID]; ok {
statement = state.PostgresStatement{QueryTextUnavailable: false, Fingerprint: identity.Fingerprint}
} else {
statement = state.PostgresStatement{QueryTextUnavailable: true, Fingerprint: util.FingerprintText(util.QueryTextUnavailable)}
}
}

key := statementKey{
Expand Down Expand Up @@ -67,7 +71,7 @@ func transformQueryStatistic(stats state.DiffedPostgresStatementStats, idx int32

func transformPostgresStatements(s snapshot.FullSnapshot, newState state.PersistedState, diffState state.DiffState, transientState state.TransientState, roleOidToIdx OidToIdx, databaseOidToIdx OidToIdx) snapshot.FullSnapshot {
// Statement stats from this snapshot
groupedStatements := groupStatements(transientState.Statements, diffState.StatementStats)
groupedStatements := groupStatements(transientState.Statements, diffState.StatementStats, newState.QueryIdentities)
for key, value := range groupedStatements {
idx := upsertQueryReferenceAndInformation(&s, transientState.StatementTexts, roleOidToIdx, databaseOidToIdx, key, value)

Expand All @@ -87,7 +91,7 @@ func transformPostgresStatements(s snapshot.FullSnapshot, newState state.Persist
h.CollectedAt, _ = ptypes.TimestampProto(timeKey.CollectedAt)
h.CollectedIntervalSecs = timeKey.CollectedIntervalSecs

groupedStatements = groupStatements(transientState.Statements, diffedStats)
groupedStatements = groupStatements(transientState.Statements, diffedStats, newState.QueryIdentities)
for key, value := range groupedStatements {
idx := upsertQueryReferenceAndInformation(&s, transientState.StatementTexts, roleOidToIdx, databaseOidToIdx, key, value)
statistic := transformQueryStatistic(value.statementStats, idx)
Expand Down
20 changes: 17 additions & 3 deletions output/transform/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"encoding/binary"

"github.com/guregu/null"
snapshot "github.com/pganalyze/collector/output/pganalyze_collector"
"github.com/pganalyze/collector/state"
"github.com/pganalyze/collector/util"
Expand Down Expand Up @@ -61,8 +62,16 @@ func upsertQueryReferenceAndInformation(s *snapshot.FullSnapshot, statementTexts
return idx
}

func upsertQueryReferenceAndInformationSimple(server *state.Server, refs []*snapshot.QueryReference, infos []*snapshot.QueryInformation, roleIdx int32, databaseIdx int32, originalQuery string, trackActivityQuerySize int) (int32, []*snapshot.QueryReference, []*snapshot.QueryInformation) {
fingerprint := util.FingerprintQuery(originalQuery, server.Config.FilterQueryText, trackActivityQuerySize)
func upsertQueryReferenceAndInformationSimple(server *state.Server, refs []*snapshot.QueryReference, infos []*snapshot.QueryInformation, roleIdx int32, databaseIdx int32, queryID null.Int, originalQuery string, trackActivityQuerySize int) (int32, []*snapshot.QueryReference, []*snapshot.QueryInformation) {
var fingerprint uint64
if server.PrevState.QueryIdentities != nil && queryID.Valid {
if identity, ok := server.PrevState.QueryIdentities[queryID.Int64]; ok {
seanlinsley marked this conversation as resolved.
Show resolved Hide resolved
fingerprint = identity.Fingerprint
}
}
if fingerprint == 0 {
seanlinsley marked this conversation as resolved.
Show resolved Hide resolved
fingerprint = util.FingerprintQuery(originalQuery, server.Config.FilterQueryText, trackActivityQuerySize)
}

fpBuf := make([]byte, 8)
binary.BigEndian.PutUint64(fpBuf, fingerprint)
Expand All @@ -82,10 +91,15 @@ func upsertQueryReferenceAndInformationSimple(server *state.Server, refs []*snap
idx := int32(len(refs))
refs = append(refs, &newRef)

normalizedQuery := util.NormalizeQuery(originalQuery, server.Config.FilterQueryText, trackActivityQuerySize)
if normalizedQuery == util.QueryTextTruncated {
normalizedQuery = ""
}
seanlinsley marked this conversation as resolved.
Show resolved Hide resolved

// Information
queryInformation := snapshot.QueryInformation{
QueryIdx: idx,
NormalizedQuery: util.NormalizeQuery(originalQuery, server.Config.FilterQueryText, trackActivityQuerySize),
NormalizedQuery: normalizedQuery,
}
infos = append(infos, &queryInformation)

Expand Down
27 changes: 27 additions & 0 deletions runner/full.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"os/exec"
"runtime/debug"
"sort"
"sync"
"time"

Expand Down Expand Up @@ -67,6 +68,8 @@ func collectDiffAndSubmit(server *state.Server, globalCollectionOpts state.Colle
newState.StatementStats = transientState.ResetStatementStats
}

newState.QueryIdentities = pruneQueryIdentities(newState.QueryIdentities)
seanlinsley marked this conversation as resolved.
Show resolved Hide resolved

return newState, collectionStatus, nil
}

Expand Down Expand Up @@ -254,3 +257,27 @@ func CollectAllServers(servers []*state.Server, globalCollectionOpts state.Colle

return
}

func pruneQueryIdentities(oldMap state.QueryIdentityMap) (newMap state.QueryIdentityMap) {
if len(oldMap) < 100000 {
return oldMap
}
slice := make([]state.QueryIdentity, 0, len(oldMap))
for _, identity := range oldMap {
slice = append(slice, identity)
}
sort.Slice(slice, func(i, j int) bool {
return slice[i].LastSeen.Before(slice[j].LastSeen)
})
for _, identity := range slice[:min(len(oldMap), 100000)] {
seanlinsley marked this conversation as resolved.
Show resolved Hide resolved
newMap[identity.QueryID] = identity
}
return
}

func min(x, y int) int {
if x < y {
return x
}
return y
}
2 changes: 2 additions & 0 deletions state/postgres_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ type PostgresBackend struct {

BackendType null.String // 10+ The process type of this backend

QueryID null.Int // 14+ The internal query ID which is also available in pg_stat_statements

Query null.String // Text of this backend's most recent query

// Current overall state of this backend. Possible values are:
Expand Down
8 changes: 8 additions & 0 deletions state/postgres_statement.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,11 @@ func (stmt DiffedPostgresStatementStats) Add(other DiffedPostgresStatementStats)
BlkWriteTime: stmt.BlkWriteTime + other.BlkWriteTime,
}
}

type QueryIdentity struct {
QueryID int64
seanlinsley marked this conversation as resolved.
Show resolved Hide resolved
Fingerprint uint64
LastSeen time.Time
seanlinsley marked this conversation as resolved.
Show resolved Hide resolved
}

type QueryIdentityMap map[int64]QueryIdentity
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like query IDs are globally unique, since the relation ID of referenced tables is used when generating the fingerprint:

https://github.com/postgres/postgres/blob/fc7852c6cb89a5384e0b4ad30874de92f63f88be/src/backend/utils/misc/queryjumble.c#L285

The question is: are duplicate query IDs across databases (that were for entirely different queries) a big enough issue that we need to track which database the query ID came from in QueryIdentityMap?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checking pg_stat_statements in our own database, it looks like we have ~0.15% dupes. That's pretty small, but maybe not small enough that we can just ignore it. In our case, though, all the dupes are in one database. What's the impact of collisions here? We may associate stats with the incorrect query text?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the stats could end up being associated with the wrong query in that case. But we only fall back to this identity map for queries that have churned out of pg_stat_statements since the last full snapshot so it may not be a big deal.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So you think altogether it's unlikely enough that we can just ignore it? I'm a little reluctant to go with that (because if this does bite you, it will be basically impossible to figure out), but given the odds, it doesn't seem unreasonable, and I don't have a great idea on how to surface the problem. (And I don't think it's worth talking about it as a potential problem, since it's so unlikely).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying to think through this:

  1. Queries on two different databases having the same queryid, e.g. SELECT 1 would have the same queryid, despite it being on different databases (since it doesn't involve any table OID that'd be database specific)

  2. A hash collision with the Postgres queryid, i.e. two entirely unrelated queries in two different databases having the same queryid (if it was the same database we wouldn't know about it, since they'd just end up being a single entry in pgss)

If I understand the conversation so far, you're talking about (2) here, though I'm not sure if the 0.15% statistic that @msakrejda referenced is for actual hash collisions? (I'd be surprised, I'm assuming its more likely these are actually identical queries).

Assuming the hash collision case (2), what would happen today:

  1. database A has queryid 1, and querytext X
  2. database B also has queryid 1, but querytext Y
  3. we loop through the stat statements entries, fingerprinting each based on the querytext, and end up with two distinct query fingerprints (and thus associate to the correct query entry in pganalyze)

Now assuming that in a future iteration of this PR we actually check the cache first and don't fingerprint if needed, the problem here would be that we are now associating to the wrong query in the case of the collision.

I'm 50/50 on whether this is a problem in practice. Hash collisions could occur (and the standard Postgres hash function used in pgss isn't the best hash function either), but it does seem a quite unlikely edge case, and we are not in a security sensitive context here, the way I look at it.

The one thing I would be worried about from a security perspective is in case we consider caching query text here as well (or if this mapping is relied on elsewhere to get the text), because doing a simple mapping here could cause query text to cross a security boundary (in the pganalyze app you may be restricted to viewing a single database on a server).

Maybe its better to just use the full triplet (i.e. (database_id, role_id, queryid)) for the mapping here, to avoid any accidental errors?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing to a composite key would necessarily mean increasing the cache size limit. The collector already has an unresolved memory consumption issue, and I worry that increasing the cache size further is going to make that worse.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll work on building a standalone benchmark script when I'm back from holidays. There might be an argument to drop PostgresStatementKey across the board in favor of the query ID on its own.

Copy link
Member

@lfittl lfittl Dec 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing to a composite key would necessarily mean increasing the cache size limit.

With cache size limit you mean the 100k entries limit on the map?

I'm not sure if that'd need to be raised if the key also includes the database ID - the overall number of entries shouldn't actually increase much, unless we have hash collisions (which would no longer be the same entry), or the same query without table references runs on multiple databases.

Adding the role ID on the other hand may have such an effect (e.g. imagine a workload using short-lived roles), so there may be an argument for only doing (database_id, queryid) without the role ID.

There might be an argument to drop PostgresStatementKey across the board in favor of the query ID on its own.

I don't think we can drop it across the board because of the aforementioned issue with the same queryid having different text on different databases, and that being security-relevant (e.g. imagine db1 is for tenant1, and has a query comment like SELECT 1 /* user.email: tenant1@example.com */ and db2 has SELECT 1 /* user.email: tenant2@example.com */)

Therefore we'd need to keep the PostgresStatementKey use for any maps mapping to query texts at the very least, and anything related to statistics is also problematic (since there you need it to count each database separately).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand the conversation so far, you're talking about (2) here, though I'm not sure if the 0.15% statistic that @msakrejda referenced is for actual hash collisions? (I'd be surprised, I'm assuming its more likely these are actually identical queries).

They were queries with identical queryids occurring multiple times in pg_stat_statements. However, investigating further, it looks like this is all due to different records across separate userids in pg_stat_statements. If I filter down to a single user, we have no collisions.

Copy link
Member Author

@seanlinsley seanlinsley Dec 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Therefore we'd need to keep the PostgresStatementKey use for any maps mapping to query texts at the very least, and anything related to statistics is also problematic (since there you need it to count each database separately).

Good point.

FWIW, indexing into an array (that contains a map per database) looks to be significantly faster than using a composite key: https://gist.github.com/seanlinsley/dd7b2bf8d09b6ba710d27044794f86c5#file-map_test-go-L182

Screenshot 2022-12-28 at 11 41 50 AM

But for this PR, it doesn't seem like QueryIdentityMap needs to be scoped per database.

5 changes: 4 additions & 1 deletion state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type SchemaStats struct {
FunctionStats PostgresFunctionStatsMap
}

// PersistedState - State thats kept across collector runs to be used for diffs
// PersistedState - State that's kept across collector runs
type PersistedState struct {
CollectedAt time.Time

Expand All @@ -40,6 +40,9 @@ type PersistedState struct {

// All statement stats that have not been identified (will be cleared by the next full snapshot)
UnidentifiedStatementStats HistoricStatementStatsMap

// Keeps track of queryid -> fingerprint pairs in case a query is no longer in pg_stat_statements
QueryIdentities QueryIdentityMap
}

// TransientState - State thats only used within a collector run (and not needed for diffs)
Expand Down