Skip to content

Commit

Permalink
Refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
egorras committed Jan 18, 2018
1 parent b84c6ae commit 92bb020
Show file tree
Hide file tree
Showing 15 changed files with 126 additions and 350 deletions.
1 change: 0 additions & 1 deletion lib/queue/actions/client_actions.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,3 @@ module.exports.stop = function () {
module.exports.publishAndStop = function () {
return new PublishAndStopAction();
};

100 changes: 49 additions & 51 deletions lib/queue/implementation_runner_config.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,57 +2,55 @@

var ConsoleAuditStream = require('../audit/console_audit_stream');

class ImplementationRunnerConfig {
constructor () {
this._hostname = 61616;
this._requestTimeoutMillis = 500;
this._auditStream = new ConsoleAuditStream();
}

setHostname(hostname) {
this._hostname = hostname;
return this;
}

setPort(port) {
this._port = port;
return this;
}

setUniqueId(uniqueId) {
this._uniqueId = uniqueId;
return this;
}

setTimeToWaitForRequest(timeToWaitForRequest) {
this._requestTimeoutMillis = timeToWaitForRequest;
return this;
}

setAuditStream(auditStream) {
this._auditStream = auditStream;
return this;
}

getHostName() {
return this._hostname;
}

getPort() {
return this._port;
}

getUniqueId() {
return this._uniqueId;
}

getTimeToWaitForRequest() {
return this._requestTimeoutMillis;
}

getAuditStream() {
return this._auditStream;
}
function ImplementationRunnerConfig() {
this._hostname = 61616;
this._requestTimeoutMillis = 500;
this._auditStream = new ConsoleAuditStream();
}

ImplementationRunnerConfig.prototype.setHostname = function(hostname) {
this._hostname = hostname;
return this;
}

ImplementationRunnerConfig.prototype.setPort = function(port) {
this._port = port;
return this;
}

ImplementationRunnerConfig.prototype.setUniqueId = function(uniqueId) {
this._uniqueId = uniqueId;
return this;
}

ImplementationRunnerConfig.prototype.setTimeToWaitForRequest = function(timeToWaitForRequest) {
this._requestTimeoutMillis = timeToWaitForRequest;
return this;
}

ImplementationRunnerConfig.prototype.setAuditStream = function(auditStream) {
this._auditStream = auditStream;
return this;
}

ImplementationRunnerConfig.prototype.getHostName = function() {
return this._hostname;
}

ImplementationRunnerConfig.prototype.getPort = function() {
return this._port;
}

ImplementationRunnerConfig.prototype.getUniqueId = function() {
return this._uniqueId;
}

ImplementationRunnerConfig.prototype.getTimeToWaitForRequest = function() {
return this._requestTimeoutMillis;
}

ImplementationRunnerConfig.prototype.getAuditStream = function() {
return this._auditStream;
}

module.exports = ImplementationRunnerConfig;
54 changes: 26 additions & 28 deletions lib/queue/queue_based_implementation_runner.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,36 +4,34 @@ var ImplementationRunnerConfig = require('./implementation_runner_config');
var AuditStream = require('../audit/audit_stream');
var RemoteBroker = require('./transport/remote_broker');

class QueueBasedImplementationRunner {
constructor(config, deployProcessingRules) {
this._config = config;
this._deployProcessingRules = deployProcessingRules;
this._audit = new AuditStream();
}
function QueueBasedImplementationRunner(config, deployProcessingRules) {
this._config = config;
this._deployProcessingRules = deployProcessingRules;
this._audit = new AuditStream();
}

getRequestTimeoutMillisecond() {
return this._config.getTimeToWaitForRequest();
}
QueueBasedImplementationRunner.prototype.getRequestTimeoutMillisecond = function() {
return this._config.getTimeToWaitForRequest();
}

run() {
let config = this._config;
let processingRules = this._deployProcessingRules;

return new RemoteBroker(config.getUniqueId())
.connect(config.getHostName(), config.getPort())
.then(function (remoteBroker) {
console.log("Starting client.");
return remoteBroker.subscribeAndProcess(new ApplyProcessingRules(processingRules), config.getTimeToWaitForRequest());
})
.then(function (remoteBroker) {
console.log("Stopping client.");
return remoteBroker.close()
})
.catch(function (error) {
console.error("There was a problem processing messages. " + error.message);
console.error(error.stack)
});
}
QueueBasedImplementationRunner.prototype.run = function() {
let config = this._config;
let processingRules = this._deployProcessingRules;

return new RemoteBroker(config.getUniqueId())
.connect(config.getHostName(), config.getPort())
.then(function (remoteBroker) {
console.log("Starting client.");
return remoteBroker.subscribeAndProcess(new ApplyProcessingRules(processingRules), config.getTimeToWaitForRequest());
})
.then(function (remoteBroker) {
console.log("Stopping client.");
return remoteBroker.close()
})
.catch(function (error) {
console.error("There was a problem processing messages. " + error.message);
console.error(error.stack)
});
}

//~~~~ Queue handling policies
Expand Down
42 changes: 20 additions & 22 deletions lib/queue/queue_based_implementation_runner_builder.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,32 +4,30 @@ var QueueBasedImplementationRunner = require('./queue_based_implementation_runne
var ProcessingRules = require('./processing_rules');
var PublishAction = require('./actions/publish_action');

class QueueBasedImplementationRunnerBuilder {
constructor() {
this._deployProcessingRules = new ProcessingRules();
function QueueBasedImplementationRunnerBuilder() {
this._deployProcessingRules = new ProcessingRules();

this._deployProcessingRules
.on('display_description')
.call(function() { return 'OK'; })
.then(new PublishAction());
}
this._deployProcessingRules
.on('display_description')
.call(function() { return 'OK'; })
.then(new PublishAction());
}

setConfig(config) {
this._config = config;
return this;
}
QueueBasedImplementationRunnerBuilder.prototype.setConfig = function(config) {
this._config = config;
return this;
}

withSolutionFor(methodName, userImplementation, action) {
this._deployProcessingRules
.on(methodName)
.call(userImplementation)
.then(action || new PublishAction());
return this;
}
QueueBasedImplementationRunnerBuilder.prototype.withSolutionFor = function(methodName, userImplementation, action) {
this._deployProcessingRules
.on(methodName)
.call(userImplementation)
.then(action || new PublishAction());
return this;
}

create() {
return new QueueBasedImplementationRunner(this._config, this._deployProcessingRules);
}
QueueBasedImplementationRunnerBuilder.prototype.create = function() {
return new QueueBasedImplementationRunner(this._config, this._deployProcessingRules);
}

module.exports = QueueBasedImplementationRunnerBuilder;
167 changes: 0 additions & 167 deletions lib/runner/client_runner.js

This file was deleted.

0 comments on commit 92bb020

Please sign in to comment.