Skip to content

Commit

Permalink
feat!: Change WebSocket to AsyncIterator
Browse files Browse the repository at this point in the history
* feat!: Change WebSocket to AsyncIterator

wip

Add tests

fix types

fix unit tests

improve tests

fix CI errors

Update streaming example

Retry with an exponential backoff

prettify

improve tests

* disable public:media temporarily
  • Loading branch information
neet committed Jul 25, 2023
1 parent 6e61c3d commit fcdc5ec
Show file tree
Hide file tree
Showing 73 changed files with 1,425 additions and 771 deletions.
1 change: 1 addition & 0 deletions cspell.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
"words": [
"AGPL",
"asynckit",
"Backoff",
"blurhash",
"builtins",
"carryforward",
Expand Down
80 changes: 65 additions & 15 deletions examples/timeline-with-streaming.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,69 @@
import { login } from 'masto';
import { createWebSocketClient } from 'masto';

const masto = await login({
url: 'https://example.com',
accessToken: 'YOUR TOKEN',
});
const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));

// Connect to the streaming api
const stream = await masto.v1.stream.streamPublicTimeline();
// https://en.wikipedia.org/wiki/Exponential_backoff
class ExponentialBackoff {
private errorCount = 0;

// Subscribe to updates
stream.on('update', (status) => {
console.log(`${status.account.username}: ${status.content}`);
});
constructor(private readonly baseSeconds: number) {}

// Subscribe to notifications
stream.on('notification', (notification) => {
console.log(`${notification.account.username}: ${notification.type}`);
});
get timeout() {
return this.baseSeconds ** this.errorCount * 1000;
}

clear() {
this.errorCount = 0;
}

async sleep() {
await sleep(this.timeout);
this.errorCount++;
}
}

// ========================================

const backoff = new ExponentialBackoff(2);

const subscribe = async (): Promise<void> => {
try {
const masto = createWebSocketClient({
url: '<API URL>',
accessToken: '<TOKEN>',
streamingApiUrl: '<STREAMING API URL>',
});

console.log('subscribed to #mastojs');

for await (const event of masto.subscribe('hashtag', { tag: 'mastojs' })) {
switch (event.event) {
case 'update': {
console.log('new post', event.payload.content);
break;
}
case 'delete': {
console.log('deleted post', event.payload);
break;
}
default: {
break;
}
}
}

backoff.clear();
} catch (error) {
console.error(error);
} finally {
console.log('Reconnecting in ' + backoff.timeout + 'ms');
await backoff.sleep();
await subscribe();
}
};

try {
await subscribe();
} catch (error) {
console.error(error);
}
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
"@mastojs/ponyfills": "^1.0.4",
"change-case": "^4.1.2",
"eventemitter3": "^5.0.0",
"events-to-async": "^2.0.0",
"isomorphic-ws": "^5.0.0",
"qs": "^6.11.0",
"ws": "^8.13.0"
Expand Down
1 change: 0 additions & 1 deletion src/__mocks__/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
export * from './http-mock-impl';
export * from './ws-mock-impl';
export * from './logger-mock-impl';
20 changes: 0 additions & 20 deletions src/__mocks__/ws-mock-impl.ts

This file was deleted.

4 changes: 2 additions & 2 deletions src/config.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ describe('Config', () => {
new SerializerNativeImpl(),
);

const url = config.resolveWebsocketPath('/path/to/somewhere');
const url = config.resolveWebsocketPath('/path/to/somewhere').toString();
expect(url).toEqual('wss://mastodon.social/path/to/somewhere');
});

Expand Down Expand Up @@ -123,7 +123,7 @@ describe('Config', () => {
new SerializerNativeImpl(),
);

const url = config.resolveWebsocketPath('/path/to/somewhere');
const url = config.resolveWebsocketPath('/path/to/somewhere').toString();
expect(url).toEqual(
'wss://mastodon.social/path/to/somewhere?access_token=token',
);
Expand Down
6 changes: 3 additions & 3 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,20 +63,20 @@ export class MastoConfig {
resolveWebsocketPath(
path: string,
params: Record<string, unknown> = {},
): string {
): URL {
if (this.props.streamingApiUrl == undefined) {
throw new MastoInvalidArgumentError(
'You need to specify `streamingApiUrl` to use this feature',
);
}

const url = new URL(this.props.streamingApiUrl.replace(/\/$/, '') + path);
const url = new URL(path, this.props.streamingApiUrl);
if (this.props.useInsecureWebSocketToken) {
params.accessToken = this.props.accessToken;
}

url.search = this.serializer.serializeQueryString(params);
return url.toString();
return url;
}

createTimeout(): Timeout {
Expand Down
11 changes: 11 additions & 0 deletions src/login.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { HttpNativeImpl } from './http';
import { LoggerConsoleImpl } from './logger';
import type { OAuthAPIClient, RestAPIClient } from './mastodon';
import { SerializerNativeImpl } from './serializers';
import { WebSocketAPIConnector } from './ws';

export const createClient = (props: MastoConfigProps): RestAPIClient => {
const serializer = new SerializerNativeImpl();
Expand All @@ -23,3 +24,13 @@ export const createOAuthClient = (props: MastoConfigProps): OAuthAPIClient => {
const builder = createBuilder(http, ['oauth']) as OAuthAPIClient;
return builder;
};

export function createWebSocketClient(
props: MastoConfigProps,
): WebSocketAPIConnector {
const serializer = new SerializerNativeImpl();
const config = new MastoConfig(props, serializer);
const logger = new LoggerConsoleImpl(config.getLogLevel());
const connector = new WebSocketAPIConnector(config, serializer, logger);
return connector;
}
1 change: 1 addition & 0 deletions src/mastodon/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ export * as v1 from './v1';
export * as v2 from './v2';
export * from './client';
export * from './repository';
export * from './websocket';
2 changes: 1 addition & 1 deletion src/mastodon/v2/repositories/filter-repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ export interface CreateFilterParams {
/** Integer. How many seconds from now should the filter expire? */
readonly expiresIn?: number | null;

readonly keywordsAttributes?: readonly {
readonly keywordsAttributes?: {
/** String. A keyword to be added to the newly-created filter group. */
readonly keyword?: string | null;
/** Boolean. Whether the keyword should consider word boundaries. */
Expand Down
47 changes: 47 additions & 0 deletions src/mastodon/websocket/client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import type { Event } from './event';

export type Stream =
| 'public'
| 'public:media'
| 'public:local'
| 'public:local:media'
| 'public:remote'
| 'public:remote:media'
| 'hashtag'
| 'hashtag:local'
| 'user'
| 'user:notification'
| 'list'
| 'direct';

export type SubscribeListParams = {
readonly list: string;
};

export type SubscribeHashtagParams = {
readonly tag: string;
};

export interface WebSocketAPIConnection {
readonly events: AsyncIterableIterator<Event>;
readonly readyState: number;

subscribe(
stream: 'list',
params: SubscribeListParams,
): AsyncIterableIterator<Event>;
subscribe(
stream: 'hashtag' | 'hashtag:local',
params: SubscribeHashtagParams,
): AsyncIterableIterator<Event>;
subscribe(stream: Stream): AsyncIterableIterator<Event>;

unsubscribe(stream: 'list', params: SubscribeListParams): void;
unsubscribe(
stream: 'hashtag' | 'hashtag:local',
params: SubscribeHashtagParams,
): void;
unsubscribe(stream: Stream): void;

close(): void;
}
60 changes: 60 additions & 0 deletions src/mastodon/websocket/event.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import type { mastodon } from '../..';

export type RawEventOk = {
stream: string[];
event: string;
payload: string;
};

export type RawEventError = {
error: string;
};

export type RawEvent = RawEventOk | RawEventError;

type BaseEvent<T, U> = {
stream: string[];
event: T;
payload: U;
};

export type UpdateEvent = BaseEvent<'update', mastodon.v1.Status>;

export type DeleteEvent = BaseEvent<'delete', string>;

export type NotificationEvent = BaseEvent<
'notification',
mastodon.v1.Notification
>;

export type FiltersChangedEvent = BaseEvent<'filters_changed', undefined>;

export type ConversationEvent = BaseEvent<
'conversation',
mastodon.v1.Conversation
>;

export type AnnouncementEvent = BaseEvent<
'announcement',
mastodon.v1.Announcement
>;

export type AnnouncementReactionEvent = BaseEvent<
'announcement.reaction',
mastodon.v1.Reaction
>;

export type AnnouncementDeleteEvent = BaseEvent<'announcement.delete', string>;

export type StatusUpdateEvent = BaseEvent<'status.update', mastodon.v1.Status>;

export type Event =
| UpdateEvent
| DeleteEvent
| NotificationEvent
| FiltersChangedEvent
| ConversationEvent
| AnnouncementEvent
| AnnouncementReactionEvent
| AnnouncementDeleteEvent
| StatusUpdateEvent;
2 changes: 2 additions & 0 deletions src/mastodon/websocket/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export * from './client';
export * from './event';
20 changes: 16 additions & 4 deletions src/paginator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,24 @@ import qs from 'qs';

import type { Http } from './http';

/* eslint-disable @typescript-eslint/no-explicit-any */
const mixins =
(globalThis as any).AsyncIterator == undefined
? class {}
: (globalThis as any).AsyncIterator;
/* eslint-enable @typescript-eslint/no-explicit-any */

export class Paginator<Entity, Params = never>
implements AsyncIterableIterator<Entity>, PromiseLike<Entity>
extends mixins
implements PromiseLike<Entity>
{
constructor(
private readonly http: Http,
private nextPath?: string,
private nextParams?: Params,
) {}
) {
super();
}

async next(): Promise<IteratorResult<Entity, undefined>> {
if (this.nextPath == undefined) {
Expand Down Expand Up @@ -61,12 +71,14 @@ export class Paginator<Entity, Params = never>
return this.next().then((value) => onfulfilled(value.value!), onrejected);
}

[Symbol.asyncIterator](): AsyncGenerator<
[Symbol.asyncIterator](): AsyncIterator<
Entity,
undefined,
Params | undefined
> {
return this;
// TODO: Use polyfill on demand
// eslint-disable-next-line @typescript-eslint/no-explicit-any
return this as any as AsyncIterator<Entity, undefined, Params | undefined>;
}

private clear() {
Expand Down
3 changes: 2 additions & 1 deletion src/serializers/serializer-native-impl.spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import type { Serializer } from './serializer';
import { SerializerNativeImpl } from './serializer-native-impl';

describe('SerializerNativeImpl', () => {
const serializer = new SerializerNativeImpl();
const serializer: Serializer = new SerializerNativeImpl();

it('encodes an object to JSON', () => {
const data = serializer.serialize('json', {
Expand Down
1 change: 1 addition & 0 deletions src/serializers/serializer-native-impl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import type { Encoding, Serializer } from './serializer';
import { transformKeys } from './transform-keys';

export class SerializerNativeImpl implements Serializer {
serialize(type: 'json', rawData: unknown): string | undefined;
serialize(type: Encoding, rawData: unknown): BodyInit | undefined {
if (rawData == undefined) {
return;
Expand Down
1 change: 1 addition & 0 deletions src/serializers/serializer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import type { BodyInit } from '@mastojs/ponyfills';
export type Encoding = 'none' | 'json' | 'form-url-encoded' | 'multipart-form';

export interface Serializer {
serialize(type: 'json', data: unknown): string | undefined;
serialize(type: Encoding, data: unknown): BodyInit | undefined;
serializeQueryString(data: unknown): string;
deserialize<T = Record<string, unknown>>(type: Encoding, data: unknown): T;
Expand Down

0 comments on commit fcdc5ec

Please sign in to comment.