Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fetch Hamt Children Nodes in Parallel #4979

Closed
wants to merge 16 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
325 changes: 325 additions & 0 deletions unixfs/hamt/fetcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,325 @@
package hamt

import (
"context"
"time"

cid "gx/ipfs/QmYVNvtQkeZ6AKSwDrjQTs432QtL6umrrK41EBq3cu7iSP/go-cid"
ipld "gx/ipfs/QmZtNq8dArGfnpCZfx2pUNY7UcjGhVp5qqwQ4hH6mpTMRQ/go-ipld-format"
logging "gx/ipfs/QmcVVHfdyv15GVPk7NrxdWjh2hLVccXnoD8j2tyQShiXJb/go-log"
)

var log = logging.Logger("hamt")

// fetcher implements a background fetcher to retrieve missing child
// shards in large batches. To allow streaming, it attempts to
// retrieve the missing shards in the order they will be read when
// traversing the tree depth first.
type fetcher struct {
// note: the fields in this structure should only be accesses by
// the 'mainLoop' go routine, all communication should be done via
// channels

ctx context.Context
dserv ipld.DAGService

requestCh chan *Shard // channel for requesting the children of a shard
resultCh chan result // channel for retrieving the results of the request

want *Shard // want is the job id (the parent shard) of the results we want next

idle bool

done chan batchJob // when a batch job completes results are sent to this channel

todoFirst *job // do this job first since we are waiting for its results
todo jobStack // stack of jobs that still need to be done
jobs map[*Shard]*job // map of all jobs in which the results have not been collected yet

// stats relevant for streaming the complete hamt directory
doneCnt int // job's done but results not yet retrieved
hits int // job's result already ready, no delay
nearMisses int // job currently being worked on, small delay
misses int // job on todo stack but will be done in the next batch, larger delay

// other useful stats
cidCnt int

start time.Time
}

// batchSize must be at least as large as the largest number of CIDs
// requested in a single job. For best performance it should likely be
// slightly larger as jobs are popped from the todo stack in order and a
// job close to the batchSize could force a very small batch to run.
// The recommend minimum size is thus a size slightly larger than the
// maximum number children in a HAMT node (which is the largest number
// of CIDs that could be requested in a single job) or 256 + 64 = 320
const batchSize = 320
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd open an issue to run benchmarks with different values of this once this PR is merged

Copy link
Contributor Author

@kevina kevina Jul 23, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can do that, but I already did do some informal benchmarking to arrive at this value.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kevina is that benchmarking reproducible? Do you have scripts we can run to try it out and rederive the number ourselves?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually. That number is based more on reason (as outlined on comments) then benchmarks. I do not recommend we go below that size. Larger values might be beneficial at the increase of resource usage. I don't have any formal benchmarks, I mostly just observed the behavior when fetching a large directory. I can create an issue to consider increasing the value if desirable.


//
// fetcher public interface
//

// startFetcher starts a new fetcher in the background
func startFetcher(ctx context.Context, dserv ipld.DAGService) *fetcher {
log.Infof("fetcher: starting...")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log.Debugf / remove

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rather keep this at the same level of the rest of the stats output for consistency.

f := &fetcher{
ctx: ctx,
dserv: dserv,
requestCh: make(chan *Shard),
resultCh: make(chan result),
idle: true,
done: make(chan batchJob),
jobs: make(map[*Shard]*job),
}
go f.mainLoop()
return f
}

// result contains the result of a job, see getResult
type result struct {
vals map[string]*Shard
errs []error
}

// get gets the missing child shards for the hamt object.
// The missing children for the passed in shard are returned. The
// children are then also retrieved in the background. The result is
// the result of the batch request and not just the single job. In
// particular, if the 'errs' field is empty the 'vals' of the result
// is guaranteed to contain all the missing child shards, but the
// map may also contain child shards of other jobs in the batch.
func (f *fetcher) get(hamt *Shard) result {
f.requestCh <- hamt
res := <-f.resultCh
return res
}

//
// fetcher internals
//

type job struct {
id *Shard
idx int /* index in the todo stack, an index of -1 means the job
is already done or being worked on now */
cids []*cid.Cid
res result
}

func (f *fetcher) mainLoop() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd split this function into smaller functions as this is rather hard to read

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that might be helpful, but I would rather do this in a separate p.r., as I have other higher priority things I want to work on.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As this is new code, and not a refactor, i'd prefer getting it a bit cleaner before merging it in. @kevina If you'd like to hack on other things, maybe @magik6k could help out cleaning things up?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also don't necessary agree it will be an improvement, but see my other comments.

f.start = time.Now()
for {
select {
case id := <-f.requestCh:
f.mainLoopHandleRequest(id)
case bj := <-f.done:
f.doneCnt += len(bj.jobs)
f.cidCnt += len(bj.cids)
f.launch()
if f.want != nil {
j := f.jobs[f.want]
if j.res.vals != nil {
f.mainLoopSendResult(j)
f.want = nil
}
}
log.Infof("fetcher: batch job done")
f.mainLoopLogStats()
case <-f.ctx.Done():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the context is cancelled what happens with the launch() goroutines that want to write to the done channel?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should now be fixed.

if !f.idle {
// wait unit batch job finishes
<-f.done
}
log.Infof("fetcher: exiting")
f.mainLoopLogStats()
return
}
}
}

func (f *fetcher) mainLoopHandleRequest(id *Shard) {
if f.want != nil {
// programmer error
panic("fetcher: can not request more than one result at a time")
}
j, ok := f.jobs[id]
var err error
if !ok {
// job does not exist yet so add it
j, err = f.mainLoopAddJob(id)
if err != nil {
f.resultCh <- result{errs: []error{err}}
return
}
if j == nil {
// no children that need to be retrieved
f.resultCh <- result{vals: make(map[string]*Shard)}
return
}
if f.idle {
f.launch()
}
}
if j.res.vals != nil {
// job already completed so just send result
f.hits++
f.mainLoopSendResult(j)
return
}
if j.idx != -1 {
f.misses++
// job is not currently running so
// move job to todoFirst so that it will be done on the
// next batch job
f.todo.remove(j)
f.todoFirst = j
} else {
// job already running
f.nearMisses++
}
f.want = id
}

func (f *fetcher) mainLoopAddJob(hamt *Shard) (*job, error) {
children, err := hamt.missingChildShards()
if err != nil {
return nil, err
}
if len(children) == 0 {
return nil, nil
}
j := &job{id: hamt, cids: children}
if len(j.cids) > batchSize {
// programmer error
panic("job size larger than batchSize")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this worth a panic? Is the responsibility of the consumer to now upfront how many children will be requested per shard?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it is the responsibility of the consumer to now upfront how many children will be requested per shard. The HAMT shard does not allow more than 256 children in a single node, so this is reasonable.

}
f.todo.push(j)
f.jobs[j.id] = j
return j, nil
}

func (f *fetcher) mainLoopSendResult(j *job) {
f.resultCh <- j.res
delete(f.jobs, j.id)
f.doneCnt--
if len(j.res.errs) != 0 {
return
}
// we want the first child to be the first to run not the last so
// add the jobs in the reverse order
for i := len(j.cids) - 1; i >= 0; i-- {
hamt := j.res.vals[string(j.cids[i].Bytes())]
f.mainLoopAddJob(hamt)
}
if f.idle {
f.launch()
}
}

func (f *fetcher) mainLoopLogStats() {
log.Infof("fetcher stats (cids, done, hits, nearMisses, misses): %d %d %d %d %d", f.cidCnt, f.doneCnt, f.hits, f.nearMisses, f.misses)
elapsed := time.Now().Sub(f.start).Seconds()
log.Infof("fetcher perf (cids/sec, jobs/sec) %f %f", float64(f.cidCnt)/elapsed, float64(f.doneCnt+f.hits+f.nearMisses+f.misses)/elapsed)
}

type batchJob struct {
cids []*cid.Cid
jobs []*job
}

func (b *batchJob) add(j *job) {
b.cids = append(b.cids, j.cids...)
b.jobs = append(b.jobs, j)
j.idx = -1
}

func (f *fetcher) launch() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd split this up too

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

bj := batchJob{}

// always do todoFirst
if f.todoFirst != nil {
bj.add(f.todoFirst)
f.todoFirst = nil
}

// pop requets from todo list until we hit the batchSize
for !f.todo.empty() && len(bj.cids)+len(f.todo.top().cids) <= batchSize {
j := f.todo.pop()
bj.add(j)
}

if len(bj.cids) == 0 {
if !f.idle {
log.Infof("fetcher: entering idle state: no more jobs")
}
f.idle = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(This may be too much of a restructuring so just an idea for later iterations). Could the idle state be replaced with a mechanism that would stop the mainLoop goroutine and start it up (call) again when another get is issued?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possibly. Although the current code has the limitation that there is only one batch job running at a time. This can cause stragglers in a batch job to reduce throughput. One ideas I explored earlier was to launch a new batch job when an existing job was half done. I rejected this for now because the logic become to complicated. (For example if we unconditional launch a new job when a batch is half done we might end up with a a large number of outstating jobs, we could limit the jobs also, but that complicated the logic even more.). I would still like to explore this option at a later date. I think I will change idle to numBatchJobs and make it clear that in the current iteration it is either 0 or 1, but at some point this could change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On second thought I think for this p.r. I will leave things as they are. I have reviewed the idle logic and am fairly certain it is correct.

return
}

// launch job
log.Infof("fetcher: starting batch job, size = %d", len(bj.cids))
f.idle = false
go func() {
ch := f.dserv.GetMany(f.ctx, bj.cids)
fetched := result{vals: make(map[string]*Shard)}
for no := range ch {
if no.Err != nil {
fetched.errs = append(fetched.errs, no.Err)
continue
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason to not abort early here (close context and return)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because it's unnecessary. The fetcher can continue even if it encounters an error here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I can see, at least in https://github.com/ipfs/go-ipfs/pull/4979/files#diff-689271378656b0bd9fc790b4a0a2b784R393 we throw the result away if there are errors, so it would make sense to not fetch more stuff if we know it won't be used (I might be wrong here though)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that code code also aborts the context so the amount of extra work done is minimal.

}
hamt, err := NewHamtFromDag(f.dserv, no.Node)
if err != nil {
fetched.errs = append(fetched.errs, err)
continue
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And here (abort early)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

}
fetched.vals[string(no.Node.Cid().Bytes())] = hamt
}
for _, job := range bj.jobs {
job.res = fetched
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do all of the jobs get all of the results? (Related to comment at line 90.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. And I do this for code simplicity and performance reasons.

}
f.done <- bj
}()
}

// jobStack is a specialized stack implementation. It has the
// property that once an item is added to the stack its position will
// never change so it can be referenced by index
type jobStack struct {
c []*job
}

func (js *jobStack) empty() bool {
return len(js.c) == 0
}

func (js *jobStack) top() *job {
return js.c[len(js.c)-1]
}

func (js *jobStack) push(j *job) {
j.idx = len(js.c)
js.c = append(js.c, j)
}

func (js *jobStack) pop() *job {
j := js.top()
js.remove(j)
return j
}

// remove marks a job as empty and attempts to remove it if it is at
// the top of a stack
func (js *jobStack) remove(j *job) {
js.c[j.idx] = nil
j.idx = -1
js.popEmpty()
}

// popEmpty removes all empty jobs at the top of the stack
func (js *jobStack) popEmpty() {
for len(js.c) > 0 && js.c[len(js.c)-1] == nil {
js.c = js.c[:len(js.c)-1]
}
}
Loading