-
Notifications
You must be signed in to change notification settings - Fork 280
/
storage.go
125 lines (111 loc) · 4.06 KB
/
storage.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
// Package storage provide generic interface to interact with storage backend.
package storage
import (
"context"
"errors"
"strings"
"time"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/fieldmaskpb"
"github.com/pomerium/pomerium/internal/log"
"github.com/pomerium/pomerium/pkg/grpc/databroker"
)
// Errors
var (
ErrNotFound = errors.New("record not found")
ErrStreamDone = errors.New("record stream done")
ErrInvalidServerVersion = status.Error(codes.Aborted, "invalid server version")
)
// Backend is the interface required for a storage backend.
type Backend interface {
// Close closes the backend.
Close() error
// Get is used to retrieve a record.
Get(ctx context.Context, recordType, id string) (*databroker.Record, error)
// GetOptions gets the options for a type.
GetOptions(ctx context.Context, recordType string) (*databroker.Options, error)
// Lease acquires a lease, or renews an existing one. If the lease is acquired true is returned.
Lease(ctx context.Context, leaseName, leaseID string, ttl time.Duration) (bool, error)
// ListTypes lists all the known record types.
ListTypes(ctx context.Context) ([]string, error)
// Put is used to insert or update records.
Put(ctx context.Context, records []*databroker.Record) (serverVersion uint64, err error)
// Patch is used to update specific fields of existing records.
Patch(ctx context.Context, records []*databroker.Record, fields *fieldmaskpb.FieldMask) (serverVersion uint64, patchedRecords []*databroker.Record, err error)
// SetOptions sets the options for a type.
SetOptions(ctx context.Context, recordType string, options *databroker.Options) error
// Sync syncs record changes after the specified version.
Sync(ctx context.Context, recordType string, serverVersion, recordVersion uint64) (RecordStream, error)
// SyncLatest syncs all the records.
SyncLatest(ctx context.Context, recordType string, filter FilterExpression) (serverVersion, recordVersion uint64, stream RecordStream, err error)
}
// MatchAny searches any data with a query.
func MatchAny(any *anypb.Any, query string) bool {
if any == nil {
return false
}
msg, err := any.UnmarshalNew()
if err != nil {
// ignore invalid any types
log.Error(context.TODO()).Err(err).Msg("storage: invalid any type")
return false
}
// search by query
return matchProtoMessage(msg.ProtoReflect(), query)
}
func matchProtoMessage(msg protoreflect.Message, query string) bool {
md := msg.Descriptor()
fds := md.Fields()
for i := 0; i < fds.Len(); i++ {
fd := fds.Get(i)
if !msg.Has(fd) {
continue
}
if matchProtoValue(fd, msg.Get(fd), query) {
return true
}
}
return false
}
func matchProtoValue(fd protoreflect.FieldDescriptor, v protoreflect.Value, query string) bool {
switch {
case fd.IsList():
return matchProtoListValue(fd, v.List(), query)
case fd.IsMap():
return matchProtoMapValue(fd, v.Map(), query)
default:
return matchProtoSingularValue(fd, v, query)
}
}
func matchProtoSingularValue(fd protoreflect.FieldDescriptor, v protoreflect.Value, query string) bool {
switch fd.Kind() {
case protoreflect.MessageKind:
return matchProtoMessage(v.Message(), query)
case protoreflect.StringKind:
return strings.Contains(strings.ToLower(v.String()), query)
}
return false
}
func matchProtoListValue(fd protoreflect.FieldDescriptor, l protoreflect.List, query string) bool {
for i := 0; i < l.Len(); i++ {
if matchProtoSingularValue(fd, l.Get(i), query) {
return true
}
}
return false
}
func matchProtoMapValue(fd protoreflect.FieldDescriptor, m protoreflect.Map, query string) bool {
matches := false
m.Range(func(_ protoreflect.MapKey, v protoreflect.Value) bool {
matches = matches || matchProtoSingularValue(fd, v, query)
return !matches
})
return matches
}
// IsNotFound returns true if the error is because a record was not found.
func IsNotFound(err error) bool {
return errors.Is(err, ErrNotFound) || status.Code(err) == codes.NotFound
}