Skip to content

Commit

Permalink
cleanup driver eventor goroutines
Browse files Browse the repository at this point in the history
This fixes few cases where driver eventor goroutines are leaked during
normal operations, but especially so in tests.

This change makes few modifications:

First, it switches drivers to use `Context`s to manage shutdown events.
Previously, it relied on callers invoking `.Shutdown()` function that is
specific to internal drivers only and require casting.  Using `Contexts`
provide a consistent idiomatic way to manage lifecycle for both internal
and external drivers.

Also, I discovered few places where we don't clean up a temporary driver
instance in the plugin catalog code, where we dispense a driver to
inspect and validate the schema config without properly cleaning it up.
  • Loading branch information
Mahmood Ali committed May 26, 2020
1 parent 29eca94 commit d6c75e3
Show file tree
Hide file tree
Showing 24 changed files with 207 additions and 158 deletions.
4 changes: 0 additions & 4 deletions client/pluginmanager/drivermanager/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,10 +248,6 @@ func (i *instanceManager) cleanup() {
return
}

if internalPlugin, ok := i.plugin.Plugin().(drivers.InternalDriverPlugin); ok {
internalPlugin.Shutdown()
}

if !i.plugin.Exited() {
i.plugin.Kill()
if err := i.storeReattach(nil); err != nil {
Expand Down
8 changes: 5 additions & 3 deletions devices/gpu/nvidia/cmd/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package main

import (
"context"

log "github.com/hashicorp/go-hclog"

"github.com/hashicorp/nomad/devices/gpu/nvidia"
Expand All @@ -9,10 +11,10 @@ import (

func main() {
// Serve the plugin
plugins.Serve(factory)
plugins.ServeCtx(factory)
}

// factory returns a new instance of the Nvidia GPU plugin
func factory(log log.Logger) interface{} {
return nvidia.NewNvidiaDevice(log)
func factory(ctx context.Context, log log.Logger) interface{} {
return nvidia.NewNvidiaDevice(ctx, log)
}
4 changes: 2 additions & 2 deletions devices/gpu/nvidia/device.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ var (
// PluginConfig is the nvidia factory function registered in the
// plugin catalog.
PluginConfig = &loader.InternalPluginConfig{
Factory: func(l log.Logger) interface{} { return NewNvidiaDevice(l) },
Factory: func(ctx context.Context, l log.Logger) interface{} { return NewNvidiaDevice(ctx, l) },
}

// pluginInfo describes the plugin
Expand Down Expand Up @@ -99,7 +99,7 @@ type NvidiaDevice struct {
}

// NewNvidiaDevice returns a new nvidia device plugin.
func NewNvidiaDevice(log log.Logger) *NvidiaDevice {
func NewNvidiaDevice(_ context.Context, log log.Logger) *NvidiaDevice {
nvmlClient, err := nvml.NewNvmlClient()
logger := log.Named(pluginName)
if err != nil && err.Error() != nvml.UnavailableLib.Error() {
Expand Down
7 changes: 4 additions & 3 deletions drivers/docker/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package main

import (
"context"
"os"

log "github.com/hashicorp/go-hclog"
Expand Down Expand Up @@ -42,10 +43,10 @@ func main() {
}

// Serve the plugin
plugins.Serve(factory)
plugins.ServeCtx(factory)
}

// factory returns a new instance of the docker driver plugin
func factory(log log.Logger) interface{} {
return docker.NewDockerDriver(log)
func factory(ctx context.Context, log log.Logger) interface{} {
return docker.NewDockerDriver(ctx, log)
}
3 changes: 2 additions & 1 deletion drivers/docker/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package docker

import (
"context"
"fmt"
"strconv"
"strings"
Expand Down Expand Up @@ -123,7 +124,7 @@ var (
// plugin catalog.
PluginConfig = &loader.InternalPluginConfig{
Config: map[string]interface{}{},
Factory: func(l hclog.Logger) interface{} { return NewDockerDriver(l) },
Factory: func(ctx context.Context, l hclog.Logger) interface{} { return NewDockerDriver(ctx, l) },
}

// pluginInfo is the response returned for the PluginInfo RPC
Expand Down
22 changes: 6 additions & 16 deletions drivers/docker/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,6 @@ type Driver struct {
// coordinate shutdown
ctx context.Context

// signalShutdown is called when the driver is shutting down and cancels the
// ctx passed to any subsystems
signalShutdown context.CancelFunc

// tasks is the in memory datastore mapping taskIDs to taskHandles
tasks *taskStore

Expand All @@ -120,16 +116,14 @@ type Driver struct {
}

// NewDockerDriver returns a docker implementation of a driver plugin
func NewDockerDriver(logger hclog.Logger) drivers.DriverPlugin {
ctx, cancel := context.WithCancel(context.Background())
func NewDockerDriver(ctx context.Context, logger hclog.Logger) drivers.DriverPlugin {
logger = logger.Named(pluginName)
return &Driver{
eventer: eventer.NewEventer(ctx, logger),
config: &DriverConfig{},
tasks: newTaskStore(),
ctx: ctx,
signalShutdown: cancel,
logger: logger,
eventer: eventer.NewEventer(ctx, logger),
config: &DriverConfig{},
tasks: newTaskStore(),
ctx: ctx,
logger: logger,
}
}

Expand Down Expand Up @@ -1622,10 +1616,6 @@ func sliceMergeUlimit(ulimitsRaw map[string]string) ([]docker.ULimit, error) {
return ulimits, nil
}

func (d *Driver) Shutdown() {
d.signalShutdown()
}

func isDockerTransientError(err error) bool {
if err == nil {
return false
Expand Down
6 changes: 4 additions & 2 deletions drivers/docker/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,9 @@ func cleanSlate(client *docker.Client, imageID string) {
// A driver plugin interface and cleanup function is returned
func dockerDriverHarness(t *testing.T, cfg map[string]interface{}) *dtestutil.DriverHarness {
logger := testlog.HCLogger(t)
harness := dtestutil.NewDriverHarness(t, NewDockerDriver(logger))
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(func() { cancel() })
harness := dtestutil.NewDriverHarness(t, NewDockerDriver(ctx, logger))
if cfg == nil {
cfg = map[string]interface{}{
"gc": map[string]interface{}{
Expand All @@ -190,7 +192,7 @@ func dockerDriverHarness(t *testing.T, cfg map[string]interface{}) *dtestutil.Dr
InternalPlugins: map[loader.PluginID]*loader.InternalPluginConfig{
PluginID: {
Config: cfg,
Factory: func(hclog.Logger) interface{} {
Factory: func(context.Context, hclog.Logger) interface{} {
return harness
},
},
Expand Down
6 changes: 5 additions & 1 deletion drivers/docker/fingerprint_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package docker

import (
"context"
"testing"

"github.com/hashicorp/nomad/client/testutil"
Expand All @@ -20,7 +21,10 @@ func TestDockerDriver_FingerprintHealth(t *testing.T) {
}
testutil.DockerCompatible(t)

d := NewDockerDriver(testlog.HCLogger(t)).(*Driver)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

d := NewDockerDriver(ctx, testlog.HCLogger(t)).(*Driver)

fp := d.buildFingerprint()
require.Equal(t, drivers.HealthStateHealthy, fp.Health)
Expand Down
22 changes: 6 additions & 16 deletions drivers/exec/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ var (
// plugin catalog.
PluginConfig = &loader.InternalPluginConfig{
Config: map[string]interface{}{},
Factory: func(l hclog.Logger) interface{} { return NewExecDriver(l) },
Factory: func(ctx context.Context, l hclog.Logger) interface{} { return NewExecDriver(ctx, l) },
}

// pluginInfo is the response returned for the PluginInfo RPC
Expand Down Expand Up @@ -107,10 +107,6 @@ type Driver struct {
// coordinate shutdown
ctx context.Context

// signalShutdown is called when the driver is shutting down and cancels the
// ctx passed to any subsystems
signalShutdown context.CancelFunc

// logger will log to the Nomad agent
logger hclog.Logger

Expand Down Expand Up @@ -144,15 +140,13 @@ type TaskState struct {
}

// NewExecDriver returns a new DrivePlugin implementation
func NewExecDriver(logger hclog.Logger) drivers.DriverPlugin {
ctx, cancel := context.WithCancel(context.Background())
func NewExecDriver(ctx context.Context, logger hclog.Logger) drivers.DriverPlugin {
logger = logger.Named(pluginName)
return &Driver{
eventer: eventer.NewEventer(ctx, logger),
tasks: newTaskStore(),
ctx: ctx,
signalShutdown: cancel,
logger: logger,
eventer: eventer.NewEventer(ctx, logger),
tasks: newTaskStore(),
ctx: ctx,
logger: logger,
}
}

Expand Down Expand Up @@ -201,10 +195,6 @@ func (d *Driver) SetConfig(cfg *base.Config) error {
return nil
}

func (d *Driver) Shutdown() {
d.signalShutdown()
}

func (d *Driver) TaskConfigSchema() (*hclspec.Spec, error) {
return taskConfigSpec, nil
}
Expand Down
60 changes: 48 additions & 12 deletions drivers/exec/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,10 @@ func TestExecDriver_Fingerprint_NonLinux(t *testing.T) {
t.Skip("Test only available not on Linux")
}

d := NewExecDriver(testlog.HCLogger(t))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

d := NewExecDriver(ctx, testlog.HCLogger(t))
harness := dtestutil.NewDriverHarness(t, d)

fingerCh, err := harness.Fingerprint(context.Background())
Expand All @@ -78,7 +81,10 @@ func TestExecDriver_Fingerprint(t *testing.T) {

ctestutils.ExecCompatible(t)

d := NewExecDriver(testlog.HCLogger(t))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

d := NewExecDriver(ctx, testlog.HCLogger(t))
harness := dtestutil.NewDriverHarness(t, d)

fingerCh, err := harness.Fingerprint(context.Background())
Expand All @@ -97,7 +103,10 @@ func TestExecDriver_StartWait(t *testing.T) {
require := require.New(t)
ctestutils.ExecCompatible(t)

d := NewExecDriver(testlog.HCLogger(t))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

d := NewExecDriver(ctx, testlog.HCLogger(t))
harness := dtestutil.NewDriverHarness(t, d)
task := &drivers.TaskConfig{
ID: uuid.Generate(),
Expand Down Expand Up @@ -129,7 +138,10 @@ func TestExecDriver_StartWaitStopKill(t *testing.T) {
require := require.New(t)
ctestutils.ExecCompatible(t)

d := NewExecDriver(testlog.HCLogger(t))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

d := NewExecDriver(ctx, testlog.HCLogger(t))
harness := dtestutil.NewDriverHarness(t, d)
task := &drivers.TaskConfig{
ID: uuid.Generate(),
Expand Down Expand Up @@ -190,7 +202,10 @@ func TestExecDriver_StartWaitRecover(t *testing.T) {
require := require.New(t)
ctestutils.ExecCompatible(t)

d := NewExecDriver(testlog.HCLogger(t))
dctx, dcancel := context.WithCancel(context.Background())
defer dcancel()

d := NewExecDriver(dctx, testlog.HCLogger(t))
harness := dtestutil.NewDriverHarness(t, d)
task := &drivers.TaskConfig{
ID: uuid.Generate(),
Expand Down Expand Up @@ -262,7 +277,10 @@ func TestExecDriver_DestroyKillsAll(t *testing.T) {
require := require.New(t)
ctestutils.ExecCompatible(t)

d := NewExecDriver(testlog.HCLogger(t))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

d := NewExecDriver(ctx, testlog.HCLogger(t))
harness := dtestutil.NewDriverHarness(t, d)
defer harness.Kill()

Expand Down Expand Up @@ -360,7 +378,10 @@ func TestExecDriver_Stats(t *testing.T) {
require := require.New(t)
ctestutils.ExecCompatible(t)

d := NewExecDriver(testlog.HCLogger(t))
dctx, dcancel := context.WithCancel(context.Background())
defer dcancel()

d := NewExecDriver(dctx, testlog.HCLogger(t))
harness := dtestutil.NewDriverHarness(t, d)
task := &drivers.TaskConfig{
ID: uuid.Generate(),
Expand Down Expand Up @@ -403,7 +424,10 @@ func TestExecDriver_Start_Wait_AllocDir(t *testing.T) {
require := require.New(t)
ctestutils.ExecCompatible(t)

d := NewExecDriver(testlog.HCLogger(t))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

d := NewExecDriver(ctx, testlog.HCLogger(t))
harness := dtestutil.NewDriverHarness(t, d)
task := &drivers.TaskConfig{
ID: uuid.Generate(),
Expand Down Expand Up @@ -452,7 +476,10 @@ func TestExecDriver_User(t *testing.T) {
require := require.New(t)
ctestutils.ExecCompatible(t)

d := NewExecDriver(testlog.HCLogger(t))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

d := NewExecDriver(ctx, testlog.HCLogger(t))
harness := dtestutil.NewDriverHarness(t, d)
task := &drivers.TaskConfig{
ID: uuid.Generate(),
Expand Down Expand Up @@ -486,7 +513,10 @@ func TestExecDriver_HandlerExec(t *testing.T) {
require := require.New(t)
ctestutils.ExecCompatible(t)

d := NewExecDriver(testlog.HCLogger(t))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

d := NewExecDriver(ctx, testlog.HCLogger(t))
harness := dtestutil.NewDriverHarness(t, d)
task := &drivers.TaskConfig{
ID: uuid.Generate(),
Expand Down Expand Up @@ -574,7 +604,10 @@ func TestExecDriver_DevicesAndMounts(t *testing.T) {
err = ioutil.WriteFile(filepath.Join(tmpDir, "testfile"), []byte("from-host"), 600)
require.NoError(err)

d := NewExecDriver(testlog.HCLogger(t))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

d := NewExecDriver(ctx, testlog.HCLogger(t))
harness := dtestutil.NewDriverHarness(t, d)
task := &drivers.TaskConfig{
ID: uuid.Generate(),
Expand Down Expand Up @@ -678,7 +711,10 @@ func TestExecDriver_NoPivotRoot(t *testing.T) {
require := require.New(t)
ctestutils.ExecCompatible(t)

d := NewExecDriver(testlog.HCLogger(t))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

d := NewExecDriver(ctx, testlog.HCLogger(t))
harness := dtestutil.NewDriverHarness(t, d)

config := &Config{NoPivotRoot: true}
Expand Down
10 changes: 8 additions & 2 deletions drivers/exec/driver_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ func TestExecDriver_StartWaitStop(t *testing.T) {
require := require.New(t)
ctestutils.ExecCompatible(t)

d := NewExecDriver(testlog.HCLogger(t))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

d := NewExecDriver(ctx, testlog.HCLogger(t))
harness := dtestutil.NewDriverHarness(t, d)
task := &drivers.TaskConfig{
ID: uuid.Generate(),
Expand Down Expand Up @@ -82,7 +85,10 @@ func TestExec_ExecTaskStreaming(t *testing.T) {
t.Parallel()
require := require.New(t)

d := NewExecDriver(testlog.HCLogger(t))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

d := NewExecDriver(ctx, testlog.HCLogger(t))
harness := dtestutil.NewDriverHarness(t, d)
defer harness.Kill()

Expand Down
Loading

0 comments on commit d6c75e3

Please sign in to comment.