Skip to content

Commit

Permalink
New "subscription" helper exported from library with abortSignal and …
Browse files Browse the repository at this point in the history
…abortController, specially useful for GraphQL Usage
  • Loading branch information
PabloSzx committed Jul 28, 2022
1 parent f189504 commit 17a8cf7
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 82 deletions.
5 changes: 5 additions & 0 deletions .changeset/thirty-bugs-rest.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@soundxyz/redis-pubsub": minor
---

New "subscription" helper exported from library with abortSignal and abortController, specially useful for GraphQL Usage
24 changes: 24 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -522,3 +522,27 @@ export function RedisPubSub({
publisher.disconnect();
}
}

export function subscription<T>(
asyncGenerator: (args: {
abortSignal: AbortSignal;
abortController: AbortController;
}) => AsyncGenerator<T>
) {
const abortController = new AbortController();

const asyncIterator = asyncGenerator({
abortController,
abortSignal: abortController.signal,
});

const asyncReturn = asyncIterator.return;

asyncIterator.return = () => {
abortController.abort();

return asyncReturn.call(asyncIterator, undefined);
};

return asyncIterator;
}
106 changes: 24 additions & 82 deletions test/graphql.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import SchemaBuilder from "@pothos/core";
import { ezWebSockets } from "@graphql-ez/plugin-websockets";
import { getPubsub } from "./helpers";
import { z } from "zod";
import * as timers from "timers/promises";
test("hello", async (t) => {
import { subscription } from "../src";
import { createDeferredPromise } from "../src/promise";

test("graphql subscription", async (t) => {
const pubsub = getPubsub();
const builder = new SchemaBuilder({});

Expand All @@ -26,69 +28,41 @@ test("hello", async (t) => {
},
});

const withCancel = <T>(
asyncGenerator: (args: { abortSignal: AbortSignal }) => AsyncGenerator<T>
) => {
const abortController = new AbortController();

const asyncIterator = asyncGenerator({
abortSignal: abortController.signal,
});
const asyncReturn = asyncIterator.return;

asyncIterator.return = () => {
console.log("cancel", asyncReturn);
abortController.abort();
const backendCleanedPromise = createDeferredPromise(2000);

return asyncReturn
? asyncReturn.call(asyncIterator, undefined)
: Promise.resolve({ value: undefined, done: true });
};

return asyncIterator;
};
const backendDataPromise = createDeferredPromise<number>(2000);

builder.subscriptionType({
fields(t) {
return {
test: t.int({
subscribe() {
return withCancel(async function* subscribe({ abortSignal }) {
return subscription(async function* ({ abortSignal }) {
try {
for await (const data of channel.subscribe({
abortSignal,
})) {
console.log({ data });
yield data;
backendDataPromise.resolve(data);
}
} finally {
console.log("cleaned");
backendCleanedPromise.resolve();
}
});
},

resolve(t) {
return t;
},
}),
};
},
});
const { assertedQuery, websockets } = await CreateTestClient(
const { assertedQuery, websockets, cleanup } = await CreateTestClient(
{
schema: builder.toSchema({}),
ez: {
plugins: [
ezWebSockets({
graphQLWS: {
onComplete() {
console.log("on complete");
},
onNext() {
console.log("on next");
},
},
}),
],
plugins: [ezWebSockets("new")],
},
},
{
Expand All @@ -100,61 +74,29 @@ test("hello", async (t) => {
}
);

const { __typename } = await assertedQuery("{__typename}");
t.teardown(cleanup);

console.log({
__typename,
});
const { __typename } = await assertedQuery("{__typename}");

t.true(true);
t.is(__typename, "Query");

const wsClient = await websockets.client;

const asd = wsClient.subscribe(
{
query: "subscription{test}",
},
{
complete() {
console.log("complete");
},
error(err) {
console.log("error", err);
},
next(value) {
console.log("Next", value);
},
}
);

// const subscribe = websockets.subscribe("subscription{test}");
const { iterator, unsubscribe } = websockets.subscribe<{ test: number }>("subscription{test}");

await channel.isReady();

await timers.setTimeout(100);

await channel.publish({ value: 1 });

await timers.setTimeout(100);

await asd();
// setTimeout(() => {
// subscribe.unsubscribe();
// }, 50);

// for await (const data of subscribe.iterator) {
// console.log({
// data,
// });
// }
let iteratorData: number | undefined;

// await timers.setTimeout(100);
for await (const { data } of iterator) {
iteratorData = data?.test;

// await subscribe.unsubscribe();
unsubscribe().catch(t.fail);
}

// const client = await websockets.client;
t.is(iteratorData, 1);

// await client.dispose();
await backendCleanedPromise.promise;

await timers.setTimeout(2000);
t.is(await backendDataPromise.promise, 1);
});

0 comments on commit 17a8cf7

Please sign in to comment.