Skip to content

Commit

Permalink
Merge pull request #1120 from neet/refactor-ws
Browse files Browse the repository at this point in the history
fix: Refactor WebSocketSubscription and remove unused code
  • Loading branch information
neet committed May 3, 2024
2 parents 409d2fb + 6870df2 commit e74ca97
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 20 deletions.
24 changes: 10 additions & 14 deletions src/adapters/ws/web-socket-subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ export class WebSocketSubscription implements mastodon.streaming.Subscription {
while (this.connector.canAcquire()) {
this.connection = await this.connector.acquire();

const messages = toAsyncIterable(this.connection);

const data = this.serializer.serialize("json", {
type: "subscribe",
stream: this.stream,
Expand All @@ -37,8 +35,15 @@ export class WebSocketSubscription implements mastodon.streaming.Subscription {
this.logger?.log("debug", "↑ WEBSOCKET", data);
this.connection.send(data);

for await (const event of this.transformIntoEvents(messages)) {
if (!this.matches(event)) continue;
const messages = toAsyncIterable(this.connection);

for await (const message of messages) {
const event = await this.parseMessage(message.data as string);

if (!this.test(event)) {
continue;
}

this.logger?.log("debug", "↓ WEBSOCKET", event);
yield event;
}
Expand Down Expand Up @@ -70,7 +75,7 @@ export class WebSocketSubscription implements mastodon.streaming.Subscription {
this.unsubscribe();
}

private matches(event: mastodon.streaming.Event): boolean {
private test(event: mastodon.streaming.Event): boolean {
// subscribe("hashtag", { tag: "foo" }) -> ["hashtag", "foo"]
// subscribe("list", { list: "foo" }) -> ["list", "foo"]
const params = this.params ?? {};
Expand All @@ -79,15 +84,6 @@ export class WebSocketSubscription implements mastodon.streaming.Subscription {
return stream.every((s) => event.stream.includes(s));
}

private async *transformIntoEvents(
messages: AsyncIterable<WebSocket.MessageEvent>,
) {
for await (const message of messages) {
const event = await this.parseMessage(message.data as string);
yield event;
}
}

private async parseMessage(
rawEvent: string,
): Promise<mastodon.streaming.Event> {
Expand Down
4 changes: 2 additions & 2 deletions src/utils/exponential-backoff.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ export class ExponentialBackoff {
throw new ExponentialBackoffError(this.attempts);
}

await sleep(this.getTimeout());
await sleep(this.timeout);
this.attempts++;
}

Expand All @@ -45,7 +45,7 @@ export class ExponentialBackoff {
return this.props.maxAttempts ?? Number.POSITIVE_INFINITY;
}

getTimeout(): number {
private get timeout(): number {
return this.factor * this.base ** this.attempts;
}
}
4 changes: 0 additions & 4 deletions src/utils/noop.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
export function noop(): void {
//
}

export async function noopAsync(): Promise<void> {
//
}

0 comments on commit e74ca97

Please sign in to comment.