From 0740994604e4e295e9a58c63d2f1c81b87bed86f Mon Sep 17 00:00:00 2001 From: Justin Chadwell Date: Wed, 17 Jan 2024 14:10:43 +0000 Subject: [PATCH 1/5] chore: fix minor typos in scheduler_test.go Signed-off-by: Justin Chadwell --- solver/scheduler_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/solver/scheduler_test.go b/solver/scheduler_test.go index 79497913076d..9b4ecb7e2d52 100644 --- a/solver/scheduler_test.go +++ b/solver/scheduler_test.go @@ -3560,7 +3560,7 @@ func (v *vertexConst) Acquire(ctx context.Context) (ReleaseFunc, error) { return func() {}, nil } -// vtxSum returns a vertex that ourputs sum of its inputs plus a constant +// vtxSum returns a vertex that outputs sum of its inputs plus a constant func vtxSum(v int, opt vtxOpt) *vertexSum { if opt.cacheKeySeed == "" { opt.cacheKeySeed = fmt.Sprintf("sum-%d-%d", v, len(opt.inputs)) @@ -3601,7 +3601,7 @@ func (v *vertexSum) Acquire(ctx context.Context) (ReleaseFunc, error) { func vtxSubBuild(g Edge, opt vtxOpt) *vertexSubBuild { if opt.cacheKeySeed == "" { - opt.cacheKeySeed = fmt.Sprintf("sum-%s", identity.NewID()) + opt.cacheKeySeed = fmt.Sprintf("sub-%s", identity.NewID()) } if opt.name == "" { opt.name = opt.cacheKeySeed + "-" + identity.NewID() From d3b1de8f94ce0133b6b941c92653a0bcc875d554 Mon Sep 17 00:00:00 2001 From: Justin Chadwell Date: Wed, 17 Jan 2024 14:12:39 +0000 Subject: [PATCH 2/5] chore: update merging log message to include index Signed-off-by: Justin Chadwell --- solver/scheduler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/solver/scheduler.go b/solver/scheduler.go index cee36672640d..56932a75bc48 100644 --- a/solver/scheduler.go +++ b/solver/scheduler.go @@ -186,7 +186,7 @@ func (s *scheduler) dispatch(e *edge) { if e.isDep(origEdge) || origEdge.isDep(e) { bklog.G(context.TODO()).Debugf("skip merge due to dependency") } else { - bklog.G(context.TODO()).Debugf("merging edge %s to %s\n", e.edge.Vertex.Name(), origEdge.edge.Vertex.Name()) + bklog.G(context.TODO()).Debugf("merging edge %s[%d] to %s[%d]\n", e.edge.Vertex.Name(), e.edge.Index, origEdge.edge.Vertex.Name(), origEdge.edge.Index) if s.mergeTo(origEdge, e) { s.ef.setEdge(e.edge, origEdge) } From 7aee2929ee78ab66a11609894ce03002f24ac0bb Mon Sep 17 00:00:00 2001 From: Justin Chadwell Date: Wed, 17 Jan 2024 13:57:05 +0000 Subject: [PATCH 3/5] progress: add panic for multiwriter cycles Previously, if this case occured, we would deadlock, so as to make this case more easily identifiable and debuggable, we should panic instead. Potentially, we could also just *skip* this case, but given that this is an internal logical error that should not occur, it would be better to identify this. Signed-off-by: Justin Chadwell --- util/progress/multiwriter.go | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/util/progress/multiwriter.go b/util/progress/multiwriter.go index f0f7b40aff5b..6810ad68b51f 100644 --- a/util/progress/multiwriter.go +++ b/util/progress/multiwriter.go @@ -36,6 +36,15 @@ func (ps *MultiWriter) Add(pw Writer) { if !ok { return } + if pws, ok := rw.(*MultiWriter); ok { + if pws.contains(ps) { + // this would cause a deadlock, so we should panic instead + // NOTE: this can be caused by a cycle in the scheduler states, + // which is created by a series of unfortunate edge merges + panic("multiwriter loop detected") + } + } + ps.mu.Lock() plist := make([]*Progress, 0, len(ps.items)) plist = append(plist, ps.items...) @@ -102,3 +111,24 @@ func (ps *MultiWriter) writeRawProgress(p *Progress) error { func (ps *MultiWriter) Close() error { return nil } + +func (ps *MultiWriter) contains(pw rawProgressWriter) bool { + ps.mu.Lock() + defer ps.mu.Unlock() + _, ok := ps.writers[pw] + if ok { + return true + } + + for w := range ps.writers { + w, ok := w.(*MultiWriter) + if !ok { + continue + } + if w.contains(pw) { + return true + } + } + + return false +} From 4e59c55b84c2ff58a844554cf3f31b8b979e563b Mon Sep 17 00:00:00 2001 From: Justin Chadwell Date: Tue, 16 Jan 2024 12:29:26 +0000 Subject: [PATCH 4/5] scheduler: always edge merge in one direction When we perform a vertex merge, we should explicitly track the vertex that it was merged into. This way, we can avoid the case where we merge an index 0 edge from A->B and, later an index 1 edge from B->A. With this patch, this scenario instead flips the direction of the merge to merge from A->B for index 1. Signed-off-by: Justin Chadwell --- solver/jobs.go | 43 +++++++++++++++++++++++++++++++++++++++++++ solver/scheduler.go | 12 +++++++++--- 2 files changed, 52 insertions(+), 3 deletions(-) diff --git a/solver/jobs.go b/solver/jobs.go index 5ddafa614f4a..6062b47efb69 100644 --- a/solver/jobs.go +++ b/solver/jobs.go @@ -280,6 +280,49 @@ func NewSolver(opts SolverOpt) *Solver { return jl } +// hasOwner returns true if the provided target edge (or any of it's sibling +// edges) has the provided owner. +func (jl *Solver) hasOwner(target Edge, owner Edge) bool { + jl.mu.RLock() + defer jl.mu.RUnlock() + + st, ok := jl.actives[target.Vertex.Digest()] + if !ok { + return false + } + + var owners []Edge + for _, e := range st.edges { + if e.owner != nil { + owners = append(owners, e.owner.edge) + } + } + for len(owners) > 0 { + var owners2 []Edge + for _, e := range owners { + st, ok = jl.actives[e.Vertex.Digest()] + if !ok { + continue + } + + if st.vtx.Digest() == owner.Vertex.Digest() { + return true + } + + for _, e := range st.edges { + if e.owner != nil { + owners2 = append(owners2, e.owner.edge) + } + } + } + + // repeat recursively, this time with the linked owners owners + owners = owners2 + } + + return false +} + func (jl *Solver) setEdge(e Edge, targetEdge *edge) { jl.mu.RLock() defer jl.mu.RUnlock() diff --git a/solver/scheduler.go b/solver/scheduler.go index 56932a75bc48..20220f73943c 100644 --- a/solver/scheduler.go +++ b/solver/scheduler.go @@ -186,9 +186,14 @@ func (s *scheduler) dispatch(e *edge) { if e.isDep(origEdge) || origEdge.isDep(e) { bklog.G(context.TODO()).Debugf("skip merge due to dependency") } else { - bklog.G(context.TODO()).Debugf("merging edge %s[%d] to %s[%d]\n", e.edge.Vertex.Name(), e.edge.Index, origEdge.edge.Vertex.Name(), origEdge.edge.Index) - if s.mergeTo(origEdge, e) { - s.ef.setEdge(e.edge, origEdge) + dest, src := origEdge, e + if s.ef.hasOwner(origEdge.edge, e.edge) { + dest, src = src, dest + } + + bklog.G(context.TODO()).Debugf("merging edge %s[%d] to %s[%d]\n", src.edge.Vertex.Name(), src.edge.Index, dest.edge.Vertex.Name(), dest.edge.Index) + if s.mergeTo(dest, src) { + s.ef.setEdge(src.edge, dest) } } } @@ -351,6 +356,7 @@ func (s *scheduler) mergeTo(target, src *edge) bool { type edgeFactory interface { getEdge(Edge) *edge setEdge(Edge, *edge) + hasOwner(Edge, Edge) bool } type pipeFactory struct { From b7a0282baee131db4eab10ad66cde4b4aca5ede5 Mon Sep 17 00:00:00 2001 From: Justin Chadwell Date: Tue, 16 Jan 2024 17:28:23 +0000 Subject: [PATCH 5/5] test: add a test for cyclic merges Signed-off-by: Justin Chadwell --- solver/scheduler_test.go | 161 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 161 insertions(+) diff --git a/solver/scheduler_test.go b/solver/scheduler_test.go index 9b4ecb7e2d52..2a3544ba4cff 100644 --- a/solver/scheduler_test.go +++ b/solver/scheduler_test.go @@ -3090,6 +3090,127 @@ func TestMergedEdgesLookup(t *testing.T) { } } +func TestMergedEdgesCycle(t *testing.T) { + t.Parallel() + + for i := 0; i < 20; i++ { + ctx := context.TODO() + + cacheManager := newTrackingCacheManager(NewInMemoryCacheManager()) + + l := NewSolver(SolverOpt{ + ResolveOpFunc: testOpResolver, + DefaultCache: cacheManager, + }) + defer l.Close() + + j0, err := l.NewJob("j0") + require.NoError(t, err) + + defer func() { + if j0 != nil { + j0.Discard() + } + }() + + // 2 different vertices, va and vb, both with the same cache key + va := vtxAdd(2, vtxOpt{name: "va", inputs: []Edge{ + {Vertex: vtxConst(3, vtxOpt{})}, + {Vertex: vtxConst(4, vtxOpt{})}, + }}) + vb := vtxAdd(2, vtxOpt{name: "vb", inputs: []Edge{ + {Vertex: vtxConst(3, vtxOpt{})}, + {Vertex: vtxConst(4, vtxOpt{})}, + }}) + + // 4 edges va[0], va[1], vb[0], vb[1] + // by ordering them like this, we try and trigger merge va[0]->vb[0] and + // vb[1]->va[1] to cause a cycle + g := Edge{ + Vertex: vtxSum(1, vtxOpt{inputs: []Edge{ + {Vertex: va, Index: 1}, // 6 + {Vertex: vb, Index: 0}, // 5 + {Vertex: va, Index: 0}, // 5 + {Vertex: vb, Index: 1}, // 6 + }}), + } + g.Vertex.(*vertexSum).setupCallCounters() + + res, err := j0.Build(ctx, g) + require.NoError(t, err) + require.Equal(t, 23, unwrapInt(res)) + + require.NoError(t, j0.Discard()) + j0 = nil + } +} + +func TestMergedEdgesCycleMultipleOwners(t *testing.T) { + t.Parallel() + + for i := 0; i < 20; i++ { + ctx := context.TODO() + + cacheManager := newTrackingCacheManager(NewInMemoryCacheManager()) + + l := NewSolver(SolverOpt{ + ResolveOpFunc: testOpResolver, + DefaultCache: cacheManager, + }) + defer l.Close() + + j0, err := l.NewJob("j0") + require.NoError(t, err) + + defer func() { + if j0 != nil { + j0.Discard() + } + }() + + va := vtxAdd(2, vtxOpt{name: "va", inputs: []Edge{ + {Vertex: vtxConst(3, vtxOpt{})}, + {Vertex: vtxConst(4, vtxOpt{})}, + {Vertex: vtxConst(5, vtxOpt{})}, + }}) + vb := vtxAdd(2, vtxOpt{name: "vb", inputs: []Edge{ + {Vertex: vtxConst(3, vtxOpt{})}, + {Vertex: vtxConst(4, vtxOpt{})}, + {Vertex: vtxConst(5, vtxOpt{})}, + }}) + vc := vtxAdd(2, vtxOpt{name: "vc", inputs: []Edge{ + {Vertex: vtxConst(3, vtxOpt{})}, + {Vertex: vtxConst(4, vtxOpt{})}, + {Vertex: vtxConst(5, vtxOpt{})}, + }}) + + g := Edge{ + Vertex: vtxSum(1, vtxOpt{inputs: []Edge{ + // we trigger merge va[0]->vb[0] and va[1]->vc[1] so that va gets + // been merged twice + {Vertex: vb, Index: 0}, // 5 + {Vertex: va, Index: 0}, // 5 + + {Vertex: vc, Index: 1}, // 6 + {Vertex: va, Index: 1}, // 6 + + // then we trigger another merge via the first owner vb[1]->va[1] + // that must be flipped + {Vertex: va, Index: 2}, // 7 + {Vertex: vb, Index: 2}, // 7 + }}), + } + g.Vertex.(*vertexSum).setupCallCounters() + + res, err := j0.Build(ctx, g) + require.NoError(t, err) + require.Equal(t, 37, unwrapInt(res)) + + require.NoError(t, j0.Discard()) + j0 = nil + } +} + func TestCacheLoadError(t *testing.T) { t.Parallel() @@ -3432,6 +3553,8 @@ func (v *vertex) setCallCounters(cacheCount, execCount *int64) { v = vv case *vertexSum: v = vv.vertex + case *vertexAdd: + v = vv.vertex case *vertexConst: v = vv.vertex case *vertexSubBuild: @@ -3599,6 +3722,44 @@ func (v *vertexSum) Acquire(ctx context.Context) (ReleaseFunc, error) { return func() {}, nil } +// vtxAdd returns a vertex that outputs each input plus a constant +func vtxAdd(v int, opt vtxOpt) *vertexAdd { + if opt.cacheKeySeed == "" { + opt.cacheKeySeed = fmt.Sprintf("add-%d-%d", v, len(opt.inputs)) + } + if opt.name == "" { + opt.name = opt.cacheKeySeed + "-" + identity.NewID() + } + return &vertexAdd{vertex: vtx(opt), value: v} +} + +type vertexAdd struct { + *vertex + value int +} + +func (v *vertexAdd) Sys() interface{} { + return v +} + +func (v *vertexAdd) Exec(ctx context.Context, g session.Group, inputs []Result) (outputs []Result, err error) { + if err := v.exec(ctx, inputs); err != nil { + return nil, err + } + for _, inp := range inputs { + r, ok := inp.Sys().(*dummyResult) + if !ok { + return nil, errors.Errorf("invalid input type: %T", inp.Sys()) + } + outputs = append(outputs, &dummyResult{id: identity.NewID(), intValue: r.intValue + v.value}) + } + return outputs, nil +} + +func (v *vertexAdd) Acquire(ctx context.Context) (ReleaseFunc, error) { + return func() {}, nil +} + func vtxSubBuild(g Edge, opt vtxOpt) *vertexSubBuild { if opt.cacheKeySeed == "" { opt.cacheKeySeed = fmt.Sprintf("sub-%s", identity.NewID())