Skip to content

Commit

Permalink
feat: switch from WebSocket to EventSource (#34)
Browse files Browse the repository at this point in the history
BREAKING CHANGE: /ws endpoint changed to /sse
  • Loading branch information
Atinux authored and Pooya Parsa committed Aug 5, 2019
1 parent 14cc6be commit 68ea10e
Show file tree
Hide file tree
Showing 10 changed files with 1,615 additions and 1,457 deletions.
14 changes: 7 additions & 7 deletions app/app.vue
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import fetch from 'unfetch'
import capitalizeMixin from './mixins/capitalize'
import logMixin from './mixins/log'
import wsMixin from './mixins/ws'
import sseMixin from './mixins/sse'
const waitFor = ms => new Promise(resolve => setTimeout(resolve, ms))
Expand All @@ -52,7 +52,7 @@ export default {
mixins: [
capitalizeMixin,
logMixin,
wsMixin
sseMixin
],
data() {
Expand All @@ -69,17 +69,17 @@ export default {
mounted() {
this.onData(window.$STATE)
this.wsConnect(`${this.baseURL}_loading/ws`)
this.sseConnect(`${this.baseURL}_loading/sse`)
this.setTimeout()
},
methods: {
onWSData(data) {
onSseData(data) {
if (this._reloading) {
return
}
// We have data from ws. Delay timeout!
// We have data from sse. Delay timeout!
this.setTimeout()
this.onData(data)
Expand Down Expand Up @@ -161,8 +161,8 @@ export default {
// Stop timers
this.clearTimeout()
// Close websockets connection
this.wsClose()
// Close EventSource connection
this.sseClose()
// Clear console
this.clearConsole()
Expand Down
2 changes: 1 addition & 1 deletion app/mixins/capitalize.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
export default {
filters: {
capitalize(value) {
capitalize (value) {
if (!value) {
return ''
}
Expand Down
6 changes: 3 additions & 3 deletions app/mixins/log.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
export default {
methods: {
log(...args) {
log (...args) {
console.log(...args) // eslint-disable-line no-console
},

logError(...args) {
logError (...args) {
console.error(...args) // eslint-disable-line no-console
},

clearConsole() {
clearConsole () {
if (typeof console.clear === 'function') { // eslint-disable-line no-console
console.clear() // eslint-disable-line no-console
}
Expand Down
31 changes: 31 additions & 0 deletions app/mixins/sse.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
export default {
methods: {
logSse (...args) {
this.log('[SSE]', ...args)
},

logSseError (...args) {
this.logError('[SSE]', ...args)
},

sseConnect (path) {
this.logSse(`Connecting to ${path}...`)

this.$sse = new EventSource(path)
this.$sse.addEventListener('message', event => this.onSseMessage(event))
},

onSseMessage (message) {
const data = JSON.parse(message.data)

data && this.onSseData && this.onSseData(data)
},

sseClose () {
if (this.$sse) {
this.$sse.close()
delete this.$sse
}
}
}
}
94 changes: 0 additions & 94 deletions app/mixins/ws.js

This file was deleted.

80 changes: 28 additions & 52 deletions lib/loading.js
Original file line number Diff line number Diff line change
@@ -1,100 +1,76 @@
const path = require('path')
const { resolve } = require('path')
const connect = require('connect')
const serveStatic = require('serve-static')
const WebSocket = require('ws')
const fs = require('fs-extra')
const { json, end } = require('node-res')

const SSE = require('./sse')

class LoadingUI {
constructor({ baseURL }) {
this.baseURL = baseURL
constructor ({ baseURL }) {
// Create a connect middleware stack
this.app = connect()

this.serveIndex = this.serveIndex.bind(this)
this.serveWS = this.serveWS.bind(this)
this.serveJSON = this.serveJSON.bind(this)
// Create an SSE handler instance
this.sse = new SSE()

this.baseURL = baseURL
this._lastBroadCast = 0

this.states = []
this.allDone = true
this.hasErrors = false
}

async init() {
// Create a connect middleware stack
this.app = connect()
this.serveIndex = this.serveIndex.bind(this)
}

// Create and serve ws
this.wss = new WebSocket.Server({ noServer: true })
this.app.use('/ws', this.serveWS)
async init () {
// Subscribe to SSR channel
this.app.use('/sse', (req, res) => this.sse.subscribe(req, res))

// Serve dist
this.app.use('/json', this.serveJSON)
// Serve state with JSON
this.app.use('/json', (req, res) => json(req, res, this.state))

// Serve dist
const distPath = path.resolve(__dirname, '..', 'app-dist')
const distPath = resolve(__dirname, '..', 'app-dist')
this.app.use('/', serveStatic(distPath))

// Serve index.html
const indexPath = path.resolve(distPath, 'index.html')
const indexPath = resolve(distPath, 'index.html')
this.indexTemplate = await fs.readFile(indexPath, 'utf-8')
this.app.use('/', this.serveIndex)
}

get state() {
get state () {
return {
states: this.states,
allDone: this.allDone,
hasErrors: this.hasErrors
}
}

setStates(states) {
setStates (states) {
this.states = states
this.allDone = this.states.every(state => state.progress === 0 || state.progress === 100)
this.hasErrors = this.states.some(state => state.hasErrors === true)
this.broadcastState()
}

broadcastState() {
broadcastState () {
const now = new Date()
if ((now - this._lastBroadCast > 500) || this.allDone) {
this._broadcastState()
this._lastBroadCast = now
}
}

_broadcastState() {
const data = JSON.stringify(this.state)

for (const client of this.wss.clients) {
try {
client.send(data)
} catch (err) {
// Ignore error (if client not ready to receive event)
}
if ((now - this._lastBroadCast > 500) || this.allDone || this.hasErrors) {
this.sse.broadcast('state', this.state)
this._lastBroadCast = now
}
}

serveIndex(req, res) {
res.writeHead(200, { 'Content-Type': 'text/html' })
res.end(this.indexTemplate
serveIndex (req, res) {
const html = this.indexTemplate
.replace('{STATE}', JSON.stringify(this.state))
.replace(/{BASE_URL}/g, this.baseURL)
)
}

serveJSON(req, res) {
res.end(JSON.stringify(this.state))
}

serveWS(req) {
this.handleUpgrade(req, req.socket, undefined)
}

handleUpgrade(request, socket, head) {
return this.wss.handleUpgrade(request, socket, head, (client) => {
this.wss.emit('connection', client, request)
})
end(res, html)
}
}

Expand Down
13 changes: 2 additions & 11 deletions lib/module.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
module.exports = async function NuxtLoadingScreen() {
module.exports = async function NuxtLoadingScreen () {
if (!this.options.dev) {
return
}
const LoadingUI = require('./loading')

const baseURL = this.options.router.base

const LoadingUI = require('./loading')
const loading = new LoadingUI({ baseURL })
await loading.init()

Expand All @@ -21,12 +20,4 @@ module.exports = async function NuxtLoadingScreen() {
this.nuxt.hook('server:nuxt:renderLoading', (req, res) => {
loading.serveIndex(req, res)
})

this.nuxt.hook('listen', (server) => {
server.on('upgrade', (request, socket, head) => {
if (request.url === `${baseURL}_loading/ws`) {
loading.handleUpgrade(request, socket, head)
}
})
})
}
40 changes: 40 additions & 0 deletions lib/sse.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
const { status, header } = require('node-res')

class SSE {
constructor () {
this.subscriptions = new Set()
this.counter = 0
}

// Subscribe to a channel and set initial headers
subscribe (req, res) {
req.socket.setTimeout(0)

status(res, 200)
header(res, 'Content-Type', 'text/event-stream')
header(res, 'Cache-Control', 'no-cache')
header(res, 'Connection', 'keep-alive')

this.subscriptions.add(res)
res.on('close', () => this.subscriptions.delete(res))
this.broadcast('ready', {})
}

// Publish event and data to all connected clients
broadcast (event, data) {
this.counter++
// Do console.log(this.subscriptions.size) to see, if there are any memory leaks
for (const res of this.subscriptions) {
this.clientBroadcast(res, event, data)
}
}

// Publish event and data to a given response object
clientBroadcast (res, event, data) {
res.write(`id: ${this.counter}\n`)
res.write(`event: message\n`)
res.write(`data: ${JSON.stringify({ event, ...data })}\n\n`)
}
}

module.exports = SSE

0 comments on commit 68ea10e

Please sign in to comment.