-
Notifications
You must be signed in to change notification settings - Fork 77
/
x509source.go
124 lines (102 loc) · 3.23 KB
/
x509source.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package workloadapi
import (
"context"
"sync"
"github.com/spiffe/go-spiffe/v2/bundle/x509bundle"
"github.com/spiffe/go-spiffe/v2/spiffeid"
"github.com/spiffe/go-spiffe/v2/svid/x509svid"
"github.com/zeebo/errs"
)
var x509sourceErr = errs.Class("x509source")
// X509Source is a source of X509-SVIDs and X.509 bundles maintained via the
// Workload API.
type X509Source struct {
watcher *watcher
picker func([]*x509svid.SVID) *x509svid.SVID
mtx sync.RWMutex
svid *x509svid.SVID
bundles *x509bundle.Set
closeMtx sync.RWMutex
closed bool
}
// NewX509Source creates a new X509Source. It blocks until the initial update
// has been received from the Workload API. The source should be closed when
// no longer in use to free underlying resources.
func NewX509Source(ctx context.Context, options ...X509SourceOption) (_ *X509Source, err error) {
config := &x509SourceConfig{}
for _, option := range options {
option.configureX509Source(config)
}
s := &X509Source{
picker: config.picker,
}
s.watcher, err = newWatcher(ctx, config.watcher, s.setX509Context, nil)
if err != nil {
return nil, err
}
return s, nil
}
// Close closes the source, dropping the connection to the Workload API.
// Other source methods will return an error after Close has been called.
// The underlying Workload API client will also be closed if it is owned by
// the X509Source (i.e. not provided via the WithClient option).
func (s *X509Source) Close() (err error) {
s.closeMtx.Lock()
s.closed = true
s.closeMtx.Unlock()
return s.watcher.Close()
}
// GetX509SVID returns an X509-SVID from the source. It implements the
// x509svid.Source interface.
func (s *X509Source) GetX509SVID() (*x509svid.SVID, error) {
if err := s.checkClosed(); err != nil {
return nil, err
}
s.mtx.RLock()
svid := s.svid
s.mtx.RUnlock()
if svid == nil {
// This is a defensive check and should be unreachable since the source
// waits for the initial Workload API update before returning from
// New().
return nil, x509sourceErr.New("missing X509-SVID")
}
return svid, nil
}
// GetX509BundleForTrustDomain returns the X.509 bundle for the given trust
// domain. It implements the x509bundle.Source interface.
func (s *X509Source) GetX509BundleForTrustDomain(trustDomain spiffeid.TrustDomain) (*x509bundle.Bundle, error) {
if err := s.checkClosed(); err != nil {
return nil, err
}
return s.bundles.GetX509BundleForTrustDomain(trustDomain)
}
// WaitUntilUpdated waits until the source is updated or the context is done,
// in which case ctx.Err() is returned.
func (s *X509Source) WaitUntilUpdated(ctx context.Context) error {
return s.watcher.WaitUntilUpdated(ctx)
}
// Updated returns a channel that is sent on whenever the source is updated.
func (s *X509Source) Updated() <-chan struct{} {
return s.watcher.Updated()
}
func (s *X509Source) setX509Context(x509Context *X509Context) {
var svid *x509svid.SVID
if s.picker == nil {
svid = x509Context.DefaultSVID()
} else {
svid = s.picker(x509Context.SVIDs)
}
s.mtx.Lock()
defer s.mtx.Unlock()
s.svid = svid
s.bundles = x509Context.Bundles
}
func (s *X509Source) checkClosed() error {
s.closeMtx.RLock()
defer s.closeMtx.RUnlock()
if s.closed {
return x509sourceErr.New("source is closed")
}
return nil
}