From d51851a498ee7b57d6e48f4d8f1df2eee076a6ba Mon Sep 17 00:00:00 2001 From: Nathaniel Tucker Date: Tue, 30 Apr 2024 12:00:52 -0400 Subject: [PATCH] enhance: Improve stream manager against disconnects --- examples/coin-app/src/index.tsx | 2 +- .../coin-app/src/resources/StreamManager.ts | 55 +++++++++++++--- examples/nextjs/app/Provider.tsx | 2 +- examples/nextjs/resources/StreamManager.ts | 63 ++++++++++++++----- 4 files changed, 96 insertions(+), 26 deletions(-) diff --git a/examples/coin-app/src/index.tsx b/examples/coin-app/src/index.tsx index d605c3400d9..d1fb4a3ae67 100644 --- a/examples/coin-app/src/index.tsx +++ b/examples/coin-app/src/index.tsx @@ -31,7 +31,7 @@ const spouts = JSONSpout()( getManagers: () => { return [ new StreamManager( - new WebSocket('wss://ws-feed.exchange.coinbase.com'), + () => new WebSocket('wss://ws-feed.exchange.coinbase.com'), { ticker: getTicker }, ), ...getManagers(), diff --git a/examples/coin-app/src/resources/StreamManager.ts b/examples/coin-app/src/resources/StreamManager.ts index f4f5533f29c..a7ff07f1819 100644 --- a/examples/coin-app/src/resources/StreamManager.ts +++ b/examples/coin-app/src/resources/StreamManager.ts @@ -14,23 +14,41 @@ export default class StreamManager implements Manager { []; protected product_ids: string[] = []; + private attempts = 0; + protected declare connect: () => void; constructor( - evtSource: WebSocket, // | EventSource, + evtSource: () => WebSocket, // | EventSource, endpoints: Record, ) { - this.evtSource = evtSource; this.endpoints = endpoints; this.middleware = controller => { - this.evtSource.onmessage = event => { - try { - const msg = JSON.parse(event.data); - this.handleMessage(controller, msg); - } catch (e) { - console.error('Failed to handle message'); - console.error(e); - } + this.connect = () => { + this.evtSource = evtSource(); + this.evtSource.onmessage = event => { + try { + const msg = JSON.parse(event.data); + this.handleMessage(controller, msg); + } catch (e) { + console.error('Failed to handle message'); + console.error(e); + } + }; + this.evtSource.onopen = () => { + console.info('WebSocket connected'); + // Reset reconnection attempts after a successful connection + this.attempts = 0; + }; + this.evtSource.onclose = () => { + console.info('WebSocket disconnected'); + this.reconnect(); + }; + this.evtSource.onerror = error => { + console.error('WebSocket error:', error); + // Ensures that the onclose handler gets triggered for reconnection + this.evtSource.close(); + }; }; return next => async action => { switch (action.type) { @@ -110,13 +128,30 @@ export default class StreamManager implements Manager { } init() { + this.connect(); this.evtSource.addEventListener('open', event => { //this.msgQueue.forEach((msg) => this.evtSource.send(msg)); this.flushSubscribe(); }); } + reconnect() { + // Exponential backoff formula to gradually increase the reconnection time + setTimeout( + () => { + console.info( + `Attempting to reconnect... (Attempt: ${this.attempts + 1})`, + ); + this.attempts++; + this.connect(); + }, + Math.min(10000, (Math.pow(2, this.attempts) - 1) * 1000), + ); + } + cleanup() { + // remove our event handler that attempts reconnection + this.evtSource.onclose = null; this.evtSource.close(); } diff --git a/examples/nextjs/app/Provider.tsx b/examples/nextjs/app/Provider.tsx index 1f7419e5fbc..c0d178ab518 100644 --- a/examples/nextjs/app/Provider.tsx +++ b/examples/nextjs/app/Provider.tsx @@ -7,7 +7,7 @@ import { getTicker } from 'resources/Ticker'; const managers = typeof window === 'undefined' ? getDefaultManagers() : ( [ - new StreamManager(new WebSocket('wss://ws-feed.exchange.coinbase.com'), { + new StreamManager(() => new WebSocket('wss://ws-feed.exchange.coinbase.com'), { ticker: getTicker, }), ...getDefaultManagers(), diff --git a/examples/nextjs/resources/StreamManager.ts b/examples/nextjs/resources/StreamManager.ts index c9bffa4f440..acc2f73a8a6 100644 --- a/examples/nextjs/resources/StreamManager.ts +++ b/examples/nextjs/resources/StreamManager.ts @@ -7,30 +7,48 @@ import { ActionTypes, Controller, actionTypes } from '@data-client/react'; * https://docs.cloud.coinbase.com/advanced-trade-api/docs/ws-overview */ export default class StreamManager implements Manager { - protected middleware: Middleware; - protected evtSource: WebSocket; // | EventSource; - protected endpoints: Record; + protected declare middleware: Middleware; + protected declare evtSource: WebSocket; // | EventSource; + protected declare endpoints: Record; protected msgQueue: (string | ArrayBufferLike | Blob | ArrayBufferView)[] = []; protected product_ids: string[] = []; + private attempts = 0; + protected declare connect: () => void; constructor( - evtSource: WebSocket, // | EventSource, + evtSource: () => WebSocket, // | EventSource, endpoints: Record, ) { - this.evtSource = evtSource; this.endpoints = endpoints; this.middleware = controller => { - this.evtSource.onmessage = event => { - try { - const msg = JSON.parse(event.data); - this.handleMessage(controller, msg); - } catch (e) { - console.error('Failed to handle message'); - console.error(e); - } + this.connect = () => { + this.evtSource = evtSource(); + this.evtSource.onmessage = event => { + try { + const msg = JSON.parse(event.data); + this.handleMessage(controller, msg); + } catch (e) { + console.error('Failed to handle message'); + console.error(e); + } + }; + this.evtSource.onopen = () => { + console.info('WebSocket connected'); + // Reset reconnection attempts after a successful connection + this.attempts = 0; + }; + this.evtSource.onclose = () => { + console.info('WebSocket disconnected'); + this.reconnect(); + }; + this.evtSource.onerror = error => { + console.error('WebSocket error:', error); + // Ensures that the onclose handler gets triggered for reconnection + this.evtSource.close(); + }; }; return next => async action => { switch (action.type) { @@ -110,14 +128,31 @@ export default class StreamManager implements Manager { } init() { + this.connect(); this.evtSource.addEventListener('open', event => { //this.msgQueue.forEach((msg) => this.evtSource.send(msg)); this.flushSubscribe(); }); } + reconnect() { + // Exponential backoff formula to gradually increase the reconnection time + setTimeout( + () => { + console.info( + `Attempting to reconnect... (Attempt: ${this.attempts + 1})`, + ); + this.attempts++; + this.connect(); + }, + Math.min(10000, (Math.pow(2, this.attempts) - 1) * 1000), + ); + } + cleanup() { - //this.evtSource.close(); + // remove our event handler that attempts reconnection + this.evtSource.onclose = null; + this.evtSource.close(); } getMiddleware() {