Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend peer task queue to work with want-have / want-block #8

Merged
merged 19 commits into from
Nov 11, 2019
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,16 @@ github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMo
github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE=
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gxed/hashland/keccakpg v0.0.1 h1:wrk3uMNaMxbXiHibbPO4S0ymqJMm41WiudyFSs7UnsU=
github.com/gxed/hashland/keccakpg v0.0.1/go.mod h1:kRzw3HkwxFU1mpmPP8v1WyQzwdGfmKFJ6tItnhQ67kU=
github.com/gxed/hashland/murmur3 v0.0.1 h1:SheiaIt0sda5K+8FLz952/1iWS9zrnKsEJaOJu4ZbSc=
github.com/gxed/hashland/murmur3 v0.0.1/go.mod h1:KjXop02n4/ckmZSnY2+HKcLud/tcmvhST0bie/0lS48=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/ipfs/go-cid v0.0.1/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM=
github.com/ipfs/go-ipfs-pq v0.0.0-20191101181110-8122fa6a9529 h1:izQqDLe/uSPKe6NYr3FjwnvU0AAg0im/4DLVXplLFUQ=
github.com/ipfs/go-ipfs-pq v0.0.0-20191101181110-8122fa6a9529/go.mod h1:LWIqQpqfRG3fNc5XsnIhz/wQ2XXGyugQwls7BgUmUfY=
github.com/ipfs/go-ipfs-pq v0.0.1 h1:zgUotX8dcAB/w/HidJh1zzc1yFq6Vm8J7T2F4itj/RU=
github.com/ipfs/go-ipfs-pq v0.0.1/go.mod h1:LWIqQpqfRG3fNc5XsnIhz/wQ2XXGyugQwls7BgUmUfY=
github.com/jbenet/goprocess v0.0.0-20160826012719-b497e2f366b8/go.mod h1:Ly/wlsjFq/qrU3Rar62tu1gASgGw6chQbSh/XgIIXCY=
Expand Down
86 changes: 34 additions & 52 deletions peertask/peertask.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,90 +8,72 @@ import (
)

// FIFOCompare is a basic task comparator that returns tasks in the order created.
var FIFOCompare = func(a, b *TaskBlock) bool {
var FIFOCompare = func(a, b *QueueTask) bool {
return a.created.Before(b.created)
}

// PriorityCompare respects the target peer's task priority. For tasks involving
// different peers, the oldest task is prioritized.
var PriorityCompare = func(a, b *TaskBlock) bool {
var PriorityCompare = func(a, b *QueueTask) bool {
if a.Target == b.Target {
return a.Priority > b.Priority
}
return FIFOCompare(a, b)
}

// WrapCompare wraps a TaskBlock comparison function so it can be used as
// WrapCompare wraps a QueueTask comparison function so it can be used as
// comparison for a priority queue
func WrapCompare(f func(a, b *TaskBlock) bool) func(a, b pq.Elem) bool {
func WrapCompare(f func(a, b *QueueTask) bool) func(a, b pq.Elem) bool {
return func(a, b pq.Elem) bool {
return f(a.(*TaskBlock), b.(*TaskBlock))
return f(a.(*QueueTask), b.(*QueueTask))
}
}

// Identifier is a unique identifier for a task. It's used by the client library
// Topic is a non-unique name for a task. It's used by the client library
// to act on a task once it exits the queue.
type Identifier interface{}
type Topic interface{}

// Task is a single task to be executed as part of a task block.
type Task struct {
Identifier Identifier
Priority int
}
// Data is used by the client to associate extra information with a Task
type Data interface{}

// TaskBlock is a block of tasks to execute on a single peer.
type TaskBlock struct {
Tasks []Task
// Task is a single task to be executed in Priority order.
type Task struct {
// Topic for the task
Topic Topic
// Priority of the task
Priority int
Target peer.ID

// A callback to signal that this task block has been completed
Done func([]Task)
// The size of the task
// - peers with most active work are deprioritized
// - peers with most pending work are prioritized
Work int
// Arbitrary data associated with this Task by the client
Data Data
}
dirkmc marked this conversation as resolved.
Show resolved Hide resolved

// toPrune are the tasks that have already been taken care of as part of
// a different task block which can be removed from the task block.
toPrune map[Identifier]struct{}
// QueueTask contains a Task, and also some bookkeeping information.
// It is used internally by the PeerTracker to keep track of tasks.
type QueueTask struct {
Task
Target peer.ID
created time.Time // created marks the time that the task was added to the queue
index int // book-keeping field used by the pq container
}

// NewTaskBlock creates a new task block with the given tasks, priority, target
// peer, and task completion function.
func NewTaskBlock(tasks []Task, priority int, target peer.ID, done func([]Task)) *TaskBlock {
return &TaskBlock{
Tasks: tasks,
Priority: priority,
Target: target,
Done: done,
toPrune: make(map[Identifier]struct{}, len(tasks)),
created: time.Now(),
}
}

// MarkPrunable marks any tasks with the given identifier as prunable at the time
// the task block is pulled of the queue to execute (because they've already been removed).
func (pt *TaskBlock) MarkPrunable(identifier Identifier) {
pt.toPrune[identifier] = struct{}{}
}

// PruneTasks removes all tasks previously marked as prunable from the lists of
// tasks in the block
func (pt *TaskBlock) PruneTasks() {
newTasks := make([]Task, 0, len(pt.Tasks)-len(pt.toPrune))
for _, task := range pt.Tasks {
if _, ok := pt.toPrune[task.Identifier]; !ok {
newTasks = append(newTasks, task)
}
// NewQueueTask creates a new QueueTask from the given Task.
func NewQueueTask(task Task, target peer.ID, created time.Time) *QueueTask {
return &QueueTask{
Task: task,
Target: target,
created: created,
}
pt.Tasks = newTasks
}

// Index implements pq.Elem.
func (pt *TaskBlock) Index() int {
func (pt *QueueTask) Index() int {
return pt.index
}

// SetIndex implements pq.Elem.
func (pt *TaskBlock) SetIndex(i int) {
func (pt *QueueTask) SetIndex(i int) {
pt.index = i
}
97 changes: 75 additions & 22 deletions peertaskqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const (
type hookFunc func(p peer.ID, event peerTaskQueueEvent)

// PeerTaskQueue is a prioritized list of tasks to be executed on peers.
// The queue puts tasks on in blocks, then alternates between peers (roughly)
// Tasks are added to the queue, then popped off alternately between peers (roughly)
// to execute the block with the highest priority, or otherwise the one added
// first if priorities are equal.
type PeerTaskQueue struct {
Expand All @@ -29,6 +29,7 @@ type PeerTaskQueue struct {
frozenPeers map[peer.ID]struct{}
hooks []hookFunc
ignoreFreezing bool
taskMerger peertracker.TaskMerger
}

// Option is a function that configures the peer task queue
Expand All @@ -51,6 +52,16 @@ func IgnoreFreezing(ignoreFreezing bool) Option {
}
}

// TaskMerger is an option that specifies merge behaviour when pushing a task
// with the same Topic as an existing Topic.
func TaskMerger(tmfp peertracker.TaskMerger) Option {
return func(ptq *PeerTaskQueue) Option {
previous := ptq.taskMerger
ptq.taskMerger = tmfp
return TaskMerger(previous)
}
}

func removeHook(hook hookFunc) Option {
return func(ptq *PeerTaskQueue) Option {
for i, testHook := range ptq.hooks {
Expand Down Expand Up @@ -96,6 +107,7 @@ func New(options ...Option) *PeerTaskQueue {
peerTrackers: make(map[peer.ID]*peertracker.PeerTracker),
frozenPeers: make(map[peer.ID]struct{}),
pQueue: pq.New(peertracker.PeerCompare),
taskMerger: &peertracker.DefaultTaskMerger{},
}
ptq.Options(options...)
return ptq
Expand All @@ -120,56 +132,98 @@ func (ptq *PeerTaskQueue) callHooks(to peer.ID, event peerTaskQueueEvent) {
}
}

// PushBlock adds a new block of tasks for the given peer to the queue
func (ptq *PeerTaskQueue) PushBlock(to peer.ID, tasks ...peertask.Task) {
// PushTasks adds a new group of tasks for the given peer to the queue
func (ptq *PeerTaskQueue) PushTasks(to peer.ID, tasks ...peertask.Task) {
ptq.lock.Lock()
defer ptq.lock.Unlock()

peerTracker, ok := ptq.peerTrackers[to]
if !ok {
peerTracker = peertracker.New(to)
peerTracker = peertracker.New(to, ptq.taskMerger)
ptq.pQueue.Push(peerTracker)
ptq.peerTrackers[to] = peerTracker
ptq.callHooks(to, peerAdded)
}

peerTracker.PushBlock(to, tasks, func(e []peertask.Task) {
ptq.lock.Lock()
for _, task := range e {
peerTracker.TaskDone(task.Identifier)
}
ptq.pQueue.Update(peerTracker.Index())
ptq.lock.Unlock()
})
peerTracker.PushTasks(tasks...)
ptq.pQueue.Update(peerTracker.Index())
}

// PopBlock 'pops' the next block of tasks to be performed. Returns nil if no block exists.
func (ptq *PeerTaskQueue) PopBlock() *peertask.TaskBlock {
// PopTasks finds the peer with the highest priority and pops as many tasks
// off the peer's queue as necessary to cover targetMinWork, in priority order.
// If there are not enough tasks to cover targetMinWork it just returns
// whatever is in the peer's queue.
// - Peers with the most "active" work are deprioritized.
// This heuristic is for fairness, we try to keep all peers "busy".
// - Peers with the most "pending" work are prioritized.
// This heuristic is so that peers with a lot to do get asked for work first.
// The third response argument is pending work: the amount of work in the
// queue for this peer.
func (ptq *PeerTaskQueue) PopTasks(targetMinWork int) (peer.ID, []*peertask.Task, int) {
ptq.lock.Lock()
defer ptq.lock.Unlock()

if ptq.pQueue.Len() == 0 {
return nil
return "", nil, -1
}

var peerTracker *peertracker.PeerTracker

// Choose the highest priority peer
peerTracker = ptq.pQueue.Peek().(*peertracker.PeerTracker)
if peerTracker == nil {
return "", nil, -1
}
peerTracker := ptq.pQueue.Pop().(*peertracker.PeerTracker)

out := peerTracker.PopBlock()
// Get the highest priority tasks for the given peer
out, pendingWork := peerTracker.PopTasks(targetMinWork)

// If the peer has no more tasks, remove its peer tracker
if peerTracker.IsIdle() {
ptq.pQueue.Pop()
target := peerTracker.Target()
delete(ptq.peerTrackers, target)
delete(ptq.frozenPeers, target)
ptq.callHooks(target, peerRemoved)
} else {
ptq.pQueue.Push(peerTracker)
// We may have modified the peer tracker's state (by popping tasks), so
// update its position in the priority queue
ptq.pQueue.Update(peerTracker.Index())
}
return out

return peerTracker.Target(), out, pendingWork
}

// TasksDone is called to indicate that the given tasks have completed
// for the given peer
func (ptq *PeerTaskQueue) TasksDone(to peer.ID, tasks ...*peertask.Task) {
ptq.lock.Lock()
defer ptq.lock.Unlock()

// Get the peer tracker for the peer
peerTracker, ok := ptq.peerTrackers[to]
if !ok {
return
}

// Tell the peer tracker that the tasks have completed
for _, task := range tasks {
peerTracker.TaskDone(task)
}

// This may affect the peer's position in the peer queue, so update if
// necessary
ptq.pQueue.Update(peerTracker.Index())
}

// Remove removes a task from the queue.
func (ptq *PeerTaskQueue) Remove(identifier peertask.Identifier, p peer.ID) {
func (ptq *PeerTaskQueue) Remove(topic peertask.Topic, p peer.ID) {
ptq.lock.Lock()
defer ptq.lock.Unlock()

peerTracker, ok := ptq.peerTrackers[p]
if ok {
if peerTracker.Remove(identifier) {
if peerTracker.Remove(topic) {
// we now also 'freeze' that partner. If they sent us a cancel for a
// block we were about to send them, we should wait a short period of time
// to make sure we receive any other in-flight cancels before sending
Expand All @@ -184,7 +238,6 @@ func (ptq *PeerTaskQueue) Remove(identifier peertask.Identifier, p peer.ID) {
ptq.pQueue.Update(peerTracker.Index())
}
}
ptq.lock.Unlock()
}

// FullThaw completely thaws all peers in the queue so they can execute tasks.
Expand Down