From 7b2e7380a4ecc35217e277a6545ac86e7d89085d Mon Sep 17 00:00:00 2001 From: RehanY147 Date: Mon, 29 Apr 2024 03:52:47 +0500 Subject: [PATCH] NAS-128449 / 24.10 / Fix issues with subscriptions (#9995) --- .../widget-memory/widget-memory.component.ts | 4 +--- .../services/websocket-connection.service.ts | 2 +- src/app/services/ws.service.spec.ts | 4 ++-- src/app/services/ws.service.ts | 24 ++++++++++++------- 4 files changed, 19 insertions(+), 15 deletions(-) diff --git a/src/app/pages/dashboard/widgets/memory/widget-memory/widget-memory.component.ts b/src/app/pages/dashboard/widgets/memory/widget-memory/widget-memory.component.ts index 46b6058a608..06ca8c20256 100644 --- a/src/app/pages/dashboard/widgets/memory/widget-memory/widget-memory.component.ts +++ b/src/app/pages/dashboard/widgets/memory/widget-memory/widget-memory.component.ts @@ -6,7 +6,7 @@ import { TinyColor } from '@ctrl/tinycolor'; import { Store } from '@ngrx/store'; import { TranslateService } from '@ngx-translate/core'; import { ChartData, ChartOptions } from 'chart.js'; -import { map, take } from 'rxjs/operators'; +import { map } from 'rxjs/operators'; import { GiB } from 'app/constants/bytes.constant'; import { WidgetResourcesService } from 'app/pages/dashboard/services/widget-resources.service'; import { SlotSize } from 'app/pages/dashboard/types/widget.interface'; @@ -27,14 +27,12 @@ export class WidgetMemoryComponent { protected memory = toSignal(this.resources.realtimeUpdates$.pipe( map((update) => update.fields.virtual_memory), - take(1), )); protected isLoading = computed(() => !this.memory()); protected arcSize = toSignal(this.resources.realtimeUpdates$.pipe( map((update) => update.fields.zfs?.arc_size), - take(1), )); stats = computed(() => { diff --git a/src/app/services/websocket-connection.service.ts b/src/app/services/websocket-connection.service.ts index 42397877480..ef0160ed757 100644 --- a/src/app/services/websocket-connection.service.ts +++ b/src/app/services/websocket-connection.service.ts @@ -157,7 +157,7 @@ export class WebSocketConnectionService { return this.ws$.multiplex( () => ({ id, name, msg: OutgoingApiMessageType.Sub }), () => ({ id, msg: OutgoingApiMessageType.UnSub }), - (message: R) => (message.collection === name), + (message: R) => (message.collection === name && message.msg !== IncomingApiMessageType.NoSub), ) as Observable; } diff --git a/src/app/services/ws.service.spec.ts b/src/app/services/ws.service.spec.ts index efba6fa6883..d8b63ad5045 100644 --- a/src/app/services/ws.service.spec.ts +++ b/src/app/services/ws.service.spec.ts @@ -45,8 +45,8 @@ describe('WebSocketService', () => { jest.spyOn(service.clearSubscriptions$, 'next'); (service as unknown as { - subscriptions: Map>; - }).subscriptions = mockEventSubscriptions; + eventSubscribers: Map>; + }).eventSubscribers = mockEventSubscriptions; jest.clearAllMocks(); }); diff --git a/src/app/services/ws.service.ts b/src/app/services/ws.service.ts index a674c8c6375..b6bfe31ca68 100644 --- a/src/app/services/ws.service.ts +++ b/src/app/services/ws.service.ts @@ -4,7 +4,7 @@ import { TranslateService } from '@ngx-translate/core'; import { UUID } from 'angular2-uuid'; import { environment } from 'environments/environment'; import { - merge, Observable, of, Subject, throwError, + merge, Observable, of, Subject, Subscriber, throwError, } from 'rxjs'; import { filter, map, share, startWith, switchMap, take, takeUntil, takeWhile, tap, @@ -37,7 +37,7 @@ import { WebSocketConnectionService } from 'app/services/websocket-connection.se providedIn: 'root', }) export class WebSocketService { - private readonly subscriptions = new Map>(); + private readonly eventSubscribers = new Map>(); clearSubscriptions$ = new Subject(); mockUtils: MockEnclosureUtils; @@ -116,11 +116,16 @@ export class WebSocketService { } subscribe(method: K): Observable> { - if (this.subscriptions.has(method)) { - return this.subscriptions.get(method); + if (this.eventSubscribers.has(method)) { + return this.eventSubscribers.get(method); } - - const subscription$ = this.wsManager.buildSubscriber>(method).pipe( + const observable$ = new Observable((trigger: Subscriber>) => { + const subscription = this.wsManager.buildSubscriber>(method).subscribe(trigger); + return () => { + subscription.unsubscribe(); + this.eventSubscribers.delete(method); + }; + }).pipe( switchMap((apiEvent) => { const erroredEvent = apiEvent as unknown as ResultMessage; if (erroredEvent?.error) { @@ -132,8 +137,9 @@ export class WebSocketService { share(), takeUntil(this.clearSubscriptions$), ); - this.subscriptions.set(method, subscription$); - return subscription$; + + this.eventSubscribers.set(method, observable$); + return observable$; } subscribeToLogs(name: string): Observable> { @@ -142,7 +148,7 @@ export class WebSocketService { clearSubscriptions(): void { this.clearSubscriptions$.next(); - this.subscriptions.clear(); + this.eventSubscribers.clear(); } private callMethod(method: M, params?: ApiCallParams): Observable>;