Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send events before adding watchers in traversePluginDir #75110

Merged
merged 2 commits into from
Apr 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/kubelet/util/pluginwatcher/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ go_test(
embed = [":go_default_library"],
deps = [
"//pkg/kubelet/apis/pluginregistration/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/github.com/stretchr/testify/require:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
Expand Down
72 changes: 33 additions & 39 deletions pkg/kubelet/util/pluginwatcher/plugin_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,31 +102,39 @@ func (w *Watcher) Start() error {
}
w.fsWatcher = fsWatcher

// Traverse plugin dir and add filesystem watchers before starting the plugin processing goroutine.
if err := w.traversePluginDir(w.path); err != nil {
w.Stop()
return fmt.Errorf("failed to traverse plugin socket path %q, err: %v", w.path, err)
}

// Traverse deprecated plugin dir, if specified.
if len(w.deprecatedPath) != 0 {
if err := w.traversePluginDir(w.deprecatedPath); err != nil {
w.Stop()
return fmt.Errorf("failed to traverse deprecated plugin socket path %q, err: %v", w.deprecatedPath, err)
}
}

w.wg.Add(1)
go func(fsWatcher *fsnotify.Watcher) {
defer w.wg.Done()

for {
select {
case event := <-fsWatcher.Events:
//TODO: Handle errors by taking corrective measures

w.wg.Add(1)
func() {
defer w.wg.Done()

if event.Op&fsnotify.Create == fsnotify.Create {
err := w.handleCreateEvent(event)
if err != nil {
klog.Errorf("error %v when handling create event: %s", err, event)
}
} else if event.Op&fsnotify.Remove == fsnotify.Remove {
err := w.handleDeleteEvent(event)
if err != nil {
klog.Errorf("error %v when handling delete event: %s", err, event)
}
if event.Op&fsnotify.Create == fsnotify.Create {
err := w.handleCreateEvent(event)
if err != nil {
klog.Errorf("error %v when handling create event: %s", err, event)
}
return
}()
} else if event.Op&fsnotify.Remove == fsnotify.Remove {
err := w.handleDeleteEvent(event)
if err != nil {
klog.Errorf("error %v when handling delete event: %s", err, event)
}
}
continue
case err := <-fsWatcher.Errors:
if err != nil {
Expand All @@ -139,20 +147,6 @@ func (w *Watcher) Start() error {
}
}(fsWatcher)

// Traverse plugin dir after starting the plugin processing goroutine
if err := w.traversePluginDir(w.path); err != nil {
w.Stop()
return fmt.Errorf("failed to traverse plugin socket path %q, err: %v", w.path, err)
}

// Traverse deprecated plugin dir, if specified.
if len(w.deprecatedPath) != 0 {
if err := w.traversePluginDir(w.deprecatedPath); err != nil {
w.Stop()
return fmt.Errorf("failed to traverse deprecated plugin socket path %q, err: %v", w.deprecatedPath, err)
}
}

return nil
}

Expand Down Expand Up @@ -211,14 +205,14 @@ func (w *Watcher) traversePluginDir(dir string) error {
return fmt.Errorf("failed to watch %s, err: %v", path, err)
}
case mode&os.ModeSocket != 0:
w.wg.Add(1)
go func() {
bertinatto marked this conversation as resolved.
Show resolved Hide resolved
defer w.wg.Done()
w.fsWatcher.Events <- fsnotify.Event{
Name: path,
Op: fsnotify.Create,
}
}()
event := fsnotify.Event{
Name: path,
Op: fsnotify.Create,
}
//TODO: Handle errors by taking corrective measures
if err := w.handleCreateEvent(event); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the implementation of handleCreateEvent, I think it's more direct to just do:

if !w.containsBlacklistedDir(path) {
  return w.traversePluginDir(path)
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if I follow, do you mean inside handleCreateEvent? If so, then it wouldn't call handlePluginRegistration... or am I missing somehting?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean "handle create event" doesn't really seem like what this is doing, unless I'm not following the control flow. If you look at what handle create event is doing, the snippet I commented above is all that gets hit in that function, so I'm suggesting to just inline that piece here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, OK... I understand now...

This part of the code creates "synthetic create events" if we find a socket file. Then we "handle" this event right here (as opposed to sending it to a processing goroutine like it was before).

Since we're dealing with a socket file, we don't really want to traversePluginDir() it, but instead we want to register the plugin that's listening to this unix domain socket file (i.e., handlePluginRegistration()).

However, we also need to make sure that we ignore black-listed directories (like you pointed in the snippet above) and files prefixed with a "."; this would do it:

			if !w.containsBlacklistedDir(path) {
				if !strings.HasPrefix(path, ".") {
					// TODO: Handle errors by taking corrective measures
					if err := w.handlePluginRegistration(path); err != nil {
						klog.Errorf("error %v when handling create event for file: %s", err, path)
					}
				}
			}

IMO handleCreateEvent() looks like would be a better fit though (since we're almost re-implementing it in-line).

Does this make sense?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, sorry, I completely misread what handleCreateEvent was doing.

Copy link
Contributor

@tedyu tedyu Apr 30, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't the err be returned, in the same way error is returned for mode.IsDir() case ?

I created PR #77244

klog.Errorf("error %v when handling create event: %s", err, event)
}
default:
klog.V(5).Infof("Ignoring file %s with mode %v", path, mode)
}
Expand Down
26 changes: 19 additions & 7 deletions pkg/kubelet/util/pluginwatcher/plugin_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/stretchr/testify/require"

"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog"
registerapi "k8s.io/kubernetes/pkg/kubelet/apis/pluginregistration/v1"
)
Expand Down Expand Up @@ -173,9 +174,6 @@ func TestPluginRegistrationAtKubeletStart(t *testing.T) {
plugins[i] = p
}

w := newWatcherWithHandler(t, hdlr, false /* testDeprecatedDir */)
defer func() { require.NoError(t, w.Stop()) }()

var wg sync.WaitGroup
for i := 0; i < len(plugins); i++ {
wg.Add(1)
Expand All @@ -189,6 +187,9 @@ func TestPluginRegistrationAtKubeletStart(t *testing.T) {
}(plugins[i])
}

w := newWatcherWithHandler(t, hdlr, false /* testDeprecatedDir */)
defer func() { require.NoError(t, w.Stop()) }()

bertinatto marked this conversation as resolved.
Show resolved Hide resolved
c := make(chan struct{})
go func() {
defer close(c)
Expand All @@ -198,7 +199,7 @@ func TestPluginRegistrationAtKubeletStart(t *testing.T) {
select {
case <-c:
return
case <-time.After(2 * time.Second):
case <-time.After(wait.ForeverTestTimeout):
t.Fatalf("Timeout while waiting for the plugin registration status")
}
}
Expand Down Expand Up @@ -238,11 +239,22 @@ func TestPlugiRegistrationFailureWithUnsupportedVersionAtKubeletStart(t *testing
hdlr := NewExampleHandler(supportedVersions, false /* permitDeprecatedDir */)
hdlr.AddPluginName(pluginName)

c := make(chan struct{})
go func() {
defer close(c)
require.True(t, waitForEvent(t, exampleEventValidate, hdlr.EventChan(p.pluginName)))
require.False(t, waitForPluginRegistrationStatus(t, p.registrationStatus))
}()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

alternatively, you can just do this serially at the end of the test and rely on the whole test timing out for the failure condition.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we're processing plugins serially now, newWatcherWithHandler below would block forever trying to write to the event/registration channels. This goroutine prevents that by receiving info from these channels beforehand.


w := newWatcherWithHandler(t, hdlr, false /* testDeprecatedDir */)
defer func() { require.NoError(t, w.Stop()) }()

require.True(t, waitForEvent(t, exampleEventValidate, hdlr.EventChan(p.pluginName)))
require.False(t, waitForPluginRegistrationStatus(t, p.registrationStatus))
select {
case <-c:
return
case <-time.After(wait.ForeverTestTimeout):
t.Fatalf("Timeout while waiting for the plugin registration status")
}
}

func waitForPluginRegistrationStatus(t *testing.T, statusChan chan registerapi.RegistrationStatus) bool {
Expand All @@ -259,7 +271,7 @@ func waitForEvent(t *testing.T, expected examplePluginEvent, eventChan chan exam
select {
case event := <-eventChan:
return event == expected
case <-time.After(2 * time.Second):
case <-time.After(wait.ForeverTestTimeout):
t.Fatalf("Timed out while waiting for registration status %v", expected)
}

Expand Down