diff --git a/docs/api/options.md b/docs/api/options.md index 980b25d4..7a2e93a0 100644 --- a/docs/api/options.md +++ b/docs/api/options.md @@ -59,6 +59,7 @@ - `subscription.context`: `Function` Result of function is passed to subscription resolvers as a custom GraphQL context. The function receives the `connection` and `request` as parameters. - `subscription.onConnect`: `Function` A function which can be used to validate the `connection_init` payload. If defined it should return a truthy value to authorize the connection. If it returns an object the subscription context will be extended with the returned object. - `subscription.onDisconnect`: `Function` A function which is called with the subscription context of the connection after the connection gets disconnected. + - `subscription.keepAlive`: `Integer` Optional interval in ms to send the `GQL_CONNECTION_KEEP_ALIVE` message. - `federationMetadata`: Boolean. Enable federation metadata support so the service can be deployed behind an Apollo Gateway - `gateway`: Object. Run the GraphQL server in gateway mode. diff --git a/index.d.ts b/index.d.ts index 64629d2f..4bb55b68 100644 --- a/index.d.ts +++ b/index.d.ts @@ -516,6 +516,7 @@ export interface MercuriusCommonOptions { payload: any; }) => Record | Promise>; onDisconnect?: (context: MercuriusContext) => void | Promise; + keepAlive?: number, }; /** * Enable federation metadata support so the service can be deployed behind an Apollo Gateway diff --git a/index.js b/index.js index e979b3f0..0e4d11f7 100644 --- a/index.js +++ b/index.js @@ -122,6 +122,7 @@ const plugin = fp(async function (app, opts) { let subscriptionContextFn let onConnect let onDisconnect + let keepAlive if (typeof subscriptionOpts === 'object') { if (subscriptionOpts.pubsub) { @@ -134,6 +135,7 @@ const plugin = fp(async function (app, opts) { subscriptionContextFn = subscriptionOpts.context onConnect = subscriptionOpts.onConnect onDisconnect = subscriptionOpts.onDisconnect + keepAlive = subscriptionOpts.keepAlive } else if (subscriptionOpts === true) { emitter = mq() subscriber = new PubSub(emitter) @@ -246,7 +248,8 @@ const plugin = fp(async function (app, opts) { onDisconnect, lruGatewayResolvers, entityResolversFactory, - subscriptionContextFn + subscriptionContextFn, + keepAlive }) } diff --git a/lib/routes.js b/lib/routes.js index 88461306..429d077c 100644 --- a/lib/routes.js +++ b/lib/routes.js @@ -182,7 +182,8 @@ module.exports = async function (app, opts) { lruGatewayResolvers, entityResolversFactory, persistedQueryProvider, - allowBatchedQueries + allowBatchedQueries, + keepAlive } = opts // Load the persisted query settings @@ -306,7 +307,8 @@ module.exports = async function (app, opts) { onDisconnect, lruGatewayResolvers, entityResolversFactory, - subscriptionContextFn + subscriptionContextFn, + keepAlive }) } else { app.route(getOptions) diff --git a/lib/subscription-connection.js b/lib/subscription-connection.js index e787f9c0..662e39e8 100644 --- a/lib/subscription-connection.js +++ b/lib/subscription-connection.js @@ -19,7 +19,8 @@ module.exports = class SubscriptionConnection { context = {}, onConnect, onDisconnect, - resolveContext + resolveContext, + keepAlive }) { this.fastify = fastify this.socket = socket @@ -33,6 +34,7 @@ module.exports = class SubscriptionConnection { this.context = context this.isReady = false this.resolveContext = resolveContext + this.keepAlive = keepAlive this.headers = {} this.protocolMessageTypes = getProtocolByName(socket.protocol) @@ -130,6 +132,16 @@ module.exports = class SubscriptionConnection { } this.sendMessage(this.protocolMessageTypes.GQL_CONNECTION_ACK) + + if (this.keepAlive) { + this.sendKeepAlive() + + /* istanbul ignore next */ + this.keepAliveTimer = setInterval(() => { + this.sendKeepAlive() + }, this.keepAlive) + } + this.isReady = true } @@ -258,6 +270,8 @@ module.exports = class SubscriptionConnection { .map((subIter) => subIter.return && subIter.return()) this.socket.close() + if (this.keepAliveTimer) clearInterval(this.keepAliveTimer) + if (typeof this.onDisconnect === 'function') { Promise.resolve() .then(() => this.onDisconnect(this.context)) @@ -300,4 +314,8 @@ module.exports = class SubscriptionConnection { close () { this.handleConnectionClose() } + + sendKeepAlive () { + this.sendMessage(this.protocolMessageTypes.GQL_CONNECTION_KEEP_ALIVE) + } } diff --git a/lib/subscription.js b/lib/subscription.js index 009cc289..246470a2 100644 --- a/lib/subscription.js +++ b/lib/subscription.js @@ -6,7 +6,7 @@ const { kHooks } = require('./symbols') const SubscriptionConnection = require('./subscription-connection') const { getProtocolByName } = require('./subscription-protocol') -function createConnectionHandler ({ subscriber, fastify, onConnect, onDisconnect, lruGatewayResolvers, entityResolversFactory, subscriptionContextFn }) { +function createConnectionHandler ({ subscriber, fastify, onConnect, onDisconnect, lruGatewayResolvers, entityResolversFactory, subscriptionContextFn, keepAlive }) { return async (connection, request) => { const { socket } = connection @@ -44,7 +44,8 @@ function createConnectionHandler ({ subscriber, fastify, onConnect, onDisconnect lruGatewayResolvers, entityResolversFactory, context, - resolveContext + resolveContext, + keepAlive }) /* istanbul ignore next */ @@ -58,7 +59,7 @@ function createConnectionHandler ({ subscriber, fastify, onConnect, onDisconnect } module.exports = function (fastify, opts, next) { - const { getOptions, subscriber, verifyClient, onConnect, onDisconnect, lruGatewayResolvers, entityResolversFactory, subscriptionContextFn } = opts + const { getOptions, subscriber, verifyClient, onConnect, onDisconnect, lruGatewayResolvers, entityResolversFactory, subscriptionContextFn, keepAlive } = opts // If `fastify.websocketServer` exists, it means `fastify-websocket` already registered. // Without this check, fastify-websocket will be registered multiple times and raises FST_ERR_DEC_ALREADY_PRESENT. @@ -80,7 +81,8 @@ module.exports = function (fastify, opts, next) { onDisconnect, lruGatewayResolvers, entityResolversFactory, - subscriptionContextFn + subscriptionContextFn, + keepAlive }) }) diff --git a/test/subscription.js b/test/subscription.js index 0841f2b7..4b6b2a08 100644 --- a/test/subscription.js +++ b/test/subscription.js @@ -52,6 +52,55 @@ test('subscription server replies with connection_ack', t => { }) }) +test('subscription server replies with keep alive when enabled', t => { + const app = Fastify() + t.teardown(() => app.close()) + + const schema = ` + type Query { + add(x: Int, y: Int): Int + } + ` + + const resolvers = { + Query: { + add: (parent, { x, y }) => x + y + } + } + + app.register(GQL, { + schema, + resolvers, + subscription: { + keepAlive: 10000 + } + }) + + app.listen(0, err => { + t.error(err) + + const url = 'ws://localhost:' + (app.server.address()).port + '/graphql' + const ws = new WebSocket(url, 'graphql-ws') + const client = WebSocket.createWebSocketStream(ws, { encoding: 'utf8', objectMode: true }) + t.teardown(client.destroy.bind(client)) + + client.setEncoding('utf8') + client.write(JSON.stringify({ + type: 'connection_init' + })) + client.on('data', chunk => { + const payload = JSON.parse(chunk) + + // keep alive only comes after the ack + if (payload.type === 'connection_ack') return + + t.equal(payload.type, 'ka') + client.end() + t.end() + }) + }) +}) + test('subscription server sends update to subscriptions', t => { const app = Fastify() t.teardown(() => app.close())