Skip to content
This repository was archived by the owner on May 17, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .eslintrc.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ module.exports = {
},
plugins: [
'@typescript-eslint',
'mocha-no-only',
],
rules: {
'@typescript-eslint/member-delimiter-style': ['error', {
Expand All @@ -32,6 +33,7 @@ module.exports = {
'object-curly-spacing': ['error', 'always'],
'quote-props': ['error', 'as-needed'],
'space-infix-ops': ['error'],
"mocha-no-only/mocha-no-only": ["error"],
indent: ['error', 2],
quotes: ['error', 'single'],
semi: 'off',
Expand Down
59 changes: 29 additions & 30 deletions lib/messages/complete.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,38 +9,37 @@ import { getResolverAndArgs } from '../utils/getResolverAndArgs'
import { isArray } from '../utils/isArray'

/** Handler function for 'complete' message. */
export const complete: MessageHandler<CompleteMessage> =
async ({ server, event, message }) => {
server.log('messages:complete', { connectionId: event.requestContext.connectionId })
try {
const subscription = await server.models.subscription.get({ id: `${event.requestContext.connectionId}|${message.id}` })
if (!subscription) {
return
}
const execContext = buildExecutionContext(
server.schema,
parse(subscription.subscription.query),
undefined,
await buildContext({ server, connectionInitPayload: subscription.connectionInitPayload, connectionId: subscription.connectionId }),
subscription.subscription.variables,
subscription.subscription.operationName,
undefined,
)
export const complete: MessageHandler<CompleteMessage> = async ({ server, event, message }) => {
server.log('messages:complete', { connectionId: event.requestContext.connectionId })
try {
const subscription = await server.models.subscription.get({ id: `${event.requestContext.connectionId}|${message.id}` })
if (!subscription) {
return
}
const execContext = buildExecutionContext(
server.schema,
parse(subscription.subscription.query),
undefined,
await buildContext({ server, connectionInitPayload: subscription.connectionInitPayload, connectionId: subscription.connectionId }),
subscription.subscription.variables,
subscription.subscription.operationName,
undefined,
)

if (isArray(execContext)) {
throw new AggregateError(execContext)
}
if (isArray(execContext)) {
throw new AggregateError(execContext)
}

const [field, root, args, context, info] = getResolverAndArgs(server)(execContext)
const { field, root, args, context, info } = getResolverAndArgs({ server, execContext })

const onComplete = (field?.subscribe as SubscribePseudoIterable<PubSubEvent>)?.onComplete
server.log('messages:complete:onComplete', { onComplete: !!onComplete })
await onComplete?.(root, args, context, info)
const onComplete = (field?.subscribe as SubscribePseudoIterable<PubSubEvent>)?.onComplete
server.log('messages:complete:onComplete', { onComplete: !!onComplete })
await onComplete?.(root, args, context, info)

await server.models.subscription.delete({ id: subscription.id })
} catch (err) {
server.log('messages:complete:onError', { err, event })
await server.onError?.(err, { event, message })
await deleteConnection(server)(event.requestContext)
}
await server.models.subscription.delete({ id: subscription.id })
} catch (err) {
server.log('messages:complete:onError', { err, event })
await server.onError?.(err, { event, message })
await deleteConnection(server)(event.requestContext)
}
}
4 changes: 2 additions & 2 deletions lib/messages/disconnect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ export const disconnect: MessageHandler<null> = async ({ server, event }) => {
throw new AggregateError(execContext)
}

const [field, root, args, context, info] = getResolverAndArgs(server)(execContext)
const { field, root, args, context, info } = getResolverAndArgs({ server, execContext })

const onComplete = (field?.subscribe as SubscribePseudoIterable<PubSubEvent>)?.onComplete
server.log('messages:disconnect:onComplete', { onComplete: !!onComplete })
Expand All @@ -47,7 +47,7 @@ export const disconnect: MessageHandler<null> = async ({ server, event }) => {
})

// do this first so that we don't create any more subscriptions for this connection
await server.models.connection.delete({ id: connectionId }),
await server.models.connection.delete({ id: connectionId })
await Promise.all(deletions)
} catch (err) {
server.log('messages:disconnect:onError', { err, event })
Expand Down
128 changes: 106 additions & 22 deletions lib/messages/subscribe-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,15 @@ describe('messages/subscribe', () => {
assert.deepEqual(state, {
post: [
{ ConnectionId, Data: JSON.stringify({ type: 'connection_ack' }) },
{ ConnectionId, Data: JSON.stringify({ type: 'error', id: 'abcdefg', payload: [{
message: 'Cannot query field "HIHOWEAREYOU" on type "Query".',
locations: [{ line:1, column:3 }],
{
ConnectionId, Data: JSON.stringify({
type: 'error', id: 'abcdefg', payload: [{
message: 'Cannot query field "HIHOWEAREYOU" on type "Query".',
locations: [{ line: 1, column: 3 }],
},
],
}),
},
] }) },
],
delete: [],
})
Expand All @@ -102,7 +106,7 @@ describe('messages/subscribe', () => {
const server = await mockServerContext({
apiGatewayManagementApi: {
// eslint-disable-next-line @typescript-eslint/no-empty-function
postToConnection: () => ({ promise: async () => { if(sendErr) { throw new Error('postToConnection Error') } } }),
postToConnection: () => ({ promise: async () => { if (sendErr) { throw new Error('postToConnection Error') } } }),
// eslint-disable-next-line @typescript-eslint/no-empty-function
deleteConnection: () => ({ promise: async () => { } }),
},
Expand All @@ -112,7 +116,7 @@ describe('messages/subscribe', () => {
await connection_init({ server, event: connectionInitEvent, message: JSON.parse(connectionInitEvent.body) })
sendErr = true
await subscribe({ server, event, message: JSON.parse(event.body) })
assert.match(error.message, /postToConnection Error/ )
assert.match(error.message, /postToConnection Error/)
})

describe('callbacks', () => {
Expand All @@ -132,7 +136,7 @@ describe('messages/subscribe', () => {
hello: () => 'Hello World!',
},
Subscription: {
greetings:{
greetings: {
subscribe: pubsubSubscribe('greetings', {
onSubscribe() {
onSubscribe.push('We did it!')
Expand All @@ -146,13 +150,8 @@ describe('messages/subscribe', () => {
},
}

const schema = makeExecutableSchema({
typeDefs,
resolvers,
})
const server = await mockServerContext({
schema,
})
const schema = makeExecutableSchema({ typeDefs, resolvers })
const server = await mockServerContext({ schema })
const event: any = { requestContext: { connectedAt: 1628889984369, connectionId, domainName: 'localhost:3339', eventType: 'MESSAGE', messageDirection: 'IN', messageId: 'el4MNdOJy', requestId: '0yd7bkvXz', requestTimeEpoch: 1628889984774, routeKey: '$default', stage: 'testing' }, isBase64Encoded: false, body: '{"id":"1234","type":"subscribe","payload":{"query":"subscription { greetings }"}}' }

await connection_init({ server, event: connectionInitEvent, message: JSON.parse(connectionInitEvent.body) })
Expand All @@ -172,6 +171,96 @@ describe('messages/subscribe', () => {
assert.isEmpty(subscriptions)
})

it('fires onSubscribe with variable args', async () => {
const collectedArgs: any[] = []

const typeDefs = `
type Query {
hello(name: String!): String
}
type Subscription {
greetings(name: String!): String
}
`
const resolvers = {
Query: {
hello: (_, { name }) => `Hello ${name}!`,
},
Subscription: {
greetings: {
subscribe: pubsubSubscribe('greetings', {
onSubscribe(_, args) {
collectedArgs.push(args)
},
}),
resolve: ({ payload }) => {
return payload
},
},
},
}

const schema = makeExecutableSchema({ typeDefs, resolvers })
const server = await mockServerContext({ schema })
const event: any = { requestContext: { connectedAt: 1628889984369, connectionId, domainName: 'localhost:3339', eventType: 'MESSAGE', messageDirection: 'IN', messageId: 'el4MNdOJy', requestId: '0yd7bkvXz', requestTimeEpoch: 1628889984774, routeKey: '$default', stage: 'testing' }, isBase64Encoded: false, body: '{"id":"1234","type":"subscribe","payload":{"query":"subscription($name: String!) { greetings(name: $name) }", "variables":{"name":"Jonas"}}}' }

await connection_init({ server, event: connectionInitEvent, message: JSON.parse(connectionInitEvent.body) })
await subscribe({ server, event, message: JSON.parse(event.body) })
assert.deepEqual(collectedArgs[0], { name: 'Jonas' })
const [subscription] = await collect(server.models.subscription.query({
IndexName: 'ConnectionIndex',
ExpressionAttributeNames: { '#a': 'connectionId' },
ExpressionAttributeValues: { ':1': event.requestContext.connectionId },
KeyConditionExpression: '#a = :1',
}))
assert.containSubset(subscription, { connectionId, subscriptionId: '1234', subscription: JSON.parse(event.body).payload })
})

it('fires onSubscribe with inline args', async () => {
const collectedArgs: any[] = []

const typeDefs = `
type Query {
hello(name: String!): String
}
type Subscription {
greetings(name: String!): String
}
`
const resolvers = {
Query: {
hello: (_, { name }) => `Hello ${name}!`,
},
Subscription: {
greetings: {
subscribe: pubsubSubscribe('greetings', {
onSubscribe(_, args) {
collectedArgs.push(args)
},
}),
resolve: ({ payload }) => {
return payload
},
},
},
}

const schema = makeExecutableSchema({ typeDefs, resolvers })
const server = await mockServerContext({ schema })
const event: any = { requestContext: { connectedAt: 1628889984369, connectionId, domainName: 'localhost:3339', eventType: 'MESSAGE', messageDirection: 'IN', messageId: 'el4MNdOJy', requestId: '0yd7bkvXz', requestTimeEpoch: 1628889984774, routeKey: '$default', stage: 'testing' }, isBase64Encoded: false, body: '{"id":"1234","type":"subscribe","payload":{"query":"subscription { greetings(name: \\"Jonas\\") }"}}' }

await connection_init({ server, event: connectionInitEvent, message: JSON.parse(connectionInitEvent.body) })
await subscribe({ server, event, message: JSON.parse(event.body) })
assert.deepEqual(collectedArgs[0], { name: 'Jonas' })
const [subscription] = await collect(server.models.subscription.query({
IndexName: 'ConnectionIndex',
ExpressionAttributeNames: { '#a': 'connectionId' },
ExpressionAttributeValues: { ':1': event.requestContext.connectionId },
KeyConditionExpression: '#a = :1',
}))
assert.containSubset(subscription, { connectionId, subscriptionId: '1234', subscription: JSON.parse(event.body).payload })
})

it('fires onAfterSubscribe after subscribing', async () => {
const events: string[] = []

Expand All @@ -188,7 +277,7 @@ describe('messages/subscribe', () => {
hello: () => 'Hello World!',
},
Subscription: {
greetings:{
greetings: {
subscribe: pubsubSubscribe('greetings', {
onSubscribe() {
events.push('onSubscribe')
Expand All @@ -204,13 +293,8 @@ describe('messages/subscribe', () => {
},
}

const schema = makeExecutableSchema({
typeDefs,
resolvers,
})
const server = await mockServerContext({
schema,
})
const schema = makeExecutableSchema({ typeDefs, resolvers })
const server = await mockServerContext({ schema })
const event: any = { requestContext: { connectedAt: 1628889984369, connectionId, domainName: 'localhost:3339', eventType: 'MESSAGE', messageDirection: 'IN', messageId: 'el4MNdOJy', requestId: '0yd7bkvXz', requestTimeEpoch: 1628889984774, routeKey: '$default', stage: 'testing' }, isBase64Encoded: false, body: '{"id":"1234","type":"subscribe","payload":{"query":"subscription { greetings }"}}' }

await connection_init({ server, event: connectionInitEvent, message: JSON.parse(connectionInitEvent.body) })
Expand Down
15 changes: 8 additions & 7 deletions lib/messages/subscribe.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,17 @@ const setupSubscription: MessageHandler<SubscribeMessage> = async ({ server, eve
})
}

const subscriptionId = `${connection.id}|${message.id}`
if (await server.models.subscription.get({ id: subscriptionId })) {
throw new Error(`Subscriber for ${message.id} already exists`)
}

if (execContext.operation.operation !== 'subscription') {
await executeQuery(server, message, contextValue, event)
return
}

const [field, root, args, context, info] = getResolverAndArgs(server)(execContext)
const { field, root, args, context, info } = getResolverAndArgs({ server, execContext })
if (!field) {
throw new Error('No field')
}
Expand All @@ -98,24 +103,20 @@ const setupSubscription: MessageHandler<SubscribeMessage> = async ({ server, eve

const filterData = typeof filter === 'function' ? await filter(root, args, context, info) : filter

const subscriptionId = `${connection.id}|${message.id}`
const subscription: Subscription = {
id: subscriptionId,
topic,
filter: filterData || {},
subscriptionId: message.id,
subscription: {
variableValues: args,
...message.payload,
},
subscription: message.payload,
connectionId: connection.id,
connectionInitPayload: connection.payload,
requestContext: event.requestContext,
ttl: connection.ttl,
createdAt: Date.now(),
}
server.log('subscribe:putSubscription', subscription)
try{
try {
await server.models.subscription.put(subscription, {
ConditionExpression: '#id <> :id',
ExpressionAttributeNames: {
Expand Down
2 changes: 1 addition & 1 deletion lib/pubsub/complete.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ export const complete = (serverPromise: Promise<ServerClosure> | ServerClosure):
throw new AggregateError(execContext)
}

const [field, root, args, context, info] = getResolverAndArgs(server)(execContext)
const { field, root, args, context, info } = getResolverAndArgs({ server, execContext })

const onComplete = (field?.subscribe as SubscribePseudoIterable<PubSubEvent>)?.onComplete
server.log('pubsub:complete:onComplete', { onComplete: !!onComplete })
Expand Down
12 changes: 8 additions & 4 deletions lib/test/execute-helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -185,11 +185,13 @@ export const executeDoubleQuery = async function* (query: string, {
stayConnected = false,
timeout = 20_000,
id = 1,
skipWaitingForFirstMessage = false,
}: {
url?: string
stayConnected?: boolean
timeout?: number
id?: number
skipWaitingForFirstMessage?: boolean
} = {}): AsyncGenerator<unknown, void, unknown> {
const ws = new WebSocket(url, 'graphql-transport-ws')

Expand Down Expand Up @@ -235,11 +237,13 @@ export const executeDoubleQuery = async function* (query: string, {
payload: { query },
})

const firstMessage = await incomingMessages.generator.next()
if (firstMessage.done) {
return
if (!skipWaitingForFirstMessage) {
const firstMessage = await incomingMessages.generator.next()
if (firstMessage.done) {
return
}
yield firstMessage.value
}
yield firstMessage.value

await send({
id: `${id}`,
Expand Down
3 changes: 3 additions & 0 deletions lib/test/graphql-ws-schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ const PORT = 4000
const typeDefs = `
type Query {
hello: String
dontResolve: String
}
type Subscription {
greetings: String
Expand All @@ -21,6 +22,8 @@ const typeDefs = `
const resolvers = {
Query: {
hello: () => 'Hello World!',
// eslint-disable-next-line @typescript-eslint/no-empty-function
dontResolve: () => new Promise(() => {}),
},
Subscription: {
greetings:{
Expand Down
Loading