From 6b5891d8d251f1fe2ea28a80e15cbc9456bbbf7a Mon Sep 17 00:00:00 2001 From: Justin Chadwell Date: Tue, 16 Jan 2024 12:29:26 +0000 Subject: [PATCH] scheduler: always edge merge in one direction When we perform a vertex merge, we should explicitly track the vertex that it was merged into. This way, we can avoid the case where we merge an index 0 edge from A->B and, later an index 1 edge from B->A. With this patch, this scenario instead flips the direction of the merge to merge from A->B for index 1. Signed-off-by: Justin Chadwell --- solver/jobs.go | 36 ++++++++++++++++++++++++++++++++++++ solver/scheduler.go | 8 +++++--- 2 files changed, 41 insertions(+), 3 deletions(-) diff --git a/solver/jobs.go b/solver/jobs.go index 5ddafa614f4a7..48eac30a909ce 100644 --- a/solver/jobs.go +++ b/solver/jobs.go @@ -57,6 +57,8 @@ type state struct { clientVertex client.Vertex origDigest digest.Digest // original LLB digest. TODO: probably better to use string ID so this isn't needed + owners []Vertex + mu sync.Mutex op *sharedOp edges map[Index]*edge @@ -178,6 +180,7 @@ func (s *state) setEdge(index Index, targetEdge *edge, targetState *state) { targetState.mpw.Add(s.mpw) targetState.allPw[s.mpw] = struct{}{} } + targetState.owners = append(targetState.owners, s.vtx) } } @@ -280,6 +283,39 @@ func NewSolver(opts SolverOpt) *Solver { return jl } +func (jl *Solver) findMergeDirection(dest *edge, src *edge) (*edge, *edge) { + jl.mu.RLock() + defer jl.mu.RUnlock() + + st, ok := jl.actives[src.edge.Vertex.Digest()] + if !ok { + return dest, src + } + + // attempt to detect a cycle + owners := st.owners + for len(owners) > 0 { + var owners2 []Vertex + for _, owner := range owners { + st, ok = jl.actives[owner.Digest()] + if !ok { + continue + } + + // if the destination vertex has already been merged into the target, + // we should switch the merge order + if st.vtx.Digest() == dest.edge.Vertex.Digest() { + return src, dest + } + + owners2 = append(owners2, st.owners...) + } + owners = owners2 + } + + return dest, src +} + func (jl *Solver) setEdge(e Edge, targetEdge *edge) { jl.mu.RLock() defer jl.mu.RUnlock() diff --git a/solver/scheduler.go b/solver/scheduler.go index cee36672640d3..9b64c6ec72c82 100644 --- a/solver/scheduler.go +++ b/solver/scheduler.go @@ -186,9 +186,10 @@ func (s *scheduler) dispatch(e *edge) { if e.isDep(origEdge) || origEdge.isDep(e) { bklog.G(context.TODO()).Debugf("skip merge due to dependency") } else { - bklog.G(context.TODO()).Debugf("merging edge %s to %s\n", e.edge.Vertex.Name(), origEdge.edge.Vertex.Name()) - if s.mergeTo(origEdge, e) { - s.ef.setEdge(e.edge, origEdge) + dest, src := s.ef.findMergeDirection(origEdge, e) + bklog.G(context.TODO()).Debugf("merging edge %s to %s\n", src.edge.Vertex.Name(), dest.edge.Vertex.Name()) + if s.mergeTo(dest, src) { + s.ef.setEdge(src.edge, dest) } } } @@ -351,6 +352,7 @@ func (s *scheduler) mergeTo(target, src *edge) bool { type edgeFactory interface { getEdge(Edge) *edge setEdge(Edge, *edge) + findMergeDirection(*edge, *edge) (*edge, *edge) } type pipeFactory struct {