Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bulkhead #7

Open
wants to merge 3 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions examples/bulkhead.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import { bulkhead } from '../lib';

class Service {

@bulkhead(2, { size: 1 })
public async get(value: number) {
return new Promise(resolve => setTimeout(
() => resolve(value + 1),
100,
));
}

}

const instance = new Service();

instance.get(1) // start execution immediately
.then(result => console.log(`call 1 result: ${result}`));
instance.get(2) // start execution immediately
.then(result => console.log(`call 2 result: ${result}`));

instance.get(3) // start execution after one of first 2 executions ends
.then(result => console.log(`call 3 result: ${result}`));

instance.get(4) // throws because are executed to much calls and queue limit is reached
.catch(() => console.log('call 4 fails'));
34 changes: 0 additions & 34 deletions lib/bulkhead.ts

This file was deleted.

57 changes: 57 additions & 0 deletions lib/bulkhead/Bulkhead/Bulkhead.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import { ExecutionQueue } from '../ExecutionQueue/ExecutionQueue';
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bulkhead/Bulkhead.ts doesn't sound nice. Maybe just bulkhead/index.ts ?


export class Bulkhead<T = any> {

private inExecution: number = 0;

constructor(
private readonly threshold: number,
private readonly executionQueue: ExecutionQueue<T>,
) { }

public async run(method: (...args: any[]) => Promise<T>, args: any[]): Promise<T> {
if (this.inExecution < this.threshold) {
return this.execute(method, args);
}

return this.addToQueue(method, args);
}

private async execute(method: (...args: any[]) => Promise<T>, args: any[]) {
this.inExecution += 1;

const result = method(...args);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The provided method might throw an error, thus breaking our queue execution.


const afterExecution = async () => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if async is needed here.

this.inExecution -= 1;
this.callNext();
};

result.then(afterExecution, afterExecution);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the method never resolves? It will stay forever in the queue and potentially might block the queue. I think we might need an additional option to setup a timeout for the execution. Could we use the @timeout from current library? Either as a decorator, or directly as a method.


return result;
}

private async addToQueue(method: (...args: any[]) => Promise<T>, args: any[]) {
return new Promise<T>(
(resolve, reject) => this.executionQueue.store({ method, args, resolve, reject }),
);
}

private async callNext() {
const next = this.executionQueue.next();
if (!next) {
return;
}

const { method, args, resolve, reject } = next;

try {
const result = await this.execute(method, args);
resolve(result);
} catch (error) {
reject(error);
}
}

}
18 changes: 18 additions & 0 deletions lib/bulkhead/Bulkhead/factory.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { Factory } from '../../interfaces/factory';
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as for queue implementation, this factory is not of much use taking into account there won't be another bulkhead implementations. Now it looks like an added complexity. I'd suggest that we implement a static create method directly in the bulkhead class.

import { ExecutionQueueFactory } from '../ExecutionQueue/factory';
import { Bulkhead } from './Bulkhead';

export class BulkheadFactory<T = any> implements Factory<Bulkhead<T>> {

constructor(
private readonly threshold: number,
private readonly executionQueueFactory: ExecutionQueueFactory<T>,
) { }

public create(): Bulkhead<T> {
const executionQueue = this.executionQueueFactory.create();

return new Bulkhead(this.threshold, executionQueue);
}

}
27 changes: 27 additions & 0 deletions lib/bulkhead/BulkheadOptions.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
export type BulkheadOptions = {
/**
* The scope of limiter.
* The `class` (default) scope defines a single method scope for all class instances.
* The `instance` scope defines a per-instance method scope.
*/
scope?: 'class' | 'instance',

/**
* The max size of the pending queue. By default not limited.
*/
size?: number,

/**
* Sets the behavior of handling the case when queue limit is reached.
* When `reject` (default) then returns a rejected promise with error
* When `ignoreAsync` then doesn't throw any error and immediately
* returns a resolved promise.
*/
onError?: 'reject' | 'ignoreAsync',
};

export const DEFAULT_OPTIONS: Readonly<BulkheadOptions> = {
scope: 'class',
onError: 'reject',
size: undefined,
};
5 changes: 5 additions & 0 deletions lib/bulkhead/BulkheadProvider/BulkheadProvider.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import { Bulkhead } from '../Bulkhead/Bulkhead';

export interface BulkheadProvider<T = any> {
get(instance: any): Bulkhead<T>;
}
21 changes: 21 additions & 0 deletions lib/bulkhead/BulkheadProvider/ClassBulkheadProvider.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import { Bulkhead } from '../Bulkhead/Bulkhead';
import { BulkheadFactory } from '../Bulkhead/factory';
import { BulkheadProvider } from './BulkheadProvider';

export class ClassBulkheadProvider<T = any> implements BulkheadProvider<T> {

private bulkhead: Bulkhead<T> = null;

constructor(
private readonly bulkheadFactory: BulkheadFactory<T>,
) { }

public get(): Bulkhead<T> {
if (!this.bulkhead) {
this.bulkhead = this.bulkheadFactory.create();
}

return this.bulkhead;
}

}
23 changes: 23 additions & 0 deletions lib/bulkhead/BulkheadProvider/InstanceBulkheadProvider.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import { Bulkhead } from '../Bulkhead/Bulkhead';
import { BulkheadFactory } from '../Bulkhead/factory';
import { BulkheadProvider } from './BulkheadProvider';

export class InstanceBulkheadProvider<T = any> implements BulkheadProvider<T> {

private readonly instancesBulkeads = new WeakMap<any, Bulkhead<T>>();

constructor(
private readonly bulkheadFactory: BulkheadFactory<T>,
) { }

public get(instance: any): Bulkhead<T> {
const hasBulkhead = this.instancesBulkeads.has(instance);
if (!hasBulkhead) {
const bulkheadService = this.bulkheadFactory.create();
this.instancesBulkeads.set(instance, bulkheadService);
}

return this.instancesBulkeads.get(instance);
}

}
27 changes: 27 additions & 0 deletions lib/bulkhead/BulkheadProvider/factory.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import { Factory } from '../../interfaces/factory';
import { BulkheadFactory } from '../Bulkhead/factory';
import { BulkheadProvider } from './BulkheadProvider';
import { ClassBulkheadProvider } from './ClassBulkheadProvider';
import { InstanceBulkheadProvider } from './InstanceBulkheadProvider';

export class BulkheadProviderFactory<T = any>
implements Factory<BulkheadProvider<T>, ['class' | 'instance']> {

constructor(
private readonly bulkheadFactory: BulkheadFactory<T>,
) { }

public create(scope: 'class' | 'instance'): BulkheadProvider<T> {
switch (scope) {
case 'class':
return new ClassBulkheadProvider(this.bulkheadFactory);

case 'instance':
return new InstanceBulkheadProvider(this.bulkheadFactory);

default:
throw new Error(`@bulkhead unsuported scope type: ${scope}.`);
}
}

}
29 changes: 29 additions & 0 deletions lib/bulkhead/ExecutionQueue/ExecutionQueue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import { ExecutionMetaData } from '../types';

export class ExecutionQueue<T = any> {

private readonly queue: ExecutionMetaData<T>[] = [];

constructor(
private readonly limit: number = Infinity,
) { }

public store(data: ExecutionMetaData<T>): this {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use standard queue method names, like push and pop instead of store and next

this.checkLimit();

this.queue.push(data);

return this;
}

public next(): ExecutionMetaData<T> {
return this.queue.shift();
}

private checkLimit() {
if (this.queue.length >= this.limit) {
throw new Error('@bulkhead execution queue limit reached.');
}
}

}
14 changes: 14 additions & 0 deletions lib/bulkhead/ExecutionQueue/factory.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import { Factory } from '../../interfaces/factory';
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Chances are low we will ever have another execution queue implementation than the current one. Therefore the factory here seems like an added complexity. I'd suggest rather to delete this file, and implement a static method static create<T>(limit: number): ExecutionQueue<T> directly in the ExecutionQueue class.

import { ExecutionQueue } from './ExecutionQueue';

export class ExecutionQueueFactory<T = any> implements Factory<ExecutionQueue<T>> {

constructor(
private readonly limit: number,
) { }

public create(): ExecutionQueue<T> {
return new ExecutionQueue(this.limit);
}

}
44 changes: 44 additions & 0 deletions lib/bulkhead/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import { BulkheadOptions, DEFAULT_OPTIONS } from './BulkheadOptions';
import { ExecutionQueueFactory } from './ExecutionQueue/factory';
import { BulkheadFactory } from './Bulkhead/factory';
import { BulkheadProviderFactory } from './BulkheadProvider/factory';
import { BulkheadProvider } from './BulkheadProvider/BulkheadProvider';
import { raiseStrategy } from '../utils';

export { BulkheadOptions };

/**
* Limits the number of queued concurrent executions of a method.
* When the limit is reached the execution is delayed and queued.
* @param threshold the max number of concurrent executions.
*/
export function bulkhead(threshold: number, options: BulkheadOptions): MethodDecorator {
const bulkheadProvided = createBulkheadProvider(threshold, options);
const raise = raiseStrategy(options, 'reject');

return function (_: any, __: any, descriptor: PropertyDescriptor) {
const method = descriptor.value;

descriptor.value = async function (...args) {
const bulkhead = bulkheadProvided.get(this);

try {
return await bulkhead.run(method.bind(this), args);
} catch (error) {
return raise(error);
}
};

return descriptor;
};
}

function createBulkheadProvider(
threshold: number,
{ size = undefined, scope = 'class' }: BulkheadOptions = DEFAULT_OPTIONS,
): BulkheadProvider {

const executionQueueFactory = new ExecutionQueueFactory(size);
const bulkheadFactory = new BulkheadFactory(threshold, executionQueueFactory);
return new BulkheadProviderFactory(bulkheadFactory).create(scope);
}
6 changes: 6 additions & 0 deletions lib/bulkhead/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
export interface ExecutionMetaData<T = any> {
args: any[];
method: (...args: any[]) => Promise<T>;
resolve: (value: T) => void;
reject: (error?: Error) => void;
}
3 changes: 3 additions & 0 deletions lib/interfaces/factory.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export interface Factory<T, Args extends any[]= any[] | never> {
create(...args: Args): T;
}
22 changes: 22 additions & 0 deletions lib/utils/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
type RaiseStrategies = 'throw' | 'reject' | 'ignore' | 'ignoreAsync';

type StrategyOptions = {
onError?: RaiseStrategies;
};

export function raiseStrategy(options: StrategyOptions, defaultStrategy: RaiseStrategies) {
const value = options && options.onError || defaultStrategy;

switch (value) {
case 'reject':
return err => Promise.reject(err);
case 'throw':
return (err) => { throw err; };
case 'ignore':
return () => { };
case 'ignoreAsync':
return () => Promise.resolve();
default:
throw new Error(`Option ${value} is not supported for 'behavior'.`);
}
}
Loading