Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend peer task queue to work with want-have / want-block #8

Merged
merged 19 commits into from
Nov 11, 2019
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/ipfs/go-peertaskqueue
go 1.12

require (
github.com/google/uuid v1.1.1 // indirect
github.com/ipfs/go-ipfs-pq v0.0.1
github.com/libp2p/go-libp2p-core v0.0.1
github.com/multiformats/go-multihash v0.0.5 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMo
github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE=
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gxed/hashland/keccakpg v0.0.1 h1:wrk3uMNaMxbXiHibbPO4S0ymqJMm41WiudyFSs7UnsU=
github.com/gxed/hashland/keccakpg v0.0.1/go.mod h1:kRzw3HkwxFU1mpmPP8v1WyQzwdGfmKFJ6tItnhQ67kU=
github.com/gxed/hashland/murmur3 v0.0.1 h1:SheiaIt0sda5K+8FLz952/1iWS9zrnKsEJaOJu4ZbSc=
Expand Down
83 changes: 36 additions & 47 deletions peertask/peertask.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,18 @@ import (

pq "github.com/ipfs/go-ipfs-pq"
peer "github.com/libp2p/go-libp2p-core/peer"

"github.com/google/uuid"
)

// FIFOCompare is a basic task comparator that returns tasks in the order created.
var FIFOCompare = func(a, b *TaskBlock) bool {
var FIFOCompare = func(a, b *QueueTask) bool {
return a.created.Before(b.created)
}

// PriorityCompare respects the target peer's task priority. For tasks involving
// different peers, the oldest task is prioritized.
var PriorityCompare = func(a, b *TaskBlock) bool {
var PriorityCompare = func(a, b *QueueTask) bool {
if a.Target == b.Target {
return a.Priority > b.Priority
}
Expand All @@ -23,9 +25,9 @@ var PriorityCompare = func(a, b *TaskBlock) bool {

// WrapCompare wraps a TaskBlock comparison function so it can be used as
// comparison for a priority queue
func WrapCompare(f func(a, b *TaskBlock) bool) func(a, b pq.Elem) bool {
func WrapCompare(f func(a, b *QueueTask) bool) func(a, b pq.Elem) bool {
return func(a, b pq.Elem) bool {
return f(a.(*TaskBlock), b.(*TaskBlock))
return f(a.(*QueueTask), b.(*QueueTask))
}
}

Expand All @@ -35,63 +37,50 @@ type Identifier interface{}

// Task is a single task to be executed as part of a task block.
dirkmc marked this conversation as resolved.
Show resolved Hide resolved
type Task struct {
// Identifier for the task (may not be unique)
Identifier Identifier
Priority int
}

// TaskBlock is a block of tasks to execute on a single peer.
type TaskBlock struct {
Tasks []Task
// Priority of the task
Priority int
Target peer.ID

// A callback to signal that this task block has been completed
Done func([]Task)
// Tasks can be want-have or want-block
IsWantBlock bool
// Whether to immediately send a response if the block is not found
SendDontHave bool
// The size that this task will take up in the response message
EntrySize int
// The size of the block corresponding to the identifier
BlockSize int
// Whether the block was found
HaveBlock bool
// Unique ID
Uuid uuid.UUID
}
dirkmc marked this conversation as resolved.
Show resolved Hide resolved

// toPrune are the tasks that have already been taken care of as part of
// a different task block which can be removed from the task block.
toPrune map[Identifier]struct{}
// QueueTask contains a Task, and also some bookkeeping information.
// It is used internally by the PeerTracker to keep track of tasks.
type QueueTask struct {
Task
Target peer.ID
created time.Time // created marks the time that the task was added to the queue
index int // book-keeping field used by the pq container
}

// NewTaskBlock creates a new task block with the given tasks, priority, target
// peer, and task completion function.
func NewTaskBlock(tasks []Task, priority int, target peer.ID, done func([]Task)) *TaskBlock {
return &TaskBlock{
Tasks: tasks,
Priority: priority,
Target: target,
Done: done,
toPrune: make(map[Identifier]struct{}, len(tasks)),
created: time.Now(),
}
}

// MarkPrunable marks any tasks with the given identifier as prunable at the time
// the task block is pulled of the queue to execute (because they've already been removed).
func (pt *TaskBlock) MarkPrunable(identifier Identifier) {
pt.toPrune[identifier] = struct{}{}
}

// PruneTasks removes all tasks previously marked as prunable from the lists of
// tasks in the block
func (pt *TaskBlock) PruneTasks() {
newTasks := make([]Task, 0, len(pt.Tasks)-len(pt.toPrune))
for _, task := range pt.Tasks {
if _, ok := pt.toPrune[task.Identifier]; !ok {
newTasks = append(newTasks, task)
}
// NewQueueTask creates a new QueueTask from the given Task.
func NewQueueTask(task Task, target peer.ID, created time.Time) *QueueTask {
t := &QueueTask{
Task: task,
Target: target,
created: created,
}
pt.Tasks = newTasks
t.Uuid = uuid.New()
return t
}

// Index implements pq.Elem.
func (pt *TaskBlock) Index() int {
func (pt *QueueTask) Index() int {
return pt.index
}

// SetIndex implements pq.Elem.
func (pt *TaskBlock) SetIndex(i int) {
func (pt *QueueTask) SetIndex(i int) {
pt.index = i
}
81 changes: 62 additions & 19 deletions peertaskqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const (
type hookFunc func(p peer.ID, event peerTaskQueueEvent)

// PeerTaskQueue is a prioritized list of tasks to be executed on peers.
// The queue puts tasks on in blocks, then alternates between peers (roughly)
// Tasks are added to the queue, then popped off alternately between peers (roughly)
// to execute the block with the highest priority, or otherwise the one added
// first if priorities are equal.
type PeerTaskQueue struct {
Expand Down Expand Up @@ -120,10 +120,11 @@ func (ptq *PeerTaskQueue) callHooks(to peer.ID, event peerTaskQueueEvent) {
}
}

// PushBlock adds a new block of tasks for the given peer to the queue
func (ptq *PeerTaskQueue) PushBlock(to peer.ID, tasks ...peertask.Task) {
// PushTasks adds a new group of tasks for the given peer to the queue
func (ptq *PeerTaskQueue) PushTasks(to peer.ID, tasks ...peertask.Task) {
ptq.lock.Lock()
defer ptq.lock.Unlock()

peerTracker, ok := ptq.peerTrackers[to]
if !ok {
peerTracker = peertracker.New(to)
Expand All @@ -132,41 +133,84 @@ func (ptq *PeerTaskQueue) PushBlock(to peer.ID, tasks ...peertask.Task) {
ptq.callHooks(to, peerAdded)
}

peerTracker.PushBlock(to, tasks, func(e []peertask.Task) {
ptq.lock.Lock()
for _, task := range e {
peerTracker.TaskDone(task.Identifier)
}
ptq.pQueue.Update(peerTracker.Index())
ptq.lock.Unlock()
})
peerTracker.PushTasks(tasks)
ptq.pQueue.Update(peerTracker.Index())
}

// PopBlock 'pops' the next block of tasks to be performed. Returns nil if no block exists.
func (ptq *PeerTaskQueue) PopBlock() *peertask.TaskBlock {
// PopTasks pops the highest priority tasks from the peer off the queue, up to
// the given maximum size of those tasks.
// If peer is "", the highest priority peer is chosen.
func (ptq *PeerTaskQueue) PopTasks(from peer.ID, maxSize int) (peer.ID, []peertask.Task) {
ptq.lock.Lock()
defer ptq.lock.Unlock()

if ptq.pQueue.Len() == 0 {
return nil
return "", nil
}

var peerTracker *peertracker.PeerTracker

// If the peer wasn't specified, choose the highest priority peer
popTracker := from == ""
if popTracker {
peerTracker = ptq.pQueue.Pop().(*peertracker.PeerTracker)
if peerTracker == nil {
return "", nil
}
} else {
// Get the requested peer tracker
var ok bool
peerTracker, ok = ptq.peerTrackers[from]
if !ok || peerTracker.IsIdle() {
return "", nil
}
}
peerTracker := ptq.pQueue.Pop().(*peertracker.PeerTracker)

out := peerTracker.PopBlock()
// Get the highest priority tasks for the given peer
out := peerTracker.PopTasks(maxSize)

// If the peer has no more tasks, remove its peer tracker
if peerTracker.IsIdle() {
target := peerTracker.Target()
delete(ptq.peerTrackers, target)
delete(ptq.frozenPeers, target)
ptq.callHooks(target, peerRemoved)
} else {
} else if popTracker {
// If it does have more tasks, and we popped it, put it back into the
// peer queue
ptq.pQueue.Push(peerTracker)
}
return out

return peerTracker.Target(), out
}

// TasksDone is called to indicate that the given tasks have completed
// for the given peer
func (ptq *PeerTaskQueue) TasksDone(to peer.ID, tasks ...peertask.Task) {
ptq.lock.Lock()
defer ptq.lock.Unlock()

// Get the peer tracker for the peer
peerTracker, ok := ptq.peerTrackers[to]
if !ok {
return
}

// Tell the peer tracker that the tasks have completed
for _, task := range tasks {
peerTracker.TaskDone(task)
}

// This may affect the peer's position in the peer queue, so update if
// necessary
ptq.pQueue.Update(peerTracker.Index())
}

// Remove removes a task from the queue.
func (ptq *PeerTaskQueue) Remove(identifier peertask.Identifier, p peer.ID) {
ptq.lock.Lock()
defer ptq.lock.Unlock()

peerTracker, ok := ptq.peerTrackers[p]
if ok {
if peerTracker.Remove(identifier) {
Expand All @@ -184,7 +228,6 @@ func (ptq *PeerTaskQueue) Remove(identifier peertask.Identifier, p peer.ID) {
ptq.pQueue.Update(peerTracker.Index())
}
}
ptq.lock.Unlock()
}

// FullThaw completely thaws all peers in the queue so they can execute tasks.
Expand Down