diff --git a/README.md b/README.md index 80ad6b2a..5be86a11 100644 --- a/README.md +++ b/README.md @@ -3,8 +3,28 @@ This is a fork of [subscriptionless](https://github.com/andyrichardson/subscriptionless) that is built to work with [Architect](https://arc.codes) and tested with the [Architect Sandbox](https://arc.codes/docs/en/reference/cli/sandbox). There's no reason why it wont work with Serverless or other deploy tools but their support is not a goal. +## API -# Old Readme +### `subscribe(topic: string, options?: SubscribeOptions): SubscribePseudoIterable` + +Subscribe is the most important method in the library. It's the primary difference between `graphql-ws` and `graphql-lambda-subscriptions`. It returns a `SubscribePseudoIterable` that pretends to be an async iterator that you put on the `subscribe` resolver for your Subscription. In reality it includes a few properties that we use to subscribe to events and fire lifecycle functions. + +```ts +interface SubscribeOptions { + filter?: (...args: TSubscribeArgs) => MaybePromise|void>; + onSubscribe?: (...args: TSubscribeArgs) => MaybePromise; + onAfterSubscribe?: (...args: TSubscribeArgs) => MaybePromise; + onComplete?: (...args: TSubscribeArgs) => MaybePromise; +} +``` + +- `topic`: The you subscribe to the topic and can filter based upon the topics payload. +- `filter`: An object that the payload will be matched against (or a function that produces the object). If the payload's field matches the subscription will receive the event. If the payload is missing the field the subscription will receive the event. +- `onSubscribe`: A function that gets the subscription information (like arguments) it can throw if you don't want the subscription to subscribe. +- `onAfterSubscribe`: A function that gets the subscription information (like arguments) and can fire initial events or record information. +- `onComplete`: A function that fires at least once when a connection disconnects, a client sends a "complete" message, or the server sends a "complete" message. Because of the nature of aws lambda, it's possible for a client to send a "complete" message and disconnect and those events executing on lambda out of order. Which why this function can be called up to twice. + +## Old Readme ## About @@ -282,7 +302,7 @@ Wrap any `subscribe` function call in a `withFilter` to provide filter condition > Note: If a function is provided, it will be called **on subscription start** and must return a serializable object. ```ts -import { withFilter, subscribe } from 'subscriptionless/subscribe'; +import { subscribe } from 'subscriptionless/subscribe'; // Subscription agnostic filter withFilter(subscribe('MY_TOPIC'), { diff --git a/lib/messages/complete.ts b/lib/messages/complete.ts index 32fd8d3d..bd7c0e58 100644 --- a/lib/messages/complete.ts +++ b/lib/messages/complete.ts @@ -12,6 +12,7 @@ import { isArray } from '../utils/isArray' /** Handler function for 'complete' message. */ export const complete: MessageHandler = async ({ server, event, message }) => { + server.log('messages:complete', { connectionId: event.requestContext.connectionId }) try { const topicSubscriptions = await collect(server.mapper.query(server.model.Subscription, { id: `${event.requestContext.connectionId}|${message.id}`, @@ -38,12 +39,12 @@ export const complete: MessageHandler = const [field, root, args, context, info] = getResolverAndArgs(server)(execContext) const onComplete = (field?.subscribe as SubscribePseudoIterable)?.onComplete - if (onComplete) { - await onComplete(root, args, context, info) - } + server.log('messages:complete:onComplete', { onComplete: !!onComplete }) + await onComplete?.(root, args, context, info) await Promise.all(topicSubscriptions.map(sub => server.mapper.delete(sub))) } catch (err) { + server.log('messages:complete:onError', { err, event }) await server.onError?.(err, { event, message }) await deleteConnection(server)(event.requestContext) } diff --git a/lib/messages/disconnect.ts b/lib/messages/disconnect.ts index 8c078f67..5f4c46b7 100644 --- a/lib/messages/disconnect.ts +++ b/lib/messages/disconnect.ts @@ -10,67 +10,67 @@ import { collect } from 'streaming-iterables' import { Connection } from '../model/Connection' /** Handler function for 'disconnect' message. */ -export const disconnect: MessageHandler = - async ({ server, event }) => { - try { - await server.onDisconnect?.({ event }) +export const disconnect: MessageHandler = async ({ server, event }) => { + server.log('messages:disconnect', { connectionId: event.requestContext.connectionId }) + try { + await server.onDisconnect?.({ event }) - const topicSubscriptions = await collect(server.mapper.query( - server.model.Subscription, - { - connectionId: equals(event.requestContext.connectionId), - }, - { indexName: 'ConnectionIndex' }, - )) + const topicSubscriptions = await collect(server.mapper.query( + server.model.Subscription, + { + connectionId: equals(event.requestContext.connectionId), + }, + { indexName: 'ConnectionIndex' }, + )) - const completed = {} as Record - const deletions = [] as Promise[] - for (const sub of topicSubscriptions) { - deletions.push( - (async () => { - // only call onComplete per subscription - if (!completed[sub.subscriptionId]) { - completed[sub.subscriptionId] = true + const completed = {} as Record + const deletions = [] as Promise[] + for (const sub of topicSubscriptions) { + deletions.push( + (async () => { + // only call onComplete per subscription + if (!completed[sub.subscriptionId]) { + completed[sub.subscriptionId] = true - const execContext = buildExecutionContext( - server.schema, - parse(sub.subscription.query), - undefined, - await constructContext({ server, connectionParams: sub.connectionParams, connectionId: sub.connectionId }), - sub.subscription.variables, - sub.subscription.operationName, - undefined, - ) - - if (isArray(execContext)) { - throw new AggregateError(execContext) - } + const execContext = buildExecutionContext( + server.schema, + parse(sub.subscription.query), + undefined, + await constructContext({ server, connectionParams: sub.connectionParams, connectionId: sub.connectionId }), + sub.subscription.variables, + sub.subscription.operationName, + undefined, + ) + if (isArray(execContext)) { + throw new AggregateError(execContext) + } - const [field, root, args, context, info] = getResolverAndArgs(server)(execContext) - const onComplete = (field?.subscribe as SubscribePseudoIterable)?.onComplete - if (onComplete) { - await onComplete(root, args, context, info) - } - } + const [field, root, args, context, info] = getResolverAndArgs(server)(execContext) - await server.mapper.delete(sub) - })(), - ) - } + const onComplete = (field?.subscribe as SubscribePseudoIterable)?.onComplete + server.log('messages:disconnect:onComplete', { onComplete: !!onComplete }) + await onComplete?.(root, args, context, info) + } - await Promise.all([ - // Delete subscriptions - ...deletions, - // Delete connection - server.mapper.delete( - Object.assign(new server.model.Connection(), { - id: event.requestContext.connectionId, - }), - ), - ]) - } catch (err) { - await server.onError?.(err, { event }) + await server.mapper.delete(sub) + })(), + ) } + + await Promise.all([ + // Delete subscriptions + ...deletions, + // Delete connection + server.mapper.delete( + Object.assign(new server.model.Connection(), { + id: event.requestContext.connectionId, + }), + ), + ]) + } catch (err) { + server.log('messages:disconnect:onError', { err, event }) + await server.onError?.(err, { event }) } +} diff --git a/lib/pubsub/complete.ts b/lib/pubsub/complete.ts index eb3a0842..d02226e2 100644 --- a/lib/pubsub/complete.ts +++ b/lib/pubsub/complete.ts @@ -41,10 +41,8 @@ export const complete = (server: ServerClosure): ServerInstance['complete'] => a const [field, root, args, context, info] = getResolverAndArgs(server)(execContext) const onComplete = (field?.subscribe as SubscribePseudoIterable)?.onComplete - if (onComplete) { - await onComplete(root, args, context, info) - } - + server.log('pubsub:complete:onComplete', { onComplete: !!onComplete }) + await onComplete?.(root, args, context, info) }) await Promise.all(iters) } diff --git a/lib/test/execute-helper.ts b/lib/test/execute-helper.ts index bd2baf24..dff3a00d 100644 --- a/lib/test/execute-helper.ts +++ b/lib/test/execute-helper.ts @@ -23,15 +23,17 @@ export const executeQuery = async (query: string): Promise => { }) } -type SubscriptionResult = Promise<{ +type SubscriptionResult = { values: AsyncGenerator unsubscribe: () => void -}> + close: () => Promise | void +} -export const executeSubscription = async (query: string): SubscriptionResult => { +export const executeSubscription = (query: string, { lazy }: {lazy?: boolean} = {}): SubscriptionResult => { const client = createClient({ url, webSocketImpl: WebSocket, + lazy, }) const values = deferGenerator() @@ -50,5 +52,7 @@ export const executeSubscription = async (query: string): SubscriptionResult => }, ) - return { values: values.generator, unsubscribe } + const close = () => client.dispose() + + return { values: values.generator, unsubscribe, close } } diff --git a/lib/test/integration-events-test.ts b/lib/test/integration-events-test.ts index 206a6d70..c7d8a9f2 100644 --- a/lib/test/integration-events-test.ts +++ b/lib/test/integration-events-test.ts @@ -19,7 +19,7 @@ describe('Events', () => { }) it('subscribes', async () => { - const { values } = await executeSubscription('subscription { greetings }') + const { values } = executeSubscription('subscription { greetings }') const greetings = await collect(map((value: { greetings: string }) => value.greetings, values)) assert.deepEqual(greetings, ['yoyo', 'hows it', 'howdy']) }) @@ -27,9 +27,48 @@ describe('Events', () => { describe('Filter Events', () => { it('subscribes', async () => { - const { values } = await executeSubscription('subscription { filterTest }') + const { values } = executeSubscription('subscription { filterTest }') const greetings = await collect(map((value: { filterTest: string }) => value.filterTest, values)) assert.deepEqual(greetings, ['oh yes!', 'Missing fields also work']) }) }) + + describe('onComplete', () => { + it('fires when the client disconnects at least once', async () => { + const { values } = executeSubscription('subscription { onCompleteSideChannel }') + assert.deepEqual(await values.next(), { done: false, value: { onCompleteSideChannel: 'start' } }) + const { unsubscribe } = executeSubscription('subscription { onCompleteTestClientDisconnect }') + assert.deepEqual(await values.next(), { done: false, value: { onCompleteSideChannel: 'subscribed' } }) + unsubscribe() + assert.deepEqual((await collect(values))[0], { onCompleteSideChannel: 'onComplete' }) + }) + it('fires when the client completes', async () => { + // non lazy connections don't disconnect when unsubscribed + const { values, close } = executeSubscription('subscription { onCompleteSideChannel }', { lazy: false }) + assert.deepEqual(await values.next(), { done: false, value: { onCompleteSideChannel: 'start' } }) + const { unsubscribe } = executeSubscription('subscription { onCompleteTestClientDisconnect }') + assert.deepEqual(await values.next(), { done: false, value: { onCompleteSideChannel: 'subscribed' } }) + unsubscribe() + assert.deepEqual((await collect(values)), [{ onCompleteSideChannel: 'onComplete' }]) + await close() + }) + // confirm behavior with graphql-ws but we don't currently error + // it('fires when the resolver errors', async () => { + // const { values } = executeSubscription('subscription { onCompleteSideChannel }') + // assert.deepEqual(await values.next(), { done: false, value: { onCompleteSideChannel: 'start' } }) + // const { values: errorValuesGen } = executeSubscription('subscription { onCompleteTestResolverError }') + // assert.deepEqual(await collect(errorValuesGen), []) + // assert.deepEqual(await collect(values), [{ onCompleteSideChannel: 'onComplete' }]) + // }) + it('fires when the server completes', async () => { + const { values } = executeSubscription('subscription { onCompleteSideChannel }') + assert.deepEqual(await values.next(), { done: false, value: { onCompleteSideChannel: 'start' } }) + const { values: completeValuesGen } = executeSubscription('subscription { onCompleteServerComplete }') + assert.deepEqual(await collect(completeValuesGen), []) + assert.deepEqual((await collect(values)), [ + { onCompleteSideChannel: 'subscribed' }, + { onCompleteSideChannel: 'onComplete' }, + ]) + }) + }) }) diff --git a/lib/types.ts b/lib/types.ts index 55d76b53..1247d293 100644 --- a/lib/types.ts +++ b/lib/types.ts @@ -74,7 +74,7 @@ export type SubscribeArgs, TContext = a export type SubscriptionFilter< TSubscribeArgs extends SubscribeArgs = SubscribeArgs, TReturn extends Record = Record -> = Partial | void | ((...args: TSubscribeArgs) => MaybePromise> | MaybePromise>) +> = Partial | void | ((...args: TSubscribeArgs) => MaybePromise | void>) export type SubscriptionDefinition< T extends PubSubEvent, @@ -90,8 +90,8 @@ export type SubscribePseudoIterable topicDefinitions: SubscriptionDefinition[] onSubscribe?: (...args: TSubscribeArgs) => MaybePromise - onComplete?: (...args: TSubscribeArgs) => MaybePromise onAfterSubscribe?: (...args: TSubscribeArgs) => MaybePromise + onComplete?: (...args: TSubscribeArgs) => MaybePromise } diff --git a/mocks/arc-basic-events/lib/graphql.js b/mocks/arc-basic-events/lib/graphql.js index a7050d3f..3f73f1a2 100644 --- a/mocks/arc-basic-events/lib/graphql.js +++ b/mocks/arc-basic-events/lib/graphql.js @@ -30,6 +30,10 @@ const typeDefs = ` type Subscription { greetings: String filterTest: String + onCompleteSideChannel: String + onCompleteTestClientDisconnect: String + onCompleteTestResolverError: String + onCompleteServerComplete: String } ` @@ -65,7 +69,60 @@ const resolvers = { await complete({ topic: 'filterTest' }) }, }), - resolve: ({ payload }) => { + resolve({ payload }) { + return payload.message + }, + }, + onCompleteTestClientDisconnect: { + subscribe: subscribe('onCompleteTestResolverError', { + async onComplete(_, __, { publish, complete }){ + await publish({ topic: 'onCompleteSideChannel', payload: { message: 'onComplete' } }) + await complete({ topic: 'onCompleteSideChannel' }) + }, + async onAfterSubscribe(_, __, { publish }) { + await publish({ topic: 'onCompleteSideChannel', payload: { message: 'subscribed' } }) + }, + }), + resolve({ payload }) { + return payload.message + }, + }, + onCompleteTestResolverError: { + subscribe: subscribe('onCompleteTestResolverError', { + async onComplete(_, __, { publish, complete }){ + await publish({ topic: 'onCompleteSideChannel', payload: { message: 'onComplete' } }) + await complete({ topic: 'onCompleteSideChannel' }) + }, + async onAfterSubscribe(_, __, { publish }) { + await publish({ topic: 'onCompleteTestResolverError', payload: { message: 'doesnt really matter does it' } }) + }, + }), + resolve() { + throw new Error('oh no!') + }, + }, + onCompleteServerComplete: { + subscribe: subscribe('onCompleteServerComplete', { + async onComplete(_, __, { publish, complete }){ + await publish({ topic: 'onCompleteSideChannel', payload: { message: 'onComplete' } }) + await complete({ topic: 'onCompleteSideChannel' }) + }, + async onAfterSubscribe(_, __, { publish, complete }) { + await publish({ topic: 'onCompleteSideChannel', payload: { message: 'subscribed' } }) + await complete({ topic: 'onCompleteServerComplete' }) + }, + }), + resolve({ payload }) { + return payload.message + }, + }, + onCompleteSideChannel: { + subscribe: subscribe('onCompleteSideChannel', { + async onAfterSubscribe(_, __, { publish }) { + await publish({ topic: 'onCompleteSideChannel', payload: { message: 'start' } }) + }, + }), + resolve({ payload }) { return payload.message }, },