/
store.go
165 lines (139 loc) · 4.76 KB
/
store.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
// Package store contains a datastore for authorization policy evaluation.
package store
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/go-jose/go-jose/v3"
"github.com/open-policy-agent/opa/ast"
"github.com/open-policy-agent/opa/rego"
opastorage "github.com/open-policy-agent/opa/storage"
"github.com/open-policy-agent/opa/storage/inmem"
"github.com/open-policy-agent/opa/types"
octrace "go.opencensus.io/trace"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
"github.com/pomerium/pomerium/config"
"github.com/pomerium/pomerium/internal/log"
"github.com/pomerium/pomerium/internal/telemetry/trace"
"github.com/pomerium/pomerium/pkg/grpc/databroker"
"github.com/pomerium/pomerium/pkg/storage"
)
// A Store stores data for the OPA rego policy evaluation.
type Store struct {
opastorage.Store
}
// New creates a new Store.
func New() *Store {
return &Store{
Store: inmem.New(),
}
}
// UpdateGoogleCloudServerlessAuthenticationServiceAccount updates the google cloud serverless authentication
// service account in the store.
func (s *Store) UpdateGoogleCloudServerlessAuthenticationServiceAccount(serviceAccount string) {
s.write("/google_cloud_serverless_authentication_service_account", serviceAccount)
}
// UpdateJWTClaimHeaders updates the jwt claim headers in the store.
func (s *Store) UpdateJWTClaimHeaders(jwtClaimHeaders map[string]string) {
s.write("/jwt_claim_headers", jwtClaimHeaders)
}
// UpdateRoutePolicies updates the route policies in the store.
func (s *Store) UpdateRoutePolicies(routePolicies []config.Policy) {
s.write("/route_policies", routePolicies)
}
// UpdateSigningKey updates the signing key stored in the database. Signing operations
// in rego use JWKs, so we take in that format.
func (s *Store) UpdateSigningKey(signingKey *jose.JSONWebKey) {
s.write("/signing_key", signingKey)
}
func (s *Store) write(rawPath string, value interface{}) {
ctx := context.TODO()
err := opastorage.Txn(ctx, s.Store, opastorage.WriteParams, func(txn opastorage.Transaction) error {
return s.writeTxn(txn, rawPath, value)
})
if err != nil {
log.Error(ctx).Err(err).Msg("opa-store: error writing data")
return
}
}
func (s *Store) writeTxn(txn opastorage.Transaction, rawPath string, value interface{}) error {
p, ok := opastorage.ParsePath(rawPath)
if !ok {
return fmt.Errorf("invalid path")
}
if len(p) > 1 {
err := opastorage.MakeDir(context.Background(), s, txn, p[:len(p)-1])
if err != nil {
return err
}
}
var op opastorage.PatchOp = opastorage.ReplaceOp
_, err := s.Read(context.Background(), txn, p)
if opastorage.IsNotFound(err) {
op = opastorage.AddOp
} else if err != nil {
return err
}
return s.Write(context.Background(), txn, op, p, value)
}
// GetDataBrokerRecordOption returns a function option that can retrieve databroker data.
func (s *Store) GetDataBrokerRecordOption() func(*rego.Rego) {
return rego.Function2(®o.Function{
Name: "get_databroker_record",
Decl: types.NewFunction(
types.Args(types.S, types.S),
types.NewObject(nil, types.NewDynamicProperty(types.S, types.S)),
),
}, func(bctx rego.BuiltinContext, op1 *ast.Term, op2 *ast.Term) (*ast.Term, error) {
ctx, span := trace.StartSpan(bctx.Context, "rego.get_databroker_record")
defer span.End()
recordType, ok := op1.Value.(ast.String)
if !ok {
return nil, fmt.Errorf("invalid record type: %T", op1)
}
span.AddAttributes(octrace.StringAttribute("record_type", recordType.String()))
value, ok := op2.Value.(ast.String)
if !ok {
return nil, fmt.Errorf("invalid record id: %T", op2)
}
span.AddAttributes(octrace.StringAttribute("record_id", value.String()))
req := &databroker.QueryRequest{
Type: string(recordType),
Limit: 1,
}
req.SetFilterByIDOrIndex(string(value))
res, err := storage.GetQuerier(ctx).Query(ctx, req)
if err != nil {
log.Error(ctx).Err(err).Msg("authorize/store: error retrieving record")
return ast.NullTerm(), nil
}
if len(res.GetRecords()) == 0 {
return ast.NullTerm(), nil
}
msg, _ := res.GetRecords()[0].GetData().UnmarshalNew()
if msg == nil {
return ast.NullTerm(), nil
}
// exclude expired records
if hasExpiresAt, ok := msg.(interface{ GetExpiresAt() *timestamppb.Timestamp }); ok && hasExpiresAt.GetExpiresAt() != nil {
if hasExpiresAt.GetExpiresAt().AsTime().Before(time.Now()) {
return ast.NullTerm(), nil
}
}
obj := toMap(msg)
regoValue, err := ast.InterfaceToValue(obj)
if err != nil {
log.Error(ctx).Err(err).Msg("authorize/store: error converting object to rego")
return ast.NullTerm(), nil
}
return ast.NewTerm(regoValue), nil
})
}
func toMap(msg proto.Message) map[string]interface{} {
bs, _ := json.Marshal(msg)
var obj map[string]interface{}
_ = json.Unmarshal(bs, &obj)
return obj
}