/
peermessagemanager.go
48 lines (39 loc) · 1.84 KB
/
peermessagemanager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package peermanager
import (
"context"
blocks "github.com/ipfs/fs-repo-migrations/ipfs-10-to-11/_vendor/github.com/ipfs/go-block-format"
"github.com/ipfs/fs-repo-migrations/ipfs-10-to-11/_vendor/github.com/libp2p/go-libp2p-core/peer"
gsmsg "github.com/ipfs/fs-repo-migrations/ipfs-10-to-11/_vendor/github.com/ipfs/go-graphsync/message"
"github.com/ipfs/fs-repo-migrations/ipfs-10-to-11/_vendor/github.com/ipfs/go-graphsync/notifications"
)
// PeerQueue is a process that sends messages to a peer
type PeerQueue interface {
PeerProcess
AddRequest(graphSyncRequest gsmsg.GraphSyncRequest, notifees ...notifications.Notifee)
AddResponses(responses []gsmsg.GraphSyncResponse, blks []blocks.Block, notifees ...notifications.Notifee)
}
// PeerQueueFactory provides a function that will create a PeerQueue.
type PeerQueueFactory func(ctx context.Context, p peer.ID) PeerQueue
// PeerMessageManager manages message queues for peers
type PeerMessageManager struct {
*PeerManager
}
// NewMessageManager generates a new manger for sending messages
func NewMessageManager(ctx context.Context, createPeerQueue PeerQueueFactory) *PeerMessageManager {
return &PeerMessageManager{
PeerManager: New(ctx, func(ctx context.Context, p peer.ID) PeerProcess {
return createPeerQueue(ctx, p)
}),
}
}
// SendRequest sends the given GraphSyncRequest to the given peer
func (pmm *PeerMessageManager) SendRequest(p peer.ID, request gsmsg.GraphSyncRequest, notifees ...notifications.Notifee) {
pq := pmm.GetProcess(p).(PeerQueue)
pq.AddRequest(request, notifees...)
}
// SendResponse sends the given GraphSyncResponses and blocks to the given peer.
func (pmm *PeerMessageManager) SendResponse(p peer.ID,
responses []gsmsg.GraphSyncResponse, blks []blocks.Block, notifees ...notifications.Notifee) {
pq := pmm.GetProcess(p).(PeerQueue)
pq.AddResponses(responses, blks, notifees...)
}