Skip to content

Commit

Permalink
fix: provide websocket instead of stream to avoid potential backpress…
Browse files Browse the repository at this point in the history
…ure issues (fastify#289)
  • Loading branch information
mat-sz committed Mar 2, 2024
1 parent 25d37d2 commit 6e7a088
Show file tree
Hide file tree
Showing 7 changed files with 238 additions and 319 deletions.
36 changes: 11 additions & 25 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -122,20 +122,11 @@ function fastifyWebsocket (fastify, opts, next) {
wss.handleUpgrade(rawRequest, rawRequest[kWs], rawRequest[kWsHead], (socket) => {
wss.emit('connection', socket, rawRequest)

const connection = WebSocket.createWebSocketStream(socket, opts.connectionOptions)
connection.socket = socket

connection.on('error', (error) => {
socket.on('error', (error) => {
fastify.log.error(error)
})

connection.socket.on('newListener', event => {
if (event === 'message') {
connection.resume()
}
})

callback(connection)
callback(socket)
})
}

Expand Down Expand Up @@ -187,20 +178,20 @@ function fastifyWebsocket (fastify, opts, next) {
// within the route handler, we check if there has been a connection upgrade by looking at request.raw[kWs]. we need to dispatch the normal HTTP handler if not, and hijack to dispatch the websocket handler if so
if (request.raw[kWs]) {
reply.hijack()
handleUpgrade(request.raw, connection => {
handleUpgrade(request.raw, socket => {
let result
try {
if (isWebsocketRoute) {
result = wsHandler.call(this, connection, request)
result = wsHandler.call(this, socket, request)
} else {
result = noHandle.call(this, connection, request)
result = noHandle.call(this, socket, request)
}
} catch (err) {
return errorHandler.call(this, err, connection, request, reply)
return errorHandler.call(this, err, socket, request, reply)
}

if (result && typeof result.catch === 'function') {
result.catch(err => errorHandler.call(this, err, connection, request, reply))
result.catch(err => errorHandler.call(this, err, socket, request, reply))
}
})
} else {
Expand Down Expand Up @@ -229,19 +220,14 @@ function fastifyWebsocket (fastify, opts, next) {
done()
}

function noHandle (connection, rawRequest) {
function noHandle (socket, rawRequest) {
this.log.info({ path: rawRequest.url }, 'closed incoming websocket connection for path with no websocket handler')
connection.socket.close()
socket.close()
}

function defaultErrorHandler (error, conn, request) {
// Before destroying the connection, we attach an error listener.
// Since we already handled the error, adding this listener prevents the ws
// library from emitting the error and causing an uncaughtException
// Reference: https://github.com/websockets/ws/blob/master/lib/stream.js#L35
conn.on('error', _ => { })
function defaultErrorHandler (error, socket, request) {
request.log.error(error)
conn.destroy(error)
socket.terminate()
}

next()
Expand Down
Loading

0 comments on commit 6e7a088

Please sign in to comment.