From 2ac187d955edcc1d78a4da51338848c459ef89a5 Mon Sep 17 00:00:00 2001 From: Francis Gulotta Date: Sun, 29 Aug 2021 23:11:51 -0400 Subject: [PATCH] fix: args that are inline weren't passed to callback functions eg ```graphql subscription { greetings(name: "Jonas") } ``` Will now work as expected. --- .eslintrc.js | 2 + lib/messages/complete.ts | 59 ++++++------ lib/messages/disconnect.ts | 4 +- lib/messages/subscribe-test.ts | 128 +++++++++++++++++++++----- lib/messages/subscribe.ts | 15 +-- lib/pubsub/complete.ts | 2 +- lib/test/execute-helper.ts | 12 ++- lib/test/graphql-ws-schema.ts | 3 + lib/test/integration-events-test.ts | 15 ++- lib/utils/getResolverAndArgs.ts | 36 +++++--- mocks/arc-basic-events/lib/graphql.js | 2 + package-lock.json | 15 +++ package.json | 1 + rollup.config.js | 1 + 14 files changed, 215 insertions(+), 80 deletions(-) diff --git a/.eslintrc.js b/.eslintrc.js index a98af63e..3e3940d8 100644 --- a/.eslintrc.js +++ b/.eslintrc.js @@ -14,6 +14,7 @@ module.exports = { }, plugins: [ '@typescript-eslint', + 'mocha-no-only', ], rules: { '@typescript-eslint/member-delimiter-style': ['error', { @@ -32,6 +33,7 @@ module.exports = { 'object-curly-spacing': ['error', 'always'], 'quote-props': ['error', 'as-needed'], 'space-infix-ops': ['error'], + "mocha-no-only/mocha-no-only": ["error"], indent: ['error', 2], quotes: ['error', 'single'], semi: 'off', diff --git a/lib/messages/complete.ts b/lib/messages/complete.ts index e95975aa..61900068 100644 --- a/lib/messages/complete.ts +++ b/lib/messages/complete.ts @@ -9,38 +9,37 @@ import { getResolverAndArgs } from '../utils/getResolverAndArgs' import { isArray } from '../utils/isArray' /** Handler function for 'complete' message. */ -export const complete: MessageHandler = - async ({ server, event, message }) => { - server.log('messages:complete', { connectionId: event.requestContext.connectionId }) - try { - const subscription = await server.models.subscription.get({ id: `${event.requestContext.connectionId}|${message.id}` }) - if (!subscription) { - return - } - const execContext = buildExecutionContext( - server.schema, - parse(subscription.subscription.query), - undefined, - await buildContext({ server, connectionInitPayload: subscription.connectionInitPayload, connectionId: subscription.connectionId }), - subscription.subscription.variables, - subscription.subscription.operationName, - undefined, - ) +export const complete: MessageHandler = async ({ server, event, message }) => { + server.log('messages:complete', { connectionId: event.requestContext.connectionId }) + try { + const subscription = await server.models.subscription.get({ id: `${event.requestContext.connectionId}|${message.id}` }) + if (!subscription) { + return + } + const execContext = buildExecutionContext( + server.schema, + parse(subscription.subscription.query), + undefined, + await buildContext({ server, connectionInitPayload: subscription.connectionInitPayload, connectionId: subscription.connectionId }), + subscription.subscription.variables, + subscription.subscription.operationName, + undefined, + ) - if (isArray(execContext)) { - throw new AggregateError(execContext) - } + if (isArray(execContext)) { + throw new AggregateError(execContext) + } - const [field, root, args, context, info] = getResolverAndArgs(server)(execContext) + const { field, root, args, context, info } = getResolverAndArgs({ server, execContext }) - const onComplete = (field?.subscribe as SubscribePseudoIterable)?.onComplete - server.log('messages:complete:onComplete', { onComplete: !!onComplete }) - await onComplete?.(root, args, context, info) + const onComplete = (field?.subscribe as SubscribePseudoIterable)?.onComplete + server.log('messages:complete:onComplete', { onComplete: !!onComplete }) + await onComplete?.(root, args, context, info) - await server.models.subscription.delete({ id: subscription.id }) - } catch (err) { - server.log('messages:complete:onError', { err, event }) - await server.onError?.(err, { event, message }) - await deleteConnection(server)(event.requestContext) - } + await server.models.subscription.delete({ id: subscription.id }) + } catch (err) { + server.log('messages:complete:onError', { err, event }) + await server.onError?.(err, { event, message }) + await deleteConnection(server)(event.requestContext) } +} diff --git a/lib/messages/disconnect.ts b/lib/messages/disconnect.ts index 2be7aa4b..e375247e 100644 --- a/lib/messages/disconnect.ts +++ b/lib/messages/disconnect.ts @@ -38,7 +38,7 @@ export const disconnect: MessageHandler = async ({ server, event }) => { throw new AggregateError(execContext) } - const [field, root, args, context, info] = getResolverAndArgs(server)(execContext) + const { field, root, args, context, info } = getResolverAndArgs({ server, execContext }) const onComplete = (field?.subscribe as SubscribePseudoIterable)?.onComplete server.log('messages:disconnect:onComplete', { onComplete: !!onComplete }) @@ -47,7 +47,7 @@ export const disconnect: MessageHandler = async ({ server, event }) => { }) // do this first so that we don't create any more subscriptions for this connection - await server.models.connection.delete({ id: connectionId }), + await server.models.connection.delete({ id: connectionId }) await Promise.all(deletions) } catch (err) { server.log('messages:disconnect:onError', { err, event }) diff --git a/lib/messages/subscribe-test.ts b/lib/messages/subscribe-test.ts index a94733d4..f756d96f 100644 --- a/lib/messages/subscribe-test.ts +++ b/lib/messages/subscribe-test.ts @@ -85,11 +85,15 @@ describe('messages/subscribe', () => { assert.deepEqual(state, { post: [ { ConnectionId, Data: JSON.stringify({ type: 'connection_ack' }) }, - { ConnectionId, Data: JSON.stringify({ type: 'error', id: 'abcdefg', payload: [{ - message: 'Cannot query field "HIHOWEAREYOU" on type "Query".', - locations: [{ line:1, column:3 }], + { + ConnectionId, Data: JSON.stringify({ + type: 'error', id: 'abcdefg', payload: [{ + message: 'Cannot query field "HIHOWEAREYOU" on type "Query".', + locations: [{ line: 1, column: 3 }], + }, + ], + }), }, - ] }) }, ], delete: [], }) @@ -102,7 +106,7 @@ describe('messages/subscribe', () => { const server = await mockServerContext({ apiGatewayManagementApi: { // eslint-disable-next-line @typescript-eslint/no-empty-function - postToConnection: () => ({ promise: async () => { if(sendErr) { throw new Error('postToConnection Error') } } }), + postToConnection: () => ({ promise: async () => { if (sendErr) { throw new Error('postToConnection Error') } } }), // eslint-disable-next-line @typescript-eslint/no-empty-function deleteConnection: () => ({ promise: async () => { } }), }, @@ -112,7 +116,7 @@ describe('messages/subscribe', () => { await connection_init({ server, event: connectionInitEvent, message: JSON.parse(connectionInitEvent.body) }) sendErr = true await subscribe({ server, event, message: JSON.parse(event.body) }) - assert.match(error.message, /postToConnection Error/ ) + assert.match(error.message, /postToConnection Error/) }) describe('callbacks', () => { @@ -132,7 +136,7 @@ describe('messages/subscribe', () => { hello: () => 'Hello World!', }, Subscription: { - greetings:{ + greetings: { subscribe: pubsubSubscribe('greetings', { onSubscribe() { onSubscribe.push('We did it!') @@ -146,13 +150,8 @@ describe('messages/subscribe', () => { }, } - const schema = makeExecutableSchema({ - typeDefs, - resolvers, - }) - const server = await mockServerContext({ - schema, - }) + const schema = makeExecutableSchema({ typeDefs, resolvers }) + const server = await mockServerContext({ schema }) const event: any = { requestContext: { connectedAt: 1628889984369, connectionId, domainName: 'localhost:3339', eventType: 'MESSAGE', messageDirection: 'IN', messageId: 'el4MNdOJy', requestId: '0yd7bkvXz', requestTimeEpoch: 1628889984774, routeKey: '$default', stage: 'testing' }, isBase64Encoded: false, body: '{"id":"1234","type":"subscribe","payload":{"query":"subscription { greetings }"}}' } await connection_init({ server, event: connectionInitEvent, message: JSON.parse(connectionInitEvent.body) }) @@ -172,6 +171,96 @@ describe('messages/subscribe', () => { assert.isEmpty(subscriptions) }) + it('fires onSubscribe with variable args', async () => { + const collectedArgs: any[] = [] + + const typeDefs = ` + type Query { + hello(name: String!): String + } + type Subscription { + greetings(name: String!): String + } + ` + const resolvers = { + Query: { + hello: (_, { name }) => `Hello ${name}!`, + }, + Subscription: { + greetings: { + subscribe: pubsubSubscribe('greetings', { + onSubscribe(_, args) { + collectedArgs.push(args) + }, + }), + resolve: ({ payload }) => { + return payload + }, + }, + }, + } + + const schema = makeExecutableSchema({ typeDefs, resolvers }) + const server = await mockServerContext({ schema }) + const event: any = { requestContext: { connectedAt: 1628889984369, connectionId, domainName: 'localhost:3339', eventType: 'MESSAGE', messageDirection: 'IN', messageId: 'el4MNdOJy', requestId: '0yd7bkvXz', requestTimeEpoch: 1628889984774, routeKey: '$default', stage: 'testing' }, isBase64Encoded: false, body: '{"id":"1234","type":"subscribe","payload":{"query":"subscription($name: String!) { greetings(name: $name) }", "variables":{"name":"Jonas"}}}' } + + await connection_init({ server, event: connectionInitEvent, message: JSON.parse(connectionInitEvent.body) }) + await subscribe({ server, event, message: JSON.parse(event.body) }) + assert.deepEqual(collectedArgs[0], { name: 'Jonas' }) + const [subscription] = await collect(server.models.subscription.query({ + IndexName: 'ConnectionIndex', + ExpressionAttributeNames: { '#a': 'connectionId' }, + ExpressionAttributeValues: { ':1': event.requestContext.connectionId }, + KeyConditionExpression: '#a = :1', + })) + assert.containSubset(subscription, { connectionId, subscriptionId: '1234', subscription: JSON.parse(event.body).payload }) + }) + + it('fires onSubscribe with inline args', async () => { + const collectedArgs: any[] = [] + + const typeDefs = ` + type Query { + hello(name: String!): String + } + type Subscription { + greetings(name: String!): String + } + ` + const resolvers = { + Query: { + hello: (_, { name }) => `Hello ${name}!`, + }, + Subscription: { + greetings: { + subscribe: pubsubSubscribe('greetings', { + onSubscribe(_, args) { + collectedArgs.push(args) + }, + }), + resolve: ({ payload }) => { + return payload + }, + }, + }, + } + + const schema = makeExecutableSchema({ typeDefs, resolvers }) + const server = await mockServerContext({ schema }) + const event: any = { requestContext: { connectedAt: 1628889984369, connectionId, domainName: 'localhost:3339', eventType: 'MESSAGE', messageDirection: 'IN', messageId: 'el4MNdOJy', requestId: '0yd7bkvXz', requestTimeEpoch: 1628889984774, routeKey: '$default', stage: 'testing' }, isBase64Encoded: false, body: '{"id":"1234","type":"subscribe","payload":{"query":"subscription { greetings(name: \\"Jonas\\") }"}}' } + + await connection_init({ server, event: connectionInitEvent, message: JSON.parse(connectionInitEvent.body) }) + await subscribe({ server, event, message: JSON.parse(event.body) }) + assert.deepEqual(collectedArgs[0], { name: 'Jonas' }) + const [subscription] = await collect(server.models.subscription.query({ + IndexName: 'ConnectionIndex', + ExpressionAttributeNames: { '#a': 'connectionId' }, + ExpressionAttributeValues: { ':1': event.requestContext.connectionId }, + KeyConditionExpression: '#a = :1', + })) + assert.containSubset(subscription, { connectionId, subscriptionId: '1234', subscription: JSON.parse(event.body).payload }) + }) + it('fires onAfterSubscribe after subscribing', async () => { const events: string[] = [] @@ -188,7 +277,7 @@ describe('messages/subscribe', () => { hello: () => 'Hello World!', }, Subscription: { - greetings:{ + greetings: { subscribe: pubsubSubscribe('greetings', { onSubscribe() { events.push('onSubscribe') @@ -204,13 +293,8 @@ describe('messages/subscribe', () => { }, } - const schema = makeExecutableSchema({ - typeDefs, - resolvers, - }) - const server = await mockServerContext({ - schema, - }) + const schema = makeExecutableSchema({ typeDefs, resolvers }) + const server = await mockServerContext({ schema }) const event: any = { requestContext: { connectedAt: 1628889984369, connectionId, domainName: 'localhost:3339', eventType: 'MESSAGE', messageDirection: 'IN', messageId: 'el4MNdOJy', requestId: '0yd7bkvXz', requestTimeEpoch: 1628889984774, routeKey: '$default', stage: 'testing' }, isBase64Encoded: false, body: '{"id":"1234","type":"subscribe","payload":{"query":"subscription { greetings }"}}' } await connection_init({ server, event: connectionInitEvent, message: JSON.parse(connectionInitEvent.body) }) diff --git a/lib/messages/subscribe.ts b/lib/messages/subscribe.ts index 70f2f044..669292da 100644 --- a/lib/messages/subscribe.ts +++ b/lib/messages/subscribe.ts @@ -70,12 +70,17 @@ const setupSubscription: MessageHandler = async ({ server, eve }) } + const subscriptionId = `${connection.id}|${message.id}` + if (await server.models.subscription.get({ id: subscriptionId })) { + throw new Error(`Subscriber for ${message.id} already exists`) + } + if (execContext.operation.operation !== 'subscription') { await executeQuery(server, message, contextValue, event) return } - const [field, root, args, context, info] = getResolverAndArgs(server)(execContext) + const { field, root, args, context, info } = getResolverAndArgs({ server, execContext }) if (!field) { throw new Error('No field') } @@ -98,16 +103,12 @@ const setupSubscription: MessageHandler = async ({ server, eve const filterData = typeof filter === 'function' ? await filter(root, args, context, info) : filter - const subscriptionId = `${connection.id}|${message.id}` const subscription: Subscription = { id: subscriptionId, topic, filter: filterData || {}, subscriptionId: message.id, - subscription: { - variableValues: args, - ...message.payload, - }, + subscription: message.payload, connectionId: connection.id, connectionInitPayload: connection.payload, requestContext: event.requestContext, @@ -115,7 +116,7 @@ const setupSubscription: MessageHandler = async ({ server, eve createdAt: Date.now(), } server.log('subscribe:putSubscription', subscription) - try{ + try { await server.models.subscription.put(subscription, { ConditionExpression: '#id <> :id', ExpressionAttributeNames: { diff --git a/lib/pubsub/complete.ts b/lib/pubsub/complete.ts index 2237621a..6be756f6 100644 --- a/lib/pubsub/complete.ts +++ b/lib/pubsub/complete.ts @@ -39,7 +39,7 @@ export const complete = (serverPromise: Promise | ServerClosure): throw new AggregateError(execContext) } - const [field, root, args, context, info] = getResolverAndArgs(server)(execContext) + const { field, root, args, context, info } = getResolverAndArgs({ server, execContext }) const onComplete = (field?.subscribe as SubscribePseudoIterable)?.onComplete server.log('pubsub:complete:onComplete', { onComplete: !!onComplete }) diff --git a/lib/test/execute-helper.ts b/lib/test/execute-helper.ts index 0f331931..411ea2e7 100644 --- a/lib/test/execute-helper.ts +++ b/lib/test/execute-helper.ts @@ -185,11 +185,13 @@ export const executeDoubleQuery = async function* (query: string, { stayConnected = false, timeout = 20_000, id = 1, + skipWaitingForFirstMessage = false, }: { url?: string stayConnected?: boolean timeout?: number id?: number + skipWaitingForFirstMessage?: boolean } = {}): AsyncGenerator { const ws = new WebSocket(url, 'graphql-transport-ws') @@ -235,11 +237,13 @@ export const executeDoubleQuery = async function* (query: string, { payload: { query }, }) - const firstMessage = await incomingMessages.generator.next() - if (firstMessage.done) { - return + if (!skipWaitingForFirstMessage) { + const firstMessage = await incomingMessages.generator.next() + if (firstMessage.done) { + return + } + yield firstMessage.value } - yield firstMessage.value await send({ id: `${id}`, diff --git a/lib/test/graphql-ws-schema.ts b/lib/test/graphql-ws-schema.ts index 210d12d9..a713d1b3 100644 --- a/lib/test/graphql-ws-schema.ts +++ b/lib/test/graphql-ws-schema.ts @@ -9,6 +9,7 @@ const PORT = 4000 const typeDefs = ` type Query { hello: String + dontResolve: String } type Subscription { greetings: String @@ -21,6 +22,8 @@ const typeDefs = ` const resolvers = { Query: { hello: () => 'Hello World!', + // eslint-disable-next-line @typescript-eslint/no-empty-function + dontResolve: () => new Promise(() => {}), }, Subscription: { greetings:{ diff --git a/lib/test/integration-events-test.ts b/lib/test/integration-events-test.ts index 28418a1b..b3531fc9 100644 --- a/lib/test/integration-events-test.ts +++ b/lib/test/integration-events-test.ts @@ -65,7 +65,20 @@ describe('Events', () => { await stop() }) - it('errors when duplicating subscription ids', async () => { + // it('errors when duplicating subscription ids with queries', async () => { + // const { url, stop } = await startGqlWSServer() + + // const lambdaError = await collect(executeDoubleQuery('{ dontResolve }', { id: 1, skipWaitingForFirstMessage: true })) + // const gqlWSError = await collect(executeDoubleQuery('{ dontResolve }', { id: 1, skipWaitingForFirstMessage: true, url })) + // console.log({ lambdaError, gqlWSError }) + // assert.deepEqual(lambdaError[0], gqlWSError[0]) + // // This would be exactly equal but apigateway doesn't support close reasons *eye roll* + // assert.containSubset(lambdaError[1], { type: 'close' }) + // assert.containSubset(gqlWSError[1], { type: 'close' }) + // await stop() + // }) + + it('errors when duplicating subscription ids with subscriptions', async () => { const { url, stop } = await startGqlWSServer() const lambdaError = await collect(executeDoubleQuery('subscription { oneEvent }', { id: 1 })) diff --git a/lib/utils/getResolverAndArgs.ts b/lib/utils/getResolverAndArgs.ts index 179143a2..10b89810 100644 --- a/lib/utils/getResolverAndArgs.ts +++ b/lib/utils/getResolverAndArgs.ts @@ -5,14 +5,21 @@ import { ExecutionContext, getFieldDef, } from 'graphql/execution/execute' +import { getArgumentValues } from 'graphql/execution/values' import { addPath } from 'graphql/jsutils/Path' import { ServerClosure } from '../types' -type ResolverAndArgs = [ReturnType, null, ExecutionContext['variableValues'], ExecutionContext['contextValue'], ReturnType] +interface ResolverAndArgs { + field: ReturnType + root: null + args: ExecutionContext['variableValues'] + context: ExecutionContext['contextValue'] + info: ReturnType +} -export const getResolverAndArgs = (c: Omit) => (execContext: ExecutionContext): ResolverAndArgs => { - // Taken from graphql js - https://github.com/graphql/graphql-js/blob/main/src/subscription/subscribe.js#L190 - const type = getOperationRootType(c.schema, execContext.operation) +export const getResolverAndArgs = ({ server, execContext }: { server: ServerClosure, execContext: ExecutionContext }): ResolverAndArgs => { + // Taken from graphql-js - https://github.com/graphql/graphql-js/blob/main/src/subscription/subscribe.ts#L189 + const type = getOperationRootType(server.schema, execContext.operation) const fields = collectFields( execContext, type, @@ -25,26 +32,29 @@ export const getResolverAndArgs = (c: Omit) => (execCo const fieldNodes = fields[responseName] const fieldNode = fieldNodes[0] const fieldName = fieldNode.name.value - const fieldDef = getFieldDef(c.schema, type, fieldName) + const field = getFieldDef(server.schema, type, fieldName) const path = addPath(undefined, responseName, type.name) - if (!fieldDef) { + if (!field) { throw new Error('invalid schema, unknown field definition') } const info = buildResolveInfo( execContext, - fieldDef, + field, fieldNodes, type, path, ) - return [ - fieldDef, - null, - execContext.variableValues, - execContext.contextValue, + const args = getArgumentValues(field, fieldNode, execContext.variableValues) + const context = execContext.contextValue + + return { + field, + root: null, + args, + context, info, - ] + } } diff --git a/mocks/arc-basic-events/lib/graphql.js b/mocks/arc-basic-events/lib/graphql.js index 8ee6864a..f99b8dad 100644 --- a/mocks/arc-basic-events/lib/graphql.js +++ b/mocks/arc-basic-events/lib/graphql.js @@ -25,6 +25,7 @@ const makeManagementAPI = () => { const typeDefs = ` type Query { hello: String + dontResolve: String } type Subscription { greetings: String @@ -41,6 +42,7 @@ const typeDefs = ` const resolvers = { Query: { hello: () => 'Hello World!', + dontResolve: () => new Promise(() => {}), }, Subscription: { greetings:{ diff --git a/package-lock.json b/package-lock.json index 2de8911b..f819fdb4 100644 --- a/package-lock.json +++ b/package-lock.json @@ -2473,6 +2473,15 @@ } } }, + "eslint-plugin-mocha-no-only": { + "version": "1.1.1", + "resolved": "https://registry.npmjs.org/eslint-plugin-mocha-no-only/-/eslint-plugin-mocha-no-only-1.1.1.tgz", + "integrity": "sha512-b+vgjJQ3SjRQCygBhomtjzvRQRpIP8Yd9cqwNSbcoVJREuNajao7M1Kl1aObAUc4wx98qsZyQyUSUxiAbMS9yA==", + "dev": true, + "requires": { + "requireindex": "~1.1.0" + } + }, "eslint-scope": { "version": "5.1.1", "resolved": "https://registry.npmjs.org/eslint-scope/-/eslint-scope-5.1.1.tgz", @@ -6961,6 +6970,12 @@ "integrity": "sha512-Xf0nWe6RseziFMu+Ap9biiUbmplq6S9/p+7w7YXP/JBHhrUDDUhwa+vANyubuqfZWTveU//DYVGsDG7RKL/vEw==", "dev": true }, + "requireindex": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/requireindex/-/requireindex-1.1.0.tgz", + "integrity": "sha1-5UBLgVV+91225JxacgBIk/4D4WI=", + "dev": true + }, "requires-port": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/requires-port/-/requires-port-1.0.0.tgz", diff --git a/package.json b/package.json index fc9aec50..9dc2823e 100644 --- a/package.json +++ b/package.json @@ -63,6 +63,7 @@ "esbuild": "^0.12.20", "esbuild-register": "^3.0.0", "eslint": "^7.29.0", + "eslint-plugin-mocha-no-only": "^1.1.1", "graphql": ">= 14.0.0", "graphql-ws": "^5.3.0", "inside-out-async": "^1.0.0", diff --git a/rollup.config.js b/rollup.config.js index e9f42a4c..c74b8ac7 100644 --- a/rollup.config.js +++ b/rollup.config.js @@ -16,6 +16,7 @@ export default { 'aws-sdk', 'graphql', 'graphql/execution/execute', + 'graphql/execution/values', // actual deps 'debug', 'streaming-iterables',