-
Notifications
You must be signed in to change notification settings - Fork 107
/
watcher.go
93 lines (78 loc) · 2.47 KB
/
watcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package policywatcher
import (
"context"
"sync"
"time"
"github.com/cenkalti/backoff/v4"
"github.com/oasisprotocol/oasis-core/go/common"
"github.com/oasisprotocol/oasis-core/go/common/accessctl"
"github.com/oasisprotocol/oasis-core/go/common/grpc"
"github.com/oasisprotocol/oasis-core/go/common/grpc/policy/api"
"github.com/oasisprotocol/oasis-core/go/common/identity"
"github.com/oasisprotocol/oasis-core/go/common/logging"
"github.com/oasisprotocol/oasis-core/go/common/node"
sentry "github.com/oasisprotocol/oasis-core/go/sentry/api"
sentryClient "github.com/oasisprotocol/oasis-core/go/sentry/client"
)
var _ api.PolicyWatcher = (*policyWatcher)(nil)
type policyWatcher struct {
sync.RWMutex
ctx context.Context
sentryAddrs []node.TLSAddress
identity *identity.Identity
sentryClients []*sentryClient.Client
logger *logging.Logger
}
func (c *policyWatcher) PolicyUpdated(service grpc.ServiceName, accessPolicies map[common.Namespace]accessctl.Policy) {
// Spawn a goroutine, so that we don't block the caller.
go func() {
c.Lock()
defer c.Unlock()
// Notify the sentry nodes of the new policy.
for idx, addr := range c.sentryAddrs {
pushPolicies := func() error {
var client *sentryClient.Client
var err error
if c.sentryClients[idx] != nil {
client = c.sentryClients[idx]
} else {
client, err = sentryClient.New(addr, c.identity)
if err != nil {
return err
}
c.sentryClients[idx] = client
}
policies := sentry.ServicePolicies{
Service: service,
AccessPolicies: accessPolicies,
}
err = client.UpdatePolicies(c.ctx, policies)
if err != nil {
// Try to reconnect on next try in case our certs have rotated.
c.sentryClients[idx].Close()
c.sentryClients[idx] = nil
return err
}
return nil
}
sched := backoff.WithMaxRetries(backoff.NewConstantBackOff(1*time.Second), 15)
err := backoff.Retry(pushPolicies, backoff.WithContext(sched, c.ctx))
if err != nil {
c.logger.Error("unable to push new policy to sentry node",
"err", err,
"sentry_address", addr,
)
}
}
}()
}
// New retruns a new policy watcher.
func New(ctx context.Context, sentryAddrs []node.TLSAddress, id *identity.Identity) api.PolicyWatcher {
return &policyWatcher{
ctx: ctx,
sentryAddrs: sentryAddrs,
identity: id,
sentryClients: make([]*sentryClient.Client, len(sentryAddrs)),
logger: logging.GetLogger("sentry/policywatcher"),
}
}