Skip to content

Commit

Permalink
feat: Add setDirection and getDirection to paginator
Browse files Browse the repository at this point in the history
  • Loading branch information
neet committed Jul 27, 2023
1 parent 2864d17 commit 2ad2da7
Show file tree
Hide file tree
Showing 7 changed files with 61 additions and 40 deletions.
12 changes: 8 additions & 4 deletions src/adapters/action/paginator-http.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,20 @@ describe("PaginatorHttp", () => {
});
});

it("paginates with opposite direction when minId is specified", async () => {
it("paginates with opposite direction when prev direction was set", async () => {
http.request.mockReturnValue({
headers: new Headers({
link: '<https://mastodon.social/api/v1/timelines/home?max_id=109382006402042919>; rel="next", <https://mastodon.social/api/v1/timelines/home?min_id=109382039876197520>; rel="prev"',
}),
data: [],
});
const paginator = new PaginatorHttp(http, "/v1/api/timelines", {
minId: 1,
});

let paginator = new PaginatorHttp(http, "/v1/api/timelines");
expect(paginator.getDirection()).toBe("next");

paginator = paginator.setDirection("prev");
expect(paginator.getDirection()).toBe("prev");

await paginator.next();
await paginator.next();
expect(http.request).toBeCalledWith({
Expand Down
32 changes: 16 additions & 16 deletions src/adapters/action/paginator-http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,17 @@ import parseLinkHeader from "parse-link-header";
import { type Http, type HttpMetaParams } from "../../interfaces";
import { type mastodon } from "../../mastodon";

type Rel = "next" | "prev";

export class PaginatorHttp<Entity, Params = undefined>
implements mastodon.Paginator<Entity, Params>
{
private readonly rel: Rel;
private direction: mastodon.Direction = "next";

constructor(
private readonly http: Http,
private nextPath?: string,
private nextParams?: Params | string,
private readonly meta?: HttpMetaParams,
) {
const hasMinId =
nextParams && typeof nextParams === "object" && "minId" in nextParams;
this.rel = hasMinId ? "prev" : "next";
}
) {}

async next(): Promise<IteratorResult<Entity, undefined>> {
if (this.nextPath == undefined) {
Expand All @@ -38,15 +32,11 @@ export class PaginatorHttp<Entity, Params = undefined>
this.nextPath = nextUrl?.pathname;
this.nextParams = nextUrl?.search.replace(/^\?/, "");

const data = response.data as Entity | undefined;
const value =
this.rel === "prev" && Array.isArray(data)
? data.reverse()
: response.data;
const data = (await response.data) as Entity;

return {
done: false,
value: value as Entity,
value: data,
};
}

Expand Down Expand Up @@ -81,6 +71,16 @@ export class PaginatorHttp<Entity, Params = undefined>
return this[Symbol.asyncIterator]();
}

getDirection(): mastodon.Direction {
return this.direction;
}

setDirection(direction: mastodon.Direction): PaginatorHttp<Entity, Params> {
const that = this.clone();
that.direction = direction;
return that;
}

[Symbol.asyncIterator](): AsyncIterator<
Entity,
undefined,
Expand All @@ -98,7 +98,7 @@ export class PaginatorHttp<Entity, Params = undefined>
return;
}

const parsed = parseLinkHeader(value)?.[this.rel]?.url;
const parsed = parseLinkHeader(value)?.[this.direction]?.url;
if (parsed == undefined) {
return;
}
Expand All @@ -111,7 +111,7 @@ export class PaginatorHttp<Entity, Params = undefined>
this.nextParams = undefined;
}

clone(): mastodon.Paginator<Entity, Params> {
clone(): PaginatorHttp<Entity, Params> {
return new PaginatorHttp(
this.http,
this.nextPath,
Expand Down
24 changes: 16 additions & 8 deletions src/adapters/ws/web-socket-subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
import { type mastodon } from "../../mastodon";
import { MastoUnexpectedError } from "../errors";
import { toAsyncIterable } from "./async-iterable";
import { waitForOpen } from "./wait-for-events";

export class WebSocketSubscription implements mastodon.streaming.Subscription {
private connection?: WebSocket;
Expand Down Expand Up @@ -37,7 +38,7 @@ export class WebSocketSubscription implements mastodon.streaming.Subscription {
this.logger?.debug("↑ WEBSOCKET", data);
this.connection.send(data);

for await (const event of this.mapEvents(messages)) {
for await (const event of this.transformIntoEvents(messages)) {
if (!this.matches(event)) continue;
this.logger?.debug("↓ WEBSOCKET", event);
yield event;
Expand All @@ -59,20 +60,27 @@ export class WebSocketSubscription implements mastodon.streaming.Subscription {
this.connection.send(data);
}

matches(event: mastodon.streaming.Event): boolean {
[Symbol.asyncIterator](): AsyncIterableIterator<mastodon.streaming.Event> {
return this.values();
}

async waitForOpen(): Promise<void> {
this.connection = await this.connector.acquire();
await waitForOpen(this.connection);
}

private matches(event: mastodon.streaming.Event): boolean {
// subscribe("hashtag", { tag: "foo" }) -> ["hashtag", "foo"]
// subscribe("list", { list: "foo" }) -> ["list", "foo"]
// subscribe("list", { list: "foo" }) -> ["list", "foo"]
const params = this.params ?? {};
const extra = Object.values(params) as string[];
const stream = [this.stream, ...extra];
return stream.every((s) => event.stream.includes(s));
}

[Symbol.asyncIterator](): AsyncIterableIterator<mastodon.streaming.Event> {
return this.values();
}

private async *mapEvents(messages: AsyncIterable<WebSocket.MessageEvent>) {
private async *transformIntoEvents(
messages: AsyncIterable<WebSocket.MessageEvent>,
) {
for await (const message of messages) {
const event = await this.parseMessage(message.data as string);
yield event;
Expand Down
5 changes: 5 additions & 0 deletions src/mastodon/paginator.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
export type Direction = "next" | "prev";

// eslint-disable-next-line prettier/prettier
export interface Paginator<Entity, Params = undefined> extends PromiseLike<Entity> {
getDirection(): Direction;
setDirection(direction: Direction): void;

clone(): Paginator<Entity, Params>;

next(params?: Params | string): Promise<IteratorResult<Entity, undefined>>;
Expand Down
2 changes: 2 additions & 0 deletions src/mastodon/streaming/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ export interface SubscribeHashtagParams {
export interface Subscription {
unsubscribe(): void;
values(): AsyncIterableIterator<Event>;
/** @internal */
waitForOpen(): Promise<void>;
[Symbol.asyncIterator](): AsyncIterator<Event, undefined>;
}

Expand Down
9 changes: 5 additions & 4 deletions tests/streaming/events.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ describe("events", () => {
return sessions.use(async (session) => {
const tag = `tag_${crypto.randomBytes(4).toString("hex")}`;
const subscription = session.ws.hashtag.local.subscribe({ tag });

await subscription.waitForOpen();
const eventsPromise = subscription.values().take(3).toArray();

await sleep(1000);
Expand Down Expand Up @@ -39,14 +39,15 @@ describe("events", () => {
it.concurrent("streams filters_changed event", () => {
return sessions.use(async (session) => {
const subscription = session.ws.user.subscribe();
await subscription.waitForOpen();
const eventsPromise = subscription.values().take(1).toArray();

await sleep(1000);
const filter = await session.rest.v2.filters.create({
title: "test",
context: ["public"],
keywordsAttributes: [{ keyword: "TypeScript" }],
});
await sleep(1000);
await session.rest.v2.filters.$select(filter.id).remove();

try {
Expand All @@ -62,9 +63,9 @@ describe("events", () => {
it.concurrent("streams notification", () => {
return sessions.use(2, async ([alice, bob]) => {
const subscription = alice.ws.user.notification.subscribe();
await subscription.waitForOpen();
const eventsPromise = subscription.values().take(1).toArray();

await sleep(1000);
await bob.rest.v1.accounts.$select(alice.id).follow();

try {
Expand All @@ -81,9 +82,9 @@ describe("events", () => {
it.concurrent("streams conversation", () => {
return sessions.use(2, async ([alice, bob]) => {
const subscription = alice.ws.direct.subscribe();
await subscription.waitForOpen();
const eventsPromise = subscription.values().take(1).toArray();

await sleep(1000);
const status = await bob.rest.v1.statuses.create({
status: `@${alice.acct} Hello there`,
visibility: "direct",
Expand Down
17 changes: 9 additions & 8 deletions tests/streaming/timelines.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ describe("websocket", () => {
return sessions.use(async (session) => {
const random = crypto.randomBytes(16).toString("hex");
const subscription = session.ws.public.subscribe();
await subscription.waitForOpen();

const eventsPromise = subscription
.values()
Expand All @@ -21,7 +22,6 @@ describe("websocket", () => {
.take(1)
.toArray();

await sleep(1000);
const status = await session.rest.v1.statuses.create({
status: random,
});
Expand All @@ -40,6 +40,7 @@ describe("websocket", () => {
return sessions.use(async (session) => {
const random = crypto.randomBytes(16).toString("hex");
const subscription = session.ws.public.media.subscribe();
await subscription.waitForOpen();

const eventsPromise = subscription
.values()
Expand All @@ -50,7 +51,6 @@ describe("websocket", () => {
.take(1)
.toArray();

await sleep(1000);
const media = await session.rest.v2.media.create({
file: TRANSPARENT_1X1_PNG,
});
Expand All @@ -74,6 +74,7 @@ describe("websocket", () => {
return sessions.use(async (session) => {
const random = crypto.randomBytes(16).toString("hex");
const subscription = session.ws.public.local.subscribe();
await subscription.waitForOpen();

const eventsPromise = subscription
.values()
Expand All @@ -84,7 +85,6 @@ describe("websocket", () => {
.take(1)
.toArray();

await sleep(1000);
const status = await session.rest.v1.statuses.create({
status: random,
visibility: "public",
Expand All @@ -104,6 +104,7 @@ describe("websocket", () => {
return sessions.use(async (session) => {
const random = crypto.randomBytes(16).toString("hex");
const subscription = session.ws.public.local.media.subscribe();
await subscription.waitForOpen();

const eventsPromise = subscription
.values()
Expand All @@ -114,7 +115,6 @@ describe("websocket", () => {
.take(1)
.toArray();

await sleep(1000);
const media = await session.rest.v2.media.create({
file: TRANSPARENT_1X1_PNG,
});
Expand All @@ -139,6 +139,7 @@ describe("websocket", () => {
return sessions.use(async (session) => {
const hashtag = `tag_${crypto.randomBytes(4).toString("hex")}`;
const subscription = session.ws.hashtag.subscribe({ tag: hashtag });
await subscription.waitForOpen();

const eventsPromise = subscription
.values()
Expand All @@ -149,7 +150,6 @@ describe("websocket", () => {
.take(1)
.toArray();

await sleep(1000);
const status = await session.rest.v1.statuses.create({
status: "#" + hashtag,
});
Expand All @@ -170,6 +170,7 @@ describe("websocket", () => {
const subscription = session.ws.hashtag.local.subscribe({
tag: hashtag,
});
await subscription.waitForOpen();

const eventsPromise = subscription
.values()
Expand All @@ -180,7 +181,6 @@ describe("websocket", () => {
.take(1)
.toArray();

await sleep(1000);
const status = await session.rest.v1.statuses.create({
status: "#" + hashtag,
});
Expand All @@ -198,6 +198,7 @@ describe("websocket", () => {
it("streams user", () => {
return sessions.use(2, async ([alice, bob]) => {
const subscription = alice.ws.user.subscribe();
await subscription.waitForOpen();

const eventsPromise = subscription
.values()
Expand All @@ -208,7 +209,6 @@ describe("websocket", () => {
.take(1)
.toArray();

await sleep(1000);
await bob.rest.v1.accounts.$select(alice.id).unfollow();
await bob.rest.v1.accounts.$select(alice.id).follow();

Expand All @@ -226,6 +226,7 @@ describe("websocket", () => {
it("streams user:notification", () => {
return sessions.use(2, async ([alice, bob]) => {
const subscription = alice.ws.user.notification.subscribe();
await subscription.waitForOpen();

const eventsPromise = subscription
.values()
Expand All @@ -237,7 +238,6 @@ describe("websocket", () => {
.take(1)
.toArray();

await sleep(1000);
await bob.rest.v1.accounts.$select(alice.id).follow();

try {
Expand All @@ -255,6 +255,7 @@ describe("websocket", () => {
return sessions.use(2, async ([alice, bob]) => {
const list = await alice.rest.v1.lists.create({ title: "test" });
const subscription = alice.ws.list.subscribe({ list: list.id });
await subscription.waitForOpen();

const eventsPromise = subscription
.values()
Expand Down

0 comments on commit 2ad2da7

Please sign in to comment.