From a03b173ee6d6edf86fae68637fef10a15493adb6 Mon Sep 17 00:00:00 2001 From: Stephen Cohen Date: Sun, 25 Apr 2021 21:59:30 -0400 Subject: [PATCH] Set connection status to ERROR when closed due to protocol error Amends DuplexConnection#close to accept an optional error indicating that the connection is being closed due to that error. Updates implementations to handle this error and report it to consumers. Updates RSocketMachine to pass protocol-level connection errors to close Adds/updates tests to check for handling this parameter Signed-off-by: Stephen Cohen --- packages/rsocket-core/src/RSocketMachine.js | 3 +- .../src/RSocketResumableTransport.js | 21 +++--- .../src/ReassemblyDuplexConnection.js | 4 +- .../src/__mocks__/MockDuplexConnection.js | 8 ++- .../src/__tests__/RSocketClient-test.js | 2 +- .../RSocketResumableTransport-test.js | 55 ++++++++++++++ .../src/RSocketTcpClient.js | 4 +- .../src/__tests__/RSocketTcpClient-test.js | 71 ++++++++++++++----- .../rsocket-types/src/ReactiveSocketTypes.js | 7 +- .../src/RSocketWebSocketClient.js | 4 +- .../__tests__/RSocketWebSocketClient-test.js | 71 ++++++++++++++----- .../src/RSocketWebSocketServer.js | 9 ++- .../__tests__/RSocketWebSocketServer-test.js | 10 ++- 13 files changed, 206 insertions(+), 63 deletions(-) diff --git a/packages/rsocket-core/src/RSocketMachine.js b/packages/rsocket-core/src/RSocketMachine.js index 3d228ac4..084edf20 100644 --- a/packages/rsocket-core/src/RSocketMachine.js +++ b/packages/rsocket-core/src/RSocketMachine.js @@ -598,8 +598,7 @@ class RSocketMachineImpl implements RSocketMachine { }; _handleConnectionError(error: Error): void { - this._handleError(error); - this._connection.close(); + this._connection.close(error); const errorHandler = this._errorHandler; if (errorHandler) { errorHandler(error); diff --git a/packages/rsocket-core/src/RSocketResumableTransport.js b/packages/rsocket-core/src/RSocketResumableTransport.js index cb503214..7a60d67f 100644 --- a/packages/rsocket-core/src/RSocketResumableTransport.js +++ b/packages/rsocket-core/src/RSocketResumableTransport.js @@ -156,8 +156,8 @@ export default class RSocketResumableTransport implements DuplexConnection { this._statusSubscribers = new Set(); } - close(): void { - this._close(); + close(error?: Error): void { + this._close(error); } connect(): void { @@ -275,13 +275,18 @@ export default class RSocketResumableTransport implements DuplexConnection { if (this._isTerminated()) { return; } - if (error) { - this._setConnectionStatus({error, kind: 'ERROR'}); - } else { - this._setConnectionStatus(CONNECTION_STATUS.CLOSED); - } + + const status = error ? {error, kind: 'ERROR'} : CONNECTION_STATUS.CLOSED; + this._setConnectionStatus(status); + const receivers = this._receivers; - receivers.forEach(r => r.onComplete()); + receivers.forEach(subscriber => { + if (error) { + subscriber.onError(error); + } else { + subscriber.onComplete(); + } + }); receivers.clear(); const senders = this._senders; diff --git a/packages/rsocket-core/src/ReassemblyDuplexConnection.js b/packages/rsocket-core/src/ReassemblyDuplexConnection.js index d19b5973..7d6df25b 100644 --- a/packages/rsocket-core/src/ReassemblyDuplexConnection.js +++ b/packages/rsocket-core/src/ReassemblyDuplexConnection.js @@ -37,8 +37,8 @@ export class ReassemblyDuplexConnection implements DuplexConnection { .lift(actual => new ReassemblySubscriber(actual)); } - close(): void { - this._source.close(); + close(error?: Error): void { + this._source.close(error); } connect(): void { diff --git a/packages/rsocket-core/src/__mocks__/MockDuplexConnection.js b/packages/rsocket-core/src/__mocks__/MockDuplexConnection.js index 04558598..86593cc9 100644 --- a/packages/rsocket-core/src/__mocks__/MockDuplexConnection.js +++ b/packages/rsocket-core/src/__mocks__/MockDuplexConnection.js @@ -28,8 +28,12 @@ export function genMockConnection() { let closed = false; const connection = { - close: jest.fn(() => { - connection.mock.close(); + close: jest.fn(error => { + if (error) { + connection.mock.closeWithError(error); + } else { + connection.mock.close(); + } }), connect: jest.fn(), connectionStatus: jest.fn(() => status), diff --git a/packages/rsocket-core/src/__tests__/RSocketClient-test.js b/packages/rsocket-core/src/__tests__/RSocketClient-test.js index 21c9e4c0..4768ced4 100644 --- a/packages/rsocket-core/src/__tests__/RSocketClient-test.js +++ b/packages/rsocket-core/src/__tests__/RSocketClient-test.js @@ -310,7 +310,7 @@ describe('RSocketClient', () => { expect(errors.values().next().value).toEqual( `No keep-alive acks for ${keepAliveTimeout} millis`, ); - expect(status.kind).toEqual('CLOSED'); + expect(status.kind).toEqual('ERROR'); jest.advanceTimersByTime(keepAliveTimeout); }); diff --git a/packages/rsocket-core/src/__tests__/RSocketResumableTransport-test.js b/packages/rsocket-core/src/__tests__/RSocketResumableTransport-test.js index 14815205..862dc4aa 100644 --- a/packages/rsocket-core/src/__tests__/RSocketResumableTransport-test.js +++ b/packages/rsocket-core/src/__tests__/RSocketResumableTransport-test.js @@ -687,4 +687,59 @@ describe('RSocketResumableTransport', () => { expect(currentTransport.sendOne.mock.calls.length).toBe(0); }); }); + + describe('post-connect() APIs', () => { + beforeEach(() => { + resumableTransport.connect(); + currentTransport.mock.connect(); + }); + + describe('close()', () => { + describe('given an error', () => { + it('closes the transport', () => { + resumableTransport.close(new Error()); + expect(currentTransport.close.mock.calls.length).toBe(1); + }); + + it('sets the status to ERROR with the given error', () => { + const error = new Error(); + resumableTransport.close(error); + expect(resumableStatus.kind).toBe('ERROR'); + expect(resumableStatus.error).toBe(error); + }); + + it('calls receive.onError with the given error', () => { + const onError = jest.fn(); + const onSubscribe = subscription => + subscription.request(Number.MAX_SAFE_INTEGER); + resumableTransport.receive().subscribe({onError, onSubscribe}); + const error = new Error(); + resumableTransport.close(error); + expect(onError.mock.calls.length).toBe(1); + expect(onError.mock.calls[0][0]).toBe(error); + }); + }); + + describe('not given an error', () => { + it('closes the transport', () => { + resumableTransport.close(); + expect(currentTransport.close.mock.calls.length).toBe(1); + }); + + it('sets the status to CLOSED', () => { + resumableTransport.close(); + expect(resumableStatus.kind).toBe('CLOSED'); + }); + + it('calls receive.onComplete', () => { + const onComplete = jest.fn(); + const onSubscribe = subscription => + subscription.request(Number.MAX_SAFE_INTEGER); + resumableTransport.receive().subscribe({onComplete, onSubscribe}); + resumableTransport.close(); + expect(onComplete.mock.calls.length).toBe(1); + }); + }); + }); + }); }); diff --git a/packages/rsocket-tcp-client/src/RSocketTcpClient.js b/packages/rsocket-tcp-client/src/RSocketTcpClient.js index 8b981c0f..e2f3889f 100644 --- a/packages/rsocket-tcp-client/src/RSocketTcpClient.js +++ b/packages/rsocket-tcp-client/src/RSocketTcpClient.js @@ -60,8 +60,8 @@ export class RSocketTcpConnection implements DuplexConnection { } } - close(): void { - this._close(); + close(error?: Error): void { + this._close(error); } connect(): void { diff --git a/packages/rsocket-tcp-client/src/__tests__/RSocketTcpClient-test.js b/packages/rsocket-tcp-client/src/__tests__/RSocketTcpClient-test.js index a54913cd..0d27118c 100644 --- a/packages/rsocket-tcp-client/src/__tests__/RSocketTcpClient-test.js +++ b/packages/rsocket-tcp-client/src/__tests__/RSocketTcpClient-test.js @@ -98,29 +98,62 @@ describe('RSocketTcpClient', () => { }); describe('close()', () => { - it('closes the socket', () => { - client.close(); - expect(socket.end.mock.calls.length).toBe(1); - }); + describe('given an error', () => { + it('closes the socket', () => { + client.close(new Error()); + expect(socket.end.mock.calls.length).toBe(1); + }); - it('sets the status to CLOSED', () => { - let status; - client.connectionStatus().subscribe({ - onNext: _status => (status = _status), - onSubscribe: subscription => - subscription.request(Number.MAX_SAFE_INTEGER), + it('sets the status to ERROR with the given error', () => { + let status; + client.connectionStatus().subscribe({ + onNext: _status => (status = _status), + onSubscribe: subscription => + subscription.request(Number.MAX_SAFE_INTEGER), + }); + const error = new Error(); + client.close(error); + expect(status.kind).toBe('ERROR'); + expect(status.error).toBe(error); + }); + + it('calls receive.onError with the given error', () => { + const onError = jest.fn(); + const onSubscribe = subscription => + subscription.request(Number.MAX_SAFE_INTEGER); + client.receive().subscribe({onError, onSubscribe}); + const error = new Error(); + client.close(error); + expect(onError.mock.calls.length).toBe(1); + expect(onError.mock.calls[0][0]).toBe(error); }); - client.close(); - expect(status.kind).toBe('CLOSED'); }); - it('calls receive.onComplete', () => { - const onComplete = jest.fn(); - const onSubscribe = subscription => - subscription.request(Number.MAX_SAFE_INTEGER); - client.receive().subscribe({onComplete, onSubscribe}); - client.close(); - expect(onComplete.mock.calls.length).toBe(1); + describe('not given an error', () => { + it('closes the socket', () => { + client.close(); + expect(socket.end.mock.calls.length).toBe(1); + }); + + it('sets the status to CLOSED', () => { + let status; + client.connectionStatus().subscribe({ + onNext: _status => (status = _status), + onSubscribe: subscription => + subscription.request(Number.MAX_SAFE_INTEGER), + }); + client.close(); + expect(status.kind).toBe('CLOSED'); + }); + + it('calls receive.onComplete', () => { + const onComplete = jest.fn(); + const onSubscribe = subscription => + subscription.request(Number.MAX_SAFE_INTEGER); + client.receive().subscribe({onComplete, onSubscribe}); + client.close(); + expect(onComplete.mock.calls.length).toBe(1); + }); }); }); diff --git a/packages/rsocket-types/src/ReactiveSocketTypes.js b/packages/rsocket-types/src/ReactiveSocketTypes.js index 34cdc99e..cca5c78d 100644 --- a/packages/rsocket-types/src/ReactiveSocketTypes.js +++ b/packages/rsocket-types/src/ReactiveSocketTypes.js @@ -118,10 +118,11 @@ export interface DuplexConnection { receive(): Flowable, /** - * Close the underlying connection, emitting `onComplete` on the receive() - * Publisher. + * Close the underlying connection, optionally providing an error as reason. + * If an error is passed, emits `onError` on the receive() Publisher. + * If no error is passed, emits `onComplete` on the receive() Publisher. */ - close(): void, + close(error?: Error): void, /** * Open the underlying connection. Throws if the connection is already in diff --git a/packages/rsocket-websocket-client/src/RSocketWebSocketClient.js b/packages/rsocket-websocket-client/src/RSocketWebSocketClient.js index 43649271..585b485f 100644 --- a/packages/rsocket-websocket-client/src/RSocketWebSocketClient.js +++ b/packages/rsocket-websocket-client/src/RSocketWebSocketClient.js @@ -62,8 +62,8 @@ export default class RSocketWebSocketClient implements DuplexConnection { this._statusSubscribers = new Set(); } - close(): void { - this._close(); + close(error?: Error): void { + this._close(error); } connect(): void { diff --git a/packages/rsocket-websocket-client/src/__tests__/RSocketWebSocketClient-test.js b/packages/rsocket-websocket-client/src/__tests__/RSocketWebSocketClient-test.js index 2f3a5b6a..454072be 100644 --- a/packages/rsocket-websocket-client/src/__tests__/RSocketWebSocketClient-test.js +++ b/packages/rsocket-websocket-client/src/__tests__/RSocketWebSocketClient-test.js @@ -93,29 +93,62 @@ describe('RSocketWebSocketClient', () => { }); describe('close()', () => { - it('closes the socket', () => { - client.close(); - expect(socket.close.mock.calls.length).toBe(1); - }); + describe('given an error', () => { + it('closes the socket', () => { + client.close(new Error()); + expect(socket.close.mock.calls.length).toBe(1); + }); - it('sets the status to CLOSED', () => { - let status; - client.connectionStatus().subscribe({ - onNext: _status => (status = _status), - onSubscribe: subscription => - subscription.request(Number.MAX_SAFE_INTEGER), + it('sets the status to ERROR with the given error', () => { + let status; + client.connectionStatus().subscribe({ + onNext: _status => (status = _status), + onSubscribe: subscription => + subscription.request(Number.MAX_SAFE_INTEGER), + }); + const error = new Error(); + client.close(error); + expect(status.kind).toBe('ERROR'); + expect(status.error).toBe(error); + }); + + it('calls receive.onError with the given error', () => { + const onError = jest.fn(); + const onSubscribe = subscription => + subscription.request(Number.MAX_SAFE_INTEGER); + client.receive().subscribe({onError, onSubscribe}); + const error = new Error(); + client.close(error); + expect(onError.mock.calls.length).toBe(1); + expect(onError.mock.calls[0][0]).toBe(error); }); - client.close(); - expect(status.kind).toBe('CLOSED'); }); - it('calls receive.onComplete', () => { - const onComplete = jest.fn(); - const onSubscribe = subscription => - subscription.request(Number.MAX_SAFE_INTEGER); - client.receive().subscribe({onComplete, onSubscribe}); - client.close(); - expect(onComplete.mock.calls.length).toBe(1); + describe('not given an error', () => { + it('closes the socket', () => { + client.close(); + expect(socket.close.mock.calls.length).toBe(1); + }); + + it('sets the status to CLOSED', () => { + let status; + client.connectionStatus().subscribe({ + onNext: _status => (status = _status), + onSubscribe: subscription => + subscription.request(Number.MAX_SAFE_INTEGER), + }); + client.close(); + expect(status.kind).toBe('CLOSED'); + }); + + it('calls receive.onComplete', () => { + const onComplete = jest.fn(); + const onSubscribe = subscription => + subscription.request(Number.MAX_SAFE_INTEGER); + client.receive().subscribe({onComplete, onSubscribe}); + client.close(); + expect(onComplete.mock.calls.length).toBe(1); + }); }); }); diff --git a/packages/rsocket-websocket-server/src/RSocketWebSocketServer.js b/packages/rsocket-websocket-server/src/RSocketWebSocketServer.js index 4b1af0a5..a978a282 100644 --- a/packages/rsocket-websocket-server/src/RSocketWebSocketServer.js +++ b/packages/rsocket-websocket-server/src/RSocketWebSocketServer.js @@ -237,8 +237,13 @@ class WSDuplexConnection implements DuplexConnection { }); } - close(): void { - this._socket.emit('close'); + close(error?: Error): void { + if (error) { + this._socket.emit('error', error); + } else { + this._socket.emit('close'); + } + this._socket.close(); } diff --git a/packages/rsocket-websocket-server/src/__tests__/RSocketWebSocketServer-test.js b/packages/rsocket-websocket-server/src/__tests__/RSocketWebSocketServer-test.js index 987dda3a..dbadb718 100644 --- a/packages/rsocket-websocket-server/src/__tests__/RSocketWebSocketServer-test.js +++ b/packages/rsocket-websocket-server/src/__tests__/RSocketWebSocketServer-test.js @@ -74,10 +74,18 @@ describe('RSocketWebSocketServer', () => { expect(status.error).toBe(error); }); - it('returns CLOSED if explicitly closed', () => { + it('returns CLOSED if explicitly closed with no error', () => { connection.receive().subscribe(() => {}); connection.close(); expect(status.kind).toBe('CLOSED'); }); + + it('returns ERROR if explicitly closed with an error', () => { + connection.receive().subscribe(() => {}); + const error = new Error(); + connection.close(error); + expect(status.kind).toBe('ERROR'); + expect(status.error).toBe(error); + }); }); });