diff --git a/packages/rsocket-core/src/RSocketMachine.js b/packages/rsocket-core/src/RSocketMachine.js index 3d228ac4..084edf20 100644 --- a/packages/rsocket-core/src/RSocketMachine.js +++ b/packages/rsocket-core/src/RSocketMachine.js @@ -598,8 +598,7 @@ class RSocketMachineImpl implements RSocketMachine { }; _handleConnectionError(error: Error): void { - this._handleError(error); - this._connection.close(); + this._connection.close(error); const errorHandler = this._errorHandler; if (errorHandler) { errorHandler(error); diff --git a/packages/rsocket-core/src/RSocketResumableTransport.js b/packages/rsocket-core/src/RSocketResumableTransport.js index cb503214..7a60d67f 100644 --- a/packages/rsocket-core/src/RSocketResumableTransport.js +++ b/packages/rsocket-core/src/RSocketResumableTransport.js @@ -156,8 +156,8 @@ export default class RSocketResumableTransport implements DuplexConnection { this._statusSubscribers = new Set(); } - close(): void { - this._close(); + close(error?: Error): void { + this._close(error); } connect(): void { @@ -275,13 +275,18 @@ export default class RSocketResumableTransport implements DuplexConnection { if (this._isTerminated()) { return; } - if (error) { - this._setConnectionStatus({error, kind: 'ERROR'}); - } else { - this._setConnectionStatus(CONNECTION_STATUS.CLOSED); - } + + const status = error ? {error, kind: 'ERROR'} : CONNECTION_STATUS.CLOSED; + this._setConnectionStatus(status); + const receivers = this._receivers; - receivers.forEach(r => r.onComplete()); + receivers.forEach(subscriber => { + if (error) { + subscriber.onError(error); + } else { + subscriber.onComplete(); + } + }); receivers.clear(); const senders = this._senders; diff --git a/packages/rsocket-core/src/ReassemblyDuplexConnection.js b/packages/rsocket-core/src/ReassemblyDuplexConnection.js index d19b5973..7d6df25b 100644 --- a/packages/rsocket-core/src/ReassemblyDuplexConnection.js +++ b/packages/rsocket-core/src/ReassemblyDuplexConnection.js @@ -37,8 +37,8 @@ export class ReassemblyDuplexConnection implements DuplexConnection { .lift(actual => new ReassemblySubscriber(actual)); } - close(): void { - this._source.close(); + close(error?: Error): void { + this._source.close(error); } connect(): void { diff --git a/packages/rsocket-core/src/__mocks__/MockDuplexConnection.js b/packages/rsocket-core/src/__mocks__/MockDuplexConnection.js index 04558598..86593cc9 100644 --- a/packages/rsocket-core/src/__mocks__/MockDuplexConnection.js +++ b/packages/rsocket-core/src/__mocks__/MockDuplexConnection.js @@ -28,8 +28,12 @@ export function genMockConnection() { let closed = false; const connection = { - close: jest.fn(() => { - connection.mock.close(); + close: jest.fn(error => { + if (error) { + connection.mock.closeWithError(error); + } else { + connection.mock.close(); + } }), connect: jest.fn(), connectionStatus: jest.fn(() => status), diff --git a/packages/rsocket-core/src/__tests__/RSocketClient-test.js b/packages/rsocket-core/src/__tests__/RSocketClient-test.js index 21c9e4c0..4768ced4 100644 --- a/packages/rsocket-core/src/__tests__/RSocketClient-test.js +++ b/packages/rsocket-core/src/__tests__/RSocketClient-test.js @@ -310,7 +310,7 @@ describe('RSocketClient', () => { expect(errors.values().next().value).toEqual( `No keep-alive acks for ${keepAliveTimeout} millis`, ); - expect(status.kind).toEqual('CLOSED'); + expect(status.kind).toEqual('ERROR'); jest.advanceTimersByTime(keepAliveTimeout); }); diff --git a/packages/rsocket-core/src/__tests__/RSocketResumableTransport-test.js b/packages/rsocket-core/src/__tests__/RSocketResumableTransport-test.js index 14815205..862dc4aa 100644 --- a/packages/rsocket-core/src/__tests__/RSocketResumableTransport-test.js +++ b/packages/rsocket-core/src/__tests__/RSocketResumableTransport-test.js @@ -687,4 +687,59 @@ describe('RSocketResumableTransport', () => { expect(currentTransport.sendOne.mock.calls.length).toBe(0); }); }); + + describe('post-connect() APIs', () => { + beforeEach(() => { + resumableTransport.connect(); + currentTransport.mock.connect(); + }); + + describe('close()', () => { + describe('given an error', () => { + it('closes the transport', () => { + resumableTransport.close(new Error()); + expect(currentTransport.close.mock.calls.length).toBe(1); + }); + + it('sets the status to ERROR with the given error', () => { + const error = new Error(); + resumableTransport.close(error); + expect(resumableStatus.kind).toBe('ERROR'); + expect(resumableStatus.error).toBe(error); + }); + + it('calls receive.onError with the given error', () => { + const onError = jest.fn(); + const onSubscribe = subscription => + subscription.request(Number.MAX_SAFE_INTEGER); + resumableTransport.receive().subscribe({onError, onSubscribe}); + const error = new Error(); + resumableTransport.close(error); + expect(onError.mock.calls.length).toBe(1); + expect(onError.mock.calls[0][0]).toBe(error); + }); + }); + + describe('not given an error', () => { + it('closes the transport', () => { + resumableTransport.close(); + expect(currentTransport.close.mock.calls.length).toBe(1); + }); + + it('sets the status to CLOSED', () => { + resumableTransport.close(); + expect(resumableStatus.kind).toBe('CLOSED'); + }); + + it('calls receive.onComplete', () => { + const onComplete = jest.fn(); + const onSubscribe = subscription => + subscription.request(Number.MAX_SAFE_INTEGER); + resumableTransport.receive().subscribe({onComplete, onSubscribe}); + resumableTransport.close(); + expect(onComplete.mock.calls.length).toBe(1); + }); + }); + }); + }); }); diff --git a/packages/rsocket-tcp-client/src/RSocketTcpClient.js b/packages/rsocket-tcp-client/src/RSocketTcpClient.js index 8b981c0f..e2f3889f 100644 --- a/packages/rsocket-tcp-client/src/RSocketTcpClient.js +++ b/packages/rsocket-tcp-client/src/RSocketTcpClient.js @@ -60,8 +60,8 @@ export class RSocketTcpConnection implements DuplexConnection { } } - close(): void { - this._close(); + close(error?: Error): void { + this._close(error); } connect(): void { diff --git a/packages/rsocket-tcp-client/src/__tests__/RSocketTcpClient-test.js b/packages/rsocket-tcp-client/src/__tests__/RSocketTcpClient-test.js index a54913cd..0d27118c 100644 --- a/packages/rsocket-tcp-client/src/__tests__/RSocketTcpClient-test.js +++ b/packages/rsocket-tcp-client/src/__tests__/RSocketTcpClient-test.js @@ -98,29 +98,62 @@ describe('RSocketTcpClient', () => { }); describe('close()', () => { - it('closes the socket', () => { - client.close(); - expect(socket.end.mock.calls.length).toBe(1); - }); + describe('given an error', () => { + it('closes the socket', () => { + client.close(new Error()); + expect(socket.end.mock.calls.length).toBe(1); + }); - it('sets the status to CLOSED', () => { - let status; - client.connectionStatus().subscribe({ - onNext: _status => (status = _status), - onSubscribe: subscription => - subscription.request(Number.MAX_SAFE_INTEGER), + it('sets the status to ERROR with the given error', () => { + let status; + client.connectionStatus().subscribe({ + onNext: _status => (status = _status), + onSubscribe: subscription => + subscription.request(Number.MAX_SAFE_INTEGER), + }); + const error = new Error(); + client.close(error); + expect(status.kind).toBe('ERROR'); + expect(status.error).toBe(error); + }); + + it('calls receive.onError with the given error', () => { + const onError = jest.fn(); + const onSubscribe = subscription => + subscription.request(Number.MAX_SAFE_INTEGER); + client.receive().subscribe({onError, onSubscribe}); + const error = new Error(); + client.close(error); + expect(onError.mock.calls.length).toBe(1); + expect(onError.mock.calls[0][0]).toBe(error); }); - client.close(); - expect(status.kind).toBe('CLOSED'); }); - it('calls receive.onComplete', () => { - const onComplete = jest.fn(); - const onSubscribe = subscription => - subscription.request(Number.MAX_SAFE_INTEGER); - client.receive().subscribe({onComplete, onSubscribe}); - client.close(); - expect(onComplete.mock.calls.length).toBe(1); + describe('not given an error', () => { + it('closes the socket', () => { + client.close(); + expect(socket.end.mock.calls.length).toBe(1); + }); + + it('sets the status to CLOSED', () => { + let status; + client.connectionStatus().subscribe({ + onNext: _status => (status = _status), + onSubscribe: subscription => + subscription.request(Number.MAX_SAFE_INTEGER), + }); + client.close(); + expect(status.kind).toBe('CLOSED'); + }); + + it('calls receive.onComplete', () => { + const onComplete = jest.fn(); + const onSubscribe = subscription => + subscription.request(Number.MAX_SAFE_INTEGER); + client.receive().subscribe({onComplete, onSubscribe}); + client.close(); + expect(onComplete.mock.calls.length).toBe(1); + }); }); }); diff --git a/packages/rsocket-types/src/ReactiveSocketTypes.js b/packages/rsocket-types/src/ReactiveSocketTypes.js index 34cdc99e..cca5c78d 100644 --- a/packages/rsocket-types/src/ReactiveSocketTypes.js +++ b/packages/rsocket-types/src/ReactiveSocketTypes.js @@ -118,10 +118,11 @@ export interface DuplexConnection { receive(): Flowable, /** - * Close the underlying connection, emitting `onComplete` on the receive() - * Publisher. + * Close the underlying connection, optionally providing an error as reason. + * If an error is passed, emits `onError` on the receive() Publisher. + * If no error is passed, emits `onComplete` on the receive() Publisher. */ - close(): void, + close(error?: Error): void, /** * Open the underlying connection. Throws if the connection is already in diff --git a/packages/rsocket-websocket-client/src/RSocketWebSocketClient.js b/packages/rsocket-websocket-client/src/RSocketWebSocketClient.js index 43649271..585b485f 100644 --- a/packages/rsocket-websocket-client/src/RSocketWebSocketClient.js +++ b/packages/rsocket-websocket-client/src/RSocketWebSocketClient.js @@ -62,8 +62,8 @@ export default class RSocketWebSocketClient implements DuplexConnection { this._statusSubscribers = new Set(); } - close(): void { - this._close(); + close(error?: Error): void { + this._close(error); } connect(): void { diff --git a/packages/rsocket-websocket-client/src/__tests__/RSocketWebSocketClient-test.js b/packages/rsocket-websocket-client/src/__tests__/RSocketWebSocketClient-test.js index 2f3a5b6a..454072be 100644 --- a/packages/rsocket-websocket-client/src/__tests__/RSocketWebSocketClient-test.js +++ b/packages/rsocket-websocket-client/src/__tests__/RSocketWebSocketClient-test.js @@ -93,29 +93,62 @@ describe('RSocketWebSocketClient', () => { }); describe('close()', () => { - it('closes the socket', () => { - client.close(); - expect(socket.close.mock.calls.length).toBe(1); - }); + describe('given an error', () => { + it('closes the socket', () => { + client.close(new Error()); + expect(socket.close.mock.calls.length).toBe(1); + }); - it('sets the status to CLOSED', () => { - let status; - client.connectionStatus().subscribe({ - onNext: _status => (status = _status), - onSubscribe: subscription => - subscription.request(Number.MAX_SAFE_INTEGER), + it('sets the status to ERROR with the given error', () => { + let status; + client.connectionStatus().subscribe({ + onNext: _status => (status = _status), + onSubscribe: subscription => + subscription.request(Number.MAX_SAFE_INTEGER), + }); + const error = new Error(); + client.close(error); + expect(status.kind).toBe('ERROR'); + expect(status.error).toBe(error); + }); + + it('calls receive.onError with the given error', () => { + const onError = jest.fn(); + const onSubscribe = subscription => + subscription.request(Number.MAX_SAFE_INTEGER); + client.receive().subscribe({onError, onSubscribe}); + const error = new Error(); + client.close(error); + expect(onError.mock.calls.length).toBe(1); + expect(onError.mock.calls[0][0]).toBe(error); }); - client.close(); - expect(status.kind).toBe('CLOSED'); }); - it('calls receive.onComplete', () => { - const onComplete = jest.fn(); - const onSubscribe = subscription => - subscription.request(Number.MAX_SAFE_INTEGER); - client.receive().subscribe({onComplete, onSubscribe}); - client.close(); - expect(onComplete.mock.calls.length).toBe(1); + describe('not given an error', () => { + it('closes the socket', () => { + client.close(); + expect(socket.close.mock.calls.length).toBe(1); + }); + + it('sets the status to CLOSED', () => { + let status; + client.connectionStatus().subscribe({ + onNext: _status => (status = _status), + onSubscribe: subscription => + subscription.request(Number.MAX_SAFE_INTEGER), + }); + client.close(); + expect(status.kind).toBe('CLOSED'); + }); + + it('calls receive.onComplete', () => { + const onComplete = jest.fn(); + const onSubscribe = subscription => + subscription.request(Number.MAX_SAFE_INTEGER); + client.receive().subscribe({onComplete, onSubscribe}); + client.close(); + expect(onComplete.mock.calls.length).toBe(1); + }); }); }); diff --git a/packages/rsocket-websocket-server/src/RSocketWebSocketServer.js b/packages/rsocket-websocket-server/src/RSocketWebSocketServer.js index 4b1af0a5..a978a282 100644 --- a/packages/rsocket-websocket-server/src/RSocketWebSocketServer.js +++ b/packages/rsocket-websocket-server/src/RSocketWebSocketServer.js @@ -237,8 +237,13 @@ class WSDuplexConnection implements DuplexConnection { }); } - close(): void { - this._socket.emit('close'); + close(error?: Error): void { + if (error) { + this._socket.emit('error', error); + } else { + this._socket.emit('close'); + } + this._socket.close(); } diff --git a/packages/rsocket-websocket-server/src/__tests__/RSocketWebSocketServer-test.js b/packages/rsocket-websocket-server/src/__tests__/RSocketWebSocketServer-test.js index 987dda3a..dbadb718 100644 --- a/packages/rsocket-websocket-server/src/__tests__/RSocketWebSocketServer-test.js +++ b/packages/rsocket-websocket-server/src/__tests__/RSocketWebSocketServer-test.js @@ -74,10 +74,18 @@ describe('RSocketWebSocketServer', () => { expect(status.error).toBe(error); }); - it('returns CLOSED if explicitly closed', () => { + it('returns CLOSED if explicitly closed with no error', () => { connection.receive().subscribe(() => {}); connection.close(); expect(status.kind).toBe('CLOSED'); }); + + it('returns ERROR if explicitly closed with an error', () => { + connection.receive().subscribe(() => {}); + const error = new Error(); + connection.close(error); + expect(status.kind).toBe('ERROR'); + expect(status.error).toBe(error); + }); }); });