Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Walk: add breadth-first option #63

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 69 additions & 13 deletions merkledag.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,11 +190,13 @@ func FetchGraphWithDepthLimit(ctx context.Context, root cid.Cid, depthLim int, s
}
return false
}
options := &walkOptions{}
Concurrent()(options)

// If we have a ProgressTracker, we wrap the visit function to handle it
v, _ := ctx.Value(progressContextKey).(*ProgressTracker)
if v == nil {
return WalkDepth(ctx, GetLinksDirect(ng), root, visit, Concurrent())
return WalkDepth(ctx, GetLinksDirect(ng), root, visit, options)
}

visitProgress := func(c cid.Cid, depth int) bool {
Expand All @@ -204,7 +206,7 @@ func FetchGraphWithDepthLimit(ctx context.Context, root cid.Cid, depthLim int, s
}
return false
}
return WalkDepth(ctx, GetLinksDirect(ng), root, visitProgress, Concurrent())
return WalkDepth(ctx, GetLinksDirect(ng), root, visitProgress, options)
}

// GetMany gets many nodes from the DAG at once.
Expand Down Expand Up @@ -284,9 +286,16 @@ const defaultConcurrentFetch = 32

// walkOptions represent the parameters of a graph walking algorithm
type walkOptions struct {
// Skip visit to root.
// FIXME: Rename. The current name can be misleading, we're not skipping
// the root during the walk, on the contrary: we skip the visit which
// could have returned false and make the walk actually skip the root node
// (effectively ending the walk).
SkipRoot bool
Concurrency int
ErrorHandler func(c cid.Cid, err error) error
// Walk the DAG in breadth-first order instead of the default depth-first.
BreadthFirst bool
}

// WalkOption is a setter for walkOptions
Expand All @@ -309,6 +318,14 @@ func SkipRoot() WalkOption {
}
}

// BreadthFirst walks the DAG in breadth-first order instead of the default
// depth-first.
func BreadthFirst() WalkOption {
return func(walkOptions *walkOptions) {
walkOptions.BreadthFirst = true
}
}

// Concurrent is a WalkOption indicating that node fetching should be done in
// parallel, with the default concurrency factor.
// NOTE: When using that option, the walk order is *not* guarantee.
Expand Down Expand Up @@ -373,28 +390,67 @@ func OnError(handler func(c cid.Cid, err error) error) WalkOption {
}
}

// WalkGraph will walk the dag in order (depth first) starting at the given root.
func Walk(ctx context.Context, getLinks GetLinks, c cid.Cid, visit func(cid.Cid) bool, options ...WalkOption) error {
// WalkGraph will walk the dag in order starting at the given root.
func Walk(ctx context.Context, getLinks GetLinks, c cid.Cid, visit func(cid.Cid) bool, opts ...WalkOption) error {
options := &walkOptions{}
for _, opt := range opts {
opt(options)
}

visitDepth := func(c cid.Cid, depth int) bool {
return visit(c)
}

return WalkDepth(ctx, getLinks, c, visitDepth, options...)
if options.BreadthFirst {
return WalkBreadth(ctx, getLinks, c, 0, visitDepth, options)
} else {
return WalkDepth(ctx, getLinks, c, visitDepth, options)
}
}

// WalkBreadth walks the DAG starting at the given root in breadth-first order.
// The visit function can be used to limit DAG exploration.
// FIXME: Add parallel implementation.
func WalkBreadth(ctx context.Context, getLinks GetLinks, root cid.Cid, depth int, visit func(cid.Cid, int) bool, options *walkOptions) error {
sameDepthNodes := []cid.Cid{root}
childNodes := []cid.Cid{}
for len(sameDepthNodes) > 0 {
for _, c := range sameDepthNodes {
if !(options.SkipRoot && depth == 0) {
if !visit(c, depth) {
continue
}
}

links, err := getLinks(ctx, c)
if err != nil && options.ErrorHandler != nil {
err = options.ErrorHandler(c, err)
}
if err != nil {
return err
}

for _, link := range links {
childNodes = append(childNodes, link.Cid)
}
}
depth += 1
// FIXME: Is this the correct way to avoid re-allocating too much?
sameDepthNodes = childNodes
childNodes = nil
}
return nil
}

// WalkDepth walks the dag starting at the given root and passes the current
// depth to a given visit function. The visit function can be used to limit DAG
// exploration.
func WalkDepth(ctx context.Context, getLinks GetLinks, c cid.Cid, visit func(cid.Cid, int) bool, options ...WalkOption) error {
opts := &walkOptions{}
for _, opt := range options {
opt(opts)
}
func WalkDepth(ctx context.Context, getLinks GetLinks, c cid.Cid, visit func(cid.Cid, int) bool, options *walkOptions) error {

if opts.Concurrency > 1 {
return parallelWalkDepth(ctx, getLinks, c, visit, opts)
if options.Concurrency > 1 {
return parallelWalkDepth(ctx, getLinks, c, visit, options)
} else {
return sequentialWalkDepth(ctx, getLinks, c, 0, visit, opts)
return sequentialWalkDepth(ctx, getLinks, c, 0, visit, options)
}
}

Expand Down