From 4385428bbd260c3538ded86a6e9c9093ddf58728 Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Sun, 15 Dec 2019 16:57:09 -0800 Subject: [PATCH] Add watch dog as subprocess of the driver Add sub reaper to wait for stunnel processes --- Dockerfile.dev | 19 +++ .../kubernetes/volume_path/specs/example.yaml | 4 + pkg/driver/driver.go | 17 ++- pkg/driver/efs_watch_dog.go | 133 ++++++++++++++++++ pkg/driver/efs_watch_dog_test.go | 26 ++++ pkg/driver/reaper.go | 72 ++++++++++ pkg/driver/reaper_test.go | 30 ++++ pkg/driver/sanity_test.go | 16 ++- 8 files changed, 311 insertions(+), 6 deletions(-) create mode 100644 Dockerfile.dev create mode 100644 pkg/driver/efs_watch_dog.go create mode 100644 pkg/driver/efs_watch_dog_test.go create mode 100644 pkg/driver/reaper.go create mode 100644 pkg/driver/reaper_test.go diff --git a/Dockerfile.dev b/Dockerfile.dev new file mode 100644 index 000000000..d9d63b746 --- /dev/null +++ b/Dockerfile.dev @@ -0,0 +1,19 @@ +# Copyright 2019 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +FROM amazonlinux:2 +RUN yum install util-linux amazon-efs-utils -y +COPY bin/aws-efs-csi-driver /bin/aws-efs-csi-driver +COPY THIRD-PARTY / + +ENTRYPOINT ["/bin/aws-efs-csi-driver"] diff --git a/examples/kubernetes/volume_path/specs/example.yaml b/examples/kubernetes/volume_path/specs/example.yaml index 16396d3fd..5a9e56422 100644 --- a/examples/kubernetes/volume_path/specs/example.yaml +++ b/examples/kubernetes/volume_path/specs/example.yaml @@ -16,6 +16,8 @@ spec: - ReadWriteMany persistentVolumeReclaimPolicy: Retain storageClassName: efs-sc + mountOptions: + - tls csi: driver: efs.csi.aws.com volumeHandle: fs-e8a95a42:/dir1 @@ -44,6 +46,8 @@ spec: - ReadWriteMany persistentVolumeReclaimPolicy: Retain storageClassName: efs-sc + mountOptions: + - tls csi: driver: efs.csi.aws.com volumeHandle: fs-e8a95a42:/dir2 diff --git a/pkg/driver/driver.go b/pkg/driver/driver.go index 06ffcb19a..2560eb142 100644 --- a/pkg/driver/driver.go +++ b/pkg/driver/driver.go @@ -38,6 +38,8 @@ type Driver struct { srv *grpc.Server mounter Mounter + + efsWatchdog Watchdog } func NewDriver(endpoint string) *Driver { @@ -46,10 +48,12 @@ func NewDriver(endpoint string) *Driver { klog.Fatalln(err) } + watchdog := newExecWatchdog("amazon-efs-mount-watchdog") return &Driver{ - endpoint: endpoint, - nodeID: cloud.GetMetadata().GetInstanceID(), - mounter: newNodeMounter(), + endpoint: endpoint, + nodeID: cloud.GetMetadata().GetInstanceID(), + mounter: newNodeMounter(), + efsWatchdog: watchdog, } } @@ -79,6 +83,13 @@ func (d *Driver) Run() error { csi.RegisterIdentityServer(d.srv, d) csi.RegisterNodeServer(d.srv, d) + klog.Info("Starting watchdog") + d.efsWatchdog.start() + + reaper := newReaper() + klog.Info("Staring subreaper") + reaper.start() + klog.Infof("Listening for connections on address: %#v", listener.Addr()) return d.srv.Serve(listener) } diff --git a/pkg/driver/efs_watch_dog.go b/pkg/driver/efs_watch_dog.go new file mode 100644 index 000000000..855deb469 --- /dev/null +++ b/pkg/driver/efs_watch_dog.go @@ -0,0 +1,133 @@ +/* +Copyright 2019 The Kubernetes Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package driver + +import ( + "fmt" + "os/exec" + "sync" + + "k8s.io/klog" +) + +// Watchdog defines the interface for process monitoring and supervising +type Watchdog interface { + // start starts the watch dog along with the process + start() + + // stop stops the watch dog along with the process + stop() +} + +// execWatchdog is a watch dog that monitors a process and restart it +// if it has crashed accidentally +type execWatchdog struct { + // the command to be exec and monitored + execCmd string + // the command arguments + execArg []string + // the cmd that is running + cmd *exec.Cmd + // stopCh indicates if it should be stopped + stopCh chan struct{} + + mu sync.Mutex +} + +func newExecWatchdog(cmd string, arg ...string) Watchdog { + return &execWatchdog{ + execCmd: cmd, + execArg: arg, + stopCh: make(chan struct{}), + } +} + +func (w *execWatchdog) start() { + go w.runLoop(w.stopCh) +} + +// stop kills the underlying process and stops the watchdog +func (w *execWatchdog) stop() { + close(w.stopCh) + + w.mu.Lock() + if w.cmd.Process != nil { + p := w.cmd.Process + err := p.Kill() + if err != nil { + klog.Errorf("Failed to kill process: %s", err) + } + } + w.mu.Unlock() +} + +// runLoop starts the monitoring loop +func (w *execWatchdog) runLoop(stopCh <-chan struct{}) { + for { + select { + case <-stopCh: + klog.Info("stopping...") + break + default: + err := w.exec() + if err != nil { + klog.Errorf("Process %s exits %s", w.execCmd, err) + } + } + } +} + +func (w *execWatchdog) exec() error { + cmd := exec.Command(w.execCmd, w.execArg...) + cmd.Stdout = newInfoRedirect(w.execCmd) + cmd.Stderr = newErrRedirect(w.execCmd) + + w.cmd = cmd + + w.mu.Lock() + err := cmd.Start() + if err != nil { + return err + } + w.mu.Unlock() + + return cmd.Wait() +} + +type logRedirect struct { + processName string + level string + logFunc func(string, ...interface{}) +} + +func newInfoRedirect(name string) *logRedirect { + return &logRedirect{ + processName: name, + level: "Info", + logFunc: klog.V(4).Infof, + } +} + +func newErrRedirect(name string) *logRedirect { + return &logRedirect{ + processName: name, + level: "Error", + logFunc: klog.Errorf, + } +} +func (l *logRedirect) Write(p []byte) (n int, err error) { + msg := fmt.Sprintf("%s[%s]: %s", l.processName, l.level, string(p)) + l.logFunc("%s", msg) + return len(msg), nil +} diff --git a/pkg/driver/efs_watch_dog_test.go b/pkg/driver/efs_watch_dog_test.go new file mode 100644 index 000000000..16221d07b --- /dev/null +++ b/pkg/driver/efs_watch_dog_test.go @@ -0,0 +1,26 @@ +/* +Copyright 2019 The Kubernetes Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package driver + +import ( + "testing" + "time" +) + +func TestExecWatchdog(t *testing.T) { + w := newExecWatchdog("sleep", "300") + w.start() + time.Sleep(time.Second) + w.stop() +} diff --git a/pkg/driver/reaper.go b/pkg/driver/reaper.go new file mode 100644 index 000000000..46c9502b0 --- /dev/null +++ b/pkg/driver/reaper.go @@ -0,0 +1,72 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package driver + +import ( + "os" + "os/signal" + "syscall" + + "k8s.io/klog" +) + +type reaper struct { + sigs chan os.Signal + stopCh chan struct{} +} + +func newReaper() *reaper { + sigs := make(chan os.Signal, 1) + stopCh := make(chan struct{}) + + signal.Notify(sigs, syscall.SIGCHLD) + return &reaper{ + sigs: sigs, + stopCh: stopCh, + } +} + +// start starts the reaper +func (r *reaper) start() { + go r.runLoop() +} + +// runLoop waits for all child processes that exit +// currently only stunnel process is created by efs mount helper +// and is inherited as the child process of the driver +func (r *reaper) runLoop() { + for { + select { + case <-r.sigs: + var status syscall.WaitStatus + var rusage syscall.Rusage + childPid, err := syscall.Wait4(-1, &status, syscall.WNOHANG, &rusage) + if err != nil { + klog.Warningf("Failed to wait for child process %s", err) + } else { + klog.V(4).Infof("Waited for child process %d", childPid) + } + case <-r.stopCh: + break + } + } +} + +// stop stops the reaper +func (r *reaper) stop() { + r.stopCh <- struct{}{} +} diff --git a/pkg/driver/reaper_test.go b/pkg/driver/reaper_test.go new file mode 100644 index 000000000..332b951dd --- /dev/null +++ b/pkg/driver/reaper_test.go @@ -0,0 +1,30 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package driver + +import ( + "testing" + "time" +) + +func TestReaper(t *testing.T) { + r := newReaper() + + r.start() + time.Sleep(time.Second) + r.stop() +} diff --git a/pkg/driver/sanity_test.go b/pkg/driver/sanity_test.go index 4dd7dcd82..4efea602c 100644 --- a/pkg/driver/sanity_test.go +++ b/pkg/driver/sanity_test.go @@ -28,6 +28,15 @@ import ( "github.com/kubernetes-sigs/aws-efs-csi-driver/pkg/driver/mocks" ) +type mockWatchdog struct { +} + +func (w *mockWatchdog) start() { +} + +func (w *mockWatchdog) stop() { +} + func TestSanityEFSCSI(t *testing.T) { // Setup the full driver and its environment dir, err := ioutil.TempDir("", "sanity-efs-csi") @@ -48,9 +57,10 @@ func TestSanityEFSCSI(t *testing.T) { mockCtrl := gomock.NewController(t) drv := Driver{ - endpoint: endpoint, - nodeID: "sanity", - mounter: mocks.NewMockMounter(mockCtrl), + endpoint: endpoint, + nodeID: "sanity", + mounter: mocks.NewMockMounter(mockCtrl), + efsWatchdog: &mockWatchdog{}, } defer func() { if r := recover(); r != nil {