Skip to content

Commit 73b83c0

Browse files
authored
feat(client): Abort resends when OrderMessages stops (#1571)
When `OrderMessages` stops, it aborts any ongoing resend HTTP `fetch` requests. Before this PR it stopped the iteration of resend streams but didn't stop the `fetch` requests. ## Changes The `OrderMessages` now uses `AbortController` to store the `done` state. That signal is provided to the `fetchHttpStream` as an argument, and therefore the all `fetch` operations are stopped when `OrderMessages.stop()` is called. Manual handling the list of ongoing resends is no longer needed in `OrderMessages`. Added also `this.outBuffer.endWrite()` call to `OrderMessages.stop()` and `this.isDone()` check to `OrderMessages#addToOrderingUtil`. These are just optimizations and not strictly needed. In the current use case where `OrderMessages.stop()` is called only for stopped pipelines (see`messageStream.onBeforeFinally`) these optimizations don't have any effect. ### ComposedAbortSignal Enhanced `composeAbortSignals` utility function so that the returned object contains a `destroy()` function. To avoid memory leaks it would be good to call the method when the composed signal is no longer used so that we can remove the listeners associated to the source signals. (If all source signals have short lifespan or there are just few listeners, the cleanup is not needed in practice.) ### Future improvements - Is `stream.once('close', () => { abortController.abort() })` actually needed, or could we remove it?
1 parent 7110f28 commit 73b83c0

File tree

8 files changed

+106
-61
lines changed

8 files changed

+106
-61
lines changed

packages/client/src/subscribe/OrderMessages.ts

Lines changed: 18 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,9 @@ import OrderingUtil from './ordering/OrderingUtil'
1717
*/
1818
export class OrderMessages {
1919

20-
private done = false
20+
private abortController: AbortController = new AbortController()
2121
private inputClosed = false
2222
private enabled = true
23-
private readonly resendStreams = new Set<PushPipeline<StreamMessage, StreamMessage>>() // holds outstanding resends for cleanup
2423
private readonly outBuffer = new PushBuffer<StreamMessage>()
2524
private readonly orderingUtil: OrderingUtil
2625
private readonly resends: Resends
@@ -57,7 +56,7 @@ export class OrderMessages {
5756
}
5857

5958
async onGap(from: MessageRef, to: MessageRef, context: MsgChainContext): Promise<void> {
60-
if (this.done || !this.enabled) { return }
59+
if (this.isDone() || !this.enabled) { return }
6160
this.logger.debug('Encountered gap', {
6261
streamPartId: this.streamPartId,
6362
context,
@@ -74,19 +73,15 @@ export class OrderMessages {
7473
publisherId: context.publisherId,
7574
msgChainId: context.msgChainId,
7675
raw: true
77-
}, this.getStorageNodes)
78-
resendMessageStream.onFinally.listen(() => {
79-
this.resendStreams.delete(resendMessageStream)
80-
})
81-
this.resendStreams.add(resendMessageStream)
82-
if (this.done) { return }
76+
}, this.getStorageNodes, this.abortController.signal)
77+
if (this.isDone()) { return }
8378

8479
for await (const streamMessage of resendMessageStream) {
85-
if (this.done) { return }
80+
if (this.isDone()) { return }
8681
this.orderingUtil.add(streamMessage)
8782
}
8883
} catch (err) {
89-
if (this.done) { return }
84+
if (this.isDone()) { return }
9085

9186
if (err.code === 'NO_STORAGE_NODES') {
9287
// ignore NO_STORAGE_NODES errors
@@ -96,23 +91,24 @@ export class OrderMessages {
9691
} else {
9792
throw err
9893
}
99-
} finally {
100-
if (resendMessageStream != null) {
101-
this.resendStreams.delete(resendMessageStream)
102-
}
10394
}
10495
}
10596

10697
onOrdered(orderedMessage: StreamMessage): void {
107-
if (this.outBuffer.isDone() || this.done) {
98+
if (this.outBuffer.isDone() || this.isDone()) {
10899
return
109100
}
110101

111102
this.outBuffer.push(orderedMessage)
112103
}
113104

114105
stop(): void {
115-
this.done = true
106+
this.outBuffer.endWrite()
107+
this.abortController.abort()
108+
}
109+
110+
private isDone() {
111+
return this.abortController.signal.aborted
116112
}
117113

118114
private maybeClose(): void {
@@ -131,7 +127,9 @@ export class OrderMessages {
131127
private async addToOrderingUtil(src: AsyncGenerator<StreamMessage>): Promise<void> {
132128
try {
133129
for await (const msg of src) {
134-
this.orderingUtil.add(msg)
130+
if (!this.isDone()) {
131+
this.orderingUtil.add(msg)
132+
}
135133
}
136134
this.inputClosed = true
137135
this.maybeClose()
@@ -146,10 +144,9 @@ export class OrderMessages {
146144
this.addToOrderingUtil(src)
147145
yield* this.outBuffer
148146
} finally {
149-
this.done = true
147+
this.stop()
150148
this.orderingUtil.clearGaps()
151-
this.resendStreams.forEach((s) => s.end())
152-
this.resendStreams.clear()
149+
// TODO why there are two clearGaps() calls?
153150
this.orderingUtil.clearGaps()
154151
}
155152
}.bind(this)

packages/client/src/subscribe/Resends.ts

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,8 @@ export class Resends {
121121
async resend(
122122
streamPartId: StreamPartID,
123123
options: ResendOptions & { raw?: boolean },
124-
getStorageNodes?: (streamId: StreamID) => Promise<EthereumAddress[]>
124+
getStorageNodes?: (streamId: StreamID) => Promise<EthereumAddress[]>,
125+
abortSignal?: AbortSignal
125126
): Promise<PushPipeline<StreamMessage, StreamMessage>> {
126127
const raw = options.raw ?? false
127128
if (isResendLast(options)) {
@@ -132,7 +133,7 @@ export class Resends {
132133
}
133134
return this.fetchStream('last', streamPartId, {
134135
count: options.last
135-
}, raw, getStorageNodes)
136+
}, raw, getStorageNodes, abortSignal)
136137
} else if (isResendRange(options)) {
137138
return this.fetchStream('range', streamPartId, {
138139
fromTimestamp: new Date(options.from.timestamp).getTime(),
@@ -141,13 +142,13 @@ export class Resends {
141142
toSequenceNumber: options.to.sequenceNumber,
142143
publisherId: options.publisherId !== undefined ? toEthereumAddress(options.publisherId) : undefined,
143144
msgChainId: options.msgChainId
144-
}, raw, getStorageNodes)
145+
}, raw, getStorageNodes, abortSignal)
145146
} else if (isResendFrom(options)) {
146147
return this.fetchStream('from', streamPartId, {
147148
fromTimestamp: new Date(options.from.timestamp).getTime(),
148149
fromSequenceNumber: options.from.sequenceNumber,
149150
publisherId: options.publisherId !== undefined ? toEthereumAddress(options.publisherId) : undefined
150-
}, raw, getStorageNodes)
151+
}, raw, getStorageNodes, abortSignal)
151152
} else {
152153
throw new StreamrClientError(
153154
`can not resend without valid resend options: ${JSON.stringify({ streamPartId, options })}`,
@@ -161,7 +162,8 @@ export class Resends {
161162
streamPartId: StreamPartID,
162163
query: QueryDict,
163164
raw: boolean,
164-
getStorageNodes?: (streamId: StreamID) => Promise<EthereumAddress[]>
165+
getStorageNodes?: (streamId: StreamID) => Promise<EthereumAddress[]>,
166+
abortSignal?: AbortSignal
165167
): Promise<PushPipeline<StreamMessage, StreamMessage>> {
166168
const traceId = randomString(5)
167169
this.logger.debug('Fetch resend data', {
@@ -191,7 +193,7 @@ export class Resends {
191193
getStorageNodes: async () => without(nodeAddresses, nodeAddress),
192194
config: (nodeAddresses.length === 1) ? { ...this.config, orderMessages: false } : this.config
193195
})
194-
const lines = transformError(fetchHttpStream(url), getHttpErrorTransform())
196+
const lines = transformError(fetchHttpStream(url, abortSignal), getHttpErrorTransform())
195197
setImmediate(async () => {
196198
let count = 0
197199
const messages = map(lines, (line: string) => StreamMessage.deserialize(line))

packages/client/src/utils/utils.ts

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
import { ContractReceipt } from '@ethersproject/contracts'
22
import { StreamID, toStreamID } from '@streamr/protocol'
3-
import { Logger, TheGraphClient, merge, randomString, toEthereumAddress } from '@streamr/utils'
3+
import { Logger, TheGraphClient, composeAbortSignals, merge, randomString, toEthereumAddress } from '@streamr/utils'
4+
import compact from 'lodash/compact'
45
import fetch, { Response } from 'node-fetch'
5-
import { AbortSignal } from 'node-fetch/externals'
6+
import { AbortSignal as FetchAbortSignal } from 'node-fetch/externals'
67
import split2 from 'split2'
78
import { Readable } from 'stream'
89
import LRU from '../../vendor/quick-lru'
@@ -160,12 +161,14 @@ export class FetchHttpStreamResponseError extends Error {
160161

161162
export const fetchHttpStream = async function*(
162163
url: string,
163-
abortController = new AbortController()
164+
abortSignal?: AbortSignal
164165
): AsyncGenerator<string, void, undefined> {
165-
logger.debug('Send HTTP request', { url })
166+
logger.debug('Send HTTP request', { url })
167+
const abortController = new AbortController()
168+
const fetchAbortSignal = composeAbortSignals(...compact([abortController.signal, abortSignal]))
166169
const response: Response = await fetch(url, {
167170
// cast is needed until this is fixed: https://github.com/node-fetch/node-fetch/issues/1652
168-
signal: abortController.signal as AbortSignal
171+
signal: fetchAbortSignal as FetchAbortSignal
169172
})
170173
logger.debug('Received HTTP response', {
171174
url,
@@ -193,5 +196,6 @@ export const fetchHttpStream = async function*(
193196
throw err
194197
} finally {
195198
stream?.destroy()
199+
fetchAbortSignal.destroy()
196200
}
197201
}

packages/client/test/unit/OrderMessages.test.ts

Lines changed: 49 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
1-
import { MessageID, StreamMessage, StreamPartIDUtils, toStreamID } from '@streamr/protocol'
1+
import { MessageID, StreamID, StreamMessage, StreamPartID, StreamPartIDUtils, toStreamID } from '@streamr/protocol'
22
import { randomEthereumAddress } from '@streamr/test-utils'
3-
import { collect } from '@streamr/utils'
3+
import { EthereumAddress, collect } from '@streamr/utils'
44
import range from 'lodash/range'
55
import without from 'lodash/without'
6-
import { OrderMessages } from '../../src/subscribe/OrderMessages'
7-
import { Resends } from '../../src/subscribe/Resends'
8-
import { fromArray } from '../../src/utils/GeneratorUtils'
9-
import { PushPipeline } from '../../src/utils/PushPipeline'
10-
import { mockLoggerFactory } from '../test-utils/utils'
6+
import { OrderMessages } from './../../src/subscribe/OrderMessages'
7+
import { ResendOptions, Resends } from './../../src/subscribe/Resends'
8+
import { fromArray } from './../../src/utils/GeneratorUtils'
9+
import { PushPipeline } from './../../src/utils/PushPipeline'
10+
import { mockLoggerFactory } from './../test-utils/utils'
1111

1212
const MESSAGE_COUNT = 7
1313
const STREAM_PART_ID = StreamPartIDUtils.parse('stream#0')
@@ -22,13 +22,13 @@ const CONFIG = {
2222
gapFillTimeout: 50
2323
}
2424

25-
const createTransform = (resends: Pick<Resends, 'resend'>, config = CONFIG) => {
25+
const createOrderMessages = (resends: Pick<Resends, 'resend'>, config = CONFIG) => {
2626
return new OrderMessages(
2727
config as any,
2828
resends as any,
2929
STREAM_PART_ID,
3030
mockLoggerFactory()
31-
).transform()
31+
)
3232
}
3333

3434
const createMockMessages = async (): Promise<StreamMessage[]> => {
@@ -58,7 +58,7 @@ describe('OrderMessages', () => {
5858

5959
it('no gaps', async () => {
6060
const msgs = await createMockMessages()
61-
const transform = createTransform(undefined as any)
61+
const transform = createOrderMessages(undefined as any).transform()
6262
const output = transform(fromArray(msgs))
6363
expect(await collect(output)).toEqual(msgs)
6464
})
@@ -69,7 +69,7 @@ describe('OrderMessages', () => {
6969
const resends = {
7070
resend: jest.fn().mockResolvedValue(createMessageStream(...missing))
7171
}
72-
const transform = createTransform(resends)
72+
const transform = createOrderMessages(resends).transform()
7373
const output = transform(fromArray(without(msgs, ...missing)))
7474
expect(await collect(output)).toEqual(msgs)
7575
expect(resends.resend).toBeCalledWith(
@@ -87,7 +87,8 @@ describe('OrderMessages', () => {
8787
msgChainId: MSG_CHAIN_ID,
8888
raw: true
8989
},
90-
undefined
90+
undefined,
91+
expect.anything()
9192
)
9293
})
9394

@@ -97,7 +98,7 @@ describe('OrderMessages', () => {
9798
const resends = {
9899
resend: jest.fn().mockResolvedValue(createMessageStream(...missing))
99100
}
100-
const transform = createTransform(resends)
101+
const transform = createOrderMessages(resends).transform()
101102
const output = transform(fromArray(without(msgs, ...missing)))
102103
expect(await collect(output)).toEqual(msgs)
103104
expect(resends.resend).toBeCalledWith(
@@ -115,7 +116,8 @@ describe('OrderMessages', () => {
115116
msgChainId: MSG_CHAIN_ID,
116117
raw: true
117118
},
118-
undefined
119+
undefined,
120+
expect.anything()
119121
)
120122
})
121123

@@ -128,7 +130,7 @@ describe('OrderMessages', () => {
128130
.mockResolvedValueOnce(createMessageStream(...missing1))
129131
.mockResolvedValueOnce(createMessageStream(...missing2))
130132
}
131-
const transform = createTransform(resends)
133+
const transform = createOrderMessages(resends).transform()
132134
const output = transform(fromArray(without(msgs, ...missing1.concat(missing2))))
133135
expect(await collect(output)).toEqual(msgs)
134136
expect(resends.resend).toHaveBeenNthCalledWith(1,
@@ -146,7 +148,8 @@ describe('OrderMessages', () => {
146148
msgChainId: MSG_CHAIN_ID,
147149
raw: true
148150
},
149-
undefined
151+
undefined,
152+
expect.anything()
150153
)
151154
expect(resends.resend).toHaveBeenNthCalledWith(2,
152155
STREAM_PART_ID,
@@ -163,7 +166,8 @@ describe('OrderMessages', () => {
163166
msgChainId: MSG_CHAIN_ID,
164167
raw: true
165168
},
166-
undefined
169+
undefined,
170+
expect.anything()
167171
)
168172
})
169173

@@ -173,7 +177,7 @@ describe('OrderMessages', () => {
173177
const resends = {
174178
resend: jest.fn().mockImplementation(() => createMessageStream())
175179
}
176-
const transform = createTransform(resends)
180+
const transform = createOrderMessages(resends).transform()
177181
const output = transform(fromArray(without(msgs, ...missing)))
178182
expect(await collect(output)).toEqual(without(msgs, ...missing))
179183
expect(resends.resend).toBeCalledTimes(CONFIG.maxGapRequests)
@@ -185,9 +189,9 @@ describe('OrderMessages', () => {
185189
const resends = {
186190
resend: jest.fn()
187191
}
188-
const transform = createTransform(resends, {
192+
const transform = createOrderMessages(resends, {
189193
orderMessages: false
190-
} as any)
194+
} as any).transform()
191195
const output = transform(fromArray(without(msgs, ...missing)))
192196
expect(await collect(output)).toEqual(without(msgs, ...missing))
193197
expect(resends.resend).toBeCalledTimes(0)
@@ -209,9 +213,33 @@ describe('OrderMessages', () => {
209213
.mockRejectedValueOnce(new Error('mock-error'))
210214
.mockResolvedValueOnce(createMessageStream(...missing3))
211215
}
212-
const transform = createTransform(resends)
216+
const transform = createOrderMessages(resends).transform()
213217
const output = transform(fromArray(without(msgs, ...missing1.concat(missing2).concat(missing3))))
214218
expect(await collect(output)).toEqual(without(msgs, ...missing2))
215219
expect(resends.resend).toBeCalledTimes(2 + CONFIG.maxGapRequests)
216220
})
221+
222+
it('aborts resends when stopped', async () => {
223+
const msgs = await createMockMessages()
224+
const missing = msgs.filter((m) => m.getTimestamp() === 3000)
225+
let orderedMessages: OrderMessages | undefined = undefined
226+
let resendAborted = false
227+
const resends = {
228+
resend: jest.fn().mockImplementation((
229+
_streamPartId: StreamPartID,
230+
_options: ResendOptions & { raw?: boolean },
231+
_getStorageNodes?: (streamId: StreamID) => Promise<EthereumAddress[]>,
232+
abortSignal?: AbortSignal
233+
) => {
234+
abortSignal!.addEventListener('abort', () => resendAborted = true)
235+
orderedMessages!.stop()
236+
return createMessageStream(...missing)
237+
})
238+
}
239+
orderedMessages = createOrderMessages(resends)
240+
const transform = orderedMessages.transform()
241+
const output = transform(fromArray(without(msgs, ...missing)))
242+
expect(await collect(output)).toEqual(msgs.filter((msg) => msg.getTimestamp() < missing[0].getTimestamp()))
243+
expect(resendAborted).toBe(true)
244+
})
217245
})

packages/client/test/unit/utils.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ describe('utils', () => {
5050
res.write(`foobar\n`)
5151
})
5252
const abortController = new AbortController()
53-
const iterator = fetchHttpStream(server.url, abortController)[Symbol.asyncIterator]()
53+
const iterator = fetchHttpStream(server.url, abortController.signal)[Symbol.asyncIterator]()
5454
const line = await nextValue(iterator)
5555
expect(line).toBe('foobar')
5656
abortController.abort()

0 commit comments

Comments
 (0)