diff --git a/examples/coin-app/src/index.tsx b/examples/coin-app/src/index.tsx index 448a4d6ace9c..6185b24ff3d5 100644 --- a/examples/coin-app/src/index.tsx +++ b/examples/coin-app/src/index.tsx @@ -29,7 +29,7 @@ const spouts = JSONSpout()( getManagers: () => { return [ new StreamManager( - new WebSocket('wss://ws-feed.exchange.coinbase.com'), + () => new WebSocket('wss://ws-feed.exchange.coinbase.com'), { ticker: getTicker }, ), ...getDefaultManagers().filter( diff --git a/examples/coin-app/src/resources/StreamManager.ts b/examples/coin-app/src/resources/StreamManager.ts index f4f5533f29c5..f9dfedf223f3 100644 --- a/examples/coin-app/src/resources/StreamManager.ts +++ b/examples/coin-app/src/resources/StreamManager.ts @@ -14,24 +14,43 @@ export default class StreamManager implements Manager { []; protected product_ids: string[] = []; + private attempts = 0; + protected declare connect: () => void; constructor( - evtSource: WebSocket, // | EventSource, + evtSource: () => WebSocket, // | EventSource, endpoints: Record, ) { - this.evtSource = evtSource; this.endpoints = endpoints; this.middleware = controller => { - this.evtSource.onmessage = event => { - try { - const msg = JSON.parse(event.data); - this.handleMessage(controller, msg); - } catch (e) { - console.error('Failed to handle message'); - console.error(e); - } + this.connect = () => { + this.evtSource = evtSource(); + this.evtSource.onmessage = event => { + try { + const msg = JSON.parse(event.data); + this.handleMessage(controller, msg); + } catch (e) { + console.error('Failed to handle message'); + console.error(e); + } + }; + this.evtSource.onopen = () => { + console.info('WebSocket connected'); + // Reset reconnection attempts after a successful connection + this.attempts = 0; + }; + this.evtSource.onclose = () => { + console.info('WebSocket disconnected'); + this.reconnect(); + }; + this.evtSource.onerror = error => { + console.error('WebSocket error:', error); + // Ensures that the onclose handler gets triggered for reconnection + this.evtSource.close(); + }; }; + this.connect(); return next => async action => { switch (action.type) { case actionTypes.SUBSCRIBE_TYPE: @@ -116,7 +135,23 @@ export default class StreamManager implements Manager { }); } + reconnect() { + // Exponential backoff formula to gradually increase the reconnection time + setTimeout( + () => { + console.info( + `Attempting to reconnect... (Attempt: ${this.attempts + 1})`, + ); + this.attempts++; + this.connect(); + }, + Math.min(10000, (Math.pow(2, this.attempts) - 1) * 1000), + ); + } + cleanup() { + // remove our event handler that attempts reconnection + this.evtSource.onclose = null; this.evtSource.close(); }