Skip to content

Commit

Permalink
DuplexConnection API simplification
Browse files Browse the repository at this point in the history
Summary:
Merges TransportClient and DuplexConnection to simplify composition. Adds new APIs:
- connect(): moved from TransportClient to the connection itself
- connectionStatus(): allows observing changes to the connection status over time

Reviewed By: mitermayer

Differential Revision: D5333595

fbshipit-source-id: 3e19aae783c0b5c96a01e38c30a89845c5bb05ca
  • Loading branch information
josephsavona authored and facebook-github-bot committed Mar 16, 2018
1 parent f9cc25a commit 06ccead
Show file tree
Hide file tree
Showing 12 changed files with 648 additions and 550 deletions.
43 changes: 36 additions & 7 deletions packages/ReactiveSocketTypes.js
Original file line number Diff line number Diff line change
Expand Up @@ -100,17 +100,46 @@ export interface DuplexConnection {
close(): void,

/**
* Returns a Promise that resolves when the connection is closed. Implementations
* must resolve the promise as follows:
* - Resolve when the connection is closed explicitly with close().
* - Resolve when the underlying connection is closed by the peer.
* - Resolve when the underlying connection is closed due to an error.
*
* The promise should never be rejected, only resolved.
* Open the underlying connection. Throws if the connection is already in
* the CLOSED or ERROR state.
*/
connect(): void,

// DEPRECATED, Use connectionStatus() instead
onClose(): Promise<void>,

/**
* Returns a Flowable that immediately publishes the current connection
* status and thereafter updates as it changes. Once a connection is in
* the CLOSED or ERROR state, it may not be connected again.
* Implementations must publish values per the comments on ConnectionStatus.
*/
connectionStatus(): Flowable<ConnectionStatus>,
}

/**
* Describes the connection status of a ReactiveSocket/DuplexConnection.
* - NOT_CONNECTED: (only) until `connect()` has been called for the first time.
* - CONNECTING: when `connect()` has been called but a connection is not yet
* established.
* - CONNECTED: when a connection is established.
* - CLOSED: when the connection has been explicitly closed via `close()`.
* - ERROR: when the connection has been closed for any other reason.
*/
export type ConnectionStatus =
| {kind: 'NOT_CONNECTED'}
| {kind: 'CONNECTING'}
| {kind: 'CONNECTED'}
| {kind: 'CLOSED'}
| {kind: 'ERROR', error: Error};

export const CONNECTION_STATUS = {
CLOSED: Object.freeze({kind: 'CLOSED'}),
CONNECTED: Object.freeze({kind: 'CONNECTED'}),
CONNECTING: Object.freeze({kind: 'CONNECTING'}),
NOT_CONNECTED: Object.freeze({kind: 'NOT_CONNECTED'}),
};

/**
* A type that can be written to a buffer.
*/
Expand Down
2 changes: 1 addition & 1 deletion packages/rsocket-core/src/RSocketBinaryFraming.js
Original file line number Diff line number Diff line change
Expand Up @@ -829,7 +829,7 @@ function serializeResumeFrame(
offset + resumeTokenLength,
);
offset = writeUInt64BE(buffer, frame.clientPosition, offset);
offset = writeUInt64BE(buffer, frame.serverPosition, offset);
writeUInt64BE(buffer, frame.serverPosition, offset);
return buffer;
}

Expand Down
64 changes: 33 additions & 31 deletions packages/rsocket-core/src/RSocketClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,6 @@ import {
import {MAJOR_VERSION, MINOR_VERSION} from './RSocketVersion';
import {IdentitySerializers} from './RSocketSerialization';

export interface TransportClient {
connect(): Single<DuplexConnection>,
}
export type ClientConfig<D, M> = {|
serializers?: {
data: Serializer<D>,
Expand All @@ -55,7 +52,7 @@ export type ClientConfig<D, M> = {|
lifetime: number,
metadataMimeType: string,
|},
transport: TransportClient,
transport: DuplexConnection,
|};

/**
Expand All @@ -82,14 +79,7 @@ export default class RSocketClient<D, M> {
}

close(): void {
if (this._cancel) {
this._cancel();
this._cancel = null;
}
if (this._socket) {
this._socket.close();
this._socket = null;
}
this._config.transport.close();
}

connect(): Single<ReactiveSocket<D, M>> {
Expand All @@ -98,18 +88,30 @@ export default class RSocketClient<D, M> {
'RSocketClient: Unexpected call to connect(), already connected.',
);
this._connection = new Single(subscriber => {
this._config.transport.connect().subscribe({
onComplete: connection => {
const socket = new RSocketClientSocket(this._config, connection);
this._socket = socket;
subscriber.onComplete(socket);
const transport = this._config.transport;
let subscription;
transport.connectionStatus().subscribe({
onNext: status => {
if (status.kind === 'CONNECTED') {
subscription && subscription.cancel();
subscriber.onComplete(
new RSocketClientSocket(this._config, transport),
);
} else if (status.kind === 'ERROR') {
subscription && subscription.cancel();
subscriber.onError(status.error);
} else if (status.kind === 'CLOSED') {
subscription && subscription.cancel();
subscriber.onError(new Error('RSocketClient: Connection closed.'));
}
},
onError: error => subscriber.onError(error),
onSubscribe: cancel => {
this._cancel = cancel;
subscriber.onSubscribe(cancel);
onSubscribe: _subscription => {
subscriber.onSubscribe(() => _subscription.cancel());
subscription = _subscription;
subscription.request(Number.MAX_SAFE_INTEGER);
},
});
transport.connect();
});
return this._connection;
}
Expand Down Expand Up @@ -141,8 +143,6 @@ class RSocketClientSocket<D, M> implements ReactiveSocket<D, M> {

// Subscribe to completion/errors before sending anything
this._connection.receive().subscribe({
onComplete: this._handleConnectionClose,
onError: this._handleConnectionError,
onNext: this._handleFrame,
onSubscribe: subscription =>
subscription.request(Number.MAX_SAFE_INTEGER),
Expand All @@ -165,7 +165,7 @@ class RSocketClientSocket<D, M> implements ReactiveSocket<D, M> {
// Cleanup when the connection closes
this._connection
.onClose()
.then(this._handleConnectionClose, this._handleConnectionError);
.then(this._handleTransportClose, this._handleError);
}

close(): void {
Expand Down Expand Up @@ -309,27 +309,29 @@ class RSocketClientSocket<D, M> implements ReactiveSocket<D, M> {
/**
* Handle the connection closing normally: this is an error for any open streams.
*/
_handleConnectionClose = (): void => {
this._handleConnectionError(
new Error('RSocketClient: The connection was closed.'),
);
_handleTransportClose = (): void => {
this._close.resolve();
this._handleError(new Error('RSocketClient: The connection was closed.'));
};

/**
* Handle the transport connection closing abnormally or a connection-level protocol error.
*/
_handleConnectionError = (error: Error) => {
_handleError = (error: Error) => {
// Error any open request streams
this._receivers.forEach(receiver => {
receiver.onError(error);
});
this._receivers.clear();
// In case of a protocol-level error, close the stream.
this._connection.close();
// Resolve onClose()
this._close.reject(error);
};

_handleConnectionError(error: Error): void {
this._handleError(error);
this._connection.close();
}

/**
* Handle a frame received from the transport client.
*/
Expand Down
36 changes: 31 additions & 5 deletions packages/rsocket-core/src/__mocks__/MockDuplexConnection.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,15 @@ import {genMockPublisher} from 'MockFlowableSubscription';
export function genMockConnection() {
const deferred = new Deferred();
const receiver = genMockPublisher();
const status = genMockPublisher();
let closed = false;

const connection = {
close: jest.fn(() => {
deferred.resolve();
connection.mock.close();
}),
connect: jest.fn(),
connectionStatus: jest.fn(() => status),
onClose: jest.fn(() => {
return deferred.getPromise();
}),
Expand All @@ -39,22 +43,44 @@ export function genMockConnection() {
// Convenience methods to terminate the connection
connection.mock = {
close: () => {
if (closed) {
return;
}
closed = true;
status.onNext({kind: 'CLOSED'});
receiver.onComplete();
deferred.resolve();
},
closeWithError: error => {
if (closed) {
return;
}
closed = true;
status.onNext({
error,
kind: 'ERROR',
});
receiver.onError(error);
deferred.resolve();
deferred.reject(error);
},
connect: () => {
if (closed) {
return;
}
status.onNext({kind: 'CONNECTING'});
status.onNext({kind: 'CONNECTED'});
},
};

// Convenience to call mockClear() on all instance methods
connection.mockClear = () => {
connection.send.mockClear();
connection.sendOne.mockClear();
connection.close.mockClear();
connection.connect.mockClear();
connection.onClose.mockClear();
connection.receive.mockClear();
connection.receive.mock.publisher = receiver;
connection.close.mockClear();
connection.send.mockClear();
connection.sendOne.mockClear();
};
return connection;
}
Loading

0 comments on commit 06ccead

Please sign in to comment.