Skip to content

Commit

Permalink
[client] Always reconnect listeners, even when auto-reconnect fails
Browse files Browse the repository at this point in the history
  • Loading branch information
rexxars authored and bjoerge committed Feb 16, 2018
1 parent c1e0ca3 commit 292ed3b
Show file tree
Hide file tree
Showing 3 changed files with 177 additions and 109 deletions.
74 changes: 52 additions & 22 deletions packages/@sanity/client/src/data/listen.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
const assign = require('object-assign')
const Observable = require('@sanity/observable/minimal')
const encodeQueryString = require('./encodeQueryString')
const polyfilledEventSource = require('@sanity/eventsource')
const pick = require('../util/pick')
const defaults = require('../util/defaults')
const encodeQueryString = require('./encodeQueryString')

const EventSource = typeof window !== 'undefined' && window.EventSource
? window.EventSource // Native browser EventSource
: require('@sanity/eventsource') // Node.js, IE etc
const EventSource =
typeof window !== 'undefined' && window.EventSource
? window.EventSource // Native browser EventSource
: polyfilledEventSource // Node.js, IE etc

const possibleOptions = ['includePreviousRevision', 'includeResult']
const defaultOptions = {
Expand Down Expand Up @@ -35,18 +37,31 @@ module.exports = function listen(query, params, opts = {}) {
}

return new Observable(observer => {
const es = new EventSource(uri, esOptions)

es.addEventListener('error', onError, false)
es.addEventListener('channelError', onChannelError, false)
es.addEventListener('disconnect', onDisconnect, false)
listenFor.forEach(type => es.addEventListener(type, onMessage, false))
let es = getEventSource()
let reconnectTimer
let stopped = false

function onError() {
if (stopped) {
return
}

emitReconnect()

// Allow event handlers of `emitReconnect` to cancel/close the reconnect attempt
if (stopped) {
return
}

// Unless we've explicitly stopped the ES (in which case `stopped` should be true),
// we should never be in a disconnected state. By default, EventSource will reconnect
// automatically, in which case it sets readyState to `CONNECTING`, but in some cases
// (like when a laptop lid is closed), it closes the connection. In these cases we need
// to explicitly reconnect.
if (es.readyState === EventSource.CLOSED) {
observer.complete()
} else if (es.readyState === EventSource.CONNECTING) {
emitReconnect()
unsubscribe()
clearTimeout(reconnectTimer)
reconnectTimer = setTimeout(open, 100)
}
}

Expand All @@ -56,21 +71,20 @@ module.exports = function listen(query, params, opts = {}) {

function onMessage(evt) {
const event = parseEvent(evt)
return event instanceof Error
? observer.error(event)
: observer.next(event)
return event instanceof Error ? observer.error(event) : observer.next(event)
}

function onDisconnect(evt) {
observer.complete()
stopped = true
unsubscribe()
observer.complete()
}

function unsubscribe() {
listenFor.forEach(type => es.removeEventListener(type, onMessage, false))
es.removeEventListener('error', onError, false)
es.removeEventListener('channelError', onChannelError, false)
es.removeEventListener('disconnect', onDisconnect, false)
listenFor.forEach(type => es.removeEventListener(type, onMessage, false))
es.close()
}

Expand All @@ -80,7 +94,25 @@ module.exports = function listen(query, params, opts = {}) {
}
}

return unsubscribe
function getEventSource() {
const evs = new EventSource(uri, esOptions)
evs.addEventListener('error', onError, false)
evs.addEventListener('channelError', onChannelError, false)
evs.addEventListener('disconnect', onDisconnect, false)
listenFor.forEach(type => evs.addEventListener(type, onMessage, false))
return evs
}

function open() {
es = getEventSource()
}

function stop() {
stopped = true
unsubscribe()
}

return stop
})
}

Expand Down Expand Up @@ -111,7 +143,5 @@ function extractErrorMessage(err) {
return err.error.description
}

return typeof err.error === 'string'
? err.error
: JSON.stringify(err.error, null, 2)
return typeof err.error === 'string' ? err.error : JSON.stringify(err.error, null, 2)
}
5 changes: 4 additions & 1 deletion packages/@sanity/client/test/helpers/sseServer.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ const SseChannel = require('sse-channel')
module.exports = (onRequest, cb) => {
const server = http.createServer((request, response) => {
let channel
if (request.url.indexOf('/v1/data/listen/') === 0 || request.url.indexOf('/listen/beerns?query=') === 0) {
if (
request.url.indexOf('/v1/data/listen/') === 0 ||
request.url.indexOf('/listen/beerns?query=') === 0
) {
channel = new SseChannel({jsonEncode: true})
channel.addClient(request, response)
}
Expand Down
207 changes: 121 additions & 86 deletions packages/@sanity/client/test/listen.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,31 @@ const assign = require('xtend')
const sanityClient = require('../src/sanityClient')
const sseServer = require('./helpers/sseServer')

const getClient = options => sanityClient(assign({
dataset: 'prod',
namespace: 'beerns',
apiHost: `http://localhost:${options.port}`,
useProjectHostname: false
}, options))
const getClient = options =>
sanityClient(
assign(
{
dataset: 'prod',
namespace: 'beerns',
apiHost: `http://localhost:${options.port}`,
useProjectHostname: false,
useCdn: false
},
options
)
)

const testSse = (onRequest, options) => new Promise((resolve, reject) => {
sseServer(onRequest, (err, server) => {
if (err) {
return reject(err)
}
const testSse = (onRequest, options) =>
new Promise((resolve, reject) => {
sseServer(onRequest, (err, server) => {
if (err) {
return reject(err)
}

const client = getClient(assign({port: server.address().port}, options))
return resolve({server, client})
const client = getClient(assign({port: server.address().port}, options))
return resolve({server, client})
})
})
})

/*****************
* LISTENER *
Expand All @@ -36,116 +44,143 @@ test('[listener] can listen for mutations', t => {
identity: 'uid',
mutations: [{patch: {id: 'beer-123', set: {abv: 8}}}],
previousRev: 'MOmofa',
result: {_id: 'beer-123', _type: 'beer', brewery: 'Trillium', title: 'Headroom Double IPA', abv: 8},
result: {
_id: 'beer-123',
_type: 'beer',
brewery: 'Trillium',
title: 'Headroom Double IPA',
abv: 8
},
resultRev: 'Blatti',
timestamp: '2017-03-29T12:36:20.506516Z',
transactionId: 'foo',
transition: 'update',
transition: 'update'
}

testSse(({request, channel}) => {
t.equal(request.url, [
'/v1/data/listen/prod',
'?query=*%5Bis%20%22beer%22%20%26%26%20title%20%3D%3D%20%24beerName%5D',
'&%24beerName=%22Headroom%20Double%20IPA%22&includeResult=true'
].join(''), 'url should be correct')
t.equal(
request.url,
[
'/v1/data/listen/prod',
'?query=*%5Bis%20%22beer%22%20%26%26%20title%20%3D%3D%20%24beerName%5D',
'&%24beerName=%22Headroom%20Double%20IPA%22&includeResult=true'
].join(''),
'url should be correct'
)

channel.send({event: 'mutation', data: eventData})
process.nextTick(() => channel.close())
}).then(({server, client}) => {
const query = '*[is "beer" && title == $beerName]'
const params = {beerName: 'Headroom Double IPA'}
})
.then(({server, client}) => {
const query = '*[is "beer" && title == $beerName]'
const params = {beerName: 'Headroom Double IPA'}

const subscription = client.listen(query, params).subscribe({
next: msg => {
t.deepEqual(msg, assign({}, eventData, {type: 'mutation'}), 'event data should be correct')
subscription.unsubscribe()
server.close()
t.end()
},
error: err => {
subscription.unsubscribe()
server.close()
t.end(err)
}
const subscription = client.listen(query, params).subscribe({
next: msg => {
t.deepEqual(
msg,
assign({}, eventData, {type: 'mutation'}),
'event data should be correct'
)
subscription.unsubscribe()
server.close()
t.end()
},
error: err => {
subscription.unsubscribe()
server.close()
t.end(err)
}
})
})
}).catch(t.end)
.catch(t.end)
})

test('[listener] listener sends auth token if given (node)', t => {
let httpServer = null
testSse(({request, channel}) => {
t.equal(request.headers.authorization, 'Bearer foobar', 'should send token')
channel.send({event: 'disconnect'})
process.nextTick(() => {
channel.close()
httpServer.close()
t.end()
})
}, {token: 'foobar'}).then(({server, client}) => {
httpServer = server
const subscription = client.listen('*').subscribe({
error: err => {
subscription.unsubscribe()
server.close()
t.end(err)
}
testSse(
({request, channel}) => {
t.equal(request.headers.authorization, 'Bearer foobar', 'should send token')
channel.send({event: 'disconnect'})
process.nextTick(() => {
channel.close()
httpServer.close()
t.end()
})
},
{token: 'foobar'}
)
.then(({server, client}) => {
httpServer = server
const subscription = client.listen('*').subscribe({
error: err => {
subscription.unsubscribe()
server.close()
t.end(err)
}
})
})
}).catch(t.end)
.catch(t.end)
})

test('[listener] reconnects if disconnected', t => {
testSse(({request, channel}) => {
channel.send({event: 'welcome'})
channel.close()
process.nextTick(() => channel.close())
}).then(({server, client}) => {
const subscription = client.listen('*', {}, {events: ['reconnect']}).subscribe({
next: msg => {
t.equal(msg.type, 'reconnect', 'emits reconnect events if told to')
subscription.unsubscribe()
server.close()
t.end()
},
error: err => {
subscription.unsubscribe()
server.close()
t.end(err)
}
})
.then(({server, client}) => {
const subscription = client.listen('*', {}, {events: ['reconnect']}).subscribe({
next: msg => {
t.equal(msg.type, 'reconnect', 'emits reconnect events if told to')
subscription.unsubscribe()
server.close()
t.end()
},
error: err => {
subscription.unsubscribe()
server.close()
t.end(err)
}
})
})
}).catch(t.end)
.catch(t.end)
})

test('[listener] emits channel errors', t => {
testSse(({request, channel}) => {
channel.send({event: 'channelError', data: {message: 'Unfortunate error'}})
channel.close()
process.nextTick(() => channel.close())
}).then(({server, client}) => {
const subscription = client.listen('*').subscribe({
error: err => {
t.equal(err.message, 'Unfortunate error', 'should have passed error message')
subscription.unsubscribe()
server.close()
t.end()
}
})
.then(({server, client}) => {
const subscription = client.listen('*').subscribe({
error: err => {
t.equal(err.message, 'Unfortunate error', 'should have passed error message')
subscription.unsubscribe()
server.close()
t.end()
}
})
})
}).catch(t.end)
.catch(t.end)
})

test('[listener] emits channel errors with deep error description', t => {
testSse(({request, channel}) => {
channel.send({event: 'channelError', data: {error: {description: 'Expected error'}}})
channel.close()
process.nextTick(() => channel.close())
}).then(({server, client}) => {
const subscription = client.listen('*').subscribe({
error: err => {
t.equal(err.message, 'Expected error', 'should have passed error message')
subscription.unsubscribe()
server.close()
t.end()
}
})
.then(({server, client}) => {
const subscription = client.listen('*').subscribe({
error: err => {
t.equal(err.message, 'Expected error', 'should have passed error message')
subscription.unsubscribe()
server.close()
t.end()
}
})
})
}).catch(t.end)
.catch(t.end)
})

0 comments on commit 292ed3b

Please sign in to comment.