Skip to content

Commit

Permalink
scheduler: always edge merge in one direction
Browse files Browse the repository at this point in the history
When we perform a vertex merge, we should explicitly track the vertex
that it was merged into. This way, we can avoid the case where we merge
an index 0 edge from A->B and, later an index 1 edge from B->A.

With this patch, this scenario instead flips the direction of the merge
to merge from A->B for index 1.

Signed-off-by: Justin Chadwell <me@jedevc.com>
  • Loading branch information
jedevc committed Jan 16, 2024
1 parent a091126 commit 6b5891d
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 3 deletions.
36 changes: 36 additions & 0 deletions solver/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ type state struct {
clientVertex client.Vertex
origDigest digest.Digest // original LLB digest. TODO: probably better to use string ID so this isn't needed

owners []Vertex

mu sync.Mutex
op *sharedOp
edges map[Index]*edge
Expand Down Expand Up @@ -178,6 +180,7 @@ func (s *state) setEdge(index Index, targetEdge *edge, targetState *state) {
targetState.mpw.Add(s.mpw)
targetState.allPw[s.mpw] = struct{}{}
}
targetState.owners = append(targetState.owners, s.vtx)
}
}

Expand Down Expand Up @@ -280,6 +283,39 @@ func NewSolver(opts SolverOpt) *Solver {
return jl
}

func (jl *Solver) findMergeDirection(dest *edge, src *edge) (*edge, *edge) {
jl.mu.RLock()
defer jl.mu.RUnlock()

st, ok := jl.actives[src.edge.Vertex.Digest()]
if !ok {
return dest, src
}

// attempt to detect a cycle
owners := st.owners
for len(owners) > 0 {
var owners2 []Vertex
for _, owner := range owners {
st, ok = jl.actives[owner.Digest()]
if !ok {
continue
}

// if the destination vertex has already been merged into the target,
// we should switch the merge order
if st.vtx.Digest() == dest.edge.Vertex.Digest() {
return src, dest
}

owners2 = append(owners2, st.owners...)
}
owners = owners2
}

return dest, src
}

func (jl *Solver) setEdge(e Edge, targetEdge *edge) {
jl.mu.RLock()
defer jl.mu.RUnlock()
Expand Down
8 changes: 5 additions & 3 deletions solver/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,10 @@ func (s *scheduler) dispatch(e *edge) {
if e.isDep(origEdge) || origEdge.isDep(e) {
bklog.G(context.TODO()).Debugf("skip merge due to dependency")
} else {
bklog.G(context.TODO()).Debugf("merging edge %s to %s\n", e.edge.Vertex.Name(), origEdge.edge.Vertex.Name())
if s.mergeTo(origEdge, e) {
s.ef.setEdge(e.edge, origEdge)
dest, src := s.ef.findMergeDirection(origEdge, e)
bklog.G(context.TODO()).Debugf("merging edge %s to %s\n", src.edge.Vertex.Name(), dest.edge.Vertex.Name())
if s.mergeTo(dest, src) {
s.ef.setEdge(src.edge, dest)
}
}
}
Expand Down Expand Up @@ -351,6 +352,7 @@ func (s *scheduler) mergeTo(target, src *edge) bool {
type edgeFactory interface {
getEdge(Edge) *edge
setEdge(Edge, *edge)
findMergeDirection(*edge, *edge) (*edge, *edge)
}

type pipeFactory struct {
Expand Down

0 comments on commit 6b5891d

Please sign in to comment.