Skip to content

Commit

Permalink
fix(core): allow parallel schedules (#3485)
Browse files Browse the repository at this point in the history
Fix a race condition in the `Pool` class. Previously, parallel calls to `.schedule()` were not supported, this PR fixes it by adding some bookkeeping to keep track of work being done in other `schedule` calls.
  • Loading branch information
nicojs committed May 2, 2022
1 parent 1103958 commit bbbd514
Show file tree
Hide file tree
Showing 3 changed files with 194 additions and 63 deletions.
137 changes: 104 additions & 33 deletions packages/core/src/concurrent/pool.ts
@@ -1,11 +1,10 @@
import { lastValueFrom, Observable, Subject, merge, zip } from 'rxjs';
import { mergeMap, filter, shareReplay, tap } from 'rxjs/operators';
import { TestRunner } from '@stryker-mutator/api/test-runner';
import { notEmpty } from '@stryker-mutator/util';
import { BehaviorSubject, filter, ignoreElements, lastValueFrom, mergeMap, Observable, ReplaySubject, Subject, takeUntil, tap, zip } from 'rxjs';
import { Disposable, tokens } from 'typed-inject';
import { TestRunner } from '@stryker-mutator/api/test-runner';

import { coreTokens } from '../di/index.js';
import { CheckerFacade } from '../checker/index.js';
import { coreTokens } from '../di/index.js';

const MAX_CONCURRENT_INIT = 2;

Expand All @@ -28,41 +27,116 @@ export function createCheckerPool(factory: () => CheckerFacade, concurrencyToken
return new Pool<CheckerFacade>(factory, concurrencyToken$);
}

/**
* Represents a work item: an input with a task and with a `result$` observable where the result (exactly one) will be streamed to.
*/
class WorkItem<TResource extends Resource, TIn, TOut> {
private readonly resultSubject = new Subject<TOut>();
public readonly result$ = this.resultSubject.asObservable();

/**
* @param input The input to the ask
* @param task The task, where a resource and input is presented
*/
constructor(private readonly input: TIn, private readonly task: (resource: TResource, input: TIn) => Promise<TOut> | TOut) {}

public async execute(resource: TResource) {
try {
const output = await this.task(resource, this.input);
this.resultSubject.next(output);
this.resultSubject.complete();
} catch (err) {
this.resultSubject.error(err);
}
}

public reject(error: unknown) {
this.resultSubject.error(error);
}

public complete() {
this.resultSubject.complete();
}
}

/**
* Represents a pool of resources. Use `schedule` to schedule work to be executed on the resources.
* The pool will automatically recycle the resources, but will make sure only one task is executed
* on one resource at any one time. Creates as many resources as the concurrency tokens allow.
* Also takes care of the initialing of the resources (with `init()`)
*/
export class Pool<TResource extends Resource> implements Disposable {
// The init subject. Using an RxJS subject instead of a promise, so errors are silently ignored when nobody is listening
private readonly initSubject = new ReplaySubject<void>();

// The disposedSubject emits true when it is disposed, and false when not disposed yet
private readonly disposedSubject = new BehaviorSubject(false);

// The dispose$ only emits one `true` value when disposed (never emits `false`). Useful for `takeUntil`
private readonly dispose$ = this.disposedSubject.pipe(filter((isDisposed) => isDisposed));

private readonly createdResources: TResource[] = [];
private readonly resource$: Observable<TResource>;
// The queued work items. This is a replay subject, so scheduled work items can easily be rejected after it was picked up
private readonly todoSubject = new ReplaySubject<WorkItem<TResource, any, any>>();

constructor(factory: () => TResource, concurrencyToken$: Observable<number>) {
this.resource$ = concurrencyToken$.pipe(
mergeMap(async () => {
if (this.isDisposed) {
return null;
} else {
// Stream resources that are ready to pick up work
const resourcesSubject = new Subject<TResource>();

// Stream ongoing work.
zip(resourcesSubject, this.todoSubject)
.pipe(
mergeMap(async ([resource, workItem]) => {
await workItem.execute(resource);
resourcesSubject.next(resource); // recycle resource so it can pick up more work
}),
ignoreElements(),
takeUntil(this.dispose$)
)
.subscribe({
error: (error) => {
this.todoSubject.subscribe((workItem) => workItem.reject(error));
},
});

// Create resources
concurrencyToken$
.pipe(
takeUntil(this.dispose$),
mergeMap(async () => {
if (this.disposedSubject.value) {
// Don't create new resources when disposed
return;
}
const resource = factory();
this.createdResources.push(resource);
await resource.init?.();
return resource;
}
}, MAX_CONCURRENT_INIT),
filter(notEmpty),
// We use share replay here. This way the dry run can use a test runner that is later reused during mutation testing
// https://www.learnrxjs.io/learn-rxjs/operators/multicasting/sharereplay
shareReplay()
);
}, MAX_CONCURRENT_INIT),
filter(notEmpty),
tap({
complete: () => {
// Signal init complete
this.initSubject.next();
this.initSubject.complete();
},
error: (err) => {
this.initSubject.error(err);
},
})
)
.subscribe({
next: (resource) => resourcesSubject.next(resource),
error: (err) => resourcesSubject.error(err),
});
}

/**
* Returns a promise that resolves if all concurrency tokens have resulted in initialized resources.
* This is optional, resources will get initialized either way.
*/
public async init(): Promise<void> {
await lastValueFrom(this.resource$);
await lastValueFrom(this.initSubject);
}

/**
Expand All @@ -71,27 +145,24 @@ export class Pool<TResource extends Resource> implements Disposable {
* @param task The task to execute on each resource
*/
public schedule<TIn, TOut>(input$: Observable<TIn>, task: (resource: TResource, input: TIn) => Promise<TOut> | TOut): Observable<TOut> {
const recycleBin = new Subject<TResource>();
const resource$ = merge(recycleBin, this.resource$);

return zip(resource$, input$).pipe(
mergeMap(async ([resource, input]) => {
const output = await task(resource, input);
// Recycles a resource so its re-emitted from the `resource$` observable.
recycleBin.next(resource);
return output;
}),
tap({ complete: () => recycleBin.complete() })
return input$.pipe(
mergeMap((input) => {
const workItem = new WorkItem(input, task);
this.todoSubject.next(workItem);
return workItem.result$;
})
);
}

private isDisposed = false;

/**
* Dispose the pool
*/
public async dispose(): Promise<void> {
this.isDisposed = true;
await Promise.all(this.createdResources.map((resource) => resource.dispose?.()));
if (!this.disposedSubject.value) {
this.disposedSubject.next(true);
this.todoSubject.subscribe((workItem) => workItem.complete());
this.todoSubject.complete();
await Promise.all(this.createdResources.map((resource) => resource.dispose?.()));
}
}
}

0 comments on commit bbbd514

Please sign in to comment.