diff --git a/lib/gateway.ts b/lib/gateway.ts index 49b27aea..1ff55462 100644 --- a/lib/gateway.ts +++ b/lib/gateway.ts @@ -14,6 +14,7 @@ import { pong } from './messages/pong' export const handleGatewayEvent = (server: ServerClosure): ApiGatewayHandler => async (event) => { if (!event.requestContext) { + server.log('handleGatewayEvent unknown') return { statusCode: 200, body: '', @@ -21,6 +22,7 @@ export const handleGatewayEvent = (server: ServerClosure): ApiGatewayHandler { }) after(async () => { - tables.end() + await tables.end() }) it('is type compatible with aws-lambda handler', async () => { diff --git a/lib/makeServerClosure.ts b/lib/makeServerClosure.ts index 4e168335..0ebda6ee 100644 --- a/lib/makeServerClosure.ts +++ b/lib/makeServerClosure.ts @@ -3,9 +3,11 @@ import { ServerArgs, ServerClosure } from './types' import { createModel } from './model/createModel' import { Subscription } from './model/Subscription' import { Connection } from './model/Connection' +import { log } from './utils/logger' export function makeServerClosure(opts: ServerArgs): ServerClosure { return { + log: log, ...opts, model: { Subscription: createModel({ diff --git a/lib/messages/complete.ts b/lib/messages/complete.ts index 82acca74..32fd8d3d 100644 --- a/lib/messages/complete.ts +++ b/lib/messages/complete.ts @@ -3,7 +3,7 @@ import { parse } from 'graphql' import { CompleteMessage } from 'graphql-ws' import { buildExecutionContext } from 'graphql/execution/execute' import { collect } from 'streaming-iterables' -import { SubscribePseudoIterable, MessageHandler } from '../types' +import { SubscribePseudoIterable, MessageHandler, PubSubEvent } from '../types' import { deleteConnection } from '../utils/deleteConnection' import { constructContext } from '../utils/constructContext' import { getResolverAndArgs } from '../utils/getResolverAndArgs' @@ -37,7 +37,7 @@ export const complete: MessageHandler = const [field, root, args, context, info] = getResolverAndArgs(server)(execContext) - const onComplete = (field?.subscribe as SubscribePseudoIterable)?.onComplete + const onComplete = (field?.subscribe as SubscribePseudoIterable)?.onComplete if (onComplete) { await onComplete(root, args, context, info) } diff --git a/lib/messages/disconnect.ts b/lib/messages/disconnect.ts index 6fb0092b..8c078f67 100644 --- a/lib/messages/disconnect.ts +++ b/lib/messages/disconnect.ts @@ -4,7 +4,7 @@ import { equals } from '@aws/dynamodb-expressions' import { buildExecutionContext } from 'graphql/execution/execute' import { constructContext } from '../utils/constructContext' import { getResolverAndArgs } from '../utils/getResolverAndArgs' -import { SubscribePseudoIterable, MessageHandler } from '../types' +import { SubscribePseudoIterable, MessageHandler, PubSubEvent } from '../types' import { isArray } from '../utils/isArray' import { collect } from 'streaming-iterables' import { Connection } from '../model/Connection' @@ -49,7 +49,7 @@ export const disconnect: MessageHandler = const [field, root, args, context, info] = getResolverAndArgs(server)(execContext) - const onComplete = (field?.subscribe as SubscribePseudoIterable)?.onComplete + const onComplete = (field?.subscribe as SubscribePseudoIterable)?.onComplete if (onComplete) { await onComplete(root, args, context, info) } diff --git a/lib/messages/subscribe-test.ts b/lib/messages/subscribe-test.ts index 547418e7..ba963531 100644 --- a/lib/messages/subscribe-test.ts +++ b/lib/messages/subscribe-test.ts @@ -19,7 +19,7 @@ describe('messages/subscribe', () => { }) afterEach(async () => { - tables.end() + await tables.end() }) it('executes a query/mutation', async () => { diff --git a/lib/messages/subscribe.ts b/lib/messages/subscribe.ts index aa907b11..725bf6dd 100644 --- a/lib/messages/subscribe.ts +++ b/lib/messages/subscribe.ts @@ -6,7 +6,7 @@ import { assertValidExecutionArguments, execute, } from 'graphql/execution/execute' -import { APIGatewayWebSocketEvent, ServerClosure, SubscribeHandler, MessageHandler } from '../types' +import { APIGatewayWebSocketEvent, ServerClosure, MessageHandler, SubscribePseudoIterable, PubSubEvent } from '../types' import { constructContext } from '../utils/constructContext' import { getResolverAndArgs } from '../utils/getResolverAndArgs' import { sendMessage } from '../utils/sendMessage' @@ -25,9 +25,11 @@ export const subscribe: MessageHandler = } const setupSubscription: MessageHandler = async ({ server, event, message }) => { + const connectionId = event.requestContext.connectionId + const connection = await server.mapper.get( Object.assign(new server.model.Connection(), { - id: event.requestContext.connectionId, + id: connectionId, }), ) const connectionParams = connection.payload || {} @@ -39,7 +41,7 @@ const setupSubscription: MessageHandler = async ({ server, eve throw new AggregateError(errors) } - const contextValue = await constructContext({ server, connectionParams, connectionId: connection.id }) + const contextValue = await constructContext({ server, connectionParams, connectionId }) const execContext = buildExecutionContext( server.schema, @@ -74,33 +76,33 @@ const setupSubscription: MessageHandler = async ({ server, eve throw new Error('No field') } - const { topicDefinitions, onSubscribe, onAfterSubscribe } = await (field.subscribe as SubscribeHandler)( - root, - args, - context, - info, - ) + const { topicDefinitions, onSubscribe, onAfterSubscribe } = field.subscribe as SubscribePseudoIterable + server.log('onSubscribe', { onSubscribe: !!onSubscribe }) await onSubscribe?.(root, args, context, info) await Promise.all(topicDefinitions.map(async ({ topic, filter }) => { + const filterData = typeof filter === 'function' ? await filter(root, args, context, info) : filter + const subscription = Object.assign(new server.model.Subscription(), { - id: `${event.requestContext.connectionId}|${message.id}`, + id: `${connectionId}|${message.id}`, topic, - filter: filter || {}, + filter: filterData || {}, subscriptionId: message.id, subscription: { variableValues: args, ...message.payload, }, - connectionId: event.requestContext.connectionId, + connectionId, connectionParams, requestContext: event.requestContext, ttl: connection.ttl, }) + server.log('subscribe:putSubscription %j', subscription) await server.mapper.put(subscription) })) + server.log('onAfterSubscribe', { onAfterSubscribe: !!onAfterSubscribe }) await onAfterSubscribe?.(root, args, context, info) } @@ -125,6 +127,8 @@ const validateMessage = (server: ServerClosure) => (message: SubscribeMessage) = // eslint-disable-next-line @typescript-eslint/no-explicit-any async function executeQuery(server: ServerClosure, message: SubscribeMessage, contextValue: any, event: APIGatewayWebSocketEvent) { + server.log('executeQuery', { connectionId: event.requestContext.connectionId }) + const result = await execute( server.schema, parse(message.payload.query), diff --git a/lib/model/createModel.ts b/lib/model/createModel.ts index 606ebb97..008e9ea7 100644 --- a/lib/model/createModel.ts +++ b/lib/model/createModel.ts @@ -1,7 +1,7 @@ import { DynamoDbTable } from '@aws/dynamodb-data-mapper' -import { Class } from '../types' -export const createModel = ({ +// eslint-disable-next-line @typescript-eslint/no-explicit-any +export const createModel = ({ model, table, }: { diff --git a/lib/pubsub/complete-test.ts b/lib/pubsub/complete-test.ts new file mode 100644 index 00000000..1f3b5625 --- /dev/null +++ b/lib/pubsub/complete-test.ts @@ -0,0 +1,18 @@ +import { tables } from '@architect/sandbox' +import { mockServerContext } from '../test/mockServer' +import { complete } from './complete' + +describe('pubsub:complete', () => { + before(async () => { + await tables.start({ cwd: './mocks/arc-basic-events', quiet: true }) + }) + + after(async () => { + await tables.end() + }) + + it('takes a topic', async () => { + const server = await mockServerContext() + await complete(server)({ topic: 'Topic12' }) + }) +}) diff --git a/lib/pubsub/complete.ts b/lib/pubsub/complete.ts index d06d9cbc..91bd99fb 100644 --- a/lib/pubsub/complete.ts +++ b/lib/pubsub/complete.ts @@ -2,15 +2,17 @@ import AggregateError from 'aggregate-error' import { parse } from 'graphql' import { CompleteMessage, MessageType } from 'graphql-ws' import { buildExecutionContext } from 'graphql/execution/execute' -import { ServerClosure, PubSubEvent, SubscribePseudoIterable } from '../types' +import { ServerClosure, PubSubEvent, SubscribePseudoIterable, PartialBy } from '../types' import { sendMessage } from '../utils/sendMessage' import { constructContext } from '../utils/constructContext' import { getResolverAndArgs } from '../utils/getResolverAndArgs' import { isArray } from '../utils/isArray' import { getFilteredSubs } from './getFilteredSubs' -export const complete = (server: ServerClosure) => async (event: PubSubEvent): Promise => { +export const complete = (server: ServerClosure) => async (event: PartialBy): Promise => { const subscriptions = await getFilteredSubs({ server, event }) + server.log('pubsub:complete %j', { event, subscriptions }) + const iters = subscriptions.map(async (sub) => { const message: CompleteMessage = { id: sub.subscriptionId, @@ -38,7 +40,7 @@ export const complete = (server: ServerClosure) => async (event: PubSubEvent): P const [field, root, args, context, info] = getResolverAndArgs(server)(execContext) - const onComplete = (field?.subscribe as SubscribePseudoIterable)?.onComplete + const onComplete = (field?.subscribe as SubscribePseudoIterable)?.onComplete if (onComplete) { await onComplete(root, args, context, info) } diff --git a/lib/pubsub/getFilteredSubs-test.ts b/lib/pubsub/getFilteredSubs-test.ts new file mode 100644 index 00000000..d8e1a07b --- /dev/null +++ b/lib/pubsub/getFilteredSubs-test.ts @@ -0,0 +1,20 @@ +import { assert } from 'chai' +import { collapseKeys } from './getFilteredSubs' + +describe('collapseKeys', () => { + it('makes the deep objects into dots', () => { + assert.deepEqual(collapseKeys({}), {}) + assert.deepEqual(collapseKeys({ a: 4, b: { c: 5, d: 'hi', e: { f: false } } }), { + a: 4, + 'b.c': 5, + 'b.d': 'hi', + 'b.e.f': false, + }) + assert.deepEqual(collapseKeys({ a: [1,2,3, { b: 4, c: [], d: null, e: undefined }] }), { + 'a.0': 1, + 'a.1': 2, + 'a.2': 3, + 'a.3.b': 4, + }) + }) +}) diff --git a/lib/pubsub/getFilteredSubs.ts b/lib/pubsub/getFilteredSubs.ts index 23a18417..26a1f784 100644 --- a/lib/pubsub/getFilteredSubs.ts +++ b/lib/pubsub/getFilteredSubs.ts @@ -6,35 +6,44 @@ import { } from '@aws/dynamodb-expressions' import { collect } from 'streaming-iterables' import { Subscription } from '../model/Subscription' -import { ServerClosure, PubSubEvent } from '../types' +import { ServerClosure, PubSubEvent, PartialBy } from '../types' + +export const getFilteredSubs = async ({ server, event }: { server: Omit, event: PartialBy }): Promise => { + if (!event.payload || Object.keys(event.payload).length === 0) { + server.log('getFilteredSubs %j', { event }) + + const iterator = server.mapper.query( + server.model.Subscription, + { topic: equals(event.topic) }, + { indexName: 'TopicIndex' }, + ) + + return await collect(iterator) + } + const flattenPayload = collapseKeys(event.payload) + const conditions: ConditionExpression[] = Object.entries(flattenPayload).map(([key, value]) => ({ + type: 'Or', + conditions: [ + { + ...attributeNotExists(), + subject: `filter.${key}`, + }, + { + ...equals(value), + subject: `filter.${key}`, + }, + ], + })) + + server.log('getFilteredSubs %j', { event, conditions }) -export const getFilteredSubs = async ({ server, event }: { server: Omit, event: PubSubEvent }): Promise => { - const flattenPayload = flatten(event.payload) const iterator = server.mapper.query( server.model.Subscription, { topic: equals(event.topic) }, { filter: { type: 'And', - conditions: Object.entries(flattenPayload).reduce( - (p, [key, value]) => [ - ...p, - { - type: 'Or', - conditions: [ - { - ...attributeNotExists(), - subject: `filter.${key}`, - }, - { - ...equals(value), - subject: `filter.${key}`, - }, - ], - }, - ], - [] as ConditionExpression[], - ), + conditions, }, indexName: 'TopicIndex', }, @@ -43,33 +52,27 @@ export const getFilteredSubs = async ({ server, event }: { server: Omit => { - if (obj === undefined || obj === null) { - return {} - } - return Object.entries(obj).reduce((p, [k1, v1]) => { + const record = {} + for (const [k1, v1] of Object.entries(obj)) { + if (typeof v1 === 'string' || typeof v1 === 'number' || typeof v1 === 'boolean') { + record[k1] = v1 + continue + } + if (v1 && typeof v1 === 'object') { - const next = Object.entries(v1).reduce( - (prev, [k2, v2]) => ({ - ...prev, - [`${k1}.${k2}`]: v2, - }), - {}, - ) - return { - ...p, - ...flatten(next), + const next = {} + + for (const [k2, v2] of Object.entries(v1)) { + next[`${k1}.${k2}`] = v2 } - } - if (typeof v1 === 'string' || - typeof v1 === 'number' || - typeof v1 === 'boolean') { - return { ...p, [k1]: v1 } + for (const [k1, v1] of Object.entries(collapseKeys(next))) { + record[k1] = v1 + } } - - return p - }, {}) + } + return record } diff --git a/lib/pubsub/publish.ts b/lib/pubsub/publish.ts index 8828d1cb..0b578b76 100644 --- a/lib/pubsub/publish.ts +++ b/lib/pubsub/publish.ts @@ -5,8 +5,11 @@ import { sendMessage } from '../utils/sendMessage' import { constructContext } from '../utils/constructContext' import { getFilteredSubs } from './getFilteredSubs' -export const publish = (server: ServerClosure) => async (event: PubSubEvent): Promise => { +export const publish = (server: ServerClosure) => async (event: T): Promise => { + server.log('pubsub:publish %j', { event }) const subscriptions = await getFilteredSubs({ server, event }) + server.log('pubsub:publish %j', { subscriptions: subscriptions.map(({ connectionId, filter, subscription }) => ({ connectionId, filter, subscription }) ) }) + const iters = subscriptions.map(async (sub) => { const payload = await execute( server.schema, diff --git a/lib/pubsub/subscribe-test.ts b/lib/pubsub/subscribe-test.ts new file mode 100644 index 00000000..4d2a0a5d --- /dev/null +++ b/lib/pubsub/subscribe-test.ts @@ -0,0 +1,40 @@ +/* eslint-disable @typescript-eslint/no-explicit-any */ +import { assert } from 'chai' +import { subscribe } from './subscribe' + +describe('pubsub:subscribe', () => { + it('is type compatible with it\'s callbacks', async () => { + type GreetingEvent = { + topic: 'greetings' + payload: { message: 'hi!' } + } + + type SubTopicType = (root: null, args: { id: string}, context: { db: boolean }, info: any) => AsyncGenerator + + const foobar: SubTopicType = async function*() { + yield { + topic: 'greetings', + payload: { message: 'hi!' }, + } + } + + assert.ok(foobar) + + const subTopic: SubTopicType = subscribe('greetings', { + onSubscribe(root, args, context) { + if (root === null) { + throw new Error('impossible') + } + args.id + context.db + }, + filter(root, args) { + args.id + return { + message: 'hi!', + } + }, + }) + assert.ok(subTopic) + }) +}) diff --git a/lib/pubsub/subscribe.ts b/lib/pubsub/subscribe.ts index 927f412d..593dc406 100644 --- a/lib/pubsub/subscribe.ts +++ b/lib/pubsub/subscribe.ts @@ -1,36 +1,33 @@ -import { SubscribeArgs, SubscribeHandler, SubscribeOptions, SubscribePseudoIterable, SubscriptionDefinition } from '../types' +/* eslint-disable @typescript-eslint/no-explicit-any */ +import { PubSubEvent, SubscribeArgs, SubscribeOptions, SubscribePseudoIterable, SubscriptionDefinition } from '../types' /** Creates subscribe handler */ -export const subscribe = ( - topic: string, +export const subscribe = = any, TContext extends any = any>( + topic: T['topic'], { filter, onSubscribe, onComplete, onAfterSubscribe, - }: SubscribeOptions = {}): (...args: SubscribeArgs) => SubscribePseudoIterable => { - return (...args: SubscribeArgs) => { - const handler = createHandler([{ - topic, - filter: typeof filter === 'function' ? filter(...args) : filter, - }]) + }: SubscribeOptions> = {}, +): SubscribePseudoIterable> => { + const handler = createHandler([{ + topic, + filter, + }]) - handler.onSubscribe = onSubscribe - handler.onComplete = onComplete - handler.onAfterSubscribe = onAfterSubscribe + handler.onSubscribe = onSubscribe + handler.onComplete = onComplete + handler.onAfterSubscribe = onAfterSubscribe - return handler - } + return handler } -/** Merge multiple subscribe handlers */ -export const concat = (...handlers: SubscribeHandler[]) => (...args: SubscribeArgs): SubscribePseudoIterable => createHandler(handlers.map((h) => h(...args).topicDefinitions).flat()) - -const createHandler = (topicDefinitions: SubscriptionDefinition[]) => { +const createHandler = (topicDefinitions: SubscriptionDefinition[]) => { // eslint-disable-next-line @typescript-eslint/no-explicit-any,require-yield const handler: any = async function* () { throw new Error('Subscription handler should not have been called') } handler.topicDefinitions = topicDefinitions - return handler as SubscribePseudoIterable + return handler as SubscribePseudoIterable } diff --git a/lib/test/execute-helper.ts b/lib/test/execute-helper.ts new file mode 100644 index 00000000..bd2baf24 --- /dev/null +++ b/lib/test/execute-helper.ts @@ -0,0 +1,54 @@ +import { createClient } from 'graphql-ws' +import WebSocket from 'ws' +import { deferGenerator } from 'inside-out-async' + +const url = `ws://localhost:${process.env.PORT}` + +export const executeQuery = async (query: string): Promise => { + const client = createClient({ + url, + webSocketImpl: WebSocket, + }) + + return new Promise((resolve, reject) => { + let result + client.subscribe( + { query }, + { + next: ({ data }) => (result = data), + error: reject, + complete: () => resolve(result), + }, + ) + }) +} + +type SubscriptionResult = Promise<{ + values: AsyncGenerator + unsubscribe: () => void +}> + +export const executeSubscription = async (query: string): SubscriptionResult => { + const client = createClient({ + url, + webSocketImpl: WebSocket, + }) + + const values = deferGenerator() + + const unsubscribe = client.subscribe( + { query }, + { + next: ({ data }) => { + // console.log({ data }) + values.queueValue(data) + }, + error: (error: Error) => { + values.queueError(error) + }, + complete: () => values.queueReturn(), + }, + ) + + return { values: values.generator, unsubscribe } +} diff --git a/lib/test/integration-basic-events-test.ts b/lib/test/integration-basic-events-test.ts deleted file mode 100644 index ae164639..00000000 --- a/lib/test/integration-basic-events-test.ts +++ /dev/null @@ -1,72 +0,0 @@ -import { assert } from 'chai' -import { start as sandBoxStart, end as sandBoxStop } from '@architect/sandbox' -import { createClient } from 'graphql-ws' -import WebSocket from 'ws' -import { deferGenerator } from 'inside-out-async' -import { collect, map } from 'streaming-iterables' - -const url = `ws://localhost:${process.env.PORT}` - -const executeQuery = async (query: string) => { - const client = createClient({ - url, - webSocketImpl: WebSocket, - }) - - return new Promise((resolve, reject) => { - let result - client.subscribe( - { query }, - { - next: ({ data }) => (result = data), - error: reject, - complete: () => resolve(result), - }, - ) - }) -} - -const executeSubscription = async (query: string) => { - const client = createClient({ - url, - webSocketImpl: WebSocket, - }) - - const values = deferGenerator() - - const unsubscribe = client.subscribe( - { query }, - { - next: ({ data }) => { - values.queueValue(data) - }, - error: (error: Error) => { - values.queueError(error) - }, - complete: () => values.queueReturn(), - }, - ) - - return { values: values.generator, unsubscribe } -} - -describe('Basic Events', () => { - before(async () => { - await sandBoxStart({ cwd: './mocks/arc-basic-events', quiet: true }) - }) - - after(async () => { - await new Promise(resolve => setTimeout(resolve, 100)) // pending ddb writes need to finish - await sandBoxStop() - }) - it('queries', async () => { - const result = await executeQuery('{ hello }') - assert.deepEqual(result, { hello: 'Hello World!' }) - }) - - it('subscribes', async () => { - const { values } = await executeSubscription('subscription { greetings }') - const greetings = await collect(map((value: { greetings: string }) => value.greetings, values)) - assert.deepEqual(greetings, ['yoyo', 'hows it', 'howdy']) - }) -}) diff --git a/lib/test/integration-events-test.ts b/lib/test/integration-events-test.ts new file mode 100644 index 00000000..206a6d70 --- /dev/null +++ b/lib/test/integration-events-test.ts @@ -0,0 +1,35 @@ +import { assert } from 'chai' +import { start as sandBoxStart, end as sandBoxStop } from '@architect/sandbox' +import { collect, map } from 'streaming-iterables' +import { executeQuery, executeSubscription } from './execute-helper' + +describe('Events', () => { + before(async () => { + await sandBoxStart({ cwd: './mocks/arc-basic-events', quiet: true }) + }) + + after(async () => { + await new Promise(resolve => setTimeout(resolve, 100)) // pending ddb writes need to finish + await sandBoxStop() + }) + describe('Basic Events', () => { + it('queries', async () => { + const result = await executeQuery('{ hello }') + assert.deepEqual(result, { hello: 'Hello World!' }) + }) + + it('subscribes', async () => { + const { values } = await executeSubscription('subscription { greetings }') + const greetings = await collect(map((value: { greetings: string }) => value.greetings, values)) + assert.deepEqual(greetings, ['yoyo', 'hows it', 'howdy']) + }) + }) + + describe('Filter Events', () => { + it('subscribes', async () => { + const { values } = await executeSubscription('subscription { filterTest }') + const greetings = await collect(map((value: { filterTest: string }) => value.filterTest, values)) + assert.deepEqual(greetings, ['oh yes!', 'Missing fields also work']) + }) + }) +}) diff --git a/lib/types.ts b/lib/types.ts index d30ce0b8..b67bcb94 100644 --- a/lib/types.ts +++ b/lib/types.ts @@ -35,6 +35,7 @@ export type ServerArgs = { message: PongMessage }) => MaybePromise onError?: (error: any, context: any) => MaybePromise + log?: LoggerFunction } export type MaybePromise = T | Promise @@ -45,6 +46,7 @@ export type ServerClosure = { Subscription: typeof Subscription Connection: typeof Connection } + log: LoggerFunction } & Omit export interface ServerInstance { @@ -59,30 +61,46 @@ export type TableNames = { subscriptions: string } +export type LoggerFunction = (input: string, obj?: any) => void + export type WebsocketResponse = { statusCode: number headers?: Record body: string } -export type SubscriptionDefinition = { +export type SubscribeArgs, TContext = any> = [root: TRoot, args: TArgs, context: TContext, info: GraphQLResolveInfo] + +export type SubscriptionFilter< + TSubscribeArgs extends SubscribeArgs = SubscribeArgs, + TReturn extends Record = Record +> = Partial | void | ((...args: TSubscribeArgs) => MaybePromise> | MaybePromise>) + +export type SubscriptionDefinition< +T extends PubSubEvent, +TSubscribeArgs extends SubscribeArgs = SubscribeArgs, +> = { topic: string - filter?: object | (() => void) + filter?: SubscriptionFilter } -export type SubscribeHandler = (...args: any[]) => SubscribePseudoIterable +export type SubscribeHandler = (...args: any[]) => SubscribePseudoIterable -export type SubscribePseudoIterable = { - (...args: SubscribeArgs): AsyncGenerator - topicDefinitions: SubscriptionDefinition[] - onSubscribe?: (...args: SubscribeArgs) => MaybePromise - onComplete?: (...args: SubscribeArgs) => MaybePromise - onAfterSubscribe?: (...args: SubscribeArgs) => MaybePromise +export type SubscribePseudoIterable = { + (...args: TSubscribeArgs): AsyncGenerator + topicDefinitions: SubscriptionDefinition[] + onSubscribe?: (...args: TSubscribeArgs) => MaybePromise + onComplete?: (...args: TSubscribeArgs) => MaybePromise + onAfterSubscribe?: (...args: TSubscribeArgs) => MaybePromise } -export type SubscribeArgs = [root: any, args: Record, context: any, info: GraphQLResolveInfo] -export type Class = { new(...args: any[]): any } +export interface SubscribeOptions { + filter?: SubscriptionFilter + onSubscribe?: (...args: TSubscribeArgs) => MaybePromise + onComplete?: (...args: TSubscribeArgs) => MaybePromise + onAfterSubscribe?: (...args: TSubscribeArgs) => MaybePromise +} export type StateFunctionInput = { connectionId: string @@ -103,7 +121,7 @@ export interface APIGatewayWebSocketEvent extends APIGatewayProxyEvent { export type PubSubEvent = { topic: string - payload: any + payload: Record } export type MessageHandler = (arg: { server: ServerClosure, event: APIGatewayWebSocketEvent, message: T }) => Promise @@ -116,11 +134,6 @@ export interface ApiGatewayManagementApiSubset { deleteConnection(input: { ConnectionId: string }): { promise: () => Promise } } -export interface SubscribeOptions { - filter?: object | ((...args: SubscribeArgs) => object) - onSubscribe?: (...args: SubscribeArgs) => MaybePromise - onComplete?: (...args: SubscribeArgs) => MaybePromise - onAfterSubscribe?: (...args: SubscribeArgs) => MaybePromise -} - export type ApiGatewayHandler = (event: TEvent) => Promise + +export type PartialBy = Omit & Partial> diff --git a/lib/utils/deleteConnection.ts b/lib/utils/deleteConnection.ts index ec80a5bc..9016b5ca 100644 --- a/lib/utils/deleteConnection.ts +++ b/lib/utils/deleteConnection.ts @@ -1,7 +1,7 @@ import { ApiGatewayManagementApi } from 'aws-sdk' import { ServerClosure } from '../types' -export const deleteConnection = (c: ServerClosure) => +export const deleteConnection = (server: ServerClosure) => async ({ connectionId: ConnectionId, domainName, @@ -11,7 +11,8 @@ export const deleteConnection = (c: ServerClosure) => domainName: string stage: string }): Promise => { - const api = c.apiGatewayManagementApi ?? + server.log('deleteConnection', { connectionId: ConnectionId }) + const api = server.apiGatewayManagementApi ?? new ApiGatewayManagementApi({ apiVersion: 'latest', endpoint: `${domainName}/${stage}`, diff --git a/lib/utils/logger.ts b/lib/utils/logger.ts new file mode 100644 index 00000000..c9148350 --- /dev/null +++ b/lib/utils/logger.ts @@ -0,0 +1,8 @@ +import debug from 'debug' + +const logger = debug('graphql-lambda-subscriptions') + +// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types, @typescript-eslint/no-explicit-any +const log = (input: string, obj?: any): void => logger(input, obj) + +export { log } diff --git a/lib/utils/sendMessage.ts b/lib/utils/sendMessage.ts index 53540fa0..6c501852 100644 --- a/lib/utils/sendMessage.ts +++ b/lib/utils/sendMessage.ts @@ -11,7 +11,7 @@ import { ServerClosure } from '../types' type GraphqlWSMessages = ConnectionAckMessage | NextMessage | CompleteMessage | ErrorMessage | PingMessage | PongMessage -export const sendMessage = (c: ServerClosure) => +export const sendMessage = (server: ServerClosure) => async ({ connectionId: ConnectionId, domainName, @@ -23,7 +23,9 @@ export const sendMessage = (c: ServerClosure) => stage: string message: GraphqlWSMessages }): Promise => { - const api = c.apiGatewayManagementApi ?? + server.log('sendMessage %j', { connectionId: ConnectionId, message }) + + const api = server.apiGatewayManagementApi ?? new ApiGatewayManagementApi({ apiVersion: 'latest', endpoint: `${domainName}/${stage}`, diff --git a/mocks/arc-basic-events/lib/graphql.js b/mocks/arc-basic-events/lib/graphql.js index c0583958..a7050d3f 100644 --- a/mocks/arc-basic-events/lib/graphql.js +++ b/mocks/arc-basic-events/lib/graphql.js @@ -29,6 +29,7 @@ const typeDefs = ` } type Subscription { greetings: String + filterTest: String } ` @@ -40,14 +41,32 @@ const resolvers = { greetings:{ subscribe: subscribe('greetings', { async onAfterSubscribe(_, __, { publish, complete }) { - await publish({ topic: 'greetings', payload: 'yoyo' }) - await publish({ topic: 'greetings', payload: 'hows it' }) - await publish({ topic: 'greetings', payload: 'howdy' }) - await complete({ topic: 'greetings', payload: 'wtf' }) + await publish({ topic: 'greetings', payload: { message: 'yoyo' } }) + await publish({ topic: 'greetings', payload: { message: 'hows it' } }) + await publish({ topic: 'greetings', payload: { message: 'howdy' } }) + await complete({ topic: 'greetings', payload: { message: 'wtf' } }) }, }), resolve: ({ payload }) => { - return payload + return payload.message + }, + }, + filterTest:{ + subscribe: subscribe('filterTest', { + async filter() { + return { + error: false, + } + }, + async onAfterSubscribe(_, __, { publish, complete }) { + await publish({ topic: 'filterTest', payload: { error: true, message: 'oh no!' } }) + await publish({ topic: 'filterTest', payload: { error: false, message: 'oh yes!' } }) + await publish({ topic: 'filterTest', payload: { message: 'Missing fields also work' } }) + await complete({ topic: 'filterTest' }) + }, + }), + resolve: ({ payload }) => { + return payload.message }, }, }, diff --git a/package-lock.json b/package-lock.json index 521792b7..fb602827 100644 --- a/package-lock.json +++ b/package-lock.json @@ -955,6 +955,15 @@ "integrity": "sha512-yd+9qKmJxm496BOV9CMNaey8TWsikaZOwMRwPHQIjcOJM9oV+fi9ZMNw3JsVnbEEbo2gRTDnGEBv8pjyn67hNg==", "dev": true }, + "@types/debug": { + "version": "4.1.7", + "resolved": "https://registry.npmjs.org/@types/debug/-/debug-4.1.7.tgz", + "integrity": "sha512-9AonUzyTjXXhEOa0DnqpzZi6VHlqKMswga9EXjpXnnqxwLtdvPPtlO8evrI5D9S6asFRCQ6v+wpiUKbw+vKqyg==", + "dev": true, + "requires": { + "@types/ms": "*" + } + }, "@types/json-schema": { "version": "7.0.9", "resolved": "https://registry.npmjs.org/@types/json-schema/-/json-schema-7.0.9.tgz", @@ -973,6 +982,12 @@ "integrity": "sha512-scN0hAWyLVAvLR9AyW7HoFF5sJZglyBsbPuHO4fv7JRvfmPBMfp1ozWqOf/e4wwPNxezBZXRfWzMb6iFLgEVRA==", "dev": true }, + "@types/ms": { + "version": "0.7.31", + "resolved": "https://registry.npmjs.org/@types/ms/-/ms-0.7.31.tgz", + "integrity": "sha512-iiUgKzV9AuaEkZqkOLDIvlQiL6ltuZd9tGcW3gwpnX8JbuiuhFlEGmmFXEXkN50Cvq7Os88IY2v0dkDqXYWVgA==", + "dev": true + }, "@types/node": { "version": "16.6.1", "resolved": "https://registry.npmjs.org/@types/node/-/node-16.6.1.tgz", @@ -2124,10 +2139,9 @@ "dev": true }, "debug": { - "version": "4.3.1", - "resolved": "https://registry.npmjs.org/debug/-/debug-4.3.1.tgz", - "integrity": "sha512-doEwdvm4PCeK4K3RQN2ZC2BYUBaxwLARCqZmMjtF8a51J2Rb0xpVloFRnCODwqjpwnAoao4pelN8l3RJdv3gRQ==", - "dev": true, + "version": "4.3.2", + "resolved": "https://registry.npmjs.org/debug/-/debug-4.3.2.tgz", + "integrity": "sha512-mOp8wKcvj7XxC78zLgw/ZA+6TSgkoE2C/ienthhRD298T7UNwAg9diBpLRxC0mOezLl4B0xV7M0cCO6P/O0Xhw==", "requires": { "ms": "2.1.2" }, @@ -2135,8 +2149,7 @@ "ms": { "version": "2.1.2", "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.2.tgz", - "integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==", - "dev": true + "integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==" } } }, @@ -4253,6 +4266,25 @@ "yargs": "16.2.0", "yargs-parser": "20.2.4", "yargs-unparser": "2.0.0" + }, + "dependencies": { + "debug": { + "version": "4.3.1", + "resolved": "https://registry.npmjs.org/debug/-/debug-4.3.1.tgz", + "integrity": "sha512-doEwdvm4PCeK4K3RQN2ZC2BYUBaxwLARCqZmMjtF8a51J2Rb0xpVloFRnCODwqjpwnAoao4pelN8l3RJdv3gRQ==", + "dev": true, + "requires": { + "ms": "2.1.2" + }, + "dependencies": { + "ms": { + "version": "2.1.2", + "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.2.tgz", + "integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==", + "dev": true + } + } + } } }, "modify-values": { diff --git a/package.json b/package.json index 090302d4..67442f3e 100644 --- a/package.json +++ b/package.json @@ -37,6 +37,7 @@ "@aws/dynamodb-data-mapper-annotations": "^0.7.3", "@aws/dynamodb-expressions": "^0.7.3", "aggregate-error": "^4.0.0", + "debug": "^4.3.2", "streaming-iterables": "^6.0.0" }, "peerDependencies": { @@ -51,21 +52,22 @@ "@types/architect__sandbox": "^3.3.1", "@types/aws-lambda": "^8.10.81", "@types/chai": "^4.2.19", + "@types/debug": "^4.1.7", "@types/mocha": "^9.0.0", "@types/node": "^16.6.0", "@typescript-eslint/eslint-plugin": "^4.27.0", "@typescript-eslint/parser": "^4.27.0", "aws-sdk": ">= 2.844.0", "chai": "^4.3.4", - "esbuild-register": "^3.0.0", "esbuild": "^0.12.20", + "esbuild-register": "^3.0.0", "eslint": "^7.29.0", - "graphql-ws": "^5.3.0", "graphql": ">= 14.0.0", + "graphql-ws": "^5.3.0", "inside-out-async": "^1.0.0", "mocha": "^9.0.1", - "rollup-plugin-node-resolve": "^5.2.0", "rollup": "^2.56.0", + "rollup-plugin-node-resolve": "^5.2.0", "semantic-release": "^17.4.4", "ts-node": "^10.2.0", "tslib": "^2.3.0", diff --git a/rollup.config.js b/rollup.config.js index deb8308b..2cbeccfb 100644 --- a/rollup.config.js +++ b/rollup.config.js @@ -18,6 +18,8 @@ export default { '@aws/dynamodb-data-mapper', '@aws/dynamodb-data-mapper-annotations', '@aws/dynamodb-expressions', + 'debug', + 'streaming-iterables', // dep from ts while we're using these decorator stuff, want to include it // 'tslib', // we only use a string and types from this package so lets import it