Skip to content

Commit

Permalink
NAS-128449 / 24.10 / Fix issues with subscriptions (#9995)
Browse files Browse the repository at this point in the history
  • Loading branch information
RehanY147 committed Apr 28, 2024
1 parent 3f42d5e commit 7b2e738
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 15 deletions.
Expand Up @@ -6,7 +6,7 @@ import { TinyColor } from '@ctrl/tinycolor';
import { Store } from '@ngrx/store';
import { TranslateService } from '@ngx-translate/core';
import { ChartData, ChartOptions } from 'chart.js';
import { map, take } from 'rxjs/operators';
import { map } from 'rxjs/operators';
import { GiB } from 'app/constants/bytes.constant';
import { WidgetResourcesService } from 'app/pages/dashboard/services/widget-resources.service';
import { SlotSize } from 'app/pages/dashboard/types/widget.interface';
Expand All @@ -27,14 +27,12 @@ export class WidgetMemoryComponent {

protected memory = toSignal(this.resources.realtimeUpdates$.pipe(
map((update) => update.fields.virtual_memory),
take(1),
));

protected isLoading = computed(() => !this.memory());

protected arcSize = toSignal(this.resources.realtimeUpdates$.pipe(
map((update) => update.fields.zfs?.arc_size),
take(1),
));

stats = computed(() => {
Expand Down
2 changes: 1 addition & 1 deletion src/app/services/websocket-connection.service.ts
Expand Up @@ -157,7 +157,7 @@ export class WebSocketConnectionService {
return this.ws$.multiplex(
() => ({ id, name, msg: OutgoingApiMessageType.Sub }),
() => ({ id, msg: OutgoingApiMessageType.UnSub }),
(message: R) => (message.collection === name),
(message: R) => (message.collection === name && message.msg !== IncomingApiMessageType.NoSub),
) as Observable<R>;
}

Expand Down
4 changes: 2 additions & 2 deletions src/app/services/ws.service.spec.ts
Expand Up @@ -45,8 +45,8 @@ describe('WebSocketService', () => {
jest.spyOn(service.clearSubscriptions$, 'next');

(service as unknown as {
subscriptions: Map<string, Observable<ApiEvent>>;
}).subscriptions = mockEventSubscriptions;
eventSubscribers: Map<string, Observable<ApiEvent>>;
}).eventSubscribers = mockEventSubscriptions;

jest.clearAllMocks();
});
Expand Down
24 changes: 15 additions & 9 deletions src/app/services/ws.service.ts
Expand Up @@ -4,7 +4,7 @@ import { TranslateService } from '@ngx-translate/core';
import { UUID } from 'angular2-uuid';
import { environment } from 'environments/environment';
import {
merge, Observable, of, Subject, throwError,
merge, Observable, of, Subject, Subscriber, throwError,
} from 'rxjs';
import {
filter, map, share, startWith, switchMap, take, takeUntil, takeWhile, tap,
Expand Down Expand Up @@ -37,7 +37,7 @@ import { WebSocketConnectionService } from 'app/services/websocket-connection.se
providedIn: 'root',
})
export class WebSocketService {
private readonly subscriptions = new Map<ApiEventMethod, Observable<ApiEventTyped>>();
private readonly eventSubscribers = new Map<ApiEventMethod, Observable<ApiEventTyped>>();
clearSubscriptions$ = new Subject<void>();
mockUtils: MockEnclosureUtils;

Expand Down Expand Up @@ -116,11 +116,16 @@ export class WebSocketService {
}

subscribe<K extends ApiEventMethod = ApiEventMethod>(method: K): Observable<ApiEventTyped<K>> {
if (this.subscriptions.has(method)) {
return this.subscriptions.get(method);
if (this.eventSubscribers.has(method)) {
return this.eventSubscribers.get(method);
}

const subscription$ = this.wsManager.buildSubscriber<K, ApiEventTyped<K>>(method).pipe(
const observable$ = new Observable((trigger: Subscriber<ApiEventTyped<K>>) => {
const subscription = this.wsManager.buildSubscriber<K, ApiEventTyped<K>>(method).subscribe(trigger);
return () => {
subscription.unsubscribe();
this.eventSubscribers.delete(method);
};
}).pipe(
switchMap((apiEvent) => {
const erroredEvent = apiEvent as unknown as ResultMessage;
if (erroredEvent?.error) {
Expand All @@ -132,8 +137,9 @@ export class WebSocketService {
share(),
takeUntil(this.clearSubscriptions$),
);
this.subscriptions.set(method, subscription$);
return subscription$;

this.eventSubscribers.set(method, observable$);
return observable$;
}

subscribeToLogs(name: string): Observable<ApiEvent<{ data: string }>> {
Expand All @@ -142,7 +148,7 @@ export class WebSocketService {

clearSubscriptions(): void {
this.clearSubscriptions$.next();
this.subscriptions.clear();
this.eventSubscribers.clear();
}

private callMethod<M extends ApiCallMethod>(method: M, params?: ApiCallParams<M>): Observable<ApiCallResponse<M>>;
Expand Down

0 comments on commit 7b2e738

Please sign in to comment.