Skip to content

Commit

Permalink
Use buffered queue simplify the queue operations (#81)
Browse files Browse the repository at this point in the history
  • Loading branch information
harshkothari410 authored and iamigo committed Oct 27, 2017
1 parent ce41a49 commit 2eee49a
Show file tree
Hide file tree
Showing 18 changed files with 438 additions and 453 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
"homepage": "https://github.com/salesforce/refocus-collector#readme",
"dependencies": {
"bluebird": "^3.5.0",
"buffered-queue": "^0.1.4",
"coveralls": "^2.13.1",
"debug": "^2.6.8",
"errors": "^0.3.0",
Expand Down
8 changes: 0 additions & 8 deletions src/commands/start.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ const debug = require('debug')('refocus-collector:commands');
const logger = require('winston');
const configModule = require('../config/config');
const repeater = require('../repeater/repeater');
const flush = require('../sampleQueue/sampleQueueOps').flush;
const sendHeartbeat = require('../heartbeat/heartbeat').sendHeartbeat;
const registryFile = require('../constants').registryLocation;
const registryFileUtils = require('../utils/registryFileUtils');
Expand Down Expand Up @@ -45,13 +44,6 @@ function execute(name) {
onProgress: debug,
});

// flush function does not return anything, hence no event functions
repeater.create({
name: 'SampleQueueFlush',
interval: config.collectorConfig.sampleUpsertQueueTime,
func: () => flush(config.collectorConfig.maxSamplesPerBulkRequest,
regObj),
});
logger.info({ activity: 'cmdStart' });
debug('Exiting start.execute');
} // execute
Expand Down
3 changes: 3 additions & 0 deletions src/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ module.exports = {
SERVICE_UNAVAILABLE: 503,
},

// Bulk upsert Sample Queue Name
bulkUpsertSampleQueue: 'bulkUpsertSampleQueue',

// exported for the purpose of testing
mockRegistryLocation: './test/config/testRegistry.json',

Expand Down
4 changes: 2 additions & 2 deletions src/heartbeat/heartbeat.js
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ function sendHeartbeat(refocusInstanceObj) {
};

return buildMockResponse(generatorsDir)
.then(res => handleHeartbeatResponse(null, res))
.catch(err => handleHeartbeatResponse(err, null));
.then(res => handleHeartbeatResponse(null, res, refocusInstanceObj))
.catch(err => handleHeartbeatResponse(err, null, refocusInstanceObj));

//TODO: send the real request and handle the response once the api can handle it
//debug(`sendHeartbeat sending request. url: ${url} body: %o`, body);
Expand Down
27 changes: 26 additions & 1 deletion src/heartbeat/listener.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ const debug = require('debug')('refocus-collector:heartbeat');
const logger = require('winston');
const utils = require('./utils');
const configModule = require('../config/config');
const queueUtils = require('../utils/queueUtils');
const httpUtils = require('../utils/httpUtils');
const bulkUpsertSampleQueue = require('../constants').bulkUpsertSampleQueue;

/**
* Handles the heartbeat response:
Expand All @@ -24,14 +27,36 @@ const configModule = require('../config/config');
* @returns {Object} - config object. An error object is returned if this
* function is called with the error as the first argument.
*/
function handleHeartbeatResponse(err, res) {
function handleHeartbeatResponse(err, res, refocusInstanceObj=null) {
debug('entered handleHeartbeatResponse');
if (err) {
logger.error('The handleHeartbeatResponse function was called with an ' +
'error:', err);
return err;
}

// queue generation
// get queue
const _bulkUpsertSampleQueue = queueUtils.getQueue(bulkUpsertSampleQueue);
if (_bulkUpsertSampleQueue) {
if (res.collectorConfig) {
_bulkUpsertSampleQueue._size = res.collectorConfig.maxSamplesPerBulkRequest;
_bulkUpsertSampleQueue._flushTimeout =
res.collectorConfig.sampleUpsertQueueTime;
}
} else {
const config = configModule.getConfig();
const queueParams = {
name: bulkUpsertSampleQueue,
size: config.collectorConfig.maxSamplesPerBulkRequest,
flushTimeout: config.collectorConfig.sampleUpsertQueueTime,
verbose: false,
flushFunction: httpUtils.doBulkUpsert,
refocusInstanceObj: refocusInstanceObj,
};
queueUtils.createQueue(queueParams);
}

if (res) {
utils.updateCollectorConfig(res);
utils.addGenerator(res);
Expand Down
13 changes: 9 additions & 4 deletions src/remoteCollection/handleCollectResponse.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ const evalUtils = require('../utils/evalUtils');
const errors = require('../errors');
const errorSamples = require('./errorSamples');
const logger = require('winston');
const enqueue = require('../sampleQueue/sampleQueueOps').enqueue;
const queueUtils = require('../utils/queueUtils');
const httpStatus = require('../constants').httpStatus;
const bulkUpsertSampleQueue = require('../constants').bulkUpsertSampleQueue;
const commonUtils = require('../utils/commonUtils');

/**
* Validates the response from the collect function. Confirms that it is an
Expand Down Expand Up @@ -95,7 +97,8 @@ function handleCollectResponse(collectResponse) {
url: ${collectRes.preparedUrl},
numSamples: ${samplesToEnqueue.length},
}`);
return enqueue(samplesToEnqueue);
queueUtils.enqueueFromArray(bulkUpsertSampleQueue, samplesToEnqueue,
commonUtils.validateSample);
} else {
/*
* The transform is *not* a string, so handle the response based on the
Expand Down Expand Up @@ -130,7 +133,8 @@ function handleCollectResponse(collectResponse) {
url: ${collectRes.preparedUrl},
numSamples: ${samplesToEnqueue.length},
}`);
return enqueue(samplesToEnqueue);
queueUtils.enqueueFromArray(bulkUpsertSampleQueue, samplesToEnqueue,
commonUtils.validateSample);
} else {
/*
* If there is no transform designated for this HTTP status code, just
Expand All @@ -145,7 +149,8 @@ function handleCollectResponse(collectResponse) {
error: ${errorMessage},
numSamples: ${samplesToEnqueue.length},
}`);
return enqueue(samplesToEnqueue);
queueUtils.enqueueFromArray(bulkUpsertSampleQueue, samplesToEnqueue,
commonUtils.validateSample);
}
}
})
Expand Down
111 changes: 0 additions & 111 deletions src/sampleQueue/sampleQueueOps.js

This file was deleted.

75 changes: 0 additions & 75 deletions src/sampleQueue/sampleUpsertUtils.js

This file was deleted.

17 changes: 17 additions & 0 deletions src/utils/commonUtils.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ const debug = require('debug')('refocus-collector:commonUtils');
const path = require('path');
const os = require('os');
const crypto = require('crypto');
const sampleSchema = require('./schema').sample;

module.exports = {

Expand Down Expand Up @@ -75,6 +76,22 @@ module.exports = {
return fileContents;
},

/**
* Validates the sample.
*
* @param {Object} sample - The sample to validate
* @returns {Object} the valid sample
* @throws {ValidationError} if the object does not look like a sample
*/
validateSample(sample) {
const val = sampleSchema.validate(sample);
if (val.error) {
throw new errors.ValidationError(val.error.message);
}

return sample;
},

/**
* Get current os and process metadata
* @returns {Object} - metadata object
Expand Down

0 comments on commit 2eee49a

Please sign in to comment.