Skip to content
This repository has been archived by the owner on Mar 29, 2024. It is now read-only.

Commit

Permalink
Better handling of subscription cancelation for portmaster api
Browse files Browse the repository at this point in the history
  • Loading branch information
ppacher committed Aug 1, 2023
1 parent eb9cbf8 commit 343c9c3
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -403,22 +403,21 @@ export class PortapiService {
request<M extends RequestType, R extends Record = any>(method: M, attrs: Partial<Requestable<M>>, { forwardDone }: { forwardDone?: boolean } = {}): Observable<DataReply<R>> {
return new Observable(observer => {
const id = `${++uniqueRequestId}`;

if (!this.ws$) {
observer.error("No websocket connection");
return
}

let unsub: RequestMessage | null = null;

// some methods are cancellable and we MUST send
// a `cancel` message or the backend will not stop
// streaming data for that request id.
if (isCancellable(method)) {
unsub = {
id: id,
type: 'cancel'
let shouldCancel = isCancellable(method);
let unsub: (() => RequestMessage | null) = () => {
if (shouldCancel) {
return {
id: id,
type: 'cancel'
}
}

return null
}

const request: any = {
Expand Down Expand Up @@ -463,6 +462,9 @@ export class PortapiService {
// in all cases, an `error` message type
// terminates the data flow.
if (data.type === 'error') {
console.error(data.message);
shouldCancel = false;

observer.error(data.message);
return
}
Expand Down Expand Up @@ -493,6 +495,7 @@ export class PortapiService {
if (data.type === 'done') {
if (method === 'query') {
// done ends the query but does not end sub or qsub
shouldCancel = false;
observer.complete();
return;
}
Expand All @@ -519,6 +522,7 @@ export class PortapiService {
// for a `get` method the first `ok` message
// also marks the end of the stream.
if (method === 'get' && data.type === 'ok') {
shouldCancel = false
observer.complete();
}
},
Expand Down Expand Up @@ -547,7 +551,7 @@ export class PortapiService {
});
}

private multiplex(req: RequestMessage, cancel: RequestMessage | null): Observable<ReplyMessage> {
private multiplex(req: RequestMessage, cancel: (() => RequestMessage | null) | null): Observable<ReplyMessage> {
return new Observable(observer => {
if (this.connectedSubject.getValue()) {
// Try to directly send the request to the backend
Expand All @@ -569,7 +573,10 @@ export class PortapiService {
// any errors here.
try {
if (cancel !== null) {
this.ws$!.next(cancel);
const cancelMsg = cancel();
if (!!cancelMsg) {
this.ws$!.next(cancelMsg);
}
}
} catch (err) { }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,6 @@ export type Requestable<M extends RequestType> = RequestMessage & { type: M };
export function isCancellable(m: MessageType): boolean {
switch (m) {
case 'qsub':
case 'query':
case 'sub':
return true;
default:
Expand Down

0 comments on commit 343c9c3

Please sign in to comment.