Skip to content

Commit

Permalink
Merge pull request #8 from ipfs/feat/proto-ext-poc
Browse files Browse the repository at this point in the history
Extend peer task queue to work with want-have / want-block
  • Loading branch information
Stebalien committed Nov 11, 2019
2 parents 1e8b1e8 + fd33b91 commit f0529d7
Show file tree
Hide file tree
Showing 7 changed files with 979 additions and 242 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/ipfs/go-peertaskqueue
go 1.12

require (
github.com/ipfs/go-ipfs-pq v0.0.1
github.com/ipfs/go-ipfs-pq v0.0.2
github.com/libp2p/go-libp2p-core v0.0.1
github.com/multiformats/go-multihash v0.0.5 // indirect
)
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,20 @@ github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMo
github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE=
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gxed/hashland/keccakpg v0.0.1 h1:wrk3uMNaMxbXiHibbPO4S0ymqJMm41WiudyFSs7UnsU=
github.com/gxed/hashland/keccakpg v0.0.1/go.mod h1:kRzw3HkwxFU1mpmPP8v1WyQzwdGfmKFJ6tItnhQ67kU=
github.com/gxed/hashland/murmur3 v0.0.1 h1:SheiaIt0sda5K+8FLz952/1iWS9zrnKsEJaOJu4ZbSc=
github.com/gxed/hashland/murmur3 v0.0.1/go.mod h1:KjXop02n4/ckmZSnY2+HKcLud/tcmvhST0bie/0lS48=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/ipfs/go-cid v0.0.1/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM=
github.com/ipfs/go-ipfs-pq v0.0.0-20191101181110-8122fa6a9529 h1:izQqDLe/uSPKe6NYr3FjwnvU0AAg0im/4DLVXplLFUQ=
github.com/ipfs/go-ipfs-pq v0.0.0-20191101181110-8122fa6a9529/go.mod h1:LWIqQpqfRG3fNc5XsnIhz/wQ2XXGyugQwls7BgUmUfY=
github.com/ipfs/go-ipfs-pq v0.0.1 h1:zgUotX8dcAB/w/HidJh1zzc1yFq6Vm8J7T2F4itj/RU=
github.com/ipfs/go-ipfs-pq v0.0.1/go.mod h1:LWIqQpqfRG3fNc5XsnIhz/wQ2XXGyugQwls7BgUmUfY=
github.com/ipfs/go-ipfs-pq v0.0.2 h1:e1vOOW6MuOwG2lqxcLA+wEn93i/9laCY8sXAw76jFOY=
github.com/ipfs/go-ipfs-pq v0.0.2/go.mod h1:LWIqQpqfRG3fNc5XsnIhz/wQ2XXGyugQwls7BgUmUfY=
github.com/jbenet/goprocess v0.0.0-20160826012719-b497e2f366b8/go.mod h1:Ly/wlsjFq/qrU3Rar62tu1gASgGw6chQbSh/XgIIXCY=
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlTRt3OuAQ=
Expand Down
86 changes: 34 additions & 52 deletions peertask/peertask.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,90 +8,72 @@ import (
)

// FIFOCompare is a basic task comparator that returns tasks in the order created.
var FIFOCompare = func(a, b *TaskBlock) bool {
var FIFOCompare = func(a, b *QueueTask) bool {
return a.created.Before(b.created)
}

// PriorityCompare respects the target peer's task priority. For tasks involving
// different peers, the oldest task is prioritized.
var PriorityCompare = func(a, b *TaskBlock) bool {
var PriorityCompare = func(a, b *QueueTask) bool {
if a.Target == b.Target {
return a.Priority > b.Priority
}
return FIFOCompare(a, b)
}

// WrapCompare wraps a TaskBlock comparison function so it can be used as
// WrapCompare wraps a QueueTask comparison function so it can be used as
// comparison for a priority queue
func WrapCompare(f func(a, b *TaskBlock) bool) func(a, b pq.Elem) bool {
func WrapCompare(f func(a, b *QueueTask) bool) func(a, b pq.Elem) bool {
return func(a, b pq.Elem) bool {
return f(a.(*TaskBlock), b.(*TaskBlock))
return f(a.(*QueueTask), b.(*QueueTask))
}
}

// Identifier is a unique identifier for a task. It's used by the client library
// Topic is a non-unique name for a task. It's used by the client library
// to act on a task once it exits the queue.
type Identifier interface{}
type Topic interface{}

// Task is a single task to be executed as part of a task block.
type Task struct {
Identifier Identifier
Priority int
}
// Data is used by the client to associate extra information with a Task
type Data interface{}

// TaskBlock is a block of tasks to execute on a single peer.
type TaskBlock struct {
Tasks []Task
// Task is a single task to be executed in Priority order.
type Task struct {
// Topic for the task
Topic Topic
// Priority of the task
Priority int
Target peer.ID

// A callback to signal that this task block has been completed
Done func([]Task)
// The size of the task
// - peers with most active work are deprioritized
// - peers with most pending work are prioritized
Work int
// Arbitrary data associated with this Task by the client
Data Data
}

// toPrune are the tasks that have already been taken care of as part of
// a different task block which can be removed from the task block.
toPrune map[Identifier]struct{}
// QueueTask contains a Task, and also some bookkeeping information.
// It is used internally by the PeerTracker to keep track of tasks.
type QueueTask struct {
Task
Target peer.ID
created time.Time // created marks the time that the task was added to the queue
index int // book-keeping field used by the pq container
}

// NewTaskBlock creates a new task block with the given tasks, priority, target
// peer, and task completion function.
func NewTaskBlock(tasks []Task, priority int, target peer.ID, done func([]Task)) *TaskBlock {
return &TaskBlock{
Tasks: tasks,
Priority: priority,
Target: target,
Done: done,
toPrune: make(map[Identifier]struct{}, len(tasks)),
created: time.Now(),
}
}

// MarkPrunable marks any tasks with the given identifier as prunable at the time
// the task block is pulled of the queue to execute (because they've already been removed).
func (pt *TaskBlock) MarkPrunable(identifier Identifier) {
pt.toPrune[identifier] = struct{}{}
}

// PruneTasks removes all tasks previously marked as prunable from the lists of
// tasks in the block
func (pt *TaskBlock) PruneTasks() {
newTasks := make([]Task, 0, len(pt.Tasks)-len(pt.toPrune))
for _, task := range pt.Tasks {
if _, ok := pt.toPrune[task.Identifier]; !ok {
newTasks = append(newTasks, task)
}
// NewQueueTask creates a new QueueTask from the given Task.
func NewQueueTask(task Task, target peer.ID, created time.Time) *QueueTask {
return &QueueTask{
Task: task,
Target: target,
created: created,
}
pt.Tasks = newTasks
}

// Index implements pq.Elem.
func (pt *TaskBlock) Index() int {
func (pt *QueueTask) Index() int {
return pt.index
}

// SetIndex implements pq.Elem.
func (pt *TaskBlock) SetIndex(i int) {
func (pt *QueueTask) SetIndex(i int) {
pt.index = i
}
97 changes: 75 additions & 22 deletions peertaskqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const (
type hookFunc func(p peer.ID, event peerTaskQueueEvent)

// PeerTaskQueue is a prioritized list of tasks to be executed on peers.
// The queue puts tasks on in blocks, then alternates between peers (roughly)
// Tasks are added to the queue, then popped off alternately between peers (roughly)
// to execute the block with the highest priority, or otherwise the one added
// first if priorities are equal.
type PeerTaskQueue struct {
Expand All @@ -29,6 +29,7 @@ type PeerTaskQueue struct {
frozenPeers map[peer.ID]struct{}
hooks []hookFunc
ignoreFreezing bool
taskMerger peertracker.TaskMerger
}

// Option is a function that configures the peer task queue
Expand All @@ -51,6 +52,16 @@ func IgnoreFreezing(ignoreFreezing bool) Option {
}
}

// TaskMerger is an option that specifies merge behaviour when pushing a task
// with the same Topic as an existing Topic.
func TaskMerger(tmfp peertracker.TaskMerger) Option {
return func(ptq *PeerTaskQueue) Option {
previous := ptq.taskMerger
ptq.taskMerger = tmfp
return TaskMerger(previous)
}
}

func removeHook(hook hookFunc) Option {
return func(ptq *PeerTaskQueue) Option {
for i, testHook := range ptq.hooks {
Expand Down Expand Up @@ -96,6 +107,7 @@ func New(options ...Option) *PeerTaskQueue {
peerTrackers: make(map[peer.ID]*peertracker.PeerTracker),
frozenPeers: make(map[peer.ID]struct{}),
pQueue: pq.New(peertracker.PeerCompare),
taskMerger: &peertracker.DefaultTaskMerger{},
}
ptq.Options(options...)
return ptq
Expand All @@ -120,56 +132,98 @@ func (ptq *PeerTaskQueue) callHooks(to peer.ID, event peerTaskQueueEvent) {
}
}

// PushBlock adds a new block of tasks for the given peer to the queue
func (ptq *PeerTaskQueue) PushBlock(to peer.ID, tasks ...peertask.Task) {
// PushTasks adds a new group of tasks for the given peer to the queue
func (ptq *PeerTaskQueue) PushTasks(to peer.ID, tasks ...peertask.Task) {
ptq.lock.Lock()
defer ptq.lock.Unlock()

peerTracker, ok := ptq.peerTrackers[to]
if !ok {
peerTracker = peertracker.New(to)
peerTracker = peertracker.New(to, ptq.taskMerger)
ptq.pQueue.Push(peerTracker)
ptq.peerTrackers[to] = peerTracker
ptq.callHooks(to, peerAdded)
}

peerTracker.PushBlock(to, tasks, func(e []peertask.Task) {
ptq.lock.Lock()
for _, task := range e {
peerTracker.TaskDone(task.Identifier)
}
ptq.pQueue.Update(peerTracker.Index())
ptq.lock.Unlock()
})
peerTracker.PushTasks(tasks...)
ptq.pQueue.Update(peerTracker.Index())
}

// PopBlock 'pops' the next block of tasks to be performed. Returns nil if no block exists.
func (ptq *PeerTaskQueue) PopBlock() *peertask.TaskBlock {
// PopTasks finds the peer with the highest priority and pops as many tasks
// off the peer's queue as necessary to cover targetMinWork, in priority order.
// If there are not enough tasks to cover targetMinWork it just returns
// whatever is in the peer's queue.
// - Peers with the most "active" work are deprioritized.
// This heuristic is for fairness, we try to keep all peers "busy".
// - Peers with the most "pending" work are prioritized.
// This heuristic is so that peers with a lot to do get asked for work first.
// The third response argument is pending work: the amount of work in the
// queue for this peer.
func (ptq *PeerTaskQueue) PopTasks(targetMinWork int) (peer.ID, []*peertask.Task, int) {
ptq.lock.Lock()
defer ptq.lock.Unlock()

if ptq.pQueue.Len() == 0 {
return nil
return "", nil, -1
}

var peerTracker *peertracker.PeerTracker

// Choose the highest priority peer
peerTracker = ptq.pQueue.Peek().(*peertracker.PeerTracker)
if peerTracker == nil {
return "", nil, -1
}
peerTracker := ptq.pQueue.Pop().(*peertracker.PeerTracker)

out := peerTracker.PopBlock()
// Get the highest priority tasks for the given peer
out, pendingWork := peerTracker.PopTasks(targetMinWork)

// If the peer has no more tasks, remove its peer tracker
if peerTracker.IsIdle() {
ptq.pQueue.Pop()
target := peerTracker.Target()
delete(ptq.peerTrackers, target)
delete(ptq.frozenPeers, target)
ptq.callHooks(target, peerRemoved)
} else {
ptq.pQueue.Push(peerTracker)
// We may have modified the peer tracker's state (by popping tasks), so
// update its position in the priority queue
ptq.pQueue.Update(peerTracker.Index())
}
return out

return peerTracker.Target(), out, pendingWork
}

// TasksDone is called to indicate that the given tasks have completed
// for the given peer
func (ptq *PeerTaskQueue) TasksDone(to peer.ID, tasks ...*peertask.Task) {
ptq.lock.Lock()
defer ptq.lock.Unlock()

// Get the peer tracker for the peer
peerTracker, ok := ptq.peerTrackers[to]
if !ok {
return
}

// Tell the peer tracker that the tasks have completed
for _, task := range tasks {
peerTracker.TaskDone(task)
}

// This may affect the peer's position in the peer queue, so update if
// necessary
ptq.pQueue.Update(peerTracker.Index())
}

// Remove removes a task from the queue.
func (ptq *PeerTaskQueue) Remove(identifier peertask.Identifier, p peer.ID) {
func (ptq *PeerTaskQueue) Remove(topic peertask.Topic, p peer.ID) {
ptq.lock.Lock()
defer ptq.lock.Unlock()

peerTracker, ok := ptq.peerTrackers[p]
if ok {
if peerTracker.Remove(identifier) {
if peerTracker.Remove(topic) {
// we now also 'freeze' that partner. If they sent us a cancel for a
// block we were about to send them, we should wait a short period of time
// to make sure we receive any other in-flight cancels before sending
Expand All @@ -184,7 +238,6 @@ func (ptq *PeerTaskQueue) Remove(identifier peertask.Identifier, p peer.ID) {
ptq.pQueue.Update(peerTracker.Index())
}
}
ptq.lock.Unlock()
}

// FullThaw completely thaws all peers in the queue so they can execute tasks.
Expand Down

0 comments on commit f0529d7

Please sign in to comment.