/
client.go
310 lines (278 loc) · 10.6 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
package responsemanager
import (
"context"
"errors"
"time"
logging "github.com/ipfs/go-log/v2"
"github.com/ipfs/go-peertaskqueue/peertask"
ipld "github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/traversal"
"github.com/libp2p/go-libp2p/core/peer"
"go.opentelemetry.io/otel/trace"
"github.com/ipfs/go-graphsync"
"github.com/ipfs/go-graphsync/ipldutil"
gsmsg "github.com/ipfs/go-graphsync/message"
"github.com/ipfs/go-graphsync/network"
"github.com/ipfs/go-graphsync/notifications"
"github.com/ipfs/go-graphsync/panics"
"github.com/ipfs/go-graphsync/peerstate"
"github.com/ipfs/go-graphsync/responsemanager/hooks"
"github.com/ipfs/go-graphsync/responsemanager/queryexecutor"
"github.com/ipfs/go-graphsync/responsemanager/responseassembler"
"github.com/ipfs/go-graphsync/taskqueue"
)
// The code in this file implements the public interface of the response manager.
// Functions in this file operate outside the internal thread and should
// NOT modify the internal state of the ResponseManager.
var log = logging.Logger("graphsync")
type inProgressResponseStatus struct {
ctx context.Context
span trace.Span
cancelFn func()
peer peer.ID
request gsmsg.GraphSyncRequest
linkSystem ipld.LinkSystem
customChooser traversal.LinkTargetNodePrototypeChooser
maxLinks uint64
traverser ipldutil.Traverser
signals queryexecutor.ResponseSignals
updates []gsmsg.GraphSyncRequest
state graphsync.RequestState
startTime time.Time
responseStream responseassembler.ResponseStream
}
// RequestHooks is an interface for processing request hooks
type RequestHooks interface {
ProcessRequestHooks(p peer.ID, request graphsync.RequestData, ctx context.Context) hooks.RequestResult
}
// UpdateHooks is an interface for processing update hooks
type UpdateHooks interface {
ProcessUpdateHooks(p peer.ID, request graphsync.RequestData, update graphsync.RequestData) hooks.UpdateResult
}
// CompletedListeners is an interface for notifying listeners that responses are complete
type CompletedListeners interface {
NotifyCompletedListeners(p peer.ID, request graphsync.RequestData, status graphsync.ResponseStatusCode)
}
// CancelledListeners is an interface for notifying listeners that requestor cancelled
type CancelledListeners interface {
NotifyCancelledListeners(p peer.ID, request graphsync.RequestData)
}
// BlockSentListeners is an interface for notifying listeners that of a block send occuring over the wire
type BlockSentListeners interface {
NotifyBlockSentListeners(p peer.ID, request graphsync.RequestData, block graphsync.BlockData)
}
// RequestProcessingListeners is an interface for notifying listeners a request has begun processing
type RequestProcessingListeners interface {
NotifyRequestProcessingListeners(p peer.ID, request graphsync.RequestData, inProgressRequestCount int)
}
// NetworkErrorListeners is an interface for notifying listeners that an error occurred sending a data on the wire
type NetworkErrorListeners interface {
NotifyNetworkErrorListeners(p peer.ID, request graphsync.RequestData, err error)
}
// ResponseAssembler is an interface that returns sender interfaces for peer responses.
type ResponseAssembler interface {
NewStream(ctx context.Context, p peer.ID, requestID graphsync.RequestID, subscriber notifications.Subscriber) responseassembler.ResponseStream
}
type responseManagerMessage interface {
handle(rm *ResponseManager)
}
// ResponseManager handles incoming requests from the network, initiates selector
// traversals, and transmits responses
type ResponseManager struct {
ctx context.Context
cancelFn context.CancelFunc
responseAssembler ResponseAssembler
requestHooks RequestHooks
linkSystem ipld.LinkSystem
requestProcessingListeners RequestProcessingListeners
updateHooks UpdateHooks
cancelledListeners CancelledListeners
completedListeners CompletedListeners
blockSentListeners BlockSentListeners
networkErrorListeners NetworkErrorListeners
messages chan responseManagerMessage
inProgressResponses map[graphsync.RequestID]*inProgressResponseStatus
connManager network.ConnManager
// maximum number of links to traverse per request. A value of zero = infinity, or no limit
maxLinksPerRequest uint64
panicCallback panics.CallBackFn
responseQueue taskqueue.TaskQueue
}
// New creates a new response manager for responding to requests
func New(ctx context.Context,
linkSystem ipld.LinkSystem,
responseAssembler ResponseAssembler,
requestProcessingListeners RequestProcessingListeners,
requestHooks RequestHooks,
updateHooks UpdateHooks,
completedListeners CompletedListeners,
cancelledListeners CancelledListeners,
blockSentListeners BlockSentListeners,
networkErrorListeners NetworkErrorListeners,
connManager network.ConnManager,
maxLinksPerRequest uint64,
panicCallback panics.CallBackFn,
responseQueue taskqueue.TaskQueue,
) *ResponseManager {
ctx, cancelFn := context.WithCancel(ctx)
messages := make(chan responseManagerMessage, 16)
rm := &ResponseManager{
ctx: ctx,
cancelFn: cancelFn,
requestHooks: requestHooks,
linkSystem: linkSystem,
responseAssembler: responseAssembler,
requestProcessingListeners: requestProcessingListeners,
updateHooks: updateHooks,
cancelledListeners: cancelledListeners,
completedListeners: completedListeners,
blockSentListeners: blockSentListeners,
networkErrorListeners: networkErrorListeners,
messages: messages,
inProgressResponses: make(map[graphsync.RequestID]*inProgressResponseStatus),
connManager: connManager,
maxLinksPerRequest: maxLinksPerRequest,
responseQueue: responseQueue,
panicCallback: panicCallback,
}
return rm
}
// ProcessRequests processes incoming requests for the given peer
func (rm *ResponseManager) ProcessRequests(ctx context.Context, p peer.ID, requests []gsmsg.GraphSyncRequest) {
_ = rm.send(&processRequestsMessage{p, requests}, ctx.Done())
}
// UnpauseResponse unpauses a response that was previously paused
func (rm *ResponseManager) UnpauseResponse(ctx context.Context, requestID graphsync.RequestID, extensions ...graphsync.ExtensionData) error {
response := make(chan error, 1)
err := rm.send(&unpauseRequestMessage{requestID, response, extensions}, ctx.Done())
if err != nil {
return err
}
select {
case <-rm.ctx.Done():
return errors.New("context cancelled")
case err := <-response:
return err
}
}
// PauseResponse pauses an in progress response (may take 1 or more blocks to process)
func (rm *ResponseManager) PauseResponse(ctx context.Context, requestID graphsync.RequestID) error {
response := make(chan error, 1)
err := rm.send(&pauseRequestMessage{requestID, response}, ctx.Done())
if err != nil {
return err
}
select {
case <-rm.ctx.Done():
return errors.New("context cancelled")
case err := <-response:
return err
}
}
// CancelResponse cancels an in progress response
func (rm *ResponseManager) CancelResponse(ctx context.Context, requestID graphsync.RequestID) error {
response := make(chan error, 1)
err := rm.send(&errorRequestMessage{requestID, queryexecutor.ErrCancelledByCommand, response}, ctx.Done())
if err != nil {
return err
}
select {
case <-rm.ctx.Done():
return errors.New("context cancelled")
case err := <-response:
return err
}
}
// UpdateRequest updates an in progress response
func (rm *ResponseManager) UpdateResponse(ctx context.Context, requestID graphsync.RequestID, extensions ...graphsync.ExtensionData) error {
response := make(chan error, 1)
err := rm.send(&updateRequestMessage{requestID, extensions, response}, ctx.Done())
if err != nil {
return err
}
select {
case <-rm.ctx.Done():
return errors.New("context cancelled")
case err := <-response:
return err
}
}
// Synchronize is a utility method that blocks until all current messages are processed
func (rm *ResponseManager) synchronize() {
sync := make(chan error)
_ = rm.send(&synchronizeMessage{sync}, nil)
select {
case <-rm.ctx.Done():
case <-sync:
}
}
// StartTask starts the given task from the peer task queue
func (rm *ResponseManager) StartTask(task *peertask.Task, p peer.ID, responseTaskChan chan<- queryexecutor.ResponseTask) {
_ = rm.send(&startTaskRequest{task, p, responseTaskChan}, nil)
}
// GetUpdates is called to read pending updates for a task and clear them
func (rm *ResponseManager) GetUpdates(requestID graphsync.RequestID, updatesChan chan<- []gsmsg.GraphSyncRequest) {
_ = rm.send(&responseUpdateRequest{requestID, updatesChan}, nil)
}
// FinishTask marks a task from the task queue as done
func (rm *ResponseManager) FinishTask(task *peertask.Task, p peer.ID, err error) {
done := make(chan struct{}, 1)
_ = rm.send(&finishTaskRequest{task, p, err, done}, nil)
select {
case <-rm.ctx.Done():
case <-done:
}
}
// CloseWithNetworkError closes a request due to a network error
func (rm *ResponseManager) CloseWithNetworkError(requestID graphsync.RequestID) {
done := make(chan error, 1)
_ = rm.send(&errorRequestMessage{requestID, queryexecutor.ErrNetworkError, done}, nil)
select {
case <-rm.ctx.Done():
case <-done:
}
}
// TerminateRequest indicates a request has finished sending data and should no longer be tracked
func (rm *ResponseManager) TerminateRequest(requestID graphsync.RequestID) {
done := make(chan struct{}, 1)
_ = rm.send(&terminateRequestMessage{requestID, done}, nil)
select {
case <-rm.ctx.Done():
case <-done:
}
}
// PeerState gets current state of the outgoing responses for a given peer
func (rm *ResponseManager) PeerState(p peer.ID) peerstate.PeerState {
response := make(chan peerstate.PeerState)
_ = rm.send(&peerStateMessage{p, response}, nil)
select {
case <-rm.ctx.Done():
return peerstate.PeerState{}
case peerState := <-response:
return peerState
}
}
func (rm *ResponseManager) send(message responseManagerMessage, done <-chan struct{}) error {
// prioritize cancelled context
select {
case <-done:
return errors.New("unable to send message before cancellation")
default:
}
select {
case <-rm.ctx.Done():
return rm.ctx.Err()
case <-done:
return errors.New("unable to send message before cancellation")
case rm.messages <- message:
return nil
}
}
// Startup starts processing for the WantManager.
func (rm *ResponseManager) Startup() {
go rm.run()
}
// Shutdown ends processing for the want manager.
func (rm *ResponseManager) Shutdown() {
rm.cancelFn()
}