Skip to content

Commit

Permalink
chore: Add detectOpenHandles option
Browse files Browse the repository at this point in the history
  • Loading branch information
neet committed Jul 27, 2023
1 parent 4b06ae1 commit d8625b1
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 110 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
"scripts": {
"test": "npm-run-all test:*",
"test:unit": "jest --coverage --config=jest.config.cjs --selectProjects unit",
"test:e2e": "jest --coverage --runInBand --config=jest.config.cjs --selectProjects e2e",
"test:e2e": "jest --coverage --runInBand --detectOpenHandles --config=jest.config.cjs --selectProjects e2e",
"lint": "npm-run-all lint:*",
"lint:eslint": "eslint -c .eslintrc.json ./src/**/*.ts --cache",
"lint:spellcheck": "cspell '{src,examples}/**/*.{ts,tsx,js,json,md}'",
Expand Down
12 changes: 10 additions & 2 deletions src/utils/web-socket/async-iterable.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,23 @@
import { on } from 'events-to-async';
import type WebSocket from 'isomorphic-ws';

import { MastoUnexpectedError } from '../../errors';

export async function* toAsyncIterable(
ws: WebSocket,
): AsyncIterable<WebSocket.MessageEvent> {
const handleClose = (e: WebSocket.CloseEvent) => {
events.return?.(e);
if (events.return == undefined) {
throw new MastoUnexpectedError('events.return is undefined');
}
events.return(e);
};

const handleError = (e: WebSocket.ErrorEvent) => {
events.throw?.(e);
if (events.throw == undefined) {
throw new MastoUnexpectedError('events.return is undefined');
}
events.throw(e);
};

const events = on<[WebSocket.MessageEvent]>((handler) => {
Expand Down
24 changes: 16 additions & 8 deletions src/ws/web-socket-connector.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import type WebSocket from 'ws';

import type { MastoWebSocketConfig } from '../config';
import { MastoInvalidArgumentError } from '../errors';
import type { Logger } from '../logger';
import { ExponentialBackoff } from '../utils';
import { waitForAsyncIterableToEnd } from '../utils/wait-for-async-iterable-to-end';
Expand All @@ -16,7 +15,7 @@ export type WebSocketConnection = {

export class WebSocketConnector {
private ws?: WebSocket;
private isClosed = false;
private disableRetry = false;

constructor(
private readonly params: ConstructorParameters<typeof WebSocket>,
Expand All @@ -27,10 +26,10 @@ export class WebSocketConnector {
async *getConnections(): AsyncGenerator<WebSocketConnection> {
const backoff = new ExponentialBackoff(2);

while (backoff.attempts < this.config.maxAttempts && !this.isClosed) {
while (this.shouldRetry(backoff)) {
try {
this.ws = await webSocket.promises.connect(this.params);
this.logger.debug('WebSocket connection established');
this.logger.info('WebSocket connection established');
const messages = webSocket.toAsyncIterable(this.ws);

yield {
Expand All @@ -41,23 +40,32 @@ export class WebSocketConnector {
};

await waitForAsyncIterableToEnd(messages);
this.logger.debug('WebSocket connection closed');
this.logger.info('WebSocket connection closed');
backoff.clear();
} catch (error) {
this.logger.warn('WebSocket error occurred', error);
} finally {
this.logger.debug(`Reconnecting in ${backoff.timeout}ms...`);
this.logger.info(`Reconnecting in ${backoff.timeout}ms...`);
await backoff.sleep();
}
}
}

close(): void {
// It can be undefined if client is closed before connection is established
if (this.ws == undefined) {
throw new MastoInvalidArgumentError('WebSocket is not connected');
return;
}

this.isClosed = true;
this.disableRetry = true;
this.ws.close();
}

private shouldRetry(backoff: ExponentialBackoff): boolean {
if (this.disableRetry) {
return false;
}

return backoff.attempts < this.config.maxAttempts;
}
}
1 change: 1 addition & 0 deletions test-utils/pools/session-pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ export class SessionPoolImpl extends BasePool<Session> {
};

protected releaseOne = async (session: Session): Promise<void> => {
session.ws.close();
const token = this.sessionToToken.get(session);
if (token == undefined) {
return;
Expand Down
36 changes: 18 additions & 18 deletions tests/websocket/events.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ describe('events', () => {
try {
const events = session.ws.subscribe('public:local');

setTimeout(async () => {
const dispatch = async () => {
const status = await session.rest.v1.statuses.create({
status: 'test',
});
Expand All @@ -21,9 +21,12 @@ describe('events', () => {
});
await delay(1000);
await session.rest.v1.statuses.select(status.id).remove();
}, 500);
};

const [e1, e2, e3] = await events.take(3).toArray();
const [[e1, e2, e3]] = await Promise.all([
events.take(3).toArray(),
dispatch(),
]);

assert(e1?.event === 'update');
expect(e1.payload.content).toBe('<p>test</p>');
Expand All @@ -33,7 +36,6 @@ describe('events', () => {
expect(e3.payload).toBe(id);
} finally {
session.ws.unsubscribe('public:local');
session.ws.close();
}
});
});
Expand All @@ -43,22 +45,21 @@ describe('events', () => {
try {
const events = session.ws.subscribe('user');

setTimeout(async () => {
const dispatch = async () => {
const filter = await session.rest.v2.filters.create({
title: 'test',
context: ['public'],
keywordsAttributes: [{ keyword: 'TypeScript' }],
});
await session.rest.v2.filters.select(filter.id).remove();
}, 500);
};

const [e] = await events.take(1).toArray();
const [[e]] = await Promise.all([events.take(1).toArray(), dispatch()]);

assert(e?.event === 'filters_changed');
expect(e.payload).toBeUndefined();
} finally {
session.ws.unsubscribe('user');
session.ws.close();
}
});
});
Expand All @@ -70,18 +71,17 @@ describe('events', () => {
try {
const events = alice.ws.subscribe('user:notification');

setTimeout(async () => {
await bob.rest.v1.accounts.select(alice.id).follow();
}, 500);
const [[e]] = await Promise.all([
events.take(1).toArray(),
bob.rest.v1.accounts.select(alice.id).follow(),
]);

const [e] = await events.take(1).toArray();
assert(e?.event === 'notification');
expect(e.payload.account?.id).toBe(bob.id);
expect(e.payload.status?.id).toBe(id);
} finally {
await bob.rest.v1.accounts.select(alice.id).unfollow();
alice.ws.unsubscribe('user:notification');
alice.ws.close();
}
});
});
Expand All @@ -91,24 +91,24 @@ describe('events', () => {
let id!: string;

try {
setTimeout(async () => {
const events = alice.ws.subscribe('direct');

const dispatch = async () => {
const status = await bob.rest.v1.statuses.create({
status: `@${alice.acct} Hello there`,
visibility: 'direct',
});
id = status.id;
await delay(1000);
}, 500);
};

const events = alice.ws.subscribe('direct');
const [e] = await events.take(1).toArray();
const [[e]] = await Promise.all([events.take(1).toArray(), dispatch()]);

assert(e?.event === 'conversation');
expect(e.payload.lastStatus?.id).toBe(id);
} finally {
await bob.rest.v1.statuses.select(id).remove();
alice.ws.unsubscribe('direct');
alice.ws.close();
}
});
});
Expand Down
Loading

0 comments on commit d8625b1

Please sign in to comment.