New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix race condition in pluginWatcher #93622
Fix race condition in pluginWatcher #93622
Conversation
/retest |
go func() { | ||
sourcesReady := config.NewSourcesReady(func(_ sets.String) bool { return true }) | ||
<-pluginServing | ||
pluginManager.Run(sourcesReady, stopChan) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how is this different than simply moving this goroutine after the p.Serve call where the close(pluginServing)
call is?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good point! Updated as you said.
e5cfa79
to
86c5849
Compare
/retest |
/cc @msau42 |
/lgtm |
@@ -140,13 +137,18 @@ func TestPluginRegistration(t *testing.T) { | |||
p := pluginwatcher.NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...) | |||
require.NoError(t, p.Serve("v1beta1", "v1beta2")) | |||
|
|||
// Ensure example plugin is started before plugin manager, so that the socket is present |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/hold
If fsnotify is flaky and can miss events, then that seems like a product issue, where we need to design the plugin manager to be able to handle that?
Having the test start the plugin before the plugin manager seems like a workaround to get the test to pass, but masks potentially real problems. In a real system, the plugin is usually started after the plugin manager.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think what was racing here was not the filesystem and pluginmanager, but the observation of the plugin and the registration of the handler
in real life, we add the handlers that are sent the registration events before calling pluginManager.Run()
kubernetes/pkg/kubelet/kubelet.go
Lines 1301 to 1307 in ae9f761
// Adding Registration Callback function for CSI Driver | |
kl.pluginManager.AddHandler(pluginwatcherapi.CSIPlugin, plugincache.PluginHandler(csi.PluginHandler)) | |
// Adding Registration Callback function for Device Manager | |
kl.pluginManager.AddHandler(pluginwatcherapi.DevicePlugin, kl.containerManager.GetPluginRegistrationHandler()) | |
// Start the plugin manager | |
klog.V(4).Infof("starting plugin manager") | |
go kl.pluginManager.Run(kl.sourcesReady, wait.NeverStop) |
would moving this above the goroutine where we call pluginManager.Run(sourcesReady, stopChan)
be sufficient?
fakeHandler := newFakePluginHandler()
pluginManager.AddHandler(registerapi.DevicePlugin, fakeHandler)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, just saw in the PR description:
Please notice when the test fails, the plugin watcher could neither find any sockets in the socket dir nor receive "CREATE" event later. I suspect fsnotify is not extremely reliable and might miss events in some cases.
As a result, I decide to make sure the example plugin server is started before plugin manager, so that the socket always exists when the plugin manager is started. I have run the test 100 times locally after this change, they all pass.
that's troubling, and I agree with @msau42 that if that is the issue we need a real fix, not to mask it in a test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, let me try creating a minimal reproducible example first.
/lgtm cancel |
require.NoError(t, p.Serve("v1beta1", "v1beta2")) | ||
|
||
// Verify that the plugin is registered | ||
waitForRegistration(t, fakeHandler) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
waitForRegistration only verifies a single plugin was registered, so after the first one at depth 0, it is not checking anything further... we need to pass in the particular path we're expecting to be registered, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't need to pass the particular path, we just need to reset the handler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't need to pass the particular path, we just need to reset the handler.
it would be better to check the specific path, otherwise a late/duplicate event from an early plugin could make it look like a later one passed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually a late event would not affect the test, as the test would fail immediately if registration timeout, and I think duplicate event rarely occurs on a local network. But I think this check is good to have.
the change in just a couple comments on the test, and I wanted to check if you ran the whole pluginmanager package with the |
|
now that our fake handler is handling multiple plugins, I think we should record the actual order and plugin names we encounter, like this (otherwise, the last event in will stomp evidence of unexpected calls from other plugins): diff --git a/pkg/kubelet/pluginmanager/plugin_manager_test.go b/pkg/kubelet/pluginmanager/plugin_manager_test.go
index d81f0d42e83..14dd5387e12 100644
--- a/pkg/kubelet/pluginmanager/plugin_manager_test.go
+++ b/pkg/kubelet/pluginmanager/plugin_manager_test.go
@@ -21,6 +21,7 @@ import (
"io/ioutil"
"os"
"path/filepath"
+ "reflect"
"strconv"
"sync"
"testing"
@@ -42,26 +43,19 @@ var (
)
type fakePluginHandler struct {
- validatePluginCalled bool
- registerPluginCalled bool
- deregisterPluginCalled bool
- registerEndpoint string
+ events []string
sync.RWMutex
}
func newFakePluginHandler() *fakePluginHandler {
- return &fakePluginHandler{
- validatePluginCalled: false,
- registerPluginCalled: false,
- deregisterPluginCalled: false,
- }
+ return &fakePluginHandler{}
}
// ValidatePlugin is a fake method
func (f *fakePluginHandler) ValidatePlugin(pluginName string, endpoint string, versions []string) error {
f.Lock()
defer f.Unlock()
- f.validatePluginCalled = true
+ f.events = append(f.events, fmt.Sprintf("validate %s", pluginName))
return nil
}
@@ -69,8 +63,7 @@ func (f *fakePluginHandler) ValidatePlugin(pluginName string, endpoint string, v
func (f *fakePluginHandler) RegisterPlugin(pluginName, endpoint string, versions []string) error {
f.Lock()
defer f.Unlock()
- f.registerPluginCalled = true
- f.registerEndpoint = endpoint
+ f.events = append(f.events, fmt.Sprintf("register %s", pluginName))
return nil
}
@@ -78,11 +71,13 @@ func (f *fakePluginHandler) RegisterPlugin(pluginName, endpoint string, versions
func (f *fakePluginHandler) DeRegisterPlugin(pluginName string) {
f.Lock()
defer f.Unlock()
- f.deregisterPluginCalled = true
+ f.events = append(f.events, fmt.Sprintf("deregister %s", pluginName))
}
func (f *fakePluginHandler) Reset() {
- *f = fakePluginHandler{}
+ f.Lock()
+ defer f.Unlock()
+ f.events = nil
}
func init() {
@@ -99,15 +94,18 @@ func cleanup(t *testing.T) {
os.MkdirAll(socketDir, 0755)
}
-func waitForRegistration(t *testing.T, fakePluginHandler *fakePluginHandler, expectedEndpoint string) {
+func waitForRegistration(t *testing.T, fakePluginHandler *fakePluginHandler, pluginName string) {
err := retryWithExponentialBackOff(
- 500*time.Millisecond,
+ 100*time.Millisecond,
func() (bool, error) {
fakePluginHandler.Lock()
defer fakePluginHandler.Unlock()
- return fakePluginHandler.validatePluginCalled &&
- fakePluginHandler.registerPluginCalled &&
- fakePluginHandler.registerEndpoint == expectedEndpoint, nil
+ expected := []string{"validate " + pluginName, "register " + pluginName}
+ if reflect.DeepEqual(fakePluginHandler.events, expected) {
+ return true, nil
+ }
+ t.Logf("expected %#v, got %#v, will retry", expected, fakePluginHandler.events)
+ return false, nil
},
)
if err != nil {
@@ -160,7 +158,7 @@ func TestPluginRegistration(t *testing.T) {
require.NoError(t, p.Serve("v1beta1", "v1beta2"))
// Verify that the plugin is registered
- waitForRegistration(t, fakeHandler, socketPath)
+ waitForRegistration(t, fakeHandler, pluginName)
}
} (I also shortened the initial backoff time to 100ms which shortened the test from ~8 seconds to ~4) stress seems happy with it as well:
|
/hold |
Signed-off-by: knight42 <anonymousknight96@gmail.com>
6b6f510
to
de46e81
Compare
@liggitt Please take another look |
/lgtm would like @msau42 to take a look as well before removing the hold |
/retest |
/assign @saad-ali |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/lgtm
/approve
/hold cancel
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: dashpole, knight42, saad-ali The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
/retest |
Signed-off-by: knight42 anonymousknight96@gmail.com
What type of PR is this?
/kind flake
What this PR does / why we need it:
Which issue(s) this PR fixes:
Part of #93605
xref: #93605 (comment)
Special notes for your reviewer:
I add some logging with the following patch:
and find out that if the test fails, the output would be something like
while if the test passes, the output would be sth like
Or
EDIT:
See also #93622 (comment)
I think the root cause is more likely to be concurrently creating the socket and adding the plugin dir to
w.fsWatcher
in the test. I guess the execution order might be:walkFn
yet https://github.com/golang/go/blob/6f08e89ec3280bf6577c2bdb01243cbeeb1a259d/src/path/filepath/path.go#L358-L364, so plugin watcher would not find the socket laterwalkFn
is invoked and the plugin dir is added to the watcher, so the watcher would not receive any events later.I decided to watch the socket dir before traversing it.
Please notice when the test fails, the plugin watcher could neither find any sockets in the socket dir nor receive "CREATE" event later.
I suspectfsnotify
is not extremely reliable and might miss events in some cases.As a result, I decide to make sure the example plugin server is started before plugin manager, so that the socket always exists when the plugin manager is started. I have run the test 100 times locally after this change, they all pass.Does this PR introduce a user-facing change?:
Additional documentation e.g., KEPs (Kubernetes Enhancement Proposals), usage docs, etc.: