Skip to content

Commit 291a54a

Browse files
authored
fix(store-sync): backoff on failed connection, close websockets (#3752)
1 parent e7012e7 commit 291a54a

3 files changed

Lines changed: 15 additions & 4 deletions

File tree

.changeset/blue-gorillas-buy.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@latticexyz/store-sync": patch
3+
---
4+
5+
The preconfirmed logs stream now waits before reconnecting if a previous connection attempt failed.

packages/store-sync/src/createPreconfirmedBlockStream.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import {
1414
merge,
1515
filter,
1616
startWith,
17+
delay,
1718
} from "rxjs";
1819
import { StorageAdapterBlock, StoreEventsLog, SyncFilter } from "./common";
1920
import { watchLogs } from "./watchLogs";
@@ -66,12 +67,15 @@ export function createPreconfirmedBlockStream(opts: PreconfirmedBlockStreamOptio
6667

6768
let processedBlockLogs: { [blockNumber: string]: { [logIndex: number]: boolean } } = {};
6869
let preconfirmedLogsState: "initializing" | "initialized" | "waiting" = "waiting";
70+
let attempt = 0;
6971
const preconfirmedLogs$ = recreatePreconfirmedStream$.pipe(
7072
tap(() => {
71-
debug("initializing preconfirmed logs stream");
73+
debug(`initializing preconfirmed logs stream in ${attempt * 500}ms`);
7274
preconfirmedLogsState = "initializing";
7375
processedBlockLogs = {};
7476
}),
77+
delay(attempt * 500),
78+
tap(() => attempt++),
7579
switchMap(() =>
7680
watchLogs({
7781
...opts,
@@ -81,13 +85,15 @@ export function createPreconfirmedBlockStream(opts: PreconfirmedBlockStreamOptio
8185
catchError((e) => {
8286
debug("Error in preconfirmed logs stream, recreating", e);
8387
recreatePreconfirmedStream$.next();
84-
return throwError(() => e);
88+
return of(null);
8589
}),
8690
),
8791
),
92+
filter((block) => block != null),
8893
tap((block) => {
8994
debug("preconfirmed block", block.blockNumber, "with", block.logs.length, "logs");
9095
preconfirmedLogsState = "initialized";
96+
attempt = 0;
9197
restartBlockNumber = block.blockNumber;
9298
const seenLogs = (processedBlockLogs[String(block.blockNumber)] ??= {});
9399
block.logs.forEach((log) => {

packages/store-sync/src/watchLogs.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ export function watchLogs({ url, address, fromBlock }: WatchLogsInput): WatchLog
4242

4343
client = await getWebSocketRpcClient(getUncachedUrl(url), {
4444
keepAlive: true,
45-
reconnect: { attempts: 100, delay: 1_000 },
4645
});
4746

4847
// Start watching pending logs
@@ -109,8 +108,9 @@ export function watchLogs({ url, address, fromBlock }: WatchLogsInput): WatchLog
109108
debug("logs$ subscription closed, closing client");
110109
try {
111110
client?.close();
111+
client?.socket?.close();
112112
} catch (e) {
113-
debug("failed to close client", e);
113+
debug("failed to close client/socket", e);
114114
}
115115
};
116116
});

0 commit comments

Comments
 (0)