Skip to content

Commit

Permalink
refactor(hub connection): rework connection status + spit into two st…
Browse files Browse the repository at this point in the history
…ates.

BREAKING CHANGE: ConnectionStatus `ready` has been renamed to `connecting`
  • Loading branch information
claylaut committed Dec 8, 2017
1 parent d303000 commit 37aece7
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 29 deletions.
10 changes: 8 additions & 2 deletions src/hub-connection.model.ts
Expand Up @@ -3,9 +3,15 @@ import { IHubConnectionOptions } from "@aspnet/signalr-client/dist/src/IHubConne
import { Dictionary } from "./utils/dictionary";

export enum ConnectionStatus {
disconnected,
connecting,
connected
}

export enum InternalConnectionStatus {
disconnected,
ready,
connected,
disconnected
connected
}

export interface ConnectionState {
Expand Down
69 changes: 42 additions & 27 deletions src/hub-connection.ts
@@ -1,21 +1,26 @@
import { tap, map, filter, switchMap, skipUntil, take, delay, first, retryWhen, scan, delayWhen, defaultIfEmpty } from "rxjs/operators";
import {
tap, map, filter, switchMap, skipUntil, take, delay, first,
retryWhen, scan, delayWhen, defaultIfEmpty, distinctUntilChanged
} from "rxjs/operators";
import { HubConnection as SignalRHubConnection } from "@aspnet/signalr-client";
import { fromPromise } from "rxjs/observable/fromPromise";
import { timer } from "rxjs/observable/timer";
import { BehaviorSubject } from "rxjs/BehaviorSubject";
import { Observable } from "rxjs/Observable";
import { Observer } from "rxjs/Observer";

import { ConnectionState, ConnectionStatus, HubConnectionOptions, ReconnectionStrategyOptions } from "./hub-connection.model";
import {
ConnectionState, ConnectionStatus, HubConnectionOptions,
ReconnectionStrategyOptions, InternalConnectionStatus
} from "./hub-connection.model";
import { getReconnectionDelay } from "./reconnection-strategy";
import { buildQueryString } from "./utils/query-string";
import { Dictionary } from "./utils/dictionary";
import { emptyNext } from "./utils/rxjs";

const errorReasonName = "error";
const connectedState: ConnectionState = { status: ConnectionStatus.connected };
const connectionReadyState: ConnectionState = { status: ConnectionStatus.ready };
const disconnectedState: ConnectionState = { status: ConnectionStatus.disconnected };
const connectedState: ConnectionState = { status: ConnectionStatus.connected };

export class HubConnection<THub> {

Expand All @@ -24,9 +29,14 @@ export class HubConnection<THub> {
private source: string;
private hubConnection: SignalRHubConnection;
private retry: ReconnectionStrategyOptions;
private waitUntilConnect$: Observable<ConnectionState>;
private hubConnectionOptions$: BehaviorSubject<HubConnectionOptions>;
private _connectionState$ = new BehaviorSubject<ConnectionState>(disconnectedState);
private internalConnStatus$ = new BehaviorSubject<InternalConnectionStatus>(InternalConnectionStatus.disconnected);

private waitUntilConnect$ = this.connectionState$.pipe(
distinctUntilChanged((x, y) => x.status === y.status),
skipUntil(this.connectionState$.pipe(filter(x => x.status === ConnectionStatus.connected)))
);

constructor(connectionOption: HubConnectionOptions) {
this.source = `[${connectionOption.key}] HubConnection ::`;
Expand All @@ -36,14 +46,14 @@ export class HubConnection<THub> {

const connection$ = this.hubConnectionOptions$.pipe(
// debounceTime(10),
map(connectionOpts => [connectionOpts, this._connectionState$.value.status] as [HubConnectionOptions, ConnectionStatus]),
map(connectionOpts => [connectionOpts, this.internalConnStatus$.value] as [HubConnectionOptions, InternalConnectionStatus]),
switchMap(([connectionOpts, prevConnectionStatus]) => this.disconnect().pipe(
map(() => buildQueryString(connectionOpts.data)),
tap(queryString =>
this.hubConnection = new SignalRHubConnection(`${connectionOpts.endpointUri}${queryString}`, connectionOpts.options)
),
tap(() => this._connectionState$.next(connectionReadyState)),
filter(() => prevConnectionStatus === ConnectionStatus.connected),
tap(() => this.internalConnStatus$.next(InternalConnectionStatus.ready)),
filter(() => prevConnectionStatus === InternalConnectionStatus.connected),
switchMap(() => this.openConnection())
))
);
Expand All @@ -55,27 +65,22 @@ export class HubConnection<THub> {

reconnect$.subscribe();
connection$.subscribe();

this.waitUntilConnect$ = this.connectionState$.pipe(
skipUntil(this.connectionState$.pipe(filter(x => x.status === ConnectionStatus.connected))),
first()
);
}

connect(): Observable<void> {
if (this._connectionState$.value.status === ConnectionStatus.connected) {
if (this.internalConnStatus$.value === InternalConnectionStatus.connected) {
console.warn(`${this.source} session already connected`);
return emptyNext();
}

return emptyNext().pipe(
switchMap(() => this._connectionState$.pipe(
switchMap(() => this.internalConnStatus$.pipe(
tap(x => {
if (x.status === ConnectionStatus.disconnected) {
if (x === InternalConnectionStatus.disconnected) {
this.hubConnectionOptions$.next(this.hubConnectionOptions$.value);
}
}),
skipUntil(this._connectionState$.pipe(filter(x => x.status === ConnectionStatus.ready))),
skipUntil(this.internalConnStatus$.pipe(filter(x => x === InternalConnectionStatus.ready))),
first()
)),
switchMap(() => this.openConnection())
Expand Down Expand Up @@ -106,7 +111,7 @@ export class HubConnection<THub> {
});

return emptyNext().pipe(
switchMap(() => this.connectionState$.pipe(
switchMap(() => this.waitUntilConnect$.pipe(
filter(() => this._connectionState$.value.status === ConnectionStatus.connected)
)),
switchMap(() => stream$)
Expand All @@ -128,18 +133,17 @@ export class HubConnection<THub> {
return () => {
emptyNext().pipe(
delay(1), // workaround - when connection disconnects, stream errors fires before `signalr.onClose`
filter(() => this._connectionState$.value.status === ConnectionStatus.connected),
switchMap(() => this.send("StreamUnsubscribe", methodName, ...args)),
first()
);
filter(() => this.internalConnStatus$.value === InternalConnectionStatus.connected),
switchMap(() => this.send("StreamUnsubscribe", methodName, ...args))
).subscribe();
};
});

return emptyNext().pipe(
switchMap(() => this.waitUntilConnect$),
return this.waitUntilConnect$.pipe(
switchMap(() => stream$.pipe(
retryWhen((errors: Observable<any>) => errors.pipe(
delay(1), // workaround - when connection disconnects, stream errors fires before `signalr.onClose`
tap(() => console.log(`${this.source} stream - retrying >> INIT !...`)),
delayWhen(() => this.waitUntilConnect$)
))
))
Expand All @@ -155,7 +159,7 @@ export class HubConnection<THub> {
}

disconnect() {
if (this._connectionState$.value.status === ConnectionStatus.disconnected) {
if (this.internalConnStatus$.value === InternalConnectionStatus.disconnected) {
return emptyNext();
}

Expand All @@ -175,16 +179,27 @@ export class HubConnection<THub> {
const delayRetries = getReconnectionDelay(this.retry, retryCount);
// tslint:disable-next-line:no-console
console.debug(`${this.source} connect :: retrying`, { retryCount, maximumAttempts: this.retry.maximumAttempts, delayRetries });
this._connectionState$.next({
status: ConnectionStatus.connecting,
reason: "reconnecting",
data: { retryCount, maximumAttempts: this.retry.maximumAttempts, delayRetries }
});
this.hubConnectionOptions$.next(this.hubConnectionOptions$.value);
return timer(delayRetries);
})
)),
tap(() => this.internalConnStatus$.next(InternalConnectionStatus.connected)),
tap(() => this._connectionState$.next(connectedState)),
tap(() => {
this.hubConnection.onclose(err => {
this.internalConnStatus$.next(InternalConnectionStatus.disconnected);
if (err) {
console.error(`${this.source} session disconnected with errors`, err);
this._connectionState$.next({ status: ConnectionStatus.disconnected, reason: errorReasonName, data: err });
console.error(`${this.source} session disconnected with errors`, { name: err.name, message: err.message});
this._connectionState$.next({
status: ConnectionStatus.disconnected,
reason: errorReasonName,
data: { name: err.name, message: err.message }
});
} else {
console.warn(`${this.source} session disconnected`);
this._connectionState$.next(disconnectedState);
Expand Down

0 comments on commit 37aece7

Please sign in to comment.