Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

postgres: handle unknown types #3632

Merged
merged 1 commit into from
Sep 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
18 changes: 18 additions & 0 deletions pkg/storage/postgres/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,24 @@ func TestBackend(t *testing.T) {
assert.NoError(t, stream.Err())
})

t.Run("unknown type", func(t *testing.T) {
_, err := backend.pool.Exec(ctx, `
INSERT INTO `+schemaName+"."+recordsTableName+` (type, id, version, data)
VALUES ('unknown', '1', 1000, '{"@type":"UNKNOWN","value":{}}')
`)
assert.NoError(t, err)

_, err = backend.Get(ctx, "unknown", "1")
assert.ErrorIs(t, err, storage.ErrNotFound)

_, _, stream, err := backend.SyncLatest(ctx, "unknown-test", nil)
if assert.NoError(t, err) {
_, err := storage.RecordStreamToList(stream)
assert.NoError(t, err)
stream.Close()
}
})

return nil
}))
}
131 changes: 79 additions & 52 deletions pkg/storage/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ import (
"context"
"errors"
"fmt"
"strings"
"time"

"github.com/jackc/pgconn"
"github.com/jackc/pgtype"
"github.com/jackc/pgx/v4"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protoregistry"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/timestamppb"

Expand Down Expand Up @@ -94,46 +96,52 @@ func getLatestRecordVersion(ctx context.Context, q querier) (recordVersion uint6
}

func getNextChangedRecord(ctx context.Context, q querier, recordType string, afterRecordVersion uint64) (*databroker.Record, error) {
var recordID string
var version uint64
var data pgtype.JSONB
var modifiedAt pgtype.Timestamptz
var deletedAt pgtype.Timestamptz
query := `
SELECT type, id, version, data, modified_at, deleted_at
FROM ` + schemaName + `.` + recordChangesTableName + `
WHERE version > $1
`
args := []any{afterRecordVersion}
if recordType != "" {
query += ` AND type = $2`
args = append(args, recordType)
}
query += `
ORDER BY version ASC
LIMIT 1
`
err := q.QueryRow(ctx, query, args...).Scan(&recordType, &recordID, &version, &data, &modifiedAt, &deletedAt)
if isNotFound(err) {
return nil, storage.ErrNotFound
} else if err != nil {
return nil, fmt.Errorf("error querying next changed record: %w", err)
}
for {
var recordID string
var version uint64
var data pgtype.JSONB
var modifiedAt pgtype.Timestamptz
var deletedAt pgtype.Timestamptz
query := `
SELECT type, id, version, data, modified_at, deleted_at
FROM ` + schemaName + `.` + recordChangesTableName + `
WHERE version > $1
`
args := []any{afterRecordVersion}
if recordType != "" {
query += ` AND type = $2`
args = append(args, recordType)
}
query += `
ORDER BY version ASC
LIMIT 1
`
err := q.QueryRow(ctx, query, args...).Scan(&recordType, &recordID, &version, &data, &modifiedAt, &deletedAt)
if isNotFound(err) {
return nil, storage.ErrNotFound
} else if err != nil {
return nil, fmt.Errorf("error querying next changed record: %w", err)
}
afterRecordVersion = version

var any anypb.Any
err = protojson.Unmarshal(data.Bytes, &any)
if err != nil {
return nil, fmt.Errorf("error unmarshaling changed record data: %w", err)
}
var any anypb.Any
err = protojson.Unmarshal(data.Bytes, &any)
if isUnknownType(err) {
// ignore
continue
} else if err != nil {
return nil, fmt.Errorf("error unmarshaling changed record data: %w", err)
}

return &databroker.Record{
Version: version,
Type: recordType,
Id: recordID,
Data: &any,
ModifiedAt: timestamppbFromTimestamptz(modifiedAt),
DeletedAt: timestamppbFromTimestamptz(deletedAt),
}, nil
return &databroker.Record{
Version: version,
Type: recordType,
Id: recordID,
Data: &any,
ModifiedAt: timestamppbFromTimestamptz(modifiedAt),
DeletedAt: timestamppbFromTimestamptz(deletedAt),
}, nil
}
}

func getOptions(ctx context.Context, q querier, recordType string) (*databroker.Options, error) {
Expand Down Expand Up @@ -165,13 +173,15 @@ func getRecord(ctx context.Context, q querier, recordType, recordID string) (*da
if isNotFound(err) {
return nil, storage.ErrNotFound
} else if err != nil {
return nil, err
return nil, fmt.Errorf("postgres: failed to execute query: %w", err)
}

var any anypb.Any
err = protojson.Unmarshal(data.Bytes, &any)
if err != nil {
return nil, err
if isUnknownType(err) {
return nil, storage.ErrNotFound
} else if err != nil {
return nil, fmt.Errorf("postgres: failed to unmarshal data: %w", err)
}

return &databroker.Record{
Expand All @@ -193,7 +203,7 @@ func listRecords(ctx context.Context, q querier, expr storage.FilterExpression,
query += "WHERE "
err := addFilterExpressionToQuery(&query, &args, expr)
if err != nil {
return nil, err
return nil, fmt.Errorf("postgres: failed to add filter to query: %w", err)
}
}
query += `
Expand All @@ -203,7 +213,7 @@ func listRecords(ctx context.Context, q querier, expr storage.FilterExpression,
`
rows, err := q.Query(ctx, query, args...)
if err != nil {
return nil, err
return nil, fmt.Errorf("postgres: failed to execute query: %w", err)
}
defer rows.Close()

Expand All @@ -215,13 +225,16 @@ func listRecords(ctx context.Context, q querier, expr storage.FilterExpression,
var modifiedAt pgtype.Timestamptz
err = rows.Scan(&recordType, &id, &version, &data, &modifiedAt)
if err != nil {
return nil, err
return nil, fmt.Errorf("postgres: failed to scan row: %w", err)
}

var any anypb.Any
err = protojson.Unmarshal(data.Bytes, &any)
if err != nil {
return nil, err
if isUnknownType(err) {
// ignore records with an unknown type
continue
} else if err != nil {
return nil, fmt.Errorf("postgres: failed to unmarshal data: %w", err)
}

records = append(records, &databroker.Record{
Expand All @@ -232,7 +245,12 @@ func listRecords(ctx context.Context, q querier, expr storage.FilterExpression,
ModifiedAt: timestamppbFromTimestamptz(modifiedAt),
})
}
return records, rows.Err()
err = rows.Err()
if err != nil {
return nil, fmt.Errorf("postgres: error iterating over rows: %w", err)
}

return records, nil
}

func listServices(ctx context.Context, q querier) ([]*registry.Service, error) {
Expand All @@ -245,15 +263,15 @@ func listServices(ctx context.Context, q querier) ([]*registry.Service, error) {
`
rows, err := q.Query(ctx, query)
if err != nil {
return nil, err
return nil, fmt.Errorf("postgres: failed to execute query: %w", err)
}
defer rows.Close()

for rows.Next() {
var kind, endpoint string
err = rows.Scan(&kind, &endpoint)
if err != nil {
return nil, err
return nil, fmt.Errorf("postgres: failed to scan row: %w", err)
}

services = append(services, &registry.Service{
Expand All @@ -263,7 +281,7 @@ func listServices(ctx context.Context, q querier) ([]*registry.Service, error) {
}
err = rows.Err()
if err != nil {
return nil, err
return nil, fmt.Errorf("postgres: error iterating over rows: %w", err)
}

return services, nil
Expand All @@ -287,7 +305,7 @@ func maybeAcquireLease(ctx context.Context, q querier, leaseName, leaseID string
func putRecordAndChange(ctx context.Context, q querier, record *databroker.Record) error {
data, err := jsonbFromAny(record.GetData())
if err != nil {
return err
return fmt.Errorf("postgres: failed to convert any to json: %w", err)
}

modifiedAt := timestamptzFromTimestamppb(record.GetModifiedAt())
Expand Down Expand Up @@ -325,7 +343,7 @@ func putRecordAndChange(ctx context.Context, q querier, record *databroker.Recor
}
err = q.QueryRow(ctx, query, args...).Scan(&record.Version)
if err != nil && !isNotFound(err) {
return err
return fmt.Errorf("postgres: failed to execute query: %w", err)
}

return nil
Expand Down Expand Up @@ -398,3 +416,12 @@ func timestamptzFromTimestamppb(ts *timestamppb.Timestamp) pgtype.Timestamptz {
func isNotFound(err error) bool {
return errors.Is(err, pgx.ErrNoRows) || errors.Is(err, storage.ErrNotFound)
}

func isUnknownType(err error) bool {
if err == nil {
return false
}

return errors.Is(err, protoregistry.NotFound) ||
strings.Contains(err.Error(), "unable to resolve") // protojson doesn't wrap errors so check for the string
}