-
Notifications
You must be signed in to change notification settings - Fork 279
/
authorize.go
156 lines (136 loc) · 5.22 KB
/
authorize.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
// Package authorize is a pomerium service that is responsible for determining
// if a given request should be authorized (AuthZ).
package authorize
import (
"context"
"fmt"
"sync"
"time"
"github.com/rs/zerolog"
"golang.org/x/sync/errgroup"
"github.com/pomerium/pomerium/authorize/evaluator"
"github.com/pomerium/pomerium/authorize/internal/store"
"github.com/pomerium/pomerium/config"
"github.com/pomerium/pomerium/internal/atomicutil"
"github.com/pomerium/pomerium/internal/log"
"github.com/pomerium/pomerium/internal/telemetry/metrics"
"github.com/pomerium/pomerium/internal/telemetry/trace"
"github.com/pomerium/pomerium/pkg/cryptutil"
"github.com/pomerium/pomerium/pkg/grpc"
"github.com/pomerium/pomerium/pkg/grpc/databroker"
"github.com/pomerium/pomerium/pkg/storage"
)
// Authorize struct holds
type Authorize struct {
state *atomicutil.Value[*authorizeState]
store *store.Store
currentOptions *atomicutil.Value[*config.Options]
accessTracker *AccessTracker
globalCache storage.Cache
// The stateLock prevents updating the evaluator store simultaneously with an evaluation.
// This should provide a consistent view of the data at a given server/record version and
// avoid partial updates.
stateLock sync.RWMutex
}
// New validates and creates a new Authorize service from a set of config options.
func New(cfg *config.Config) (*Authorize, error) {
a := &Authorize{
currentOptions: config.NewAtomicOptions(),
store: store.New(),
globalCache: storage.NewGlobalCache(time.Minute),
}
a.accessTracker = NewAccessTracker(a, accessTrackerMaxSize, accessTrackerDebouncePeriod)
state, err := newAuthorizeStateFromConfig(cfg, a.store, nil)
if err != nil {
return nil, err
}
a.state = atomicutil.NewValue(state)
return a, nil
}
// GetDataBrokerServiceClient returns the current DataBrokerServiceClient.
func (a *Authorize) GetDataBrokerServiceClient() databroker.DataBrokerServiceClient {
return a.state.Load().dataBrokerClient
}
// Run runs the authorize service.
func (a *Authorize) Run(ctx context.Context) error {
eg, ctx := errgroup.WithContext(ctx)
eg.Go(func() error {
a.accessTracker.Run(ctx)
return nil
})
eg.Go(func() error {
_ = grpc.WaitForReady(ctx, a.state.Load().dataBrokerClientConnection, time.Second*10)
return nil
})
return eg.Wait()
}
func validateOptions(o *config.Options) error {
sharedKey, err := o.GetSharedKey()
if err != nil {
return fmt.Errorf("authorize: bad 'SHARED_SECRET': %w", err)
}
if _, err := cryptutil.NewAEADCipher(sharedKey); err != nil {
return fmt.Errorf("authorize: bad 'SHARED_SECRET': %w", err)
}
return nil
}
// newPolicyEvaluator returns an policy evaluator.
func newPolicyEvaluator(
opts *config.Options, store *store.Store, previous *evaluator.Evaluator,
) (*evaluator.Evaluator, error) {
metrics.AddPolicyCountCallback("pomerium-authorize", func() int64 {
return int64(len(opts.GetAllPolicies()))
})
ctx := log.WithContext(context.Background(), func(c zerolog.Context) zerolog.Context {
return c.Str("service", "authorize")
})
ctx, span := trace.StartSpan(ctx, "authorize.newPolicyEvaluator")
defer span.End()
clientCA, err := opts.DownstreamMTLS.GetCA()
if err != nil {
return nil, fmt.Errorf("authorize: invalid client CA: %w", err)
}
clientCRL, err := opts.DownstreamMTLS.GetCRL()
if err != nil {
return nil, fmt.Errorf("authorize: invalid client CRL: %w", err)
}
authenticateURL, err := opts.GetInternalAuthenticateURL()
if err != nil {
return nil, fmt.Errorf("authorize: invalid authenticate url: %w", err)
}
signingKey, err := opts.GetSigningKey()
if err != nil {
return nil, fmt.Errorf("authorize: invalid signing key: %w", err)
}
// It is important to add an invalid_client_certificate rule even when the
// mTLS enforcement behavior is set to reject connections at the listener
// level, because of the per-route TLSDownstreamClientCA setting.
addDefaultClientCertificateRule := opts.HasAnyDownstreamMTLSClientCA() &&
opts.DownstreamMTLS.GetEnforcement() != config.MTLSEnforcementPolicy
clientCertConstraints, err := evaluator.ClientCertConstraintsFromConfig(&opts.DownstreamMTLS)
if err != nil {
return nil, fmt.Errorf(
"authorize: internal error: couldn't build client cert constraints: %w", err)
}
return evaluator.New(ctx, store, previous,
evaluator.WithPolicies(opts.GetAllPolicies()),
evaluator.WithClientCA(clientCA),
evaluator.WithAddDefaultClientCertificateRule(addDefaultClientCertificateRule),
evaluator.WithClientCRL(clientCRL),
evaluator.WithClientCertConstraints(clientCertConstraints),
evaluator.WithSigningKey(signingKey),
evaluator.WithAuthenticateURL(authenticateURL.String()),
evaluator.WithGoogleCloudServerlessAuthenticationServiceAccount(opts.GetGoogleCloudServerlessAuthenticationServiceAccount()),
evaluator.WithJWTClaimsHeaders(opts.JWTClaimsHeaders),
)
}
// OnConfigChange updates internal structures based on config.Options
func (a *Authorize) OnConfigChange(ctx context.Context, cfg *config.Config) {
currentState := a.state.Load()
a.currentOptions.Store(cfg.Options)
if state, err := newAuthorizeStateFromConfig(cfg, a.store, currentState.evaluator); err != nil {
log.Error(ctx).Err(err).Msg("authorize: error updating state")
} else {
a.state.Store(state)
}
}