-
Notifications
You must be signed in to change notification settings - Fork 38
/
listeners.go
204 lines (166 loc) · 7.73 KB
/
listeners.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
package listeners
import (
"github.com/hannahhoward/go-pubsub"
peer "github.com/libp2p/go-libp2p/core/peer"
"github.com/ipfs/go-graphsync"
)
// CompletedResponseListeners is a set of listeners for completed responses
type CompletedResponseListeners struct {
pubSub *pubsub.PubSub
}
type internalCompletedResponseEvent struct {
p peer.ID
request graphsync.RequestData
status graphsync.ResponseStatusCode
}
func completedResponseDispatcher(event pubsub.Event, subscriberFn pubsub.SubscriberFn) error {
ie := event.(internalCompletedResponseEvent)
listener := subscriberFn.(graphsync.OnResponseCompletedListener)
listener(ie.p, ie.request, ie.status)
return nil
}
// NewCompletedResponseListeners returns a new list of completed response listeners
func NewCompletedResponseListeners() *CompletedResponseListeners {
return &CompletedResponseListeners{pubSub: pubsub.New(completedResponseDispatcher)}
}
// Register registers an listener for completed responses
func (crl *CompletedResponseListeners) Register(listener graphsync.OnResponseCompletedListener) graphsync.UnregisterHookFunc {
return graphsync.UnregisterHookFunc(crl.pubSub.Subscribe(listener))
}
// NotifyCompletedListeners runs notifies all completed listeners that a response has completed
func (crl *CompletedResponseListeners) NotifyCompletedListeners(p peer.ID, request graphsync.RequestData, status graphsync.ResponseStatusCode) {
_ = crl.pubSub.Publish(internalCompletedResponseEvent{p, request, status})
}
// RequestorCancelledListeners is a set of listeners for when requestors cancel
type RequestorCancelledListeners struct {
pubSub *pubsub.PubSub
}
type internalRequestorCancelledEvent struct {
p peer.ID
request graphsync.RequestData
}
func requestorCancelledDispatcher(event pubsub.Event, subscriberFn pubsub.SubscriberFn) error {
ie := event.(internalRequestorCancelledEvent)
listener := subscriberFn.(graphsync.OnRequestorCancelledListener)
listener(ie.p, ie.request)
return nil
}
// NewRequestorCancelledListeners returns a new list of listeners for when requestors cancel
func NewRequestorCancelledListeners() *RequestorCancelledListeners {
return &RequestorCancelledListeners{pubSub: pubsub.New(requestorCancelledDispatcher)}
}
// Register registers an listener for completed responses
func (rcl *RequestorCancelledListeners) Register(listener graphsync.OnRequestorCancelledListener) graphsync.UnregisterHookFunc {
return graphsync.UnregisterHookFunc(rcl.pubSub.Subscribe(listener))
}
// NotifyCancelledListeners notifies all listeners that a requestor cancelled a response
func (rcl *RequestorCancelledListeners) NotifyCancelledListeners(p peer.ID, request graphsync.RequestData) {
_ = rcl.pubSub.Publish(internalRequestorCancelledEvent{p, request})
}
// RequestProcessingListeners is a set of listeners for when requests begin processing
type RequestProcessingListeners struct {
pubSub *pubsub.PubSub
}
type internalRequestProcessingEvent struct {
p peer.ID
request graphsync.RequestData
inProgressRequestCount int
}
func requestProcessingDispatcher(event pubsub.Event, subscriberFn pubsub.SubscriberFn) error {
ie := event.(internalRequestProcessingEvent)
listener := subscriberFn.(graphsync.OnRequestProcessingListener)
listener(ie.p, ie.request, ie.inProgressRequestCount)
return nil
}
// NewRequestProcessingListeners returns a new list of listeners for when requestors cancel
func NewRequestProcessingListeners() *RequestProcessingListeners {
return &RequestProcessingListeners{pubSub: pubsub.New(requestProcessingDispatcher)}
}
// Register registers an listener for responses that are processing
func (rpl *RequestProcessingListeners) Register(listener graphsync.OnRequestProcessingListener) graphsync.UnregisterHookFunc {
return graphsync.UnregisterHookFunc(rpl.pubSub.Subscribe(listener))
}
// NotifyRequestProcessingListeners notifies all listeners that a requestor cancelled a response
func (rpl *RequestProcessingListeners) NotifyRequestProcessingListeners(p peer.ID, request graphsync.RequestData, inProgressRequestCount int) {
_ = rpl.pubSub.Publish(internalRequestProcessingEvent{p, request, inProgressRequestCount})
}
// BlockSentListeners is a set of listeners for when requestors cancel
type BlockSentListeners struct {
pubSub *pubsub.PubSub
}
type internalBlockSentEvent struct {
p peer.ID
request graphsync.RequestData
block graphsync.BlockData
}
func blockSentDispatcher(event pubsub.Event, subscriberFn pubsub.SubscriberFn) error {
ie := event.(internalBlockSentEvent)
listener := subscriberFn.(graphsync.OnBlockSentListener)
listener(ie.p, ie.request, ie.block)
return nil
}
// NewBlockSentListeners returns a new list of listeners for when requestors cancel
func NewBlockSentListeners() *BlockSentListeners {
return &BlockSentListeners{pubSub: pubsub.New(blockSentDispatcher)}
}
// Register registers an listener for completed responses
func (bsl *BlockSentListeners) Register(listener graphsync.OnBlockSentListener) graphsync.UnregisterHookFunc {
return graphsync.UnregisterHookFunc(bsl.pubSub.Subscribe(listener))
}
// NotifyBlockSentListeners notifies all listeners that a requestor cancelled a response
func (bsl *BlockSentListeners) NotifyBlockSentListeners(p peer.ID, request graphsync.RequestData, block graphsync.BlockData) {
_ = bsl.pubSub.Publish(internalBlockSentEvent{p, request, block})
}
// NetworkErrorListeners is a set of listeners for when requestors cancel
type NetworkErrorListeners struct {
pubSub *pubsub.PubSub
}
type internalNetworkErrorEvent struct {
p peer.ID
request graphsync.RequestData
err error
}
func networkErrorDispatcher(event pubsub.Event, subscriberFn pubsub.SubscriberFn) error {
ie := event.(internalNetworkErrorEvent)
listener := subscriberFn.(graphsync.OnNetworkErrorListener)
listener(ie.p, ie.request, ie.err)
return nil
}
// NewNetworkErrorListeners returns a new list of listeners for when requestors cancel
func NewNetworkErrorListeners() *NetworkErrorListeners {
return &NetworkErrorListeners{pubSub: pubsub.New(networkErrorDispatcher)}
}
// Register registers an listener for completed responses
func (nel *NetworkErrorListeners) Register(listener graphsync.OnNetworkErrorListener) graphsync.UnregisterHookFunc {
return graphsync.UnregisterHookFunc(nel.pubSub.Subscribe(listener))
}
// NotifyNetworkErrorListeners notifies all listeners that a requestor cancelled a response
func (nel *NetworkErrorListeners) NotifyNetworkErrorListeners(p peer.ID, request graphsync.RequestData, err error) {
_ = nel.pubSub.Publish(internalNetworkErrorEvent{p, request, err})
}
// NetworkReceiverErrorListeners is a set of listeners for network errors on the receiving side
type NetworkReceiverErrorListeners struct {
pubSub *pubsub.PubSub
}
type receiverNetworkErrorEvent struct {
p peer.ID
err error
}
func receiverNetworkErrorDispatcher(event pubsub.Event, subscriberFn pubsub.SubscriberFn) error {
ie := event.(receiverNetworkErrorEvent)
listener := subscriberFn.(graphsync.OnReceiverNetworkErrorListener)
listener(ie.p, ie.err)
return nil
}
// NewReceiverNetworkErrorListeners returns a new list of listeners for receiving errors
func NewReceiverNetworkErrorListeners() *NetworkReceiverErrorListeners {
return &NetworkReceiverErrorListeners{pubSub: pubsub.New(receiverNetworkErrorDispatcher)}
}
// Register registers an listener for completed responses
func (nel *NetworkReceiverErrorListeners) Register(listener graphsync.OnReceiverNetworkErrorListener) graphsync.UnregisterHookFunc {
return graphsync.UnregisterHookFunc(nel.pubSub.Subscribe(listener))
}
// NotifyReceiverNetworkErrorListeners notifies all listeners that a receive connection failed
func (nel *NetworkReceiverErrorListeners) NotifyNetworkErrorListeners(p peer.ID, err error) {
_ = nel.pubSub.Publish(receiverNetworkErrorEvent{p, err})
}