Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix a double lock bug in staging/.../apiserver #84483

Merged
merged 1 commit into from Nov 2, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
22 changes: 22 additions & 0 deletions staging/src/k8s.io/apiserver/plugin/pkg/audit/dynamic/dynamic.go
Expand Up @@ -123,6 +123,7 @@ func NewBackend(c *Config) (audit.Backend, error) {
config: c,
delegates: atomic.Value{},
delegateUpdateMutex: sync.Mutex{},
stopped: false,
webhookClientManager: cm,
recorder: recorder,
}
Expand Down Expand Up @@ -159,6 +160,7 @@ func NewBackend(c *Config) (audit.Backend, error) {
type backend struct {
// delegateUpdateMutex holds an update lock on the delegates
delegateUpdateMutex sync.Mutex
stopped bool
config *Config
delegates atomic.Value
webhookClientManager webhook.ClientManager
Expand Down Expand Up @@ -201,6 +203,11 @@ func (b *backend) Run(stopCh <-chan struct{}) error {
// the primary stopChan to the current delegate map.
func (b *backend) stopAllDelegates() {
b.delegateUpdateMutex.Lock()
defer b.delegateUpdateMutex.Unlock()
zxl381 marked this conversation as resolved.
Show resolved Hide resolved
zxl381 marked this conversation as resolved.
Show resolved Hide resolved
if b.stopped {
return
}
b.stopped = true
for _, d := range b.GetDelegates() {
close(d.stopChan)
}
Expand Down Expand Up @@ -237,6 +244,11 @@ func (b *backend) setDelegates(delegates syncedDelegates) {
func (b *backend) addSink(sink *auditregv1alpha1.AuditSink) {
b.delegateUpdateMutex.Lock()
defer b.delegateUpdateMutex.Unlock()
if b.stopped {
msg := fmt.Sprintf("Could not add audit sink %q uid: %s. Update to all delegates is stopped.", sink.Name, sink.UID)
klog.Error(msg)
return
}
delegates := b.copyDelegates()
if _, ok := delegates[sink.UID]; ok {
klog.Errorf("Audit sink %q uid: %s already exists, could not readd", sink.Name, sink.UID)
Expand All @@ -262,6 +274,11 @@ func (b *backend) addSink(sink *auditregv1alpha1.AuditSink) {
func (b *backend) updateSink(oldSink, newSink *auditregv1alpha1.AuditSink) {
b.delegateUpdateMutex.Lock()
defer b.delegateUpdateMutex.Unlock()
if b.stopped {
msg := fmt.Sprintf("Could not update old audit sink %q to new audit sink %q. Update to all delegates is stopped.", oldSink.Name, newSink.Name)
klog.Error(msg)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since these are private functions, can we make them return an error instead?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are just called from the informer event handlers (1 call for each). I think handling the error here makes sense.

return
}
delegates := b.copyDelegates()
oldDelegate, ok := delegates[oldSink.UID]
if !ok {
Expand Down Expand Up @@ -300,6 +317,11 @@ func (b *backend) updateSink(oldSink, newSink *auditregv1alpha1.AuditSink) {
func (b *backend) deleteSink(sink *auditregv1alpha1.AuditSink) {
b.delegateUpdateMutex.Lock()
defer b.delegateUpdateMutex.Unlock()
if b.stopped {
msg := fmt.Sprintf("Could not delete audit sink %q uid: %s. Update to all delegates is stopped.", sink.Name, sink.UID)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is removing a sink really an error at this point? It wasn't going to get anymore updates anyway? Maybe just return without logging anything?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the purpose of this is just to catch some racy edge conditions while the API server is shutting down. I think we can safely ignore the errors here. Although I suppose we'd want to know if the backend was stopped erroneously... maybe just log it at warning level?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am so sorry guys. I thought I replied two days ago, but I just found that I didn't click commit...
About updateSink(), addSink() and deleteSink(), I really appreciate your discussion, because I am not familiar with how they will be used.
Here is what I did: For updateSink() and addSink(), I kept klog.Error(). For deleteSink(), I used klog.Warning() instead.

klog.Warning(msg)
return
}
delegates := b.copyDelegates()
delegate, ok := delegates[sink.UID]
if !ok {
Expand Down