Skip to content

Commit

Permalink
enhance: Improve stream manager against disconnects
Browse files Browse the repository at this point in the history
  • Loading branch information
ntucker committed Apr 30, 2024
1 parent 655e48b commit 9370a1b
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 11 deletions.
2 changes: 1 addition & 1 deletion examples/coin-app/src/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ const spouts = JSONSpout()(
getManagers: () => {
return [
new StreamManager(
new WebSocket('wss://ws-feed.exchange.coinbase.com'),
() => new WebSocket('wss://ws-feed.exchange.coinbase.com'),
{ ticker: getTicker },
),
...getDefaultManagers().filter(
Expand Down
55 changes: 45 additions & 10 deletions examples/coin-app/src/resources/StreamManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,43 @@ export default class StreamManager implements Manager {
[];

protected product_ids: string[] = [];
private attempts = 0;
protected declare connect: () => void;

constructor(
evtSource: WebSocket, // | EventSource,
evtSource: () => WebSocket, // | EventSource,
endpoints: Record<string, EndpointInterface>,
) {
this.evtSource = evtSource;
this.endpoints = endpoints;

this.middleware = controller => {
this.evtSource.onmessage = event => {
try {
const msg = JSON.parse(event.data);
this.handleMessage(controller, msg);
} catch (e) {
console.error('Failed to handle message');
console.error(e);
}
this.connect = () => {
this.evtSource = evtSource();
this.evtSource.onmessage = event => {
try {
const msg = JSON.parse(event.data);
this.handleMessage(controller, msg);
} catch (e) {
console.error('Failed to handle message');
console.error(e);
}
};
this.evtSource.onopen = () => {
console.info('WebSocket connected');
// Reset reconnection attempts after a successful connection
this.attempts = 0;
};
this.evtSource.onclose = () => {
console.info('WebSocket disconnected');
this.reconnect();
};
this.evtSource.onerror = error => {
console.error('WebSocket error:', error);
// Ensures that the onclose handler gets triggered for reconnection
this.evtSource.close();
};
};
this.connect();
return next => async action => {
switch (action.type) {
case actionTypes.SUBSCRIBE_TYPE:
Expand Down Expand Up @@ -116,7 +135,23 @@ export default class StreamManager implements Manager {
});
}

reconnect() {
// Exponential backoff formula to gradually increase the reconnection time
setTimeout(
() => {
console.info(
`Attempting to reconnect... (Attempt: ${this.attempts + 1})`,
);
this.attempts++;
this.connect();
},
Math.min(10000, (Math.pow(2, this.attempts) - 1) * 1000),
);
}

cleanup() {
// remove our event handler that attempts reconnection
this.evtSource.onclose = null;
this.evtSource.close();
}

Expand Down

0 comments on commit 9370a1b

Please sign in to comment.