Skip to content

Commit

Permalink
rapide: randomly distribute equivalent walks
Browse files Browse the repository at this point in the history
This help avoiding situations where you go race someone else, in a shallow dag, and you get a block next to them in the traversal.
  • Loading branch information
Jorropo committed Feb 1, 2023
1 parent a163bb5 commit 9385e95
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 21 deletions.
4 changes: 3 additions & 1 deletion rapide/rapide.go
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"io"
mrand "math/rand"
"sync"
"sync/atomic"

Expand Down Expand Up @@ -42,8 +43,9 @@ func (c *Client) Get(ctx context.Context, root cid.Cid, traversal ipsl.Traversal
errors: make([]error, len(c.ServerDrivenDownloaders)),
}

seedRand := mrand.New(mrand.NewSource(mrand.Int63()))
for i, sdd := range c.ServerDrivenDownloaders {
d.startServerDrivenWorker(ctx, sdd, &d.root, &d.errors[i])
d.startServerDrivenWorker(ctx, sdd, &d.root, &d.errors[i], seedRand.Int63()^int64(i))
}

return out
Expand Down
62 changes: 42 additions & 20 deletions rapide/serverdriven.go
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"io"
mrand "math/rand"

"github.com/ipfs/go-cid"
"github.com/ipfs/go-libipfs/blocks"
Expand All @@ -15,22 +16,24 @@ type serverDrivenWorker struct {
impl ServerDrivenDownloader
download *download
outErr *error

current *node

tasks map[cid.Cid]*node
current *node
tasks map[cid.Cid]*node
rand mrand.Rand

// TODO: add a dontGoThere map which tells you what part of the dag this node is not able to handle
}

func (d *download) startServerDrivenWorker(ctx context.Context, impl ServerDrivenDownloader, root *node, outErr *error) {
go (&serverDrivenWorker{
func (d *download) startServerDrivenWorker(ctx context.Context, impl ServerDrivenDownloader, root *node, outErr *error, seed int64) {
w := &serverDrivenWorker{
impl: impl,
download: d,
outErr: outErr,
current: root,
tasks: make(map[cid.Cid]*node),
}).work(ctx)
rand: *mrand.New(mrand.NewSource(seed)),
}

go w.work(ctx)
}

func (w *serverDrivenWorker) work(ctx context.Context) {
Expand Down Expand Up @@ -175,30 +178,20 @@ func (w *serverDrivenWorker) findWork() (cid.Cid, ipsl.Traversal, bool) {
case done:
// first search in it's childs if it has something we could run
c.workers += 1
var minWorkers uint
var minWorkers, luck uint
var min *node
for _, child := range c.childrens {
// we run a minimum search, we want the node that have the least amount of workers currently
// TODO: filter childs in the dontGoThere map
child.mu.Lock()
switch {
case min == nil:
minWorkers = child.workers
min = child
case child.workers < minWorkers:
minWorkers = child.workers
min = child
}
child.mu.Unlock()
minWorkers, min, luck = w.compareChildWithMinimums(child, minWorkers, min, luck)
}
if min != nil {
c.mu.Unlock()
c = min
continue
}

// this node is fully completed, backtracking
// TODO: add c in dontGoThere (we failed to select any child)
// this node is fully completed, do backtracking
c.workers -= 1
new := c.parent
c.mu.Unlock()
Expand All @@ -211,6 +204,35 @@ func (w *serverDrivenWorker) findWork() (cid.Cid, ipsl.Traversal, bool) {
}
}

func (w *serverDrivenWorker) compareChildWithMinimums(child *node, minWorkers uint, min *node, luck uint) (uint, *node, uint) {
child.mu.Lock()
switch {
case min == nil:
minWorkers = child.workers
min = child
case child.workers < minWorkers:
minWorkers = child.workers
min = child
luck = 0
case child.workers == minWorkers:
// if scores are identical randomly select other nodes to randomly distribute where downloads are placed
if luck == 0 {
// lazy initialisation of luck, this allows to creating a random value when better values exists back to back
luck = uint(w.rand.Int())
}
newLuck := uint(w.rand.Int())
if newLuck >= luck {
break
}
minWorkers = child.workers
min = child
luck = newLuck
}
child.mu.Unlock()

return minWorkers, min, luck
}

// resetCurrentChildsNodeWorkState updates the state of the current node to longer count towards it.
func (w *serverDrivenWorker) resetCurrentChildsNodeWorkState() {
c := w.current
Expand Down

0 comments on commit 9385e95

Please sign in to comment.