Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor websocket control frame handling #3241

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
253 changes: 137 additions & 116 deletions lib/web/websocket/receiver.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
'use strict'

const { Writable } = require('node:stream')
const assert = require('node:assert')
const { parserStates, opcodes, states, emptyBuffer, sentCloseFrameState } = require('./constants')
const { kReadyState, kSentClose, kResponse, kReceivedClose } = require('./symbols')
const { channels } = require('../../core/diagnostics')
const { isValidStatusCode, failWebsocketConnection, websocketMessageReceived, utf8Decode } = require('./util')
const { isValidStatusCode, failWebsocketConnection, websocketMessageReceived, utf8Decode, isControlFrame } = require('./util')
const { WebsocketFrameSend } = require('./frame')
const { CloseEvent } = require('./events')

Expand Down Expand Up @@ -53,30 +54,39 @@ class ByteParser extends Writable {
}

const buffer = this.consume(2)
const fin = (buffer[0] & 0x80) !== 0
const opcode = buffer[0] & 0x0F
const masked = (buffer[1] & 0x80) === 0x80

this.#info.fin = (buffer[0] & 0x80) !== 0
this.#info.opcode = buffer[0] & 0x0F
this.#info.masked = (buffer[1] & 0x80) === 0x80

if (this.#info.masked) {
if (masked) {
failWebsocketConnection(this.ws, 'Frame cannot be masked')
return callback()
}

// If we receive a fragmented message, we use the type of the first
// frame to parse the full message as binary/text, when it's terminated
this.#info.originalOpcode ??= this.#info.opcode

this.#info.fragmented = !this.#info.fin && this.#info.opcode !== opcodes.CONTINUATION
const fragmented = !fin && opcode !== opcodes.CONTINUATION

if (this.#info.fragmented && this.#info.opcode !== opcodes.BINARY && this.#info.opcode !== opcodes.TEXT) {
if (fragmented && opcode !== opcodes.BINARY && opcode !== opcodes.TEXT) {
// Only text and binary frames can be fragmented
failWebsocketConnection(this.ws, 'Invalid frame type was fragmented.')
return
}

const payloadLength = buffer[1] & 0x7F

if (isControlFrame(opcode)) {
const loop = this.parseControlFrame(callback, {
opcode,
fragmented,
payloadLength
})

if (loop) {
continue
} else {
return
}
}

if (payloadLength <= 125) {
this.#info.payloadLength = payloadLength
this.#state = parserStates.READ_DATA
Expand All @@ -86,114 +96,18 @@ class ByteParser extends Writable {
this.#state = parserStates.PAYLOADLENGTH_64
}

// TODO(@KhafraDev): handle continuation frames separately as their
// semantics are different from TEXT/BINARY frames.
this.#info.originalOpcode ??= opcode
this.#info.opcode = opcode
this.#info.masked = masked
this.#info.fin = fin
this.#info.fragmented = fragmented

if (this.#info.fragmented && payloadLength > 125) {
// A fragmented frame can't be fragmented itself
failWebsocketConnection(this.ws, 'Fragmented frame exceeded 125 bytes.')
return
} else if (
(this.#info.opcode === opcodes.PING ||
this.#info.opcode === opcodes.PONG ||
this.#info.opcode === opcodes.CLOSE) &&
payloadLength > 125
) {
// Control frames can have a payload length of 125 bytes MAX
failWebsocketConnection(this.ws, 'Payload length for control frame exceeded 125 bytes.')
return
} else if (this.#info.opcode === opcodes.CLOSE) {
if (payloadLength === 1) {
failWebsocketConnection(this.ws, 'Received close frame with a 1-byte body.')
return
}

const body = this.consume(payloadLength)

this.#info.closeInfo = this.parseCloseBody(body)

if (this.#info.closeInfo.error) {
const { code, reason } = this.#info.closeInfo

callback(new CloseEvent('close', { wasClean: false, reason, code }))
return
}

if (this.ws[kSentClose] !== sentCloseFrameState.SENT) {
// If an endpoint receives a Close frame and did not previously send a
// Close frame, the endpoint MUST send a Close frame in response. (When
// sending a Close frame in response, the endpoint typically echos the
// status code it received.)
let body = emptyBuffer
if (this.#info.closeInfo.code) {
body = Buffer.allocUnsafe(2)
body.writeUInt16BE(this.#info.closeInfo.code, 0)
}
const closeFrame = new WebsocketFrameSend(body)

this.ws[kResponse].socket.write(
closeFrame.createFrame(opcodes.CLOSE),
(err) => {
if (!err) {
this.ws[kSentClose] = sentCloseFrameState.SENT
}
}
)
}

// Upon either sending or receiving a Close control frame, it is said
// that _The WebSocket Closing Handshake is Started_ and that the
// WebSocket connection is in the CLOSING state.
this.ws[kReadyState] = states.CLOSING
this.ws[kReceivedClose] = true

this.end()

return
} else if (this.#info.opcode === opcodes.PING) {
// Upon receipt of a Ping frame, an endpoint MUST send a Pong frame in
// response, unless it already received a Close frame.
// A Pong frame sent in response to a Ping frame must have identical
// "Application data"

const body = this.consume(payloadLength)

if (!this.ws[kReceivedClose]) {
const frame = new WebsocketFrameSend(body)

this.ws[kResponse].socket.write(frame.createFrame(opcodes.PONG))

if (channels.ping.hasSubscribers) {
channels.ping.publish({
payload: body
})
}
}

this.#state = parserStates.INFO

if (this.#byteOffset > 0) {
continue
} else {
callback()
return
}
} else if (this.#info.opcode === opcodes.PONG) {
// A Pong frame MAY be sent unsolicited. This serves as a
// unidirectional heartbeat. A response to an unsolicited Pong frame is
// not expected.

const body = this.consume(payloadLength)

if (channels.pong.hasSubscribers) {
channels.pong.publish({
payload: body
})
}

if (this.#byteOffset > 0) {
continue
} else {
callback()
return
}
}
} else if (this.#state === parserStates.PAYLOADLENGTH_16) {
if (this.#byteOffset < 2) {
Expand Down Expand Up @@ -303,6 +217,8 @@ class ByteParser extends Writable {
}

parseCloseBody (data) {
assert(data.length !== 1)

// https://datatracker.ietf.org/doc/html/rfc6455#section-7.1.5
/** @type {number|undefined} */
let code
Expand Down Expand Up @@ -336,6 +252,111 @@ class ByteParser extends Writable {
return { code, reason, error: false }
}

/**
* Parses control frames.
* @param {Buffer} data
* @param {(err?: Error) => void} callback
* @param {{ opcode: number, fragmented: boolean, payloadLength: number }} info
*/
parseControlFrame (callback, info) {
assert(!info.fragmented)

if (info.payloadLength > 125) {
// Control frames can have a payload length of 125 bytes MAX
callback(new Error('Payload length for control frame exceeded 125 bytes.'))
return false
}

const body = this.consume(info.payloadLength)

if (info.opcode === opcodes.CLOSE) {
if (info.payloadLength === 1) {
failWebsocketConnection(this.ws, 'Received close frame with a 1-byte body.')
return
}

this.#info.closeInfo = this.parseCloseBody(body)

if (this.#info.closeInfo.error) {
const { code, reason } = this.#info.closeInfo

callback(new CloseEvent('close', { wasClean: false, reason, code }))
return
}

if (this.ws[kSentClose] !== sentCloseFrameState.SENT) {
// If an endpoint receives a Close frame and did not previously send a
// Close frame, the endpoint MUST send a Close frame in response. (When
// sending a Close frame in response, the endpoint typically echos the
// status code it received.)
let body = emptyBuffer
if (this.#info.closeInfo.code) {
body = Buffer.allocUnsafe(2)
body.writeUInt16BE(this.#info.closeInfo.code, 0)
}
const closeFrame = new WebsocketFrameSend(body)

this.ws[kResponse].socket.write(
closeFrame.createFrame(opcodes.CLOSE),
(err) => {
if (!err) {
this.ws[kSentClose] = sentCloseFrameState.SENT
}
}
)
}

// Upon either sending or receiving a Close control frame, it is said
// that _The WebSocket Closing Handshake is Started_ and that the
// WebSocket connection is in the CLOSING state.
this.ws[kReadyState] = states.CLOSING
this.ws[kReceivedClose] = true

this.end()

return
} else if (info.opcode === opcodes.PING) {
// Upon receipt of a Ping frame, an endpoint MUST send a Pong frame in
// response, unless it already received a Close frame.
// A Pong frame sent in response to a Ping frame must have identical
// "Application data"

if (!this.ws[kReceivedClose]) {
const frame = new WebsocketFrameSend(body)

this.ws[kResponse].socket.write(frame.createFrame(opcodes.PONG))

if (channels.ping.hasSubscribers) {
channels.ping.publish({
payload: body
})
}
}

if (this.#byteOffset <= 0) {
callback()
return false
}
} else if (info.opcode === opcodes.PONG) {
// A Pong frame MAY be sent unsolicited. This serves as a
// unidirectional heartbeat. A response to an unsolicited Pong frame is
// not expected.

if (channels.pong.hasSubscribers) {
channels.pong.publish({
payload: body
})
}

if (this.#byteOffset <= 0) {
callback()
return false
}
}

return true
}

get closingInfo () {
return this.#info.closeInfo
}
Expand Down
15 changes: 14 additions & 1 deletion lib/web/websocket/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,18 @@ function failWebsocketConnection (ws, reason) {
}
}

/**
* @see https://datatracker.ietf.org/doc/html/rfc6455#section-5.5
* @param {number} opcode
*/
function isControlFrame (opcode) {
return (
opcode === opcodes.CLOSE ||
opcode === opcodes.PING ||
opcode === opcodes.PONG
)
}

// https://nodejs.org/api/intl.html#detecting-internationalization-support
const hasIntl = typeof process.versions.icu === 'string'
const fatalDecoder = hasIntl ? new TextDecoder('utf-8', { fatal: true }) : undefined
Expand Down Expand Up @@ -237,5 +249,6 @@ module.exports = {
isValidStatusCode,
failWebsocketConnection,
websocketMessageReceived,
utf8Decode
utf8Decode,
isControlFrame
}
38 changes: 38 additions & 0 deletions test/websocket/issue-2859.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
'use strict'

const { test } = require('node:test')
const { WebSocketServer } = require('ws')
const { WebSocket } = require('../..')
const diagnosticsChannel = require('node:diagnostics_channel')
const { tspl } = require('@matteo.collina/tspl')

test('Fragmented frame with a ping frame in the first of it', async (t) => {
const { completed, deepStrictEqual, strictEqual } = tspl(t, { plan: 2 })

const server = new WebSocketServer({ port: 0 })

server.on('connection', (ws) => {
const socket = ws._socket

socket.write(Buffer.from([0x89, 0x05, 0x48, 0x65, 0x6c, 0x6c, 0x6f])) // ping "Hello"
socket.write(Buffer.from([0x01, 0x03, 0x48, 0x65, 0x6c])) // Text frame "Hel"
socket.write(Buffer.from([0x80, 0x02, 0x6c, 0x6f])) // Text frame "lo"
})

t.after(() => {
server.close()
ws.close()
})

const ws = new WebSocket(`ws://127.0.0.1:${server.address().port}`)

diagnosticsChannel.channel('undici:websocket:ping').subscribe(
({ payload }) => deepStrictEqual(payload, Buffer.from('Hello'))
)

ws.addEventListener('message', ({ data }) => {
strictEqual(data, 'Hello')
})

await completed
})