Skip to content
This repository has been archived by the owner on May 29, 2023. It is now read-only.

Commit

Permalink
fix: add termination of unused workers
Browse files Browse the repository at this point in the history
If pipe is destroyed a worker will be terminated
  • Loading branch information
IKatsuba committed Nov 26, 2020
1 parent 8acee38 commit f181bbd
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 11 deletions.
14 changes: 14 additions & 0 deletions projects/workers/src/worker/pipes/worker.pipe.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,18 @@ describe('WorkerPipe', () => {

expect(worker).not.toEqual(differentWorker);
});

it('should terminate a previous worker', async () => {
const worker = await pipe.transform('a', (data: unknown) => data);

await pipe.transform('a', (data: unknown) => data);
await expectAsync(worker.toPromise()).toBeResolved();
});

it('should terminate a worker then a pipe is destroyed', async () => {
const worker = await pipe.transform('a', (data: unknown) => data);

pipe.ngOnDestroy();
await expectAsync(worker.toPromise()).toBeResolved();
});
});
36 changes: 25 additions & 11 deletions projects/workers/src/worker/pipes/worker.pipe.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {Pipe, PipeTransform} from '@angular/core';
import {OnDestroy, Pipe, PipeTransform} from '@angular/core';
import {Observable} from 'rxjs';
import {WebWorker} from '../classes/web-worker';
import {toData} from '../operators/to-data';
Expand All @@ -7,21 +7,35 @@ import {WorkerFunction} from '../types/worker-function';
@Pipe({
name: 'waWorker',
})
export class WorkerPipe implements PipeTransform {
private workers = new WeakMap<WorkerFunction, WebWorker>();
private observers = new WeakMap<WebWorker, Observable<any>>();
export class WorkerPipe implements PipeTransform, OnDestroy {
private fn!: WorkerFunction;
private worker!: WebWorker;
private observer!: Observable<any>;

transform<T, R>(value: T, fn: WorkerFunction<T, R>): Observable<R> {
const worker: WebWorker<T, R> =
this.workers.get(fn) || WebWorker.fromFunction(fn);
if (this.fn !== fn) {
this.terminate();
this.initNewWorker(fn);

this.workers.set(fn, worker);
worker.postMessage(value);
this.worker.postMessage(value);
}

const observer = this.observers.get(worker) || worker.pipe(toData());
return this.observer;
}

ngOnDestroy(): void {
this.terminate();
}

this.observers.set(worker, observer);
private terminate() {
if (this.worker) {
this.worker.terminate();
}
}

return observer;
private initNewWorker<T, R>(fn: WorkerFunction<T, R>) {
this.fn = fn;
this.worker = WebWorker.fromFunction(fn);
this.observer = this.worker.pipe(toData());
}
}

0 comments on commit f181bbd

Please sign in to comment.