Skip to content

Commit

Permalink
jwa(front): Use the new Polling Service
Browse files Browse the repository at this point in the history
The frontend code of JWA will now be using the new Poller Service which
has a pure RxJS implementation underneath. This will make it simpler to
cancel in-flight requests and also moves the reset logic into the common
code.
  • Loading branch information
kimwnasptd committed Nov 8, 2022
1 parent 5f5e327 commit 118f434
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ import {
ToolbarButtonConfig,
addColumn,
removeColumn,
PollerService,
} from 'kubeflow';
import { JWABackendService } from 'src/app/services/backend.service';
import { Observable, Subscription, of, forkJoin } from 'rxjs';
import { map, filter } from 'rxjs/operators';
import {
defaultConfig,
getDeleteDialogConfig,
Expand All @@ -25,6 +25,7 @@ import {
} from './config';
import { isEqual } from 'lodash';
import { NotebookResponseObject, NotebookProcessedObject } from 'src/app/types';
import { map } from 'rxjs/operators';
import { Router } from '@angular/router';

@Component({
Expand All @@ -34,15 +35,12 @@ import { Router } from '@angular/router';
})
export class IndexDefaultComponent implements OnInit, OnDestroy {
env = environment;
poller: ExponentialBackoff;

currNamespace: string | null = null;
allNamespaces: string[] | null = null;
subs = new Subscription();
currRequest = new Subscription();
nsSub = new Subscription();
pollSub = new Subscription();

currNamespace: string | string[];
config = defaultConfig;
rawData: NotebookResponseObject[] = [];
processedData: NotebookProcessedObject[] = [];

buttons: ToolbarButton[] = [this.newNotebookButton];
Expand All @@ -57,7 +55,7 @@ export class IndexDefaultComponent implements OnInit, OnDestroy {
},
};

if (this.currNamespace === null) {
if (Array.isArray(this.currNamespace)) {
config.disabled = true;
}

Expand All @@ -70,96 +68,57 @@ export class IndexDefaultComponent implements OnInit, OnDestroy {
public confirmDialog: ConfirmDialogService,
public snackBar: SnackBarService,
public router: Router,
public poller: PollerService,
) {}

ngOnInit(): void {
this.poller = new ExponentialBackoff({ interval: 1000, retries: 3 });

// Poll for new data and reset the poller if different data is found
this.subs.add(
this.poller.start().subscribe(() => {
this.currRequest = this.getNotebooksObservable().subscribe(
notebooks => {
this.updateNotebooks(notebooks);
},
);
}),
);

// Reset the poller whenever the selected namespace changes
this.subs.add(
this.ns.getSelectedNamespace2().subscribe(ns => {
if (Array.isArray(ns)) {
this.currNamespace = null;
this.allNamespaces = ns;
addColumn(this.config, NAMESPACE_COLUMN, 'name');
} else {
this.currNamespace = ns;
this.allNamespaces = null;
removeColumn(this.config, 'namespace');
}
this.resetPolling();
this.updateButtons();
}),
);
this.nsSub = this.ns.getSelectedNamespace2().subscribe(ns => {
this.currNamespace = ns;

// update the table columns
if (Array.isArray(ns)) {
addColumn(this.config, NAMESPACE_COLUMN, 'name');
} else {
removeColumn(this.config, 'namespace');
}

this.poll(ns);
this.updateButtons();
});
}

ngOnDestroy() {
this.subs.unsubscribe();
this.poller.stop();
this.nsSub.unsubscribe();
this.pollSub.unsubscribe();
}

resetPolling() {
this.currRequest.unsubscribe();
this.rawData = [];
public poll(ns: string | string[]) {
this.pollSub.unsubscribe();
this.processedData = [];
this.poller.reset();
}

correctNamespace(notebooks: NotebookResponseObject[]): boolean {
// Only update the current data when they are intended for the current
// namespace. Ideally the canceled requests should not be updating the
// state
if (this.allNamespaces) {
return true;
}
const request = this.getNotebooksObservable(ns);

return notebooks.every((nb: NotebookResponseObject, i) => {
return nb.namespace === this.currNamespace;
this.pollSub = this.poller.exponential(request).subscribe(notebooks => {
this.processedData = this.processIncomingData(notebooks);
});
}

updateNotebooks(notebooks) {
// FIXME: This check is required because the ExponentialBackoff is not
// canceling all inflight request when we reset it.
if (!this.correctNamespace(notebooks)) {
return;
}

if (isEqual(this.rawData, notebooks)) {
return;
}

this.rawData = notebooks;

// Update the frontend's state
this.processedData = this.processIncomingData(notebooks);
this.poller.reset();
}

getNotebooksObservable(): Observable<NotebookResponseObject[]> {
if (!this.currNamespace && !this.allNamespaces) {
getNotebooksObservable(
ns: string | string[],
): Observable<NotebookResponseObject[]> {
if (!ns) {
return of([]);
}

if (this.currNamespace) {
return this.backend.getNotebooks(this.currNamespace);
if (!Array.isArray(ns)) {
return this.backend.getNotebooks(ns);
}

// make a request for each namespace and gather all Notebooks
const requests: Observable<NotebookResponseObject[]>[] = [];
for (const ns of this.allNamespaces) {
requests.push(this.backend.getNotebooks(ns));
for (const namespace of ns) {
requests.push(this.backend.getNotebooks(namespace));
}

// wait until all requests complete
Expand Down Expand Up @@ -210,9 +169,11 @@ export class IndexDefaultComponent implements OnInit, OnDestroy {
}

// Close the open dialog only if the DELETE request succeeded
this.backend.deleteNotebook(this.currNamespace, notebook.name).subscribe({
this.backend.deleteNotebook(notebook.namespace, notebook.name).subscribe({
next: _ => {
this.poller.reset();
// NOTE: We don't want to reset the polling based on the Notebook's
// namespace, since the user might have selected all-namespaces
this.poll(this.currNamespace);
ref.close(DIALOG_RESP.ACCEPT);
},
error: err => {
Expand Down Expand Up @@ -261,7 +222,9 @@ export class IndexDefaultComponent implements OnInit, OnDestroy {
this.updateNotebookFields(notebook);

this.backend.startNotebook(notebook).subscribe(() => {
this.poller.reset();
// NOTE: We don't want to reset the polling based on the Notebook's
// namespace, since the user might have selected all-namespaces
this.poll(this.currNamespace);
});
}

Expand All @@ -276,7 +239,9 @@ export class IndexDefaultComponent implements OnInit, OnDestroy {
// Close the open dialog only if the request succeeded
this.backend.stopNotebook(notebook).subscribe({
next: _ => {
this.poller.reset();
// NOTE: We don't want to reset the polling based on the Notebook's
// namespace, since the user might have selected all-namespaces
this.poll(this.currNamespace);
ref.close(DIALOG_RESP.ACCEPT);
},
error: err => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
NamespaceService,
SnackBarService,
ConfirmDialogService,
PollerService,
} from 'kubeflow';
import { JWABackendService } from 'src/app/services/backend.service';
import { Router } from '@angular/router';
Expand All @@ -23,8 +24,9 @@ export class IndexRokComponent extends IndexDefaultComponent implements OnInit {
public confirmDialog: ConfirmDialogService,
public popup: SnackBarService,
public router: Router,
public poller: PollerService,
) {
super(ns, backend, confirmDialog, popup, router);
super(ns, backend, confirmDialog, popup, router, poller);

this.rok.initCSRF();
}
Expand Down

0 comments on commit 118f434

Please sign in to comment.