Skip to content
This repository was archived by the owner on May 17, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 22 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,28 @@

This is a fork of [subscriptionless](https://github.com/andyrichardson/subscriptionless) that is built to work with [Architect](https://arc.codes) and tested with the [Architect Sandbox](https://arc.codes/docs/en/reference/cli/sandbox). There's no reason why it wont work with Serverless or other deploy tools but their support is not a goal.

## API

# Old Readme
### `subscribe(topic: string, options?: SubscribeOptions): SubscribePseudoIterable`

Subscribe is the most important method in the library. It's the primary difference between `graphql-ws` and `graphql-lambda-subscriptions`. It returns a `SubscribePseudoIterable` that pretends to be an async iterator that you put on the `subscribe` resolver for your Subscription. In reality it includes a few properties that we use to subscribe to events and fire lifecycle functions.

```ts
interface SubscribeOptions {
filter?: (...args: TSubscribeArgs) => MaybePromise<Partial<Payload>|void>;
onSubscribe?: (...args: TSubscribeArgs) => MaybePromise<void>;
onAfterSubscribe?: (...args: TSubscribeArgs) => MaybePromise<void>;
onComplete?: (...args: TSubscribeArgs) => MaybePromise<void>;
}
```

- `topic`: The you subscribe to the topic and can filter based upon the topics payload.
- `filter`: An object that the payload will be matched against (or a function that produces the object). If the payload's field matches the subscription will receive the event. If the payload is missing the field the subscription will receive the event.
- `onSubscribe`: A function that gets the subscription information (like arguments) it can throw if you don't want the subscription to subscribe.
- `onAfterSubscribe`: A function that gets the subscription information (like arguments) and can fire initial events or record information.
- `onComplete`: A function that fires at least once when a connection disconnects, a client sends a "complete" message, or the server sends a "complete" message. Because of the nature of aws lambda, it's possible for a client to send a "complete" message and disconnect and those events executing on lambda out of order. Which why this function can be called up to twice.

## Old Readme

## About

Expand Down Expand Up @@ -282,7 +302,7 @@ Wrap any `subscribe` function call in a `withFilter` to provide filter condition
> Note: If a function is provided, it will be called **on subscription start** and must return a serializable object.

```ts
import { withFilter, subscribe } from 'subscriptionless/subscribe';
import { subscribe } from 'subscriptionless/subscribe';

// Subscription agnostic filter
withFilter(subscribe('MY_TOPIC'), {
Expand Down
7 changes: 4 additions & 3 deletions lib/messages/complete.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { isArray } from '../utils/isArray'
/** Handler function for 'complete' message. */
export const complete: MessageHandler<CompleteMessage> =
async ({ server, event, message }) => {
server.log('messages:complete', { connectionId: event.requestContext.connectionId })
try {
const topicSubscriptions = await collect(server.mapper.query(server.model.Subscription, {
id: `${event.requestContext.connectionId}|${message.id}`,
Expand All @@ -38,12 +39,12 @@ export const complete: MessageHandler<CompleteMessage> =
const [field, root, args, context, info] = getResolverAndArgs(server)(execContext)

const onComplete = (field?.subscribe as SubscribePseudoIterable<PubSubEvent>)?.onComplete
if (onComplete) {
await onComplete(root, args, context, info)
}
server.log('messages:complete:onComplete', { onComplete: !!onComplete })
await onComplete?.(root, args, context, info)

await Promise.all(topicSubscriptions.map(sub => server.mapper.delete(sub)))
} catch (err) {
server.log('messages:complete:onError', { err, event })
await server.onError?.(err, { event, message })
await deleteConnection(server)(event.requestContext)
}
Expand Down
108 changes: 54 additions & 54 deletions lib/messages/disconnect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,67 +10,67 @@ import { collect } from 'streaming-iterables'
import { Connection } from '../model/Connection'

/** Handler function for 'disconnect' message. */
export const disconnect: MessageHandler<null> =
async ({ server, event }) => {
try {
await server.onDisconnect?.({ event })
export const disconnect: MessageHandler<null> = async ({ server, event }) => {
server.log('messages:disconnect', { connectionId: event.requestContext.connectionId })
try {
await server.onDisconnect?.({ event })

const topicSubscriptions = await collect(server.mapper.query(
server.model.Subscription,
{
connectionId: equals(event.requestContext.connectionId),
},
{ indexName: 'ConnectionIndex' },
))
const topicSubscriptions = await collect(server.mapper.query(
server.model.Subscription,
{
connectionId: equals(event.requestContext.connectionId),
},
{ indexName: 'ConnectionIndex' },
))

const completed = {} as Record<string, boolean>
const deletions = [] as Promise<void|Connection>[]
for (const sub of topicSubscriptions) {
deletions.push(
(async () => {
// only call onComplete per subscription
if (!completed[sub.subscriptionId]) {
completed[sub.subscriptionId] = true
const completed = {} as Record<string, boolean>
const deletions = [] as Promise<void|Connection>[]
for (const sub of topicSubscriptions) {
deletions.push(
(async () => {
// only call onComplete per subscription
if (!completed[sub.subscriptionId]) {
completed[sub.subscriptionId] = true

const execContext = buildExecutionContext(
server.schema,
parse(sub.subscription.query),
undefined,
await constructContext({ server, connectionParams: sub.connectionParams, connectionId: sub.connectionId }),
sub.subscription.variables,
sub.subscription.operationName,
undefined,
)

if (isArray(execContext)) {
throw new AggregateError(execContext)
}
const execContext = buildExecutionContext(
server.schema,
parse(sub.subscription.query),
undefined,
await constructContext({ server, connectionParams: sub.connectionParams, connectionId: sub.connectionId }),
sub.subscription.variables,
sub.subscription.operationName,
undefined,
)

if (isArray(execContext)) {
throw new AggregateError(execContext)
}

const [field, root, args, context, info] = getResolverAndArgs(server)(execContext)

const onComplete = (field?.subscribe as SubscribePseudoIterable<PubSubEvent>)?.onComplete
if (onComplete) {
await onComplete(root, args, context, info)
}
}
const [field, root, args, context, info] = getResolverAndArgs(server)(execContext)

await server.mapper.delete(sub)
})(),
)
}
const onComplete = (field?.subscribe as SubscribePseudoIterable<PubSubEvent>)?.onComplete
server.log('messages:disconnect:onComplete', { onComplete: !!onComplete })
await onComplete?.(root, args, context, info)
}

await Promise.all([
// Delete subscriptions
...deletions,
// Delete connection
server.mapper.delete(
Object.assign(new server.model.Connection(), {
id: event.requestContext.connectionId,
}),
),
])
} catch (err) {
await server.onError?.(err, { event })
await server.mapper.delete(sub)
})(),
)
}

await Promise.all([
// Delete subscriptions
...deletions,
// Delete connection
server.mapper.delete(
Object.assign(new server.model.Connection(), {
id: event.requestContext.connectionId,
}),
),
])
} catch (err) {
server.log('messages:disconnect:onError', { err, event })
await server.onError?.(err, { event })
}
}
6 changes: 2 additions & 4 deletions lib/pubsub/complete.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,8 @@ export const complete = (server: ServerClosure): ServerInstance['complete'] => a
const [field, root, args, context, info] = getResolverAndArgs(server)(execContext)

const onComplete = (field?.subscribe as SubscribePseudoIterable<PubSubEvent>)?.onComplete
if (onComplete) {
await onComplete(root, args, context, info)
}

server.log('pubsub:complete:onComplete', { onComplete: !!onComplete })
await onComplete?.(root, args, context, info)
})
await Promise.all(iters)
}
12 changes: 8 additions & 4 deletions lib/test/execute-helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,17 @@ export const executeQuery = async (query: string): Promise<unknown> => {
})
}

type SubscriptionResult = Promise<{
type SubscriptionResult = {
values: AsyncGenerator<unknown, unknown, unknown>
unsubscribe: () => void
}>
close: () => Promise<void> | void
}

export const executeSubscription = async (query: string): SubscriptionResult => {
export const executeSubscription = (query: string, { lazy }: {lazy?: boolean} = {}): SubscriptionResult => {
const client = createClient({
url,
webSocketImpl: WebSocket,
lazy,
})

const values = deferGenerator()
Expand All @@ -50,5 +52,7 @@ export const executeSubscription = async (query: string): SubscriptionResult =>
},
)

return { values: values.generator, unsubscribe }
const close = () => client.dispose()

return { values: values.generator, unsubscribe, close }
}
43 changes: 41 additions & 2 deletions lib/test/integration-events-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,56 @@ describe('Events', () => {
})

it('subscribes', async () => {
const { values } = await executeSubscription('subscription { greetings }')
const { values } = executeSubscription('subscription { greetings }')
const greetings = await collect(map((value: { greetings: string }) => value.greetings, values))
assert.deepEqual(greetings, ['yoyo', 'hows it', 'howdy'])
})
})

describe('Filter Events', () => {
it('subscribes', async () => {
const { values } = await executeSubscription('subscription { filterTest }')
const { values } = executeSubscription('subscription { filterTest }')
const greetings = await collect(map((value: { filterTest: string }) => value.filterTest, values))
assert.deepEqual(greetings, ['oh yes!', 'Missing fields also work'])
})
})

describe('onComplete', () => {
it('fires when the client disconnects at least once', async () => {
const { values } = executeSubscription('subscription { onCompleteSideChannel }')
assert.deepEqual(await values.next(), { done: false, value: { onCompleteSideChannel: 'start' } })
const { unsubscribe } = executeSubscription('subscription { onCompleteTestClientDisconnect }')
assert.deepEqual(await values.next(), { done: false, value: { onCompleteSideChannel: 'subscribed' } })
unsubscribe()
assert.deepEqual((await collect(values))[0], { onCompleteSideChannel: 'onComplete' })
})
it('fires when the client completes', async () => {
// non lazy connections don't disconnect when unsubscribed
const { values, close } = executeSubscription('subscription { onCompleteSideChannel }', { lazy: false })
assert.deepEqual(await values.next(), { done: false, value: { onCompleteSideChannel: 'start' } })
const { unsubscribe } = executeSubscription('subscription { onCompleteTestClientDisconnect }')
assert.deepEqual(await values.next(), { done: false, value: { onCompleteSideChannel: 'subscribed' } })
unsubscribe()
assert.deepEqual((await collect(values)), [{ onCompleteSideChannel: 'onComplete' }])
await close()
})
// confirm behavior with graphql-ws but we don't currently error
// it('fires when the resolver errors', async () => {
// const { values } = executeSubscription('subscription { onCompleteSideChannel }')
// assert.deepEqual(await values.next(), { done: false, value: { onCompleteSideChannel: 'start' } })
// const { values: errorValuesGen } = executeSubscription('subscription { onCompleteTestResolverError }')
// assert.deepEqual(await collect(errorValuesGen), [])
// assert.deepEqual(await collect(values), [{ onCompleteSideChannel: 'onComplete' }])
// })
it('fires when the server completes', async () => {
const { values } = executeSubscription('subscription { onCompleteSideChannel }')
assert.deepEqual(await values.next(), { done: false, value: { onCompleteSideChannel: 'start' } })
const { values: completeValuesGen } = executeSubscription('subscription { onCompleteServerComplete }')
assert.deepEqual(await collect(completeValuesGen), [])
assert.deepEqual((await collect(values)), [
{ onCompleteSideChannel: 'subscribed' },
{ onCompleteSideChannel: 'onComplete' },
])
})
})
})
4 changes: 2 additions & 2 deletions lib/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ export type SubscribeArgs<TRoot = any, TArgs = Record<string, any>, TContext = a
export type SubscriptionFilter<
TSubscribeArgs extends SubscribeArgs = SubscribeArgs,
TReturn extends Record<string, any> = Record<string, any>
> = Partial<TReturn> | void | ((...args: TSubscribeArgs) => MaybePromise<Partial<TReturn>> | MaybePromise<Partial<void>>)
> = Partial<TReturn> | void | ((...args: TSubscribeArgs) => MaybePromise<Partial<TReturn> | void>)

export type SubscriptionDefinition<
T extends PubSubEvent,
Expand All @@ -90,8 +90,8 @@ export type SubscribePseudoIterable<T extends PubSubEvent, TSubscribeArgs extend
(...args: TSubscribeArgs): AsyncGenerator<T, never, unknown>
topicDefinitions: SubscriptionDefinition<T, TSubscribeArgs>[]
onSubscribe?: (...args: TSubscribeArgs) => MaybePromise<void>
onComplete?: (...args: TSubscribeArgs) => MaybePromise<void>
onAfterSubscribe?: (...args: TSubscribeArgs) => MaybePromise<void>
onComplete?: (...args: TSubscribeArgs) => MaybePromise<void>
}


Expand Down
59 changes: 58 additions & 1 deletion mocks/arc-basic-events/lib/graphql.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ const typeDefs = `
type Subscription {
greetings: String
filterTest: String
onCompleteSideChannel: String
onCompleteTestClientDisconnect: String
onCompleteTestResolverError: String
onCompleteServerComplete: String
}
`

Expand Down Expand Up @@ -65,7 +69,60 @@ const resolvers = {
await complete({ topic: 'filterTest' })
},
}),
resolve: ({ payload }) => {
resolve({ payload }) {
return payload.message
},
},
onCompleteTestClientDisconnect: {
subscribe: subscribe('onCompleteTestResolverError', {
async onComplete(_, __, { publish, complete }){
await publish({ topic: 'onCompleteSideChannel', payload: { message: 'onComplete' } })
await complete({ topic: 'onCompleteSideChannel' })
},
async onAfterSubscribe(_, __, { publish }) {
await publish({ topic: 'onCompleteSideChannel', payload: { message: 'subscribed' } })
},
}),
resolve({ payload }) {
return payload.message
},
},
onCompleteTestResolverError: {
subscribe: subscribe('onCompleteTestResolverError', {
async onComplete(_, __, { publish, complete }){
await publish({ topic: 'onCompleteSideChannel', payload: { message: 'onComplete' } })
await complete({ topic: 'onCompleteSideChannel' })
},
async onAfterSubscribe(_, __, { publish }) {
await publish({ topic: 'onCompleteTestResolverError', payload: { message: 'doesnt really matter does it' } })
},
}),
resolve() {
throw new Error('oh no!')
},
},
onCompleteServerComplete: {
subscribe: subscribe('onCompleteServerComplete', {
async onComplete(_, __, { publish, complete }){
await publish({ topic: 'onCompleteSideChannel', payload: { message: 'onComplete' } })
await complete({ topic: 'onCompleteSideChannel' })
},
async onAfterSubscribe(_, __, { publish, complete }) {
await publish({ topic: 'onCompleteSideChannel', payload: { message: 'subscribed' } })
await complete({ topic: 'onCompleteServerComplete' })
},
}),
resolve({ payload }) {
return payload.message
},
},
onCompleteSideChannel: {
subscribe: subscribe('onCompleteSideChannel', {
async onAfterSubscribe(_, __, { publish }) {
await publish({ topic: 'onCompleteSideChannel', payload: { message: 'start' } })
},
}),
resolve({ payload }) {
return payload.message
},
},
Expand Down