Permalink
Browse files

push: Thread-safety fix for --concurrent-instances w/ --workspace=docker

The previous logic for --workspace=docker was not thread-safe, which could
cause problems if --concurrent-instances was set to 2 or more.

This commit also refactors shutdown handling of workspaces in general,
permitting more flexibility if the workspace package is used as a library
in another program. The previous shutdown model was only well-suited for
command-line applications / single processes, but not for a request-driven
architecture that needs to shut down workspaces used by only a single request.
  • Loading branch information...
evanelias committed Nov 30, 2018
1 parent 1548c01 commit a8a3d88d4b6fa6300bd314c0880494633149ddf9
Showing with 91 additions and 50 deletions.
  1. +55 −35 workspace/localdocker.go
  2. +7 −3 workspace/localdocker_test.go
  3. +29 −12 workspace/workspace.go
@@ -4,24 +4,29 @@ import (
"errors"
"fmt"
"strings"
"sync"
"github.com/jmoiron/sqlx"
log "github.com/sirupsen/logrus"
"github.com/skeema/tengo"
)
// LocalDocker is a Workspace created inside of a Docker container on localhost.
// The schema is dropped when done interacting with the workspace, but the
// container may optionally remain running, be stopped, or be destroyed
// entirely.
// The schema is dropped when done interacting with the workspace in Cleanup(),
// but the container remains running. The container may optionally be stopped
// or destroyed via Shutdown().
type LocalDocker struct {
schemaName string
d *tengo.DockerizedInstance
releaseLock releaseFunc
schemaName string
d *tengo.DockerizedInstance
releaseLock releaseFunc
cleanupAction CleanupAction
}
var dockerClient *tengo.DockerClient
var seenContainerNames = map[string]bool{}
var cstore struct {
dockerClient *tengo.DockerClient
containers map[string]*LocalDocker
sync.Mutex
}
// NewLocalDocker finds or creates a containerized MySQL instance, creates a
// temporary schema on it, and returns it.
@@ -30,22 +35,29 @@ func NewLocalDocker(opts Options) (ld *LocalDocker, err error) {
return nil, fmt.Errorf("NewLocalDocker: unsupported flavor %s", opts.Flavor)
}
if dockerClient == nil {
if dockerClient, err = tengo.NewDockerClient(tengo.DockerClientOptions{}); err != nil {
cstore.Lock()
defer cstore.Unlock()
if cstore.dockerClient == nil {
if cstore.dockerClient, err = tengo.NewDockerClient(tengo.DockerClientOptions{}); err != nil {
return
}
cstore.containers = make(map[string]*LocalDocker)
tengo.UseFilteredDriverLogger()
}
ld = &LocalDocker{
schemaName: opts.SchemaName,
schemaName: opts.SchemaName,
cleanupAction: opts.CleanupAction,
}
image := opts.Flavor.String()
containerName := fmt.Sprintf("skeema-%s", strings.Replace(image, ":", "-", -1))
if !seenContainerNames[containerName] {
log.Infof("Using container %s (image=%s) for workspace operations", containerName, image)
if opts.ContainerName == "" {
opts.ContainerName = fmt.Sprintf("skeema-%s", strings.Replace(image, ":", "-", -1))
}
if cstore.containers[opts.ContainerName] == nil {
log.Infof("Using container %s (image=%s) for workspace operations", opts.ContainerName, image)
}
ld.d, err = dockerClient.GetOrCreateInstance(tengo.DockerizedInstanceOptions{
Name: containerName,
ld.d, err = cstore.dockerClient.GetOrCreateInstance(tengo.DockerizedInstanceOptions{
Name: opts.ContainerName,
Image: image,
RootPassword: opts.RootPassword,
DefaultConnParams: opts.DefaultConnParams,
@@ -66,26 +78,10 @@ func NewLocalDocker(opts Options) (ld *LocalDocker, err error) {
}
}()
if !seenContainerNames[containerName] {
if opts.CleanupAction == CleanupActionStop {
RegisterShutdownFunc(func() {
log.Infof("Stopping container %s", containerName)
ld.d.Stop()
delete(seenContainerNames, containerName)
})
} else if opts.CleanupAction == CleanupActionDestroy {
RegisterShutdownFunc(func() {
log.Infof("Destroying container %s", containerName)
ld.d.Destroy()
delete(seenContainerNames, containerName)
})
} else {
RegisterShutdownFunc(func() {
delete(seenContainerNames, containerName)
})
}
if cstore.containers[opts.ContainerName] == nil {
cstore.containers[opts.ContainerName] = ld
RegisterShutdownFunc(ld.shutdown)
}
seenContainerNames[containerName] = true
if has, err := ld.d.HasSchema(ld.schemaName); err != nil {
return ld, fmt.Errorf("Unable to check for existence of temp schema on %s: %s", ld.d.Instance, err)
@@ -135,3 +131,27 @@ func (ld *LocalDocker) Cleanup() error {
}
return nil
}
// shutdown handles shutdown logic for a specific LocalDocker instance. A single
// string arg may optionally be supplied as a container name prefix: if the
// container name does not begin with the prefix, no shutdown occurs.
func (ld *LocalDocker) shutdown(args ...interface{}) bool {
if len(args) > 0 {
if prefix, ok := args[0].(string); !ok || !strings.HasPrefix(ld.d.Name, prefix) {
return false
}
}
cstore.Lock()
defer cstore.Unlock()
if ld.cleanupAction == CleanupActionStop {
log.Infof("Stopping container %s", ld.d.Name)
ld.d.Stop()
} else if ld.cleanupAction == CleanupActionDestroy {
log.Infof("Destroying container %s", ld.d.Name)
ld.d.Destroy()
}
delete(cstore.containers, ld.d.Name)
return true
}
@@ -91,7 +91,7 @@ func (s WorkspaceIntegrationSuite) TestLocalDocker(t *testing.T) {
lookupOpts := tengo.DockerizedInstanceOptions{
Name: containerName,
}
if _, err := dockerClient.GetInstance(lookupOpts); err != nil {
if _, err := cstore.dockerClient.GetInstance(lookupOpts); err != nil {
t.Errorf("Unable to re-fetch container %s by name: %s", containerName, err)
}
@@ -107,11 +107,15 @@ func (s WorkspaceIntegrationSuite) TestLocalDocker(t *testing.T) {
if err := ld.Cleanup(); err == nil {
t.Error("Expected cleanup error since a table had rows, but err was nil")
}
Shutdown()
Shutdown("no-match") // intentionally should have no effect, container name doesn't match supplied prefix
if ok, err := ld.d.CanConnect(); !ok || err != nil {
t.Errorf("Expected container to still be running, but CanConnect returned %t / %v", ok, err)
}
Shutdown("skeema-") // should match
if ok, err := ld.d.CanConnect(); ok || err == nil {
t.Error("Expected container to be destroyed, but CanConnect returned true")
}
if _, err := dockerClient.GetInstance(lookupOpts); err == nil {
if _, err := cstore.dockerClient.GetInstance(lookupOpts); err == nil {
t.Errorf("Expected container %s to be destroyed, but able re-fetch container by name without error", containerName)
}
}
@@ -9,6 +9,7 @@ import (
"context"
"errors"
"fmt"
"strings"
"time"
"github.com/jmoiron/sqlx"
@@ -47,20 +48,22 @@ const (
// CleanupAction represents how to clean up a workspace.
type CleanupAction int
// Constants enumerating different cleanup actions
// Constants enumerating different cleanup actions. These may affect the
// behavior of Workspace.Cleanup() and/or Shutdown().
const (
// CleanupActionNone means to perform no special cleanup
CleanupActionNone CleanupAction = iota
// CleanupActionDrop means to drop the schema. Only used with TypeTempSchema.
// CleanupActionDrop means to drop the schema in Workspace.Cleanup(). Only
// used with TypeTempSchema.
CleanupActionDrop
// CleanupActionStop means to stop the MySQL instance container. Only used with
// TypeLocalDocker.
// CleanupActionStop means to stop the MySQL instance container in Shutdown().
// Only used with TypeLocalDocker.
CleanupActionStop
// CleanupActionDestroy means to destroy the MySQL instance container. Only
// used with TypeLocalDocker.
// CleanupActionDestroy means to destroy the MySQL instance container in
// Shutdown(). Only used with TypeLocalDocker.
CleanupActionDestroy
)
@@ -71,6 +74,7 @@ type Options struct {
CleanupAction CleanupAction
Instance *tengo.Instance // only TypeTempSchema
Flavor tengo.Flavor // only TypeLocalDocker
ContainerName string // only TypeLocalDocker
SchemaName string
DefaultCharacterSet string
DefaultCollation string
@@ -116,6 +120,7 @@ func OptionsForDir(dir *fs.Dir, instance *tengo.Instance) (Options, error) {
if opts.Flavor == tengo.FlavorUnknown && instance != nil {
opts.Flavor = instance.Flavor()
}
opts.ContainerName = fmt.Sprintf("skeema-%s", strings.Replace(opts.Flavor.String(), ":", "-", -1))
if cleanup, err := dir.Config.GetEnum("docker-cleanup", "none", "stop", "destroy"); err != nil {
return Options{}, err
} else if cleanup == "stop" {
@@ -138,25 +143,37 @@ func OptionsForDir(dir *fs.Dir, instance *tengo.Instance) (Options, error) {
return opts, nil
}
var shutdownFuncs []func()
// ShutdownFunc is a function that manages final cleanup of a Workspace upon
// completion of a request or process. It may optionally use args, passed
// through by Shutdown(), to determine whether or not a Workspace needs to be
// cleaned up. It should return true if cleanup occurred (meaning that the
// ShutdownFunc should be de-registered from future calls to Shutdown()), or
// false otherwise.
type ShutdownFunc func(...interface{}) bool
var shutdownFuncs []ShutdownFunc
// Shutdown performs any necessary cleanup operations prior to the program
// exiting. For example, if containers need to be stopped or destroyed, it is
// most efficient to do so at program exit, rather than needlessly doing so
// for each workspace invocation.
// It is recommended that programs importing this package call Shutdown as a
// deferred function in main().
func Shutdown() {
func Shutdown(args ...interface{}) {
retainedFuncs := make([]ShutdownFunc, 0, len(shutdownFuncs))
for _, f := range shutdownFuncs {
f()
if deregister := f(args...); !deregister {
retainedFuncs = append(retainedFuncs, f)
}
}
shutdownFuncs = []func(){}
shutdownFuncs = retainedFuncs
}
// RegisterShutdownFunc registers a function to be executed by Shutdown.
// Structs satisfying the Workspace interface may optionally use this function
// to
func RegisterShutdownFunc(f func()) {
// to track actions to perform at shutdown time, such as stopping or destroying
// containers.
func RegisterShutdownFunc(f ShutdownFunc) {
shutdownFuncs = append(shutdownFuncs, f)
}

0 comments on commit a8a3d88

Please sign in to comment.