Skip to content
This repository was archived by the owner on May 17, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 103 additions & 4 deletions lib/messages/subscribe-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,19 @@ import { mockServerContext } from '../test/mockServer'
import { connection_init } from './connection_init'
import { equals } from '@aws/dynamodb-expressions'
import { collect } from 'streaming-iterables'
import { subscribe as pubsubSubscribe } from '../pubsub/subscribe'
import { makeExecutableSchema } from '@graphql-tools/schema'

const connectionId = '7rWmyMbMr'
const ConnectionId = connectionId
const connectionInitEvent: any = { requestContext: { connectedAt: 1628905962601, connectionId, domainName: 'localhost:6001', eventType: 'MESSAGE', messageDirection: 'IN', messageId: 'Pn6evkpk2', requestId: 'gN1MPybyL', requestTimeEpoch: 1628905962602, routeKey: '$default', stage: 'testing' }, isBase64Encoded: false, body: '{"type":"connection_init"}' }

describe('messages/subscribe', () => {
before(async () => {
beforeEach(async () => {
await tables.start({ cwd: './mocks/arc-basic-events', quiet: true })
})

after(async () => {
afterEach(async () => {
tables.end()
})

Expand Down Expand Up @@ -101,7 +103,104 @@ describe('messages/subscribe', () => {
assert.match(error.message, /Cannot query field "HIHOWEAREYOU" on type "Query"/ )
})
describe('callbacks', () => {
it('fires onSubscribe before subscribing')
it('fires onAfterSubscribe after subscribing')
it('fires onSubscribe before subscribing', async () => {

const onSubscribe: string[] = []

const typeDefs = `
type Query {
hello: String
}
type Subscription {
greetings: String
}
`
const resolvers = {
Query: {
hello: () => 'Hello World!',
},
Subscription: {
greetings:{
subscribe: pubsubSubscribe('greetings', {
onSubscribe() {
onSubscribe.push('We did it!')
throw new Error('don\'t subscribe!')
},
}),
resolve: ({payload}) => {
return payload
},
},
},
}

const schema = makeExecutableSchema({
typeDefs,
resolvers,
})
const server = await mockServerContext({
schema,
})
const event: any = { requestContext: { connectedAt: 1628889984369, connectionId, domainName: 'localhost:3339', eventType: 'MESSAGE', messageDirection: 'IN', messageId: 'el4MNdOJy', requestId: '0yd7bkvXz', requestTimeEpoch: 1628889984774, routeKey: '$default', stage: 'testing' }, isBase64Encoded: false, body: '{"id":"1234","type":"subscribe","payload":{"query":"subscription { greetings }"}}' }

await connection_init({ server, event: connectionInitEvent, message: JSON.parse(connectionInitEvent.body) })
try {
await subscribe({ server, event, message: JSON.parse(event.body) })
throw new Error('should not have subscribed')
} catch (error) {
assert.equal(error.message, 'don\'t subscribe!')
}
assert.deepEqual(onSubscribe, ['We did it!'])
const subscriptions = await collect(server.mapper.query(server.model.Subscription, { connectionId: equals(event.requestContext.connectionId) }, { indexName: 'ConnectionIndex' }))
assert.isEmpty(subscriptions)

})
it('fires onAfterSubscribe after subscribing', async () => {
const events: string[] = []

const typeDefs = `
type Query {
hello: String
}
type Subscription {
greetings: String
}
`
const resolvers = {
Query: {
hello: () => 'Hello World!',
},
Subscription: {
greetings:{
subscribe: pubsubSubscribe('greetings', {
onSubscribe() {
events.push('onSubscribe')
},
onAfterSubscribe() {
events.push('onAfterSubscribe')
},
}),
resolve: ({payload}) => {
return payload
},
},
},
}

const schema = makeExecutableSchema({
typeDefs,
resolvers,
})
const server = await mockServerContext({
schema,
})
const event: any = { requestContext: { connectedAt: 1628889984369, connectionId, domainName: 'localhost:3339', eventType: 'MESSAGE', messageDirection: 'IN', messageId: 'el4MNdOJy', requestId: '0yd7bkvXz', requestTimeEpoch: 1628889984774, routeKey: '$default', stage: 'testing' }, isBase64Encoded: false, body: '{"id":"1234","type":"subscribe","payload":{"query":"subscription { greetings }"}}' }

await connection_init({ server, event: connectionInitEvent, message: JSON.parse(connectionInitEvent.body) })
await subscribe({ server, event, message: JSON.parse(event.body) })
assert.deepEqual(events, ['onSubscribe', 'onAfterSubscribe'])
const subscriptions = await collect(server.mapper.query(server.model.Subscription, { connectionId: equals(event.requestContext.connectionId) }, { indexName: 'ConnectionIndex' }))
assert.isNotEmpty(subscriptions)
})
})
})
4 changes: 2 additions & 2 deletions lib/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ export type SubscribePsuedoIterable = {
topicDefinitions: SubscriptionDefinition[]
onSubscribe?: (...args: SubscribeArgs) => void | Promise<void>
onComplete?: (...args: SubscribeArgs) => void | Promise<void>
onAfterSubscribe?: (...args: SubscribeArgs) => PubSubEvent | Promise<PubSubEvent> | undefined | Promise<undefined>
onAfterSubscribe?: (...args: SubscribeArgs) => PubSubEvent | Promise<PubSubEvent> | void | Promise<void>
}

export type SubscribeArgs = [root: any, args: Record<string, any>, context: any, info: GraphQLResolveInfo]
Expand Down Expand Up @@ -121,7 +121,7 @@ export interface SubscribeOptions {
filter?: object | ((...args: SubscribeArgs) => object)
onSubscribe?: (...args: SubscribeArgs) => void | Promise<void>
onComplete?: (...args: SubscribeArgs) => void | Promise<void>
onAfterSubscribe?: (...args: SubscribeArgs) => PubSubEvent | Promise<PubSubEvent> | undefined | Promise<undefined>
onAfterSubscribe?: (...args: SubscribeArgs) => PubSubEvent | Promise<PubSubEvent> | void | Promise<void>
}

export type ApiGatewayHandler<TEvent = any, TResult = any> = (event: TEvent) => void | Promise<TResult>