Skip to content

Commit

Permalink
WIP Start stop
Browse files Browse the repository at this point in the history
  • Loading branch information
overlookmotel committed Apr 7, 2019
1 parent 2788d36 commit e4058bb
Show file tree
Hide file tree
Showing 11 changed files with 711 additions and 24 deletions.
2 changes: 1 addition & 1 deletion index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@
'use strict';

// Exports
module.exports = {};
module.exports = require('./lib/index');
5 changes: 5 additions & 0 deletions jest.config.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
'use strict';

module.exports = {
setupTestFrameworkScriptFile: 'jest-extended'
};
15 changes: 15 additions & 0 deletions lib/constants.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/* --------------------
* dilli module
* Constants
* ------------------*/

'use strict';

// Exports
module.exports = {
STOPPED: Symbol('STOPPED'),
STARTING: Symbol('STARTING'),
STARTED: Symbol('STARTED'),
STOPPING: Symbol('STOPPING'),
ERRORED: Symbol('ERRORED')
};
80 changes: 80 additions & 0 deletions lib/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/* --------------------
* dilli module
* ------------------*/

'use strict';

// Imports
const Worker = require('./worker'),
startStopMethods = require('./startStop'),
{prepareLogger, Logger} = require('./logger'),
{isFullString, isSemverVersion} = require('./utils'),
constants = require('./constants');

// Exports

class Dilli {
constructor(options) {
this.options = {...options};
this.workers = {};
this.jobs = new Map();
this.numJobs = 0;
this._initStartStop();
this.log = prepareLogger(options.logger);
}

addWorker(worker) {
// Check if worker is valid
if (!Worker.isWorkerClass(worker)) throw new Error('workers must be subclasses of Worker class');
const {name} = worker;
if (!isFullString(name)) throw new Error('Workers must have name defined as a string');
if (!isSemverVersion(worker.version)) throw new Error('Workers must have version defined as a valid semver version e.g. 2.12.0');

// Record reference to Dilli instance on worker
worker.server = this;
worker.numJobs = 0;
worker.log = this.log.child({worker: name});

// Add to workers store
const {workers} = this;
if (workers[name]) throw new Error(`A worker with name '${name}' has already been registered`);
workers[name] = worker;

worker.log.info('Attached worker');
}

async newJob(workerId, jobId, params) {
const worker = this.workers[workerId];
if (!worker) throw new Error(`Worker '${workerId}' not found`);
if (!jobId) throw new Error('jobId must be provided');

// Create job
const job = new worker(jobId, params); // eslint-disable-line new-cap

// Add job to store
const {jobs} = this;
jobs.set(jobId, job);
this.numJobs++;
worker.numJobs++;

// Run job
await job._run();

// Remove job from store
jobs.delete(jobId);
this.numJobs--;
worker.numJobs--;
}

// eslint-disable-next-line class-methods-use-this, no-unused-vars
async _message(obj, options) {
// TODO Write this!
}
}

Object.assign(Dilli.prototype, startStopMethods);
Dilli.Worker = Worker;
Dilli.Logger = Logger;
Object.assign(Dilli, constants);

module.exports = Dilli;
94 changes: 94 additions & 0 deletions lib/logger.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/* --------------------
* dilli module
* Logger
* ------------------*/

'use strict';

// Imports
const {isFunction, isObject, isString} = require('./utils');

// Constants
const METHODS = {
fatal: 60,
error: 50,
warn: 40,
info: 30,
debug: 20,
trace: 10
};

// Exports

/*
* Logger class
*/
class Logger {
constructor(fn, parent, fields) {
this._fn = fn;
this._parent = parent;
this._fields = fields;
}

static fromFunction(fn) {
return new Logger(fn);
}

static fromObject(obj) {
// Check has all required methods
const missingMethods = [];
for (const name in METHODS) {
if (!isFunction(obj[name])) missingMethods.push(name);
}
if (missingMethods.length > 0) {
throw new Error(`Logger object missing method${missingMethods.length > 1 ? 's' : ''} ${missingMethods.join(', ')}`);
}

return new Logger(null, obj);
}

child(fields) {
return new Logger(null, this, fields);
}

_log(name, level, msg, obj) {
const out = {level, ...this._fields};

if (isString(msg)) {
Object.assign(out, obj);
out.msg = msg;
} else {
Object.assign(out, msg);
}

const parent = this._parent;
if (parent) {
parent[name](out);
} else {
const fn = this._fn;
fn(out);
}
}
}

for (const name in METHODS) { // eslint-disable-line guard-for-in
const level = METHODS[name];
Logger.prototype[name] = function(...args) {
this._log(name, level, ...args);
};
}

/*
* Convert input logger to Logger class instance
*/
function prepareLogger(logger) {
if (logger instanceof Logger) return logger;
if (isFunction(logger)) return Logger.fromFunction(logger);
if (isObject(logger)) return Logger.fromObject(logger);
return Logger.fromFunction(() => {});
}

module.exports = {
prepareLogger,
Logger
};
177 changes: 177 additions & 0 deletions lib/startStop.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
/* --------------------
* dilli module
* Dilli class start/stop methods.
* To be merged into Dilli class prototype.
* ------------------*/

'use strict';

// Modules
const {promiseForIn, defer} = require('promise-methods');

// Imports
const {STOPPED, STARTING, STARTED, STOPPING, ERRORED} = require('./constants');

// Exports

/*
* At any one time, a Dilli instance is in one of 4 states:
* - Stopped
* - Started
* - Starting
* - Stopping
*
* If `.stop()` is called while starting, or `.start()` called while stopping,
* that next state is queued up to be initiated once the current state change completes.
*
* Current state is stored in `.state`.
* Next state is stored in `._nextState` (or null if no next state)
* Promise which will resolve when currently transitioning state change completes
* is stored in `._statePromise`.
* Deferred which will be resolved when queued state transition completes
* is stored in `._nextStateDeferred`.
*
* So when you call `.start()` the promise returned will only resolve when the state eventually
* reaches "started", even if it has to stop first. Ditto for calling `.stop()`.
*/

module.exports = {
_initStartStop() {
this.state = STOPPED;
this._statePromise = null;
this._nextState = null;
this._nextStateDeferred = null;
},

start() {
this.log.info('Start requested');
return this._moveToState(STARTED, STARTING, STOPPING, '_start');
},

async _start() {
this.log.info('Starting');
this.state = STARTING;

// Start workers
await promiseForIn(this.workers, async (worker) => {
await worker._start();
});

this.log.info('Started');
this._movedToState(STARTED, '_stop');
},

stop() {
this.log.info('Stop requested');
return this._moveToState(STOPPED, STOPPING, STARTING, '_stop');
},

async _stop() {
this.log.info('Stopping');
this.state = STOPPING;

// Stop workers
await promiseForIn(this.workers, async (worker) => {
await worker._stop();
});

this.log.info('Stopped');
this._movedToState(STOPPED, '_start');
},

restart() {
this.log.info('Restart requested');

if ([STARTING, STOPPED, STOPPING].includes(this.state)) return this.start();

return Promise.all(
this.stop(),
this.start()
);
},

/**
* Move to state i.e. start or stop
* @param {Symbol} targetState - State want to move to i.e. `STARTED` or `STOPPED`
* @param {Symbol} intermediateState - Intermediate state on the way i.e. `STARTING` or `STOPPING`
* @param {Symbol} oppositeIntermediateState - Opposite intermediate state
* i.e. `STOPPING` or `STARTING`
* @param {string} methodName - Method name to call to reach new state i.e. `'_start'` or `'_stop'`
* @returns {Promise<undefined>}
*/
async _moveToState(targetState, intermediateState, oppositeIntermediateState, methodName) {
// If has errored, cannot continue - throw error
const {state} = this;
if (state === ERRORED) throw new Error(`Error in ${state === STARTING ? 'starting' : 'stopping'} previously - cannot continue`);

// If already in desired state, exit
if (state === targetState) return;

// If already moving to desired state, clear next state and resolve
if (state === intermediateState) {
// Clear next state
if (this._nextState) {
const deferred = this._nextStateDeferred;
this._nextState = null;
this._nextStateDeferred = null;
deferred.resolve();
}

await this._statePromise;
return;
}

// If in process of moving to opposite state, create deferred
if (state === oppositeIntermediateState) {
if (this._nextState) {
// Already queued to move to desired state
await this._nextStateDeferred.promise;
return;
}

// Schedule to move to opposite state after this move complete
this._nextState = intermediateState;
const deferred = defer();
this._nextStateDeferred = deferred;
await deferred.promise;
return;
}

// Is in stable opposite state - initiate move
const promise = this[methodName]();
this._statePromise = promise;

try {
await promise;
} catch (err) {
this.state = ERRORED;
throw err;
}
},

/**
* State transition complete i.e. started or stopped.
* @param {Symbol} state - State reached i.e. `STARTED` or `STOPPED`
* @param {string} oppositeMethodName - Method name to call to reach opposite state
* i.e. `'_stop'` or `'_start'`
* @returns {undefined}
*/
_movedToState(state, oppositeMethodName) {
// If no next state queued, exit
if (!this._nextState) {
this.state = state;
this.statePromise = null;
return;
}

// There is a queued up transition to opposite state.
// Initiate move.
const deferred = this._nextStateDeferred;
this._nextState = null;
this._nextStateDeferred = null;

const promise = this[oppositeMethodName]();
this._statePromise = promise;
deferred.resolve(promise);
}
};
Loading

0 comments on commit e4058bb

Please sign in to comment.