Skip to content

Commit

Permalink
ops: fix deadlock on releasing shared mounts
Browse files Browse the repository at this point in the history
Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>
  • Loading branch information
tonistiigi committed Feb 4, 2020
1 parent bf2dc85 commit 6d907b6
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 42 deletions.
46 changes: 31 additions & 15 deletions solver/llbsolver/ops/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ type execOp struct {
platform *pb.Platform
numInputs int

cacheMounts map[string]*cacheRefShare
cacheMounts map[string]*cacheRefShare
cacheMountsMu sync.Mutex
}

func NewExecOp(v solver.Vertex, op *pb.Op_Exec, platform *pb.Platform, cm cache.Manager, sm *session.Manager, md *metadata.Store, exec executor.Executor, w worker.Worker) (solver.Op, error) {
Expand Down Expand Up @@ -222,21 +223,23 @@ func (e *execOp) getMountDeps() ([]dep, error) {

func (e *execOp) getRefCacheDir(ctx context.Context, ref cache.ImmutableRef, id string, m *pb.Mount, sharing pb.CacheSharingOpt) (mref cache.MutableRef, err error) {
g := &cacheRefGetter{
locker: CacheMountsLocker(),
cacheMounts: e.cacheMounts,
cm: e.cm,
md: e.md,
name: fmt.Sprintf("cached mount %s from exec %s", m.Dest, strings.Join(e.op.Meta.Args, " ")),
locker: &e.cacheMountsMu,
cacheMounts: e.cacheMounts,
cm: e.cm,
md: e.md,
globalCacheRefs: sharedCacheRefs,
name: fmt.Sprintf("cached mount %s from exec %s", m.Dest, strings.Join(e.op.Meta.Args, " ")),
}
return g.getRefCacheDir(ctx, ref, id, sharing)
}

type cacheRefGetter struct {
locker sync.Locker
cacheMounts map[string]*cacheRefShare
cm cache.Manager
md *metadata.Store
name string
locker sync.Locker
cacheMounts map[string]*cacheRefShare
cm cache.Manager
md *metadata.Store
globalCacheRefs *cacheRefs
name string
}

func (g *cacheRefGetter) getRefCacheDir(ctx context.Context, ref cache.ImmutableRef, id string, sharing pb.CacheSharingOpt) (mref cache.MutableRef, err error) {
Expand All @@ -261,7 +264,7 @@ func (g *cacheRefGetter) getRefCacheDir(ctx context.Context, ref cache.Immutable

switch sharing {
case pb.CacheSharingOpt_SHARED:
return sharedCacheRefs.get(key, func() (cache.MutableRef, error) {
return g.globalCacheRefs.get(key, func() (cache.MutableRef, error) {
return g.getRefCacheDirNoCache(ctx, key, ref, id, false)
})
case pb.CacheSharingOpt_PRIVATE:
Expand Down Expand Up @@ -814,6 +817,9 @@ func CacheMountsLocker() sync.Locker {
}

func (r *cacheRefs) get(key string, fn func() (cache.MutableRef, error)) (cache.MutableRef, error) {
r.mu.Lock()
defer r.mu.Unlock()

if r.shares == nil {
r.shares = map[string]*cacheRefShare{}
}
Expand All @@ -830,7 +836,6 @@ func (r *cacheRefs) get(key string, fn func() (cache.MutableRef, error)) (cache.

share = &cacheRefShare{MutableRef: mref, main: r, key: key, refs: map[*cacheRef]struct{}{}}
r.shares[key] = share

return share.clone(), nil
}

Expand All @@ -844,6 +849,9 @@ type cacheRefShare struct {

func (r *cacheRefShare) clone() cache.MutableRef {
cacheRef := &cacheRef{cacheRefShare: r}
if cacheRefCloneHijack != nil {
cacheRefCloneHijack()
}
r.mu.Lock()
r.refs[cacheRef] = struct{}{}
r.mu.Unlock()
Expand All @@ -852,22 +860,30 @@ func (r *cacheRefShare) clone() cache.MutableRef {

func (r *cacheRefShare) release(ctx context.Context) error {
if r.main != nil {
r.main.mu.Lock()
defer r.main.mu.Unlock()
delete(r.main.shares, r.key)
}
return r.MutableRef.Release(ctx)
}

var cacheRefReleaseHijack func()
var cacheRefCloneHijack func()

type cacheRef struct {
*cacheRefShare
}

func (r *cacheRef) Release(ctx context.Context) error {
if r.main != nil {
r.main.mu.Lock()
defer r.main.mu.Unlock()
}
r.mu.Lock()
defer r.mu.Unlock()
delete(r.refs, r)
if len(r.refs) == 0 {
if cacheRefReleaseHijack != nil {
cacheRefReleaseHijack()
}
return r.release(ctx)
}
return nil
Expand Down
105 changes: 78 additions & 27 deletions solver/llbsolver/ops/exec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"path/filepath"
"sync"
"testing"
"time"

"github.com/containerd/containerd/content"
"github.com/containerd/containerd/content/local"
Expand All @@ -25,6 +26,7 @@ import (
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
bolt "go.etcd.io/bbolt"
"golang.org/x/sync/errgroup"
)

type cmOpt struct {
Expand Down Expand Up @@ -153,6 +155,16 @@ func TestDedupPaths(t *testing.T) {
require.Equal(t, []string{"foo/bar", "foo/bara"}, res)
}

func newRefGetter(m cache.Manager, md *metadata.Store, shared *cacheRefs) *cacheRefGetter {
return &cacheRefGetter{
locker: &sync.Mutex{},
cacheMounts: map[string]*cacheRefShare{},
cm: m,
md: md,
globalCacheRefs: shared,
}
}

func TestCacheMountPrivateRefs(t *testing.T) {
t.Parallel()
ctx := namespaces.WithNamespace(context.Background(), "buildkit-test")
Expand All @@ -172,33 +184,10 @@ func TestCacheMountPrivateRefs(t *testing.T) {

defer cleanup()

g1 := &cacheRefGetter{
locker: &sync.Mutex{},
cacheMounts: map[string]*cacheRefShare{},
cm: co.manager,
md: co.md,
}

g2 := &cacheRefGetter{
locker: g1.locker,
cacheMounts: map[string]*cacheRefShare{},
cm: co.manager,
md: co.md,
}

g3 := &cacheRefGetter{
locker: g1.locker,
cacheMounts: map[string]*cacheRefShare{},
cm: co.manager,
md: co.md,
}

g4 := &cacheRefGetter{
locker: g1.locker,
cacheMounts: map[string]*cacheRefShare{},
cm: co.manager,
md: co.md,
}
g1 := newRefGetter(co.manager, co.md, sharedCacheRefs)
g2 := newRefGetter(co.manager, co.md, sharedCacheRefs)
g3 := newRefGetter(co.manager, co.md, sharedCacheRefs)
g4 := newRefGetter(co.manager, co.md, sharedCacheRefs)

ref, err := g1.getRefCacheDir(ctx, nil, "foo", pb.CacheSharingOpt_PRIVATE)
require.NoError(t, err)
Expand Down Expand Up @@ -242,3 +231,65 @@ func TestCacheMountPrivateRefs(t *testing.T) {
require.NoError(t, err)
require.Equal(t, ref4.ID(), ref6.ID())
}

// moby/buildkit#1322
func TestCacheMountSharedRefsDeadlock(t *testing.T) {
// not parallel
ctx := namespaces.WithNamespace(context.Background(), "buildkit-test")

tmpdir, err := ioutil.TempDir("", "cachemanager")
require.NoError(t, err)
defer os.RemoveAll(tmpdir)

snapshotter, err := native.NewSnapshotter(filepath.Join(tmpdir, "snapshots"))
require.NoError(t, err)

co, cleanup, err := newCacheManager(ctx, cmOpt{
snapshotter: snapshotter,
snapshotterName: "native",
})
require.NoError(t, err)

defer cleanup()

var sharedCacheRefs = &cacheRefs{}

g1 := newRefGetter(co.manager, co.md, sharedCacheRefs)
g2 := newRefGetter(co.manager, co.md, sharedCacheRefs)

ref, err := g1.getRefCacheDir(ctx, nil, "foo", pb.CacheSharingOpt_SHARED)
require.NoError(t, err)

cacheRefReleaseHijack = func() {
time.Sleep(200 * time.Millisecond)
}
cacheRefCloneHijack = func() {
time.Sleep(400 * time.Millisecond)
}
defer func() {
cacheRefReleaseHijack = nil
cacheRefCloneHijack = nil
}()
eg, _ := errgroup.WithContext(context.TODO())

eg.Go(func() error {
return ref.Release(context.TODO())
})
eg.Go(func() error {
_, err := g2.getRefCacheDir(ctx, nil, "foo", pb.CacheSharingOpt_SHARED)
return err
})

done := make(chan struct{})
go func() {
err = eg.Wait()
require.NoError(t, err)
close(done)
}()

select {
case <-done:
case <-time.After(10 * time.Second):
require.FailNow(t, "deadlock on releasing while getting new ref")
}
}

0 comments on commit 6d907b6

Please sign in to comment.