From 049992266d104cf5ac860a5625d67c1d5c1b998d Mon Sep 17 00:00:00 2001 From: Francis Gulotta Date: Sat, 28 Aug 2021 16:59:41 -0400 Subject: [PATCH] fix: issues with disconnecting and completing Part of the DDB refactor messed up which keys we needed to remove a record. Upon investigation we had a "topic" as a range key because we used to allow multiple topics per subscription. I'm pulling that out for the time being. - add more logging and fixup the logging function's input - Better DDB logging - The logger function now always gets the object, the type change is additive so it's not a breaking change (I kind of want to move pino for logging and add a concept of log level) - DDB can now support range keys even though we don't use them BREAKING CHANGE: The subscriptions Table has has its range key removed. This will require a migration. --- README.md | 4 - docs/README.md | 27 +--- docs/interfaces/ServerArgs.md | 4 +- docs/interfaces/SubscribePseudoIterable.md | 13 +- ...ServerClosure.ts => buildServerClosure.ts} | 8 +- lib/ddb/DDB.ts | 146 +++++++++++------- lib/handleStepFunctionEvent.ts | 4 +- lib/handleWebSocketEvent.ts | 2 +- lib/index.ts | 5 +- lib/messages/complete.ts | 4 +- lib/messages/disconnect.ts | 68 ++++---- lib/messages/pong.ts | 2 +- lib/messages/subscribe.ts | 46 +++--- lib/pubsub/complete.ts | 4 +- lib/pubsub/getFilteredSubs-test.ts | 2 +- lib/pubsub/getFilteredSubs.ts | 4 +- lib/pubsub/publish.ts | 4 +- lib/pubsub/subscribe.ts | 13 +- lib/test/mockServer.ts | 4 +- lib/types.ts | 19 +-- lib/utils/logger.ts | 2 +- lib/utils/postToConnection.ts | 2 +- mocks/arc-basic-events/app.arc | 1 - 23 files changed, 187 insertions(+), 201 deletions(-) rename lib/{makeServerClosure.ts => buildServerClosure.ts} (51%) diff --git a/README.md b/README.md index 83dd5efc..d5722470 100644 --- a/README.md +++ b/README.md @@ -131,7 +131,6 @@ Connection ttl TTL Subscription id *String - topic **String ttl TTL @indexes @@ -209,8 +208,6 @@ resources: KeySchema: - AttributeName: id KeyType: HASH - - AttributeName: topic - KeyType: RANGE GlobalSecondaryIndexes: - IndexName: ConnectionIndex KeySchema: @@ -259,7 +256,6 @@ resource "aws_dynamodb_table" "subscriptions-table" { read_capacity = 1 write_capacity = 1 hash_key = "id" - range_key = "topic" attribute { name = "id" diff --git a/docs/README.md b/docs/README.md index 3dcdca14..e5c90035 100644 --- a/docs/README.md +++ b/docs/README.md @@ -21,7 +21,6 @@ graphql-lambda-subscriptions - [LoggerFunction](README.md#loggerfunction) - [MaybePromise](README.md#maybepromise) - [SubscribeArgs](README.md#subscribeargs) -- [SubscriptionDefinition](README.md#subscriptiondefinition) - [SubscriptionFilter](README.md#subscriptionfilter) - [WebSocketResponse](README.md#websocketresponse) @@ -34,11 +33,11 @@ graphql-lambda-subscriptions ### LoggerFunction -Ƭ **LoggerFunction**: (`message`: `string`, `obj?`: `any`) => `void` +Ƭ **LoggerFunction**: (`message`: `string`, `obj`: `Record`<`string`, `any`\>) => `void` #### Type declaration -▸ (`message`, `obj?`): `void` +▸ (`message`, `obj`): `void` Log operational events with a logger of your choice. It will get a message and usually object with relevant data @@ -47,7 +46,7 @@ Log operational events with a logger of your choice. It will get a message and u | Name | Type | | :------ | :------ | | `message` | `string` | -| `obj?` | `any` | +| `obj` | `Record`<`string`, `any`\> | ##### Returns @@ -81,26 +80,6 @@ ___ ___ -### SubscriptionDefinition - -Ƭ **SubscriptionDefinition**<`T`, `TSubscribeArgs`\>: `Object` - -#### Type parameters - -| Name | Type | -| :------ | :------ | -| `T` | extends [`PubSubEvent`](interfaces/PubSubEvent.md) | -| `TSubscribeArgs` | extends [`SubscribeArgs`](README.md#subscribeargs)[`SubscribeArgs`](README.md#subscribeargs) | - -#### Type declaration - -| Name | Type | -| :------ | :------ | -| `filter?` | [`SubscriptionFilter`](README.md#subscriptionfilter)<`TSubscribeArgs`, `T`[``"payload"``]\> | -| `topic` | `string` | - -___ - ### SubscriptionFilter Ƭ **SubscriptionFilter**<`TSubscribeArgs`, `TReturn`\>: `Partial`<`TReturn`\> \| `void` \| (...`args`: `TSubscribeArgs`) => [`MaybePromise`](README.md#maybepromise)<`Partial`<`TReturn`\> \| `void`\> diff --git a/docs/interfaces/ServerArgs.md b/docs/interfaces/ServerArgs.md index 4f3097ae..31f2ee93 100644 --- a/docs/interfaces/ServerArgs.md +++ b/docs/interfaces/ServerArgs.md @@ -108,7 +108,7 @@ ___ ### onConnectionInit -▸ `Optional` **onConnectionInit**(`e`): [`MaybePromise`](../README.md#maybepromise)<`object`\> +▸ `Optional` **onConnectionInit**(`e`): [`MaybePromise`](../README.md#maybepromise)<`Record`<`string`, `any`\>\> #### Parameters @@ -120,7 +120,7 @@ ___ #### Returns -[`MaybePromise`](../README.md#maybepromise)<`object`\> +[`MaybePromise`](../README.md#maybepromise)<`Record`<`string`, `any`\>\> ___ diff --git a/docs/interfaces/SubscribePseudoIterable.md b/docs/interfaces/SubscribePseudoIterable.md index 8bfdd1f1..ecde16a9 100644 --- a/docs/interfaces/SubscribePseudoIterable.md +++ b/docs/interfaces/SubscribePseudoIterable.md @@ -29,7 +29,8 @@ ### Properties -- [topicDefinitions](SubscribePseudoIterable.md#topicdefinitions) +- [filter](SubscribePseudoIterable.md#filter) +- [topic](SubscribePseudoIterable.md#topic) ### Methods @@ -39,9 +40,15 @@ ## Properties -### topicDefinitions +### filter -• **topicDefinitions**: [`SubscriptionDefinition`](../README.md#subscriptiondefinition)<`T`, `TSubscribeArgs`\>[] +• `Optional` **filter**: [`SubscriptionFilter`](../README.md#subscriptionfilter)<`TSubscribeArgs`, `T`[``"payload"``]\> + +___ + +### topic + +• **topic**: `string` ## Methods diff --git a/lib/makeServerClosure.ts b/lib/buildServerClosure.ts similarity index 51% rename from lib/makeServerClosure.ts rename to lib/buildServerClosure.ts index 8ea58274..4666a433 100644 --- a/lib/makeServerClosure.ts +++ b/lib/buildServerClosure.ts @@ -1,8 +1,8 @@ -import { ServerArgs, ServerClosure, Connection, Subscription } from './types' +import { ServerArgs, ServerClosure } from './types' import { DDB } from './ddb/DDB' import { log as debugLogger } from './utils/logger' -export const makeServerClosure = async (opts: ServerArgs): Promise => { +export const buildServerClosure = async (opts: ServerArgs): Promise => { const { tableNames, log = debugLogger, @@ -19,8 +19,8 @@ export const makeServerClosure = async (opts: ServerArgs): Promise({ dynamodb, tableName: (await tableNames)?.subscriptions || 'graphql_subscriptions', log }), - connection: DDB({ dynamodb, tableName: (await tableNames)?.connections || 'graphql_connections', log }), + subscription: DDB({ dynamodb, tableName: (await tableNames)?.subscriptions || 'graphql_subscriptions', log }), + connection: DDB({ dynamodb, tableName: (await tableNames)?.connections || 'graphql_connections', log }), }, } } diff --git a/lib/ddb/DDB.ts b/lib/ddb/DDB.ts index 1d263dfd..43f5abf7 100644 --- a/lib/ddb/DDB.ts +++ b/lib/ddb/DDB.ts @@ -1,15 +1,15 @@ import { DynamoDB } from 'aws-sdk' import { LoggerFunction, DDBType } from '../types' -export interface DDBClient { - get: (id: string) => Promise - put: (Item: T) => Promise - update: (id: string, obj: Partial) => Promise - delete: (id: string) => Promise +export interface DDBClient { + get: (Key: TKey) => Promise + put: (obj: T) => Promise + update: (Key: TKey, obj: Partial) => Promise + delete: (Key: TKey) => Promise query: (options: Omit) => AsyncGenerator } -export const DDB = ({ +export const DDB = ({ dynamodb, tableName, log, @@ -17,77 +17,109 @@ export const DDB = ({ dynamodb: DynamoDB tableName: string log: LoggerFunction -}): DDBClient => { +}): DDBClient => { const documentClient = new DynamoDB.DocumentClient({ service: dynamodb }) - const get = async (id: string): Promise => { - log('get', { tableName: tableName, id }) - const { Item } = await documentClient.get({ - TableName: tableName, - Key: { id }, - }).promise() - return (Item as T) ?? null + const get = async (Key: TKey): Promise => { + log('get', { tableName: tableName, Key }) + try { + const { Item } = await documentClient.get({ + TableName: tableName, + Key, + }).promise() + log('get:result', { Item }) + return (Item as T) ?? null + } catch (e) { + log('get:error', e) + throw e + } } const put = async (Item: T): Promise => { log('put', { tableName: tableName, Item }) - const { Attributes } = await documentClient.put({ - TableName: tableName, - Item, - ReturnValues: 'ALL_OLD', - }).promise() - return Attributes as T + try { + const { Attributes } = await documentClient.put({ + TableName: tableName, + Item, + ReturnValues: 'ALL_OLD', + }).promise() + return Attributes as T + } catch (e) { + log('put:error', e) + throw e + } } - const update = async (id: string, obj: Partial) => { - const AttributeUpdates = Object.entries(obj) - .map(([key, Value]) => ({ [key]: { Value, Action: 'PUT' } })) - .reduce((memo, val) => ({ ...memo, ...val })) + const update = async (Key: TKey, obj: Partial) => { + log('update', { tableName: tableName, Key, obj }) + try { + const AttributeUpdates = Object.entries(obj) + .map(([key, Value]) => ({ [key]: { Value, Action: 'PUT' } })) + .reduce((memo, val) => ({ ...memo, ...val })) - const { Attributes } = await documentClient.update({ - TableName: tableName, - Key: { id }, - AttributeUpdates, - ReturnValues: 'ALL_NEW', - }).promise() - return Attributes as T + const { Attributes } = await documentClient.update({ + TableName: tableName, + Key, + AttributeUpdates, + ReturnValues: 'ALL_NEW', + }).promise() + return Attributes as T + } catch (e) { + log('update:error', e) + throw e + } } - const deleteFunction = async (id: string): Promise => { - const { Attributes } = await documentClient.delete({ - TableName: tableName, - Key: { id }, - ReturnValues: 'ALL_OLD', - }).promise() - return Attributes as T + const deleteFunction = async (Key: TKey): Promise => { + log('delete', { tableName: tableName, Key }) + try { + const { Attributes } = await documentClient.delete({ + TableName: tableName, + Key, + ReturnValues: 'ALL_OLD', + }).promise() + return Attributes as T + } catch (e) { + log('delete:error', e) + throw e + } } const queryOnce = async (options: Omit) => { - log('queryOnce', options) - - const response = await documentClient.query({ - TableName: tableName, - Select: 'ALL_ATTRIBUTES', - ...options, - }).promise() + log('queryOnce', { tableName: tableName, options }) + try { + const response = await documentClient.query({ + TableName: tableName, + Select: 'ALL_ATTRIBUTES', + ...options, + }).promise() - const { Items, LastEvaluatedKey, Count } = response - return { - items: (Items ?? []) as T[], - lastEvaluatedKey: LastEvaluatedKey, - count: Count ?? 0, + const { Items, LastEvaluatedKey, Count } = response + return { + items: (Items ?? []) as T[], + lastEvaluatedKey: LastEvaluatedKey, + count: Count ?? 0, + } + } catch (e) { + log('queryOnce:error', e) + throw e } } async function* query(options: Omit) { - log('query', options) - const results = await queryOnce(options) - yield* results.items - let lastEvaluatedKey = results.lastEvaluatedKey - while (lastEvaluatedKey) { - const results = await queryOnce({ ...options, ExclusiveStartKey: lastEvaluatedKey }) + log('query', { tableName: tableName, options }) + try { + const results = await queryOnce(options) yield* results.items - lastEvaluatedKey = results.lastEvaluatedKey + let lastEvaluatedKey = results.lastEvaluatedKey + while (lastEvaluatedKey) { + const results = await queryOnce({ ...options, ExclusiveStartKey: lastEvaluatedKey }) + yield* results.items + lastEvaluatedKey = results.lastEvaluatedKey + } + } catch (e) { + log('query:error', e) + throw e } } diff --git a/lib/handleStepFunctionEvent.ts b/lib/handleStepFunctionEvent.ts index 4251b0ef..3bdc3059 100644 --- a/lib/handleStepFunctionEvent.ts +++ b/lib/handleStepFunctionEvent.ts @@ -12,7 +12,7 @@ export const handleStepFunctionEvent = (serverPromise: Promise): // Initial state - send ping message if (input.state === 'PING') { await postToConnection(server)({ ...input, message: { type: MessageType.Ping } }) - await server.models.connection.update(input.connectionId, { hasPonged: false }) + await server.models.connection.update({ id: input.connectionId }, { hasPonged: false }) return { ...input, state: 'REVIEW', @@ -21,7 +21,7 @@ export const handleStepFunctionEvent = (serverPromise: Promise): } // Follow up state - check if pong was returned - const conn = await server.models.connection.get(input.connectionId) + const conn = await server.models.connection.get({ id: input.connectionId }) if (conn?.hasPonged) { return { ...input, diff --git a/lib/handleWebSocketEvent.ts b/lib/handleWebSocketEvent.ts index 5b07978b..69652999 100644 --- a/lib/handleWebSocketEvent.ts +++ b/lib/handleWebSocketEvent.ts @@ -10,7 +10,7 @@ import { pong } from './messages/pong' export const handleWebSocketEvent = (serverPromise: Promise): SubscriptionServer['webSocketHandler'] => async (event) => { const server = await serverPromise if (!event.requestContext) { - server.log('handleWebSocketEvent unknown') + server.log('handleWebSocketEvent unknown', { event }) return { statusCode: 200, body: '', diff --git a/lib/index.ts b/lib/index.ts index dc98ae37..7d912309 100644 --- a/lib/index.ts +++ b/lib/index.ts @@ -3,10 +3,10 @@ import { publish } from './pubsub/publish' import { complete } from './pubsub/complete' import { handleWebSocketEvent } from './handleWebSocketEvent' import { handleStepFunctionEvent } from './handleStepFunctionEvent' -import { makeServerClosure } from './makeServerClosure' +import { buildServerClosure } from './buildServerClosure' export const makeServer = (opts: ServerArgs): SubscriptionServer => { - const closure: Promise = makeServerClosure(opts) + const closure: Promise = buildServerClosure(opts) return { webSocketHandler: handleWebSocketEvent(closure), @@ -32,7 +32,6 @@ export { WebSocketResponse, StateFunctionInput, PubSubEvent, - SubscriptionDefinition, SubscriptionFilter, Connection, Subscription, diff --git a/lib/messages/complete.ts b/lib/messages/complete.ts index a9202211..e95975aa 100644 --- a/lib/messages/complete.ts +++ b/lib/messages/complete.ts @@ -13,7 +13,7 @@ export const complete: MessageHandler = async ({ server, event, message }) => { server.log('messages:complete', { connectionId: event.requestContext.connectionId }) try { - const subscription = await server.models.subscription.get(`${event.requestContext.connectionId}|${message.id}`) + const subscription = await server.models.subscription.get({ id: `${event.requestContext.connectionId}|${message.id}` }) if (!subscription) { return } @@ -37,7 +37,7 @@ export const complete: MessageHandler = server.log('messages:complete:onComplete', { onComplete: !!onComplete }) await onComplete?.(root, args, context, info) - await server.models.subscription.delete(subscription.id) + await server.models.subscription.delete({ id: subscription.id }) } catch (err) { server.log('messages:complete:onError', { err, event }) await server.onError?.(err, { event, message }) diff --git a/lib/messages/disconnect.ts b/lib/messages/disconnect.ts index e24b3678..2be7aa4b 100644 --- a/lib/messages/disconnect.ts +++ b/lib/messages/disconnect.ts @@ -6,63 +6,49 @@ import { getResolverAndArgs } from '../utils/getResolverAndArgs' import { SubscribePseudoIterable, MessageHandler, PubSubEvent } from '../types' import { isArray } from '../utils/isArray' import { collect } from 'streaming-iterables' -import { Connection } from '../types' /** Handler function for 'disconnect' message. */ export const disconnect: MessageHandler = async ({ server, event }) => { - server.log('messages:disconnect', { connectionId: event.requestContext.connectionId }) + const { connectionId } = event.requestContext + server.log('messages:disconnect', { connectionId }) try { await server.onDisconnect?.({ event }) const topicSubscriptions = await collect(server.models.subscription.query({ IndexName: 'ConnectionIndex', ExpressionAttributeNames: { '#a': 'connectionId' }, - ExpressionAttributeValues: { ':1': event.requestContext.connectionId }, + ExpressionAttributeValues: { ':1': connectionId }, KeyConditionExpression: '#a = :1', })) - const completed = {} as Record - const deletions = [] as Promise[] - for (const sub of topicSubscriptions) { - deletions.push( - (async () => { - // only call onComplete per subscription - if (!completed[sub.subscriptionId]) { - completed[sub.subscriptionId] = true - - const execContext = buildExecutionContext( - server.schema, - parse(sub.subscription.query), - undefined, - await buildContext({ server, connectionInitPayload: sub.connectionInitPayload, connectionId: sub.connectionId }), - sub.subscription.variables, - sub.subscription.operationName, - undefined, - ) - - if (isArray(execContext)) { - throw new AggregateError(execContext) - } - + const deletions = topicSubscriptions.map(async (sub) => { + const queryContext = await buildContext({ server, connectionInitPayload: sub.connectionInitPayload, connectionId }) + + const execContext = buildExecutionContext( + server.schema, + parse(sub.subscription.query), + undefined, + queryContext, + sub.subscription.variables, + sub.subscription.operationName, + undefined, + ) - const [field, root, args, context, info] = getResolverAndArgs(server)(execContext) + if (isArray(execContext)) { + throw new AggregateError(execContext) + } - const onComplete = (field?.subscribe as SubscribePseudoIterable)?.onComplete - server.log('messages:disconnect:onComplete', { onComplete: !!onComplete }) - await onComplete?.(root, args, context, info) - } + const [field, root, args, context, info] = getResolverAndArgs(server)(execContext) - await server.models.subscription.delete(sub.id) - })(), - ) - } + const onComplete = (field?.subscribe as SubscribePseudoIterable)?.onComplete + server.log('messages:disconnect:onComplete', { onComplete: !!onComplete }) + await onComplete?.(root, args, context, info) + await server.models.subscription.delete({ id: sub.id }) + }) - await Promise.all([ - // Delete subscriptions - ...deletions, - // Delete connection - server.models.connection.delete(event.requestContext.connectionId), - ]) + // do this first so that we don't create any more subscriptions for this connection + await server.models.connection.delete({ id: connectionId }), + await Promise.all(deletions) } catch (err) { server.log('messages:disconnect:onError', { err, event }) await server.onError?.(err, { event }) diff --git a/lib/messages/pong.ts b/lib/messages/pong.ts index d75a1497..b366f47b 100644 --- a/lib/messages/pong.ts +++ b/lib/messages/pong.ts @@ -7,7 +7,7 @@ export const pong: MessageHandler = async ({ server, event, message }) => { try { await server.onPong?.({ event, message }) - await server.models.connection.update(event.requestContext.connectionId, { + await server.models.connection.update({ id: event.requestContext.connectionId }, { hasPonged: true, }) } catch (err) { diff --git a/lib/messages/subscribe.ts b/lib/messages/subscribe.ts index 237b81fc..45be0a9d 100644 --- a/lib/messages/subscribe.ts +++ b/lib/messages/subscribe.ts @@ -25,9 +25,9 @@ export const subscribe: MessageHandler = const setupSubscription: MessageHandler = async ({ server, event, message }) => { const connectionId = event.requestContext.connectionId - server.log('subscribe %j', { connectionId, query: message.payload.query }) + server.log('subscribe', { connectionId, query: message.payload.query }) - const connection = await server.models.connection.get(connectionId) + const connection = await server.models.connection.get({ id: connectionId }) if (!connection) { throw new Error('missing subscription record') } @@ -79,7 +79,7 @@ const setupSubscription: MessageHandler = async ({ server, eve throw new Error('No field') } - const { topicDefinitions, onSubscribe, onAfterSubscribe } = field.subscribe as SubscribePseudoIterable + const { topic, filter, onSubscribe, onAfterSubscribe } = field.subscribe as SubscribePseudoIterable server.log('onSubscribe', { onSubscribe: !!onSubscribe }) const onSubscribeErrors = await onSubscribe?.(root, args, context, info) @@ -95,27 +95,25 @@ const setupSubscription: MessageHandler = async ({ server, eve }) } - await Promise.all(topicDefinitions.map(async ({ topic, filter }) => { - const filterData = typeof filter === 'function' ? await filter(root, args, context, info) : filter - - const subscription: Subscription = { - id: `${connection.id}|${message.id}`, - topic, - filter: filterData || {}, - subscriptionId: message.id, - subscription: { - variableValues: args, - ...message.payload, - }, - connectionId: connection.id, - connectionInitPayload: connection.payload, - requestContext: event.requestContext, - ttl: connection.ttl, - createdAt: Date.now(), - } - server.log('subscribe:putSubscription %j', subscription) - await server.models.subscription.put(subscription) - })) + const filterData = typeof filter === 'function' ? await filter(root, args, context, info) : filter + + const subscription: Subscription = { + id: `${connection.id}|${message.id}`, + topic, + filter: filterData || {}, + subscriptionId: message.id, + subscription: { + variableValues: args, + ...message.payload, + }, + connectionId: connection.id, + connectionInitPayload: connection.payload, + requestContext: event.requestContext, + ttl: connection.ttl, + createdAt: Date.now(), + } + server.log('subscribe:putSubscription', subscription) + await server.models.subscription.put(subscription) server.log('onAfterSubscribe', { onAfterSubscribe: !!onAfterSubscribe }) await onAfterSubscribe?.(root, args, context, info) diff --git a/lib/pubsub/complete.ts b/lib/pubsub/complete.ts index cd17878a..2237621a 100644 --- a/lib/pubsub/complete.ts +++ b/lib/pubsub/complete.ts @@ -12,7 +12,7 @@ import { getFilteredSubs } from './getFilteredSubs' export const complete = (serverPromise: Promise | ServerClosure): SubscriptionServer['complete'] => async event => { const server = await serverPromise const subscriptions = await getFilteredSubs({ server, event }) - server.log('pubsub:complete %j', { event, subscriptions }) + server.log('pubsub:complete', { event, subscriptions }) const iters = subscriptions.map(async (sub) => { const message: CompleteMessage = { @@ -23,7 +23,7 @@ export const complete = (serverPromise: Promise | ServerClosure): ...sub.requestContext, message, }) - await server.models.subscription.delete(sub.id) + await server.models.subscription.delete({ id: sub.id }) const execContext = buildExecutionContext( server.schema, diff --git a/lib/pubsub/getFilteredSubs-test.ts b/lib/pubsub/getFilteredSubs-test.ts index 603a727a..ca7fd382 100644 --- a/lib/pubsub/getFilteredSubs-test.ts +++ b/lib/pubsub/getFilteredSubs-test.ts @@ -27,7 +27,7 @@ describe('collapseKeys', () => { let count = 1 const makeTopic = () => `topic-${count++}` -describe.only('getFilteredSubs', () => { +describe('getFilteredSubs', () => { before(async () => { await tables.start({ cwd: './mocks/arc-basic-events', quiet: true }) }) diff --git a/lib/pubsub/getFilteredSubs.ts b/lib/pubsub/getFilteredSubs.ts index 3cf69a52..ccf7c0aa 100644 --- a/lib/pubsub/getFilteredSubs.ts +++ b/lib/pubsub/getFilteredSubs.ts @@ -4,7 +4,7 @@ import { ServerClosure, Subscription } from '../types' export const getFilteredSubs = async ({ server, event }: { server: Omit, event: { topic: string, payload?: Record } }): Promise => { if (!event.payload || Object.keys(event.payload).length === 0) { - server.log('getFilteredSubs %j', { event }) + server.log('getFilteredSubs', { event }) const iterator = server.models.subscription.query({ IndexName: 'TopicIndex', @@ -29,7 +29,7 @@ export const getFilteredSubs = async ({ server, event }: { server: Omit | ServerClosure): SubscriptionServer['publish'] => async event => { const server = await serverPromise - server.log('pubsub:publish %j', { event }) + server.log('pubsub:publish', { event }) const subscriptions = await getFilteredSubs({ server, event }) - server.log('pubsub:publish %j', { subscriptions: subscriptions.map(({ connectionId, filter, subscription }) => ({ connectionId, filter, subscription }) ) }) + server.log('pubsub:publish', { subscriptions: subscriptions.map(({ connectionId, filter, subscription }) => ({ connectionId, filter, subscription }) ) }) const iters = subscriptions.map(async (sub) => { const payload = await execute( diff --git a/lib/pubsub/subscribe.ts b/lib/pubsub/subscribe.ts index 27cde402..af01f3d4 100644 --- a/lib/pubsub/subscribe.ts +++ b/lib/pubsub/subscribe.ts @@ -1,5 +1,5 @@ /* eslint-disable @typescript-eslint/no-explicit-any */ -import { PubSubEvent, SubscribeArgs, SubscribeOptions, SubscribePseudoIterable, SubscriptionDefinition } from '../types' +import { PubSubEvent, SubscribeArgs, SubscribeOptions, SubscribePseudoIterable } from '../types' /** * Creates subscribe handler for use in your graphql schema. @@ -19,11 +19,9 @@ export const subscribe = ([{ - topic, - filter, - }]) - + const handler = createHandler() + handler.topic = topic + handler.filter = filter handler.onSubscribe = onSubscribe handler.onComplete = onComplete handler.onAfterSubscribe = onAfterSubscribe @@ -31,11 +29,10 @@ export const subscribe = (topicDefinitions: SubscriptionDefinition[]) => { +const createHandler = () => { // eslint-disable-next-line @typescript-eslint/no-explicit-any,require-yield const handler: any = async function* () { throw new Error('Subscription handler should not have been called') } - handler.topicDefinitions = topicDefinitions return handler as SubscribePseudoIterable } diff --git a/lib/test/mockServer.ts b/lib/test/mockServer.ts index b7778caf..5199a7fb 100644 --- a/lib/test/mockServer.ts +++ b/lib/test/mockServer.ts @@ -1,7 +1,7 @@ /* eslint-disable @typescript-eslint/no-empty-function */ import { makeExecutableSchema } from '@graphql-tools/schema' import { tables as arcTables } from '@architect/functions' -import { makeServerClosure } from '../makeServerClosure' +import { buildServerClosure } from '../buildServerClosure' import { ServerArgs, ServerClosure } from '../types' import { subscribe } from '../pubsub/subscribe' @@ -63,5 +63,5 @@ export const mockServerArgs = async (args: Partial = {}): Promise): Promise => { - return makeServerClosure(await mockServerArgs(args)) + return buildServerClosure(await mockServerArgs(args)) } diff --git a/lib/types.ts b/lib/types.ts index f8f763ce..c2835695 100644 --- a/lib/types.ts +++ b/lib/types.ts @@ -80,8 +80,8 @@ export type MaybePromise = T | Promise export type ServerClosure = { dynamodb: DynamoDB models: { - subscription: DDBClient - connection: DDBClient + subscription: DDBClient + connection: DDBClient } log: LoggerFunction apiGatewayManagementApi?: ApiGatewayManagementApiSubset @@ -118,7 +118,7 @@ export interface SubscriptionServer { /** * Log operational events with a logger of your choice. It will get a message and usually object with relevant data */ -export type LoggerFunction = (message: string, obj?: any) => void +export type LoggerFunction = (message: string, obj: Record) => void export type WebSocketResponse = { statusCode: number @@ -133,19 +133,12 @@ export type SubscriptionFilter< TReturn extends Record = Record > = Partial | void | ((...args: TSubscribeArgs) => MaybePromise | void>) -export type SubscriptionDefinition< - T extends PubSubEvent, - TSubscribeArgs extends SubscribeArgs = SubscribeArgs, - > = { - topic: string - filter?: SubscriptionFilter - } - export type SubscribeHandler = (...args: any[]) => SubscribePseudoIterable export interface SubscribePseudoIterable { (...args: TSubscribeArgs): AsyncGenerator - topicDefinitions: SubscriptionDefinition[] + topic: string + filter?: SubscriptionFilter onSubscribe?: (...args: TSubscribeArgs) => MaybePromise onAfterSubscribe?: (...args: TSubscribeArgs) => MaybePromise onComplete?: (...args: TSubscribeArgs) => MaybePromise @@ -234,8 +227,8 @@ export interface Subscription { * connectionId|subscriptionId */ id: string - createdAt: number topic: string + createdAt: number filter: Record connectionId: string subscriptionId: string diff --git a/lib/utils/logger.ts b/lib/utils/logger.ts index 9a027f4a..9db5dbac 100644 --- a/lib/utils/logger.ts +++ b/lib/utils/logger.ts @@ -3,6 +3,6 @@ import debug from 'debug' const logger = debug('graphql-lambda-subscriptions') // eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types, @typescript-eslint/no-explicit-any -const log = (message: string, obj?: any): void => logger(message, obj) +const log = (message: string, obj: Record): void => logger(`${message} %j`, obj) export { log } diff --git a/lib/utils/postToConnection.ts b/lib/utils/postToConnection.ts index 9ad879e6..5ff35389 100644 --- a/lib/utils/postToConnection.ts +++ b/lib/utils/postToConnection.ts @@ -23,7 +23,7 @@ export const postToConnection = (server: ServerClosure) => stage: string message: GraphqlWSMessages }): Promise => { - server.log('sendMessage %j', { connectionId: ConnectionId, message }) + server.log('sendMessage', { connectionId: ConnectionId, message }) const api = server.apiGatewayManagementApi ?? new ApiGatewayManagementApi({ diff --git a/mocks/arc-basic-events/app.arc b/mocks/arc-basic-events/app.arc index 2903baf7..27147474 100644 --- a/mocks/arc-basic-events/app.arc +++ b/mocks/arc-basic-events/app.arc @@ -9,7 +9,6 @@ Connection ttl TTL Subscription id *String - topic **String ttl TTL @indexes