diff --git a/src/hub-connection.model.ts b/src/hub-connection.model.ts index 115db09..c5c766b 100644 --- a/src/hub-connection.model.ts +++ b/src/hub-connection.model.ts @@ -3,9 +3,15 @@ import { IHubConnectionOptions } from "@aspnet/signalr-client/dist/src/IHubConne import { Dictionary } from "./utils/dictionary"; export enum ConnectionStatus { + disconnected, + connecting, + connected +} + +export enum InternalConnectionStatus { + disconnected, ready, - connected, - disconnected + connected } export interface ConnectionState { diff --git a/src/hub-connection.ts b/src/hub-connection.ts index 1a19376..8492ccc 100644 --- a/src/hub-connection.ts +++ b/src/hub-connection.ts @@ -1,4 +1,7 @@ -import { tap, map, filter, switchMap, skipUntil, take, delay, first, retryWhen, scan, delayWhen, defaultIfEmpty } from "rxjs/operators"; +import { + tap, map, filter, switchMap, skipUntil, take, delay, first, + retryWhen, scan, delayWhen, defaultIfEmpty, distinctUntilChanged +} from "rxjs/operators"; import { HubConnection as SignalRHubConnection } from "@aspnet/signalr-client"; import { fromPromise } from "rxjs/observable/fromPromise"; import { timer } from "rxjs/observable/timer"; @@ -6,16 +9,18 @@ import { BehaviorSubject } from "rxjs/BehaviorSubject"; import { Observable } from "rxjs/Observable"; import { Observer } from "rxjs/Observer"; -import { ConnectionState, ConnectionStatus, HubConnectionOptions, ReconnectionStrategyOptions } from "./hub-connection.model"; +import { + ConnectionState, ConnectionStatus, HubConnectionOptions, + ReconnectionStrategyOptions, InternalConnectionStatus +} from "./hub-connection.model"; import { getReconnectionDelay } from "./reconnection-strategy"; import { buildQueryString } from "./utils/query-string"; import { Dictionary } from "./utils/dictionary"; import { emptyNext } from "./utils/rxjs"; const errorReasonName = "error"; -const connectedState: ConnectionState = { status: ConnectionStatus.connected }; -const connectionReadyState: ConnectionState = { status: ConnectionStatus.ready }; const disconnectedState: ConnectionState = { status: ConnectionStatus.disconnected }; +const connectedState: ConnectionState = { status: ConnectionStatus.connected }; export class HubConnection { @@ -24,9 +29,14 @@ export class HubConnection { private source: string; private hubConnection: SignalRHubConnection; private retry: ReconnectionStrategyOptions; - private waitUntilConnect$: Observable; private hubConnectionOptions$: BehaviorSubject; private _connectionState$ = new BehaviorSubject(disconnectedState); + private internalConnStatus$ = new BehaviorSubject(InternalConnectionStatus.disconnected); + + private waitUntilConnect$ = this.connectionState$.pipe( + distinctUntilChanged((x, y) => x.status === y.status), + skipUntil(this.connectionState$.pipe(filter(x => x.status === ConnectionStatus.connected))) + ); constructor(connectionOption: HubConnectionOptions) { this.source = `[${connectionOption.key}] HubConnection ::`; @@ -36,14 +46,14 @@ export class HubConnection { const connection$ = this.hubConnectionOptions$.pipe( // debounceTime(10), - map(connectionOpts => [connectionOpts, this._connectionState$.value.status] as [HubConnectionOptions, ConnectionStatus]), + map(connectionOpts => [connectionOpts, this.internalConnStatus$.value] as [HubConnectionOptions, InternalConnectionStatus]), switchMap(([connectionOpts, prevConnectionStatus]) => this.disconnect().pipe( map(() => buildQueryString(connectionOpts.data)), tap(queryString => this.hubConnection = new SignalRHubConnection(`${connectionOpts.endpointUri}${queryString}`, connectionOpts.options) ), - tap(() => this._connectionState$.next(connectionReadyState)), - filter(() => prevConnectionStatus === ConnectionStatus.connected), + tap(() => this.internalConnStatus$.next(InternalConnectionStatus.ready)), + filter(() => prevConnectionStatus === InternalConnectionStatus.connected), switchMap(() => this.openConnection()) )) ); @@ -55,27 +65,22 @@ export class HubConnection { reconnect$.subscribe(); connection$.subscribe(); - - this.waitUntilConnect$ = this.connectionState$.pipe( - skipUntil(this.connectionState$.pipe(filter(x => x.status === ConnectionStatus.connected))), - first() - ); } connect(): Observable { - if (this._connectionState$.value.status === ConnectionStatus.connected) { + if (this.internalConnStatus$.value === InternalConnectionStatus.connected) { console.warn(`${this.source} session already connected`); return emptyNext(); } return emptyNext().pipe( - switchMap(() => this._connectionState$.pipe( + switchMap(() => this.internalConnStatus$.pipe( tap(x => { - if (x.status === ConnectionStatus.disconnected) { + if (x === InternalConnectionStatus.disconnected) { this.hubConnectionOptions$.next(this.hubConnectionOptions$.value); } }), - skipUntil(this._connectionState$.pipe(filter(x => x.status === ConnectionStatus.ready))), + skipUntil(this.internalConnStatus$.pipe(filter(x => x === InternalConnectionStatus.ready))), first() )), switchMap(() => this.openConnection()) @@ -106,7 +111,7 @@ export class HubConnection { }); return emptyNext().pipe( - switchMap(() => this.connectionState$.pipe( + switchMap(() => this.waitUntilConnect$.pipe( filter(() => this._connectionState$.value.status === ConnectionStatus.connected) )), switchMap(() => stream$) @@ -128,18 +133,17 @@ export class HubConnection { return () => { emptyNext().pipe( delay(1), // workaround - when connection disconnects, stream errors fires before `signalr.onClose` - filter(() => this._connectionState$.value.status === ConnectionStatus.connected), - switchMap(() => this.send("StreamUnsubscribe", methodName, ...args)), - first() - ); + filter(() => this.internalConnStatus$.value === InternalConnectionStatus.connected), + switchMap(() => this.send("StreamUnsubscribe", methodName, ...args)) + ).subscribe(); }; }); - return emptyNext().pipe( - switchMap(() => this.waitUntilConnect$), + return this.waitUntilConnect$.pipe( switchMap(() => stream$.pipe( retryWhen((errors: Observable) => errors.pipe( delay(1), // workaround - when connection disconnects, stream errors fires before `signalr.onClose` + tap(() => console.log(`${this.source} stream - retrying >> INIT !...`)), delayWhen(() => this.waitUntilConnect$) )) )) @@ -155,7 +159,7 @@ export class HubConnection { } disconnect() { - if (this._connectionState$.value.status === ConnectionStatus.disconnected) { + if (this.internalConnStatus$.value === InternalConnectionStatus.disconnected) { return emptyNext(); } @@ -175,16 +179,27 @@ export class HubConnection { const delayRetries = getReconnectionDelay(this.retry, retryCount); // tslint:disable-next-line:no-console console.debug(`${this.source} connect :: retrying`, { retryCount, maximumAttempts: this.retry.maximumAttempts, delayRetries }); + this._connectionState$.next({ + status: ConnectionStatus.connecting, + reason: "reconnecting", + data: { retryCount, maximumAttempts: this.retry.maximumAttempts, delayRetries } + }); this.hubConnectionOptions$.next(this.hubConnectionOptions$.value); return timer(delayRetries); }) )), + tap(() => this.internalConnStatus$.next(InternalConnectionStatus.connected)), tap(() => this._connectionState$.next(connectedState)), tap(() => { this.hubConnection.onclose(err => { + this.internalConnStatus$.next(InternalConnectionStatus.disconnected); if (err) { - console.error(`${this.source} session disconnected with errors`, err); - this._connectionState$.next({ status: ConnectionStatus.disconnected, reason: errorReasonName, data: err }); + console.error(`${this.source} session disconnected with errors`, { name: err.name, message: err.message}); + this._connectionState$.next({ + status: ConnectionStatus.disconnected, + reason: errorReasonName, + data: { name: err.name, message: err.message } + }); } else { console.warn(`${this.source} session disconnected`); this._connectionState$.next(disconnectedState);