New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Agent cache manager #233
Agent cache manager #233
Conversation
pkg/agent/agent.go
Outdated
break Drain | ||
} | ||
for err = range a.config.ErrorCh { | ||
a.config.Log.Errorf(err.Error()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When ranging over a channel, the channel must be explicitly closed in order to break the range - otherwise, it will block forever
pkg/agent/agent.go
Outdated
if err != nil { | ||
return err | ||
} | ||
uri := a.config.TrustDomain | ||
uri.Path = "spiffe/cp" | ||
cacheManager := manager.NewManager( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe this should be in cache package? It would then be cache.NewManager
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense will change the package
pkg/agent/agent.go
Outdated
cacheManager := manager.NewManager( | ||
a.Cache, a.serverCerts, | ||
uri.String(), a.config.ServerAddress.String(), | ||
a.config.ErrorCh, a.BaseSVID, a.baseSVIDKey, regEntries, a.config.Log) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should use a config struct for all these
pkg/agent/agent.go
Outdated
a.Cache, a.serverCerts, | ||
uri.String(), a.config.ServerAddress.String(), | ||
a.config.ErrorCh, a.BaseSVID, a.baseSVIDKey, regEntries, a.config.Log) | ||
go cacheManager.UpdateCache(a.stop) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@boz had once recommended not passing channels around (particularly between packages), and after having played around with them a bit I tend to agree. Here's the pattern I used the last time I ran into this:
spire/pkg/server/endpoint/endpoint.go
Lines 178 to 210 in 18eb814
func (e *endpoint) listenAndServe() error { | |
grpcListener, err := net.Listen(e.grpcAddr.Network(), e.grpcAddr.String()) | |
if err != nil { | |
return err | |
} | |
errChan := make(chan error) | |
go func() { | |
errChan <- e.grpcServer.Serve(grpcListener) | |
}() | |
go func() { | |
errChan <- e.httpServer.ListenAndServe() | |
}() | |
// Differentiate between shutdown and an actual error condition | |
err = <-errChan | |
if e.isCleanShutdown(err) { | |
// Ensure that the second server shutdown cleanly | |
err = <-errChan | |
if e.isCleanShutdown(err) { | |
return nil | |
} | |
return err | |
} | |
e.grpcServer.Stop() | |
_ = e.httpServer.Close() | |
_ = <-errChan | |
return err | |
} |
And then on the consuming side:
Line 153 in 18eb814
go func() { server.Config.ErrorCh <- server.endpoints.ListenAndServe() }() |
I have found other common libraries to have similar block-forever behavior (like net/http server and grpc server)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Passing in a channel to signal when to stop isn't all that bad here -- it's similar to passing around a context.Context
and having things listen to Done()
-- but it gets pretty messy if it's not just closed to signal the shutdown.
If you want to continue using a channel, UpdateCache()
should take a <-chan struct{}
to ensure that the signaling only goes one way.
With that said, I think the general approach to these things in this project needs work. Yes, as @evan2645 said, things like Config.ErrorCh
are red flags.
Try decoupling the objects - give them all a common interface of something along the lines of:
interface AsyncObject {
Shutdown() // signals shutdown, internally the `context.CancelFunc` is called.
Done() <- chan struct{} // channel is readable when all gofuncs have returned
Err() error // blocks until done and then returns error if one occurred.
}
Use context.Context
for shutting down - when an object is created it can use context.WithCancel()
to derive a new one for itself and also have a context.CancelFunc
to use for it to shutdown itself.
For composite objects:
- wait for any of the child objects to be
Done()
or for your context to beDone()
. Shutdown()
all children.- Wait for all children to be
Done()
. - Derive an error from the
Err()
of the children. - return from main loop and close
Done()
channel.
With something like this then an object only needs to manage the lifecycle of its direct descendants and each object is only coupled to its parent via the context.Context
that it was given.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @boz for explaining it so well. I will make the necessary changes to use context.Context
instead of a stop channel and implement the decoupling pattern you explain here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Welp, looks like I stand corrected :) blocking forever can be dangerous - I just hit this one in the very example I linked previously golang/go#20239
pkg/agent/cache/cache.go
Outdated
@@ -31,6 +31,7 @@ type Cache interface { | |||
Entry([]*common.Selector) (entry []CacheEntry) | |||
SetEntry(cacheEntry CacheEntry) | |||
DeleteEntry([]*common.Selector) (deleted bool) | |||
GetEntries() map[string][]CacheEntry |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Entries?
@@ -87,7 +87,7 @@ service Node { | |||
/** Get Workload, Node Agent certs and CA trust bundles. Also used for rotation | |||
Base Node SVID or the Registered Node SVID used for this call) | |||
List can be empty to allow Node Agent cache refresh). */ | |||
rpc FetchSVID(FetchSVIDRequest) returns (FetchSVIDResponse); | |||
rpc FetchSVID(stream FetchSVIDRequest) returns (stream FetchSVIDResponse); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fancy :)
test/fixture/certs/baseSVID.pem
Outdated
jzQOkQ9Fgsxyb3MSBkdYD15zZxv8ZQ6LQRWKcVRqre3ym8TIiPHpTKwh5bbcraz6 | ||
xTNvTPLU+n+ixjXf+Uu7SYaN9WpH8UPFH0pmi3z+z3mysgh+glW43AUUUTUqaS1m | ||
U3Y3FCpyg28KaxZQbDp/ucvm2KzFA9i03HtwmA== | ||
-----END CERTIFICATE----- |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
an svid.pem
has been checked in that you can use or replace
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea the rebase pulled in two other pem. will consolidate.
test/fixture/certs/blogCSR.pem
Outdated
VFODJfDiU1hDrpDTxW2uWKS2m0Dd6yyH5ueudOavLxqbr/LosmMAq335zSxK72tO | ||
6TiszGCKZLnjFOzedZ1feg4mx+is8fHRz/IbWPzVbCKCE7NtujviV/ipi37J0JhJ | ||
0kLfE98TrrIKjTibJpINiuv7FOo5zSbtemOU5mFukmlWjApSsX0= | ||
-----END CERTIFICATE REQUEST----- |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this different than the existing blog_csr.pem
fixture?
test/fixture/certs/blogSVID.pem
Outdated
Q47tsVSSs9OzKp2++krbsAhDo4EgUdK+b3R9vE+Ocq8h6XHZ6O7+7QGwlx+jtDL2 | ||
AVd09l/tNnGY0ICp0JgZsY0yhrbWtigzSRpZzU8CWSftFk3A/9VKTLvwtNGTM0Xv | ||
paea0g== | ||
-----END CERTIFICATE----- |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here for blog_cert.pem
?
"github.com/spiffe/spire/proto/common" | ||
) | ||
|
||
func GetRegistrationEntries() []*common.RegistrationEntry { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe this should go in test/util
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It could also read the content from https://github.com/spiffe/spire/blob/master/test/fixture/registration/good.json
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like it needs some work on lifecycle management and error handling.
Let me know if you want some examples for lifecycle management options.
pkg/agent/agent.go
Outdated
@@ -90,6 +91,7 @@ func New(c *Config) *Agent { | |||
// and then blocks on the main event loop. | |||
func (a *Agent) Run() error { | |||
a.Cache = cache.NewCache() | |||
a.stop = make(chan struct{}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason why these are initialized here? What happens if Run()
is called twice? It's generally preferable to have as little mutation after construction as possible IMO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense. I will move these initializations to the newAgent.
pkg/agent/agent.go
Outdated
a.Cache, a.serverCerts, | ||
uri.String(), a.config.ServerAddress.String(), | ||
a.config.ErrorCh, a.BaseSVID, a.baseSVIDKey, regEntries, a.config.Log) | ||
go cacheManager.UpdateCache(a.stop) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Passing in a channel to signal when to stop isn't all that bad here -- it's similar to passing around a context.Context
and having things listen to Done()
-- but it gets pretty messy if it's not just closed to signal the shutdown.
If you want to continue using a channel, UpdateCache()
should take a <-chan struct{}
to ensure that the signaling only goes one way.
With that said, I think the general approach to these things in this project needs work. Yes, as @evan2645 said, things like Config.ErrorCh
are red flags.
Try decoupling the objects - give them all a common interface of something along the lines of:
interface AsyncObject {
Shutdown() // signals shutdown, internally the `context.CancelFunc` is called.
Done() <- chan struct{} // channel is readable when all gofuncs have returned
Err() error // blocks until done and then returns error if one occurred.
}
Use context.Context
for shutting down - when an object is created it can use context.WithCancel()
to derive a new one for itself and also have a context.CancelFunc
to use for it to shutdown itself.
For composite objects:
- wait for any of the child objects to be
Done()
or for your context to beDone()
. Shutdown()
all children.- Wait for all children to be
Done()
. - Derive an error from the
Err()
of the children. - return from main loop and close
Done()
channel.
With something like this then an object only needs to manage the lifecycle of its direct descendants and each object is only coupled to its parent via the context.Context
that it was given.
pkg/agent/cache/manager/manager.go
Outdated
cert, err := x509.ParseCertificate(baseSVID) | ||
if err != nil { | ||
errorCh <- err | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you want to continue if there's an error?
Seems like just returning (manager,error)
would be simpler.
Also, returning an un-exported struct (manager
) from an exported function NewManager
is kinda iffy.
pkg/agent/cache/manager/manager.go
Outdated
m.entryRequestCh = make(chan map[string][]EntryRequest) | ||
m.regEntriesCh = make(chan []*proto.RegistrationEntry) | ||
m.errorCh = make(chan error) | ||
m.CacheEntryCh = make(chan cache.CacheEntry) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this stuff should be modifying m
. Anything stopping UpdateCache()
from being called twice?
Why is CacheEntryCh
exported?
pkg/agent/cache/manager/manager.go
Outdated
m.errorCh <- err | ||
} | ||
m.log.Debug("Spawning FetchID for", "entryRequests:", entryRequests) | ||
go m.fetchSVID(entryRequests, node.NewNodeClient(conn), &wg) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you really want to continue if there's an error? Will node.NewNodeClient(conn)
handle conn == nil
gracefully?
pkg/agent/cache/manager/manager.go
Outdated
select { | ||
case regEntries := <-m.regEntriesCh: | ||
|
||
EntryRequestMap := make(map[string][]EntryRequest) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this capitalized?
pkg/agent/cache/manager/manager.go
Outdated
for { | ||
resp, err := stream.Recv() | ||
m.regEntriesCh <- resp.SvidUpdate.RegistrationEntries | ||
if err == io.EOF { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will conn
, stream
, or resp
be nil
if there is an error?
pkg/agent/cache/manager/manager.go
Outdated
req.entry.SVID = svid | ||
req.entry.Expiry = cert.NotAfter | ||
m.log.Debug("Sending to CacheEntry Channel: ", "req: ", req) | ||
m.CacheEntryCh <- req.entry |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will stream
, resp
, or cert
be nil
if there is an error?
pkg/agent/cache/cache.go
Outdated
func (c *cacheImpl) Entry(selectors []*common.Selector) (entry []CacheEntry) { | ||
key := deriveCacheKey(selectors) | ||
c.m.Lock() | ||
defer c.m.Unlock() | ||
return c.cache[key] | ||
if entry, found:=c.cache[key]; found{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't look gofmt
'd
pkg/agent/agent.go
Outdated
if err != nil { | ||
return err | ||
} | ||
uri := a.config.TrustDomain | ||
uri.Path = "spiffe/cp" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change will be visible to everything else that uses the same Config
that was passed into New()
. Unexpected behavior at the very least, IMO.
conf/agent/agent.conf
Outdated
PluginDir = "conf/agent/plugin" | ||
ServerAddress = "127.0.0.1" | ||
ServerPort = "8081" | ||
SocketPath ="/tmp/agent.sock" | ||
TrustBundlePath = "conf/agent/dummy_root_ca.crt" | ||
TrustDomain = "example.org" | ||
JoinToken = "f5e78312-89ac-496a-8c7e-5940a9d8d8f7" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want this value here or do we prefer an empty one?
"github.com/spiffe/spire/proto/common" | ||
) | ||
|
||
func GetRegistrationEntries() []*common.RegistrationEntry { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It could also read the content from https://github.com/spiffe/spire/blob/master/test/fixture/registration/good.json
- Updates cache with new and expired entries init tests update node mock package fixing test FetchSVID WIP PR comments
eb70d07
to
2db10cd
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks a lot better.
Some deadlock issues to work through but getting closer.
cmd/spire-agent/cli/command/run.go
Outdated
select { | ||
case <-signalCh: | ||
ch <- stop | ||
cancel() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A trick I've been doing to allow both graceful shutdown and easy "really, f--ing die" is to stop handling signals after the first one. See here.
It might not be too applicable in a non-interactive program like this server, but mentioning for food for thought. It can make dev cycles easier (although Ctrl-\ is usually fine too).
entries, err = r.parseFile(config.Path) | ||
fmt.Println(entries) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you want "in parse reg"
to be printed? What about entries
?
cmd/spire-agent/cli/command/run.go
Outdated
DataDir: defaultDataDir, | ||
PluginDir: defaultPluginDir, | ||
ErrorCh: errCh, | ||
//ShutdownCh: shutdownCh, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
delete commented code.
cmd/spire-agent/cli/command/run.go
Outdated
shutdownCh := make(chan struct{}) | ||
|
||
//shutdownCh := make(chan struct{}) | ||
// |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
delete commented code
cmd/spire-agent/cli/command/run.go
Outdated
@@ -298,15 +300,14 @@ func stringDefault(option string, defaultValue string) string { | |||
return option | |||
} | |||
|
|||
func signalListener(ch chan struct{}) { | |||
func signalListener(cancel context.CancelFunc) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this the only place that the context can be cancelled? If do the WithCancel()
in here. Otherwise it's not clear that the context can be cancelled elsewhere and the following select
will never stop blocking.
pkg/agent/cache/manager.go
Outdated
} | ||
if len(entryRequestMap) != 0 { | ||
m.log.Debug("Fetching Expired Entries", "len(entryRequestMap):", len(entryRequestMap), "entryMap", entryRequestMap) | ||
m.entryRequestCh <- entryRequestMap |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
deadlock here. quickfix similar to above.
pkg/agent/cache/manager.go
Outdated
|
||
} | ||
if len(entryRequestMap) != 0 { | ||
m.entryRequestCh <- entryRequestMap |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
deadlock here. quickfix similar to above.
pkg/agent/cache/manager.go
Outdated
csr, err := util.MakeCSR(privateKey, regEntry.SpiffeId) | ||
if err != nil { | ||
m.Shutdown(err) | ||
break |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these break
s will trigger the deadlock; Shutdown()
cancels the context and then m.entryRequestCh <- entryRequestMap
is executed after the for
loop.
pkg/agent/agent.go
Outdated
ctx, cancel := context.WithCancel(ctx) | ||
return &Agent{config: c, | ||
Catalog: catalog.New(config), | ||
ctx: ctx, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use this context for grpc calls where appropriate.
pkg/agent/agent.go
Outdated
@@ -510,35 +460,10 @@ func (a *Agent) getNodeAPIClientConn(mtls bool, svid []byte, key *ecdsa.PrivateK | |||
|
|||
dialCreds := grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)) | |||
|
|||
conn, err := grpc.Dial(a.config.ServerAddress.String(), dialCreds) | |||
conn, err = grpc.Dial(a.config.ServerAddress.String(), dialCreds) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
might want to incorporate the context into this Dial
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Few minor comments to add to boz's far more important ones :)
pkg/agent/agent.go
Outdated
config := &catalog.Config{ | ||
ConfigDir: c.PluginDir, | ||
Log: c.Log.WithField("subsystem_name", "catalog"), | ||
} | ||
return &Agent{config: c, Catalog: catalog.New(config)} | ||
ctx, cancel := context.WithCancel(ctx) | ||
return &Agent{config: c, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style nit: newline before config
pkg/agent/agent.go
Outdated
BaseSVID: a.BaseSVID, | ||
BaseSVIDKey: a.baseSVIDKey, | ||
BaseRegEntries: regEntries, | ||
Logger: a.config.Log} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style nit: newline before }
pkg/agent/agent.go
Outdated
|
||
a.CacheMgr, err = cache.NewManager(a.ctx, cmgrConfig) | ||
|
||
go a.CacheMgr.Init() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed offline implications related to golang/go#20239
Suggest Init
call to return once the cache manager is running
pkg/agent/agent.go
Outdated
<-a.CacheMgr.Done() | ||
a.config.Log.Info("Cache Update Stopped") | ||
if a.CacheMgr.Err() != nil { | ||
a.config.Log.Warning("error:", a.CacheMgr.Err()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we can skip the error:
prefix since we're logging it as warn level. If you wanna keep it, then maybe add a space after it
pkg/agent/cache/cache.go
Outdated
if entry.RegistrationEntry.SpiffeId == cacheEntry.RegistrationEntry.SpiffeId { | ||
copy(c.cache[key][i:], c.cache[key][i+1:]) | ||
c.cache[key][len(c.cache[key])-1] = CacheEntry{} | ||
c.cache[key] = c.cache[key][:len(c.cache[key])-1] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
non-blocking: this is pretty hard to read. maybe a comment or something will help
pkg/agent/cache/manager.go
Outdated
Cache() Cache | ||
//fetchSVID(requests []EntryRequest, nodeClient node.NodeClient, wg *sync.WaitGroup) | ||
//getGRPCConn(svid []byte, key *ecdsa.PrivateKey) (*grpc.ClientConn, error) | ||
//expiredCacheEntryHandler(time.Duration, *sync.WaitGroup) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we remove these?
pkg/agent/cache/manager.go
Outdated
m.Shutdown(err) | ||
break | ||
} | ||
m.log.Debug("Spawning FetchID for", "entryRequests:", entryRequests) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need spaces at the end of these strings? Also, is entryRequests
cleanly printable? I saw some hex debug output during your demo yesterday. Another option is to use structured logging. Similar comment about many of the other debug statements in here
pkg/agent/cache/manager.go
Outdated
go func() { | ||
for { | ||
resp, err := stream.Recv() | ||
// m.log.Debug("regEntries:", resp.SvidUpdate.RegistrationEntries) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we remove this?
pkg/agent/cache/manager.go
Outdated
|
||
if vanityRecord != nil { | ||
m.log.Debug("Fetching new reg Entries for vanity spiffeid:", vanityRecord[0].RegistrationEntry.SpiffeId) | ||
m.log.Debug("len(entryRequestMap):", len(entryRequestMap)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can remove some of these debug lines? I assume they were there for... debugging :) but some of them will be hard to grok for folks who are not intimately familiar with this bit of the code. Also feels like the volume of debug statements from cache manager will drown out lines from other areas in the agent
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking great.
Sorry I missed the Shutdown()
thing earlier.
pkg/agent/cache/manager.go
Outdated
|
||
func (m *manager) Shutdown(err error) { | ||
m.log.Debug("Shutting Down Cache Manager ", err) | ||
m.reason = err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a race condition here because m.Shutdown()
is called from multiple threads.
Also, every Shutdown(err)
will overwrite the previous reason
. The initial error is generally more useful imho.
A quick fix/hack would be to wrap it in a sync.Once
:
m.once.Do(func(){
m.reason = err
})
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
created a errch channel, the errch receiver would drain all errors and only call shutdown with the first error.
err := stream.Send(&node.FetchSVIDRequest{Csrs: append([][]byte{}, req.CSR)}) | ||
|
||
if err != nil { | ||
m.Shutdown(err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
stream.Close()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shutdown(err) can be called twice in the event that stream.CloseSend() in the defer returns non-nil error. Is that ok? Same thing with the Shutdown calls below
|
||
resp, err := stream.Recv() | ||
if err != nil { | ||
m.Shutdown(err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
stream.Close()
select { | ||
case m.cacheEntryCh <- req.entry: | ||
case <-m.ctx.Done(): | ||
m.Shutdown(m.ctx.Err()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
stream.Close()
pkg/agent/cache/manager.go
Outdated
case <-m.ctx.Done(): | ||
m.Shutdown(m.ctx.Err()) | ||
stream.CloseSend() | ||
close(strmch) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use defer close(strmch)
as a prelude to this fn so you don't have to worry about it.
select { | ||
case m.entryRequestCh <- entryRequestMap: | ||
case <-m.ctx.Done(): | ||
m.Shutdown(m.ctx.Err()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return
?
select { | ||
case m.entryRequestCh <- entryRequestMap: | ||
case <-m.ctx.Done(): | ||
m.Shutdown(m.ctx.Err()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return
?
pkg/agent/cache/manager.go
Outdated
m.log.Debug(resp) | ||
} | ||
}() | ||
stream.CloseSend() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
curious: what does CloseSend()
get you over Close()
? Using a single defer stream.Close()
is more readable to me but maybe I'm missing something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the GRPC ClientStream
interface used here for bi-directional stream only support CloseSend()
which closes the send side so if there are more messages from the other direction it would still receive it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, thanks. Maybe just defer stream.CloseSend()
then?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
calling defer stream.CloseSend()
would block the stream from closing and creates a deadlock..the go func would never terminate
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure I understand, but ok.
As of now it's doing
// ...
go func(){
// ...
stream.CloseSend()
// ...
}()
stream.CloseSend()
// ...
which doesn't make much sense to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so stream.Recv()
returns err=io.EOF
to signal the end of msgs on the stream which would never happen without sending stream.CloseSend()
In our go func
we wait till we receive io.EOF so with defer we would get blocked waiting on io.EOF
Also I removed the stream.CloseSend()
inside the go func()
pkg/agent/agent.go
Outdated
@@ -118,8 +119,12 @@ func (a *Agent) Run() error { | |||
for { | |||
select { | |||
case err = <-a.config.ErrorCh: | |||
e := a.Shutdown() | |||
if e != nil { | |||
return e |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Feels like we should log this instead so we still get to know the original error
pkg/agent/agent.go
Outdated
@@ -288,10 +316,6 @@ func (a *Agent) attest() (map[string]*common.RegistrationEntry, error) { | |||
} | |||
attestor := plugins[0] | |||
|
|||
attestorInfo := a.Catalog.Find(attestor.(common_catalog.Plugin)) | |||
a.config.Log.Info("Preparing to attest this node against ", a.config.ServerAddress.String(), | |||
" using strategy '", attestorInfo.Config.PluginName, "'") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was this intentional? It was recently committed to fix #213
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not at all intentional..I think this was some conflict resolution issue..will fix it
pkg/agent/cache/manager.go
Outdated
m.Shutdown(err) | ||
break | ||
} | ||
m.log.Debug("Generating CSR:", " spiffeId:", regEntry.SpiffeId, " parentId:", regEntry.ParentId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
non-blocking: This should probably be formally structured
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
whack-a-deadlock
pkg/agent/cache/manager.go
Outdated
conn, err := m.getGRPCConn(svid, key) | ||
if err != nil { | ||
m.errch <- err | ||
break |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the wg.Add(1)
will never be wg.Done()
for this iteration if you break
here. for clarity, try to move the wg.Add(1)
as close to the go fn()
as possible.
pkg/agent/cache/manager.go
Outdated
} | ||
conn, err := m.getGRPCConn(svid, key) | ||
if err != nil { | ||
m.errch <- err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
m.errch
is unbuffered; this will block forever.
pkg/agent/cache/manager.go
Outdated
defer wg.Done() | ||
stream, err := nodeClient.FetchSVID(context.Background()) | ||
if err != nil { | ||
m.errch <- err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this will deadlock if Init()
is blocking on wg.Wait()
pkg/agent/cache/manager.go
Outdated
} | ||
defer func() { | ||
if err := stream.CloseSend(); err != nil { | ||
m.errch <- err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto - deadlock.
pkg/agent/cache/manager.go
Outdated
err := stream.Send(&node.FetchSVIDRequest{Csrs: append([][]byte{}, req.CSR)}) | ||
|
||
if err != nil { | ||
m.errch <- err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
deadlock.
pkg/agent/cache/manager.go
Outdated
select { | ||
case m.entryRequestCh <- entryRequestMap: | ||
case <-m.ctx.Done(): | ||
m.errch <- m.ctx.Err() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
deadlock.
pkg/agent/cache/manager.go
Outdated
if !processed { | ||
privateKey, err := ecdsa.GenerateKey(elliptic.P521(), rand.Reader) | ||
if err != nil { | ||
m.errch <- err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
deadlock.
pkg/agent/cache/manager.go
Outdated
m.log.Debugf("Generating CSR for spiffeId: %s parentId: %s" ,regEntry.SpiffeId ,regEntry.ParentId) | ||
csr, err := util.MakeCSR(privateKey, regEntry.SpiffeId) | ||
if err != nil { | ||
m.errch <- err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
deadlock.
pkg/agent/cache/manager.go
Outdated
select { | ||
case m.entryRequestCh <- entryRequestMap: | ||
case <-m.ctx.Done(): | ||
m.errch <- m.ctx.Err() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
deadlock.
pkg/agent/cache/manager.go
Outdated
|
||
case err := <-m.errch: | ||
m.Shutdown(err) | ||
for err := range m.errch { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
m.errch
is not closed here; this will block forever.
pkg/agent/cache/manager.go
Outdated
select { | ||
case m.cacheEntryCh <- req.entry: | ||
case <-m.ctx.Done(): | ||
m.errch <- m.ctx.Err() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
deadlock.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@boz would it make sense to wait on the m.errch
in a separate go func()
to avoid these deadlocks:
go func() {
err := <-m.errch
m.Shutdown(err)
for err := range m.errch {
m.log.Debug(err)
}}()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No.
- what if nothing is ever sent to
m.errch
? m.errch
is not closed before thefor
loop so that will block forever.
Don't use m.errch
. Use Shutdown(error)
. Log in that method. Populate m.reason
with the error that gets passed the first time Shutdown()
is called using sync.Once
like I mentioned earlier.
Alternatively, look at tomb or go-lifecycle for other pattern options.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok let me go back to using Shutdown(error)
with sync.Once
. Thank you.
🎊 |
We require contributors to sign our Contributor License Agreement. If you are covered by a Organizational CLA, the email address you |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks great!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm 🚢 🇮🇹
|
||
a.CacheMgr.Init() | ||
go func() { | ||
<-a.CacheMgr.Done() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this unblock on an error condition too or only a clean shutdown? Should we shutdown the agent?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes CacheMgr.Done()
will return when doneCh
is closed which happens when the Init() method returns
return m.doneCh | ||
} | ||
func (m *manager) Err() error { | ||
<-m.Done() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Above near my other comment, we wait on the Done channel in agent, then once we receive off of it, we call Err() to see if there was an error... but Done channel is being read here again. Does it get closed somewhere? If it's closed we'll read a zero value and continue, but if it's not it will block
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes it gets closed when the main loop returns..see comment above
err := stream.Send(&node.FetchSVIDRequest{Csrs: append([][]byte{}, req.CSR)}) | ||
|
||
if err != nil { | ||
m.Shutdown(err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shutdown(err) can be called twice in the event that stream.CloseSend() in the defer returns non-nil error. Is that ok? Same thing with the Shutdown calls below
} | ||
} | ||
}() | ||
<-strmch |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like here we create strmch but never write to it, only close it once the go func is done. Do we need to have a go func here in that case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All that shutdown is doing is setting the error and then calling cancel function..subsequent calls to cancel do nothing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm good point I dont think its necessary to use a go routine here but not really sure if it hurts to have one.
agent cache manager:
renews expired SVID in cache
updates cache with new registration entry SVIDs
changed FetchSVID endpoint to use grpc streams
#TODO Increase test coverage for cache manager(48.9%)
fixes #233