diff --git a/package.json b/package.json index e594e53..10af83a 100644 --- a/package.json +++ b/package.json @@ -31,6 +31,7 @@ "homepage": "https://github.com/salesforce/refocus-collector#readme", "dependencies": { "bluebird": "^3.5.0", + "buffered-queue": "^0.1.4", "coveralls": "^2.13.1", "debug": "^2.6.8", "errors": "^0.3.0", diff --git a/src/commands/start.js b/src/commands/start.js index dd1ba62..b8d10f5 100644 --- a/src/commands/start.js +++ b/src/commands/start.js @@ -16,7 +16,6 @@ const debug = require('debug')('refocus-collector:commands'); const logger = require('winston'); const configModule = require('../config/config'); const repeater = require('../repeater/repeater'); -const flush = require('../sampleQueue/sampleQueueOps').flush; const sendHeartbeat = require('../heartbeat/heartbeat').sendHeartbeat; const registryFile = require('../constants').registryLocation; const registryFileUtils = require('../utils/registryFileUtils'); @@ -45,13 +44,6 @@ function execute(name) { onProgress: debug, }); - // flush function does not return anything, hence no event functions - repeater.create({ - name: 'SampleQueueFlush', - interval: config.collectorConfig.sampleUpsertQueueTime, - func: () => flush(config.collectorConfig.maxSamplesPerBulkRequest, - regObj), - }); logger.info({ activity: 'cmdStart' }); debug('Exiting start.execute'); } // execute diff --git a/src/constants.js b/src/constants.js index d38d32e..658b7d0 100644 --- a/src/constants.js +++ b/src/constants.js @@ -32,6 +32,9 @@ module.exports = { SERVICE_UNAVAILABLE: 503, }, + // Bulk upsert Sample Queue Name + bulkUpsertSampleQueue: 'bulkUpsertSampleQueue', + // exported for the purpose of testing mockRegistryLocation: './test/config/testRegistry.json', diff --git a/src/heartbeat/heartbeat.js b/src/heartbeat/heartbeat.js index 9f71434..29e001e 100644 --- a/src/heartbeat/heartbeat.js +++ b/src/heartbeat/heartbeat.js @@ -73,8 +73,8 @@ function sendHeartbeat(refocusInstanceObj) { }; return buildMockResponse(generatorsDir) - .then(res => handleHeartbeatResponse(null, res)) - .catch(err => handleHeartbeatResponse(err, null)); + .then(res => handleHeartbeatResponse(null, res, refocusInstanceObj)) + .catch(err => handleHeartbeatResponse(err, null, refocusInstanceObj)); //TODO: send the real request and handle the response once the api can handle it //debug(`sendHeartbeat sending request. url: ${url} body: %o`, body); diff --git a/src/heartbeat/listener.js b/src/heartbeat/listener.js index 4f07744..991061a 100644 --- a/src/heartbeat/listener.js +++ b/src/heartbeat/listener.js @@ -13,6 +13,9 @@ const debug = require('debug')('refocus-collector:heartbeat'); const logger = require('winston'); const utils = require('./utils'); const configModule = require('../config/config'); +const queueUtils = require('../utils/queueUtils'); +const httpUtils = require('../utils/httpUtils'); +const bulkUpsertSampleQueue = require('../constants').bulkUpsertSampleQueue; /** * Handles the heartbeat response: @@ -24,7 +27,7 @@ const configModule = require('../config/config'); * @returns {Object} - config object. An error object is returned if this * function is called with the error as the first argument. */ -function handleHeartbeatResponse(err, res) { +function handleHeartbeatResponse(err, res, refocusInstanceObj=null) { debug('entered handleHeartbeatResponse'); if (err) { logger.error('The handleHeartbeatResponse function was called with an ' + @@ -32,6 +35,28 @@ function handleHeartbeatResponse(err, res) { return err; } + // queue generation + // get queue + const _bulkUpsertSampleQueue = queueUtils.getQueue(bulkUpsertSampleQueue); + if (_bulkUpsertSampleQueue) { + if (res.collectorConfig) { + _bulkUpsertSampleQueue._size = res.collectorConfig.maxSamplesPerBulkRequest; + _bulkUpsertSampleQueue._flushTimeout = + res.collectorConfig.sampleUpsertQueueTime; + } + } else { + const config = configModule.getConfig(); + const queueParams = { + name: bulkUpsertSampleQueue, + size: config.collectorConfig.maxSamplesPerBulkRequest, + flushTimeout: config.collectorConfig.sampleUpsertQueueTime, + verbose: false, + flushFunction: httpUtils.doBulkUpsert, + refocusInstanceObj: refocusInstanceObj, + }; + queueUtils.createQueue(queueParams); + } + if (res) { utils.updateCollectorConfig(res); utils.addGenerator(res); diff --git a/src/remoteCollection/handleCollectResponse.js b/src/remoteCollection/handleCollectResponse.js index f5d1733..eed3e35 100644 --- a/src/remoteCollection/handleCollectResponse.js +++ b/src/remoteCollection/handleCollectResponse.js @@ -14,8 +14,10 @@ const evalUtils = require('../utils/evalUtils'); const errors = require('../errors'); const errorSamples = require('./errorSamples'); const logger = require('winston'); -const enqueue = require('../sampleQueue/sampleQueueOps').enqueue; +const queueUtils = require('../utils/queueUtils'); const httpStatus = require('../constants').httpStatus; +const bulkUpsertSampleQueue = require('../constants').bulkUpsertSampleQueue; +const commonUtils = require('../utils/commonUtils'); /** * Validates the response from the collect function. Confirms that it is an @@ -95,7 +97,8 @@ function handleCollectResponse(collectResponse) { url: ${collectRes.preparedUrl}, numSamples: ${samplesToEnqueue.length}, }`); - return enqueue(samplesToEnqueue); + queueUtils.enqueueFromArray(bulkUpsertSampleQueue, samplesToEnqueue, + commonUtils.validateSample); } else { /* * The transform is *not* a string, so handle the response based on the @@ -130,7 +133,8 @@ function handleCollectResponse(collectResponse) { url: ${collectRes.preparedUrl}, numSamples: ${samplesToEnqueue.length}, }`); - return enqueue(samplesToEnqueue); + queueUtils.enqueueFromArray(bulkUpsertSampleQueue, samplesToEnqueue, + commonUtils.validateSample); } else { /* * If there is no transform designated for this HTTP status code, just @@ -145,7 +149,8 @@ function handleCollectResponse(collectResponse) { error: ${errorMessage}, numSamples: ${samplesToEnqueue.length}, }`); - return enqueue(samplesToEnqueue); + queueUtils.enqueueFromArray(bulkUpsertSampleQueue, samplesToEnqueue, + commonUtils.validateSample); } } }) diff --git a/src/sampleQueue/sampleQueueOps.js b/src/sampleQueue/sampleQueueOps.js deleted file mode 100644 index 6f772a2..0000000 --- a/src/sampleQueue/sampleQueueOps.js +++ /dev/null @@ -1,111 +0,0 @@ -/** - * Copyright (c) 2017, salesforce.com, inc. - * All rights reserved. - * Licensed under the BSD 3-Clause license. - * For full license text, see LICENSE.txt file in the repo root or - * https://opensource.org/licenses/BSD-3-Clause - */ - -/** - * /src/sampleQueue/sampleQueueOps.js - */ -const debug = require('debug')('refocus-collector:sampleQueue'); -const logger = require('winston'); -const sampleUpsertUtils = require('./sampleUpsertUtils'); -const errors = require('../errors'); -const sampleSchema = require('../utils/schema').sample; -const sampleQueue = []; - -/** - * Validates the sample. - * - * @param {Object} sample - The sample to validate - * @returns {Object} the valid sample - * @throws {ValidationError} if the object does not look like a sample - */ -function validateSample(sample) { - const val = sampleSchema.validate(sample); - if (val.error) { - throw new errors.ValidationError(val.error.message); - } - - return sample; -} - -/** - * Enqueue samples in sample queue. - * - * @param {Array} samples - Array of samples - * @throws {ValidationError} - If Invalid sample - */ -function enqueue(samples) { - try { - debug(`Starting to push ${samples.length} samples in sampleQueue.`); - samples.forEach((sample) => { - validateSample(sample); - sampleQueue.push(sample); - }); - logger.info(`Enqueue successful for ${samples.length} samples`); - } catch (err) { - logger.error(`Enqueue failed: ${err}`); - } -} - -/** - * Bulk upsert samples and log the success or failure. - * - * @param {Array} samples - Array of samples - * @param {Object} refocusInst - The Refocus instance - * @throws {ValidationError} - If missing refocusInst - */ -function bulkUpsertAndLog(samples, refocusInst) { - debug('Entered: bulkUpsertAndLog'); - if (!refocusInst) { - throw new errors.ValidationError('Missing refocusInst'); - } - - debug(`Starting bulk upsert of ${samples.length} samples.`); - sampleUpsertUtils.doBulkUpsert(refocusInst, samples) - .then(() => logger.info({ - activity: 'bulkUpsertSamples', - sampleCount: samples.length, - })) - .catch((err) => logger.error(`doBulkUpsert failed for ${samples.length} ` + - `samples: ${JSON.stringify(err)}`)); -} // bulkUpsertAndLog - -/** - * Flush the queue. If maxSamplesPerBulkRequest is set, bulk upsert the samples - * in batches of maxSamplesPerBulkRequest count. - * - * @param {Number} maxSamplesPerBulkRequest - the maximum batch size; unlimited - * @param {Object} refocusInst - The Refocus instance - * @returns {Number} - number of samples flushed - */ -function flush(maxSamplesPerBulkRequest, refocusInst) { - debug('Entered: flush', maxSamplesPerBulkRequest, refocusInst.name); - const max = new Number(maxSamplesPerBulkRequest) || Number.MAX_SAFE_INTEGER; - const totSamplesCnt = sampleQueue.length; - let samples = sampleQueue; - let startIdx = 0; - while ((startIdx + max) < totSamplesCnt) { - const endIdx = startIdx + max; - samples = sampleQueue.slice(startIdx, endIdx); - bulkUpsertAndLog(samples, refocusInst); - startIdx = endIdx; - } - - samples = sampleQueue.slice(startIdx, totSamplesCnt); - bulkUpsertAndLog(samples, refocusInst); - sampleQueue.splice(0, totSamplesCnt); // remove these samples from queue. - debug(`Flushed ${totSamplesCnt} samples.`); - return totSamplesCnt; -} - -module.exports = { - enqueue, - flush, - sampleQueue, // for testing purposes - bulkUpsertAndLog, // for testing purposes - validateSample, // for testing purposes -}; diff --git a/src/sampleQueue/sampleUpsertUtils.js b/src/sampleQueue/sampleUpsertUtils.js deleted file mode 100644 index 50c5fe2..0000000 --- a/src/sampleQueue/sampleUpsertUtils.js +++ /dev/null @@ -1,75 +0,0 @@ -/** - * Copyright (c) 2017, salesforce.com, inc. - * All rights reserved. - * Licensed under the BSD 3-Clause license. - * For full license text, see LICENSE.txt file in the repo root or - * https://opensource.org/licenses/BSD-3-Clause - */ - -/** - * /src/sampleQueue/sampleUpsertUtils.js - */ -const debug = require('debug')('refocus-collector:sampleQueue'); -const errors = require('../errors'); -const request = require('superagent'); -const bulkUpsertEndpoint = require('../constants').bulkUpsertEndpoint; -const logger = require('winston'); - -module.exports = { - - /** - * Send the upsert and handle any errors in the response. - * - * @param {Object} refocusInstance contains Refocus url and token, - * @param {Array} arr is the array of samples to upsert; - * @throws {ValidationError} if argument(s) is missing, - * or in a wrong format. - * @returns {Promise} contains a successful response, or failed error - */ - doBulkUpsert(refocusInstance, arr) { - const { url, token } = refocusInstance; - return new Promise((resolve, reject) => { - if (!url) { - // Throw error if url is not present in registry. - debug('Error: refocus url not found. Supplied %s', url); - reject(new errors.ValidationError( - 'Refocus instance should have a url property.' - )); - } - - if (!token) { - // Throw error if token is not present in registry. - debug('Error: refocus url not found. Supplied %s', token); - reject(new errors.ValidationError( - 'Refocus instance should have a token property.' - )); - } - - if (!Array.isArray(arr)) { - // Throw error if no array is supplied - debug('Error: array of samples to post not found. Supplied %s', arr); - reject(new errors.ValidationError('bulk upsert needs an array of ' + - 'samples to send. No samples array found.' - )); - } - - const upsertUrl = url + bulkUpsertEndpoint; - debug('Bulk upserting to: %s', upsertUrl); - - request - .post(upsertUrl) - .send(arr) - .set('Authorization', token) - .set('Accept', 'application/json') - .end((err, res) => { - if (err) { - logger.error('bulkUpsert returned an error: %o', err); - return reject(err); - } - - debug('bulkUpsert returned an OK response: %o', res); - return resolve(res); - }); - }); - }, -}; diff --git a/src/utils/commonUtils.js b/src/utils/commonUtils.js index af8c66e..3e54303 100644 --- a/src/utils/commonUtils.js +++ b/src/utils/commonUtils.js @@ -17,6 +17,7 @@ const debug = require('debug')('refocus-collector:commonUtils'); const path = require('path'); const os = require('os'); const crypto = require('crypto'); +const sampleSchema = require('./schema').sample; module.exports = { @@ -75,6 +76,22 @@ module.exports = { return fileContents; }, + /** + * Validates the sample. + * + * @param {Object} sample - The sample to validate + * @returns {Object} the valid sample + * @throws {ValidationError} if the object does not look like a sample + */ + validateSample(sample) { + const val = sampleSchema.validate(sample); + if (val.error) { + throw new errors.ValidationError(val.error.message); + } + + return sample; + }, + /** * Get current os and process metadata * @returns {Object} - metadata object diff --git a/src/utils/httpUtils.js b/src/utils/httpUtils.js new file mode 100644 index 0000000..0c71e7d --- /dev/null +++ b/src/utils/httpUtils.js @@ -0,0 +1,77 @@ +/** + * Copyright (c) 2017, salesforce.com, inc. + * All rights reserved. + * Licensed under the BSD 3-Clause license. + * For full license text, see LICENSE.txt file in the repo root or + * https://opensource.org/licenses/BSD-3-Clause + */ + +/** + * src/utils/httpUtils.js + */ +'use strict'; // eslint-disable-line strict +const debug = require('debug')('refocus-collector:httpUtils'); +const errors = require('../errors'); +const request = require('superagent'); +const bulkUpsertEndpoint = require('../constants').bulkUpsertEndpoint; +const logger = require('winston'); + +/** + * Send the upsert and handle any errors in the response. + * + * @param {Object} refocusInstance contains Refocus url and token, + * @param {Array} arr is the array of samples to upsert; + * @throws {ValidationError} if argument(s) is missing, + * or in a wrong format. + * @returns {Promise} contains a successful response, or failed error + */ +function doBulkUpsert(refocusInstance, arr) { + const { url, token } = refocusInstance; + return new Promise((resolve, reject) => { + if (!url) { + // Throw error if url is not present in registry. + debug('Error: refocus url not found. Supplied %s', url); + reject(new errors.ValidationError( + 'Refocus instance should have a url property.' + )); + } + + if (!token) { + // Throw error if token is not present in registry. + debug('Error: refocus url not found. Supplied %s', token); + reject(new errors.ValidationError( + 'Refocus instance should have a token property.' + )); + } + + if (!Array.isArray(arr)) { + // Throw error if no array is supplied + debug('Error: array of samples to post not found. Supplied %s', arr); + reject(new errors.ValidationError('bulk upsert needs an array of ' + + 'samples to send. No samples array found.' + )); + } + + const upsertUrl = url + bulkUpsertEndpoint; + debug('Bulk upserting to: %s', upsertUrl); + + request + .post(upsertUrl) + .send(arr) + .set('Authorization', token) + .set('Accept', 'application/json') + .end((err, res) => { + if (err) { + logger.error('bulkUpsert returned an error: %o', err); + return reject(err); + } + + debug('bulkUpsert returned an OK response: %o', res); + return resolve(res); + }); + }); +} + +module.exports = { + doBulkUpsert, +}; diff --git a/src/utils/queueUtils.js b/src/utils/queueUtils.js new file mode 100644 index 0000000..82cd97c --- /dev/null +++ b/src/utils/queueUtils.js @@ -0,0 +1,79 @@ +/** + * Copyright (c) 2017, salesforce.com, inc. + * All rights reserved. + * Licensed under the BSD 3-Clause license. + * For full license text, see LICENSE.txt file in the repo root or + * https://opensource.org/licenses/BSD-3-Clause + */ + +/** + * src/utils/queueUtils.js + */ +'use strict'; // eslint-disable-line strict +const logger = require('winston'); +const Queue = require('buffered-queue'); +const errors = require('../errors'); +let queueObject; +const queueListObject = {}; + +/** + * Create Queue using Buffered Queue Package + * + * @param {Object} queueParams Queue Parameter Object + * name, size, flushTimeout, + * verbose, flushFunction, + * refocusInstanceObj + * @param {Object} refocusInstanceObj Refocus Instance Object to send Data + */ +function createQueue(queueParams) { + queueObject = new Queue(queueParams.name, { + size: queueParams.size, + flushTimeout: queueParams.flushTimeout, + verbose: queueParams.verbose, + }); + + queueListObject[queueParams.name] = queueObject; + + queueObject.on('flush', (data, name) => { + queueParams.flushFunction(queueParams.refocusInstanceObj, data); + }); +} + +/** + * Get queue based on Name + * @param {String} name Name of Queue + * @return {Object} Return Queue object + */ +function getQueue(name) { + return queueListObject[name]; +} + +/** + * Enqueue Data to queue from Array + * @param {String} name Queue Name + * @param {Array} arrayData Array of Data + * @param {Function} validationFunction Validation function to validate + * individiual element + * @throws {ValidationError} if the object does not look like a sample + */ +function enqueueFromArray(name, arrayData, validationFunction=null) { + const queue = queueListObject[name]; + try { + arrayData.forEach((data) => { + if (validationFunction) { + validationFunction(data); + } + + queue.add(data); + }); + } catch (err) { + logger.error(`Enqueue failed: ${err}`); + throw new errors.ValidationError(err.error); + } +} + +module.exports = { + createQueue, + getQueue, + enqueueFromArray, +}; diff --git a/test/commands/start.js b/test/commands/start.js index 20f1ddf..174328a 100644 --- a/test/commands/start.js +++ b/test/commands/start.js @@ -30,7 +30,6 @@ describe('test/commands/start >', () => { it('ok', (done) => { start.execute(); expect(repeater.tracker).to.have.property('Heartbeat'); - expect(repeater.tracker).to.have.property('SampleQueueFlush'); done(); }); diff --git a/test/heartbeat/listener.js b/test/heartbeat/listener.js index de4df53..063bcc5 100644 --- a/test/heartbeat/listener.js +++ b/test/heartbeat/listener.js @@ -44,6 +44,7 @@ describe('test/heartbeat/listener.js >', () => { const hbResponse = { collectorConfig: { heartbeatInterval: 50, + maxSamplesPerBulkRequest: 10, }, generatorsAdded: [ { @@ -85,7 +86,10 @@ describe('test/heartbeat/listener.js >', () => { it('added generators should be added to the config and the repeat tracker ' + 'should be setup', (done) => { const res = { - heartbeatInterval: 50, + collectorConfig: { + heartbeatInterval: 50, + maxSamplesPerBulkRequest: 10, + }, generatorsAdded: [ { name: 'Core_Trust2', @@ -113,7 +117,10 @@ describe('test/heartbeat/listener.js >', () => { it('updated generators should be updated in the config', (done) => { const res = { - heartbeatInterval: 50, + collectorConfig: { + heartbeatInterval: 50, + maxSamplesPerBulkRequest: 10, + }, generatorsAdded: [ { name: 'Core_Trust3', @@ -157,7 +164,10 @@ describe('test/heartbeat/listener.js >', () => { it('SGT with bulk= false should be handled', (done) => { const res = { - heartbeatInterval: 50, + collectorConfig: { + heartbeatInterval: 50, + maxSamplesPerBulkRequest: 10, + }, generatorsAdded: [ { name: 'Core_Trust_nonBulk_NA1_NA2', @@ -187,7 +197,10 @@ describe('test/heartbeat/listener.js >', () => { it('SGT update from bulk=true to bulk=false', (done) => { const res = { - heartbeatInterval: 50, + collectorConfig: { + heartbeatInterval: 50, + maxSamplesPerBulkRequest: 10, + }, generatorsAdded: [ { name: 'bulktrueToBulkFalse_1', @@ -212,6 +225,10 @@ describe('test/heartbeat/listener.js >', () => { .to.deep.equal(res.generatorsAdded[0]); expect(tracker.bulktrueToBulkFalse_1._bulk).not.equal(undefined); const updatedRes = { + collectorConfig: { + heartbeatInterval: 50, + maxSamplesPerBulkRequest: 50, + }, generatorsUpdated: [ { name: 'bulktrueToBulkFalse_1', @@ -242,7 +259,10 @@ describe('test/heartbeat/listener.js >', () => { it('SGT update from bulk=false to bulk=true', (done) => { const res = { - heartbeatInterval: 50, + collectorConfig: { + heartbeatInterval: 50, + maxSamplesPerBulkRequest: 10, + }, generatorsAdded: [ { name: 'bulktrueToBulkFalse_2', @@ -268,6 +288,10 @@ describe('test/heartbeat/listener.js >', () => { expect(tracker.bulktrueToBulkFalse_2.NA1).not.equal(undefined); expect(tracker.bulktrueToBulkFalse_2.NA2).not.equal(undefined); const updatedRes = { + collectorConfig: { + heartbeatInterval: 50, + maxSamplesPerBulkRequest: 10, + }, generatorsUpdated: [ { name: 'bulktrueToBulkFalse_2', @@ -297,7 +321,10 @@ describe('test/heartbeat/listener.js >', () => { it('deleted generators information should be deleted in the config', (done) => { const res = { - heartbeatInterval: 50, + collectorConfig: { + heartbeatInterval: 50, + maxSamplesPerBulkRequest: 10, + }, generatorsAdded: [ { name: 'ABC_DATA', @@ -332,6 +359,10 @@ describe('test/heartbeat/listener.js >', () => { const updatedConfig = listener.handleHeartbeatResponse(null, res); expect(updatedConfig.generators.ABC_DATA).to.not.equal(undefined); const resDel = { + collectorConfig: { + heartbeatInterval: 50, + maxSamplesPerBulkRequest: 10, + }, generatorsDeleted: [ { name: 'ABC_DATA', }, ], @@ -350,6 +381,7 @@ describe('test/heartbeat/listener.js >', () => { const res = { collectorConfig: { heartbeatInterval: 50, + maxSamplesPerBulkRequest: 10, }, generatorsAdded: { name: 'Fghijkl_Mnopq', @@ -380,7 +412,10 @@ describe('test/heartbeat/listener.js >', () => { it('added generators with encrypted context attributed should be ' + 'decrypted before the repeats are created', (done) => { const res = { - heartbeatInterval: 50, + collectorConfig: { + heartbeatInterval: 50, + maxSamplesPerBulkRequest: 10, + }, generatorsAdded: [ { name: 'Core_Trust2_With_Encryption', @@ -429,7 +464,10 @@ describe('test/heartbeat/listener.js >', () => { it('updated generator context with encryption should be decrypted', (done) => { const res = { - heartbeatInterval: 50, + collectorConfig: { + heartbeatInterval: 50, + maxSamplesPerBulkRequest: 10, + }, generatorsAdded: [ { name: 'Core_Trust3_With_Encryption', @@ -510,7 +548,10 @@ describe('test/heartbeat/listener.js >', () => { it('when context is updated from encryption = false to encryption = ' + 'true, decryption should happen', (done) => { const res = { - heartbeatInterval: 50, + collectorConfig: { + heartbeatInterval: 50, + maxSamplesPerBulkRequest: 10, + }, generatorsAdded: [ { name: 'Core_Trust4_With_Encryption', diff --git a/test/remoteCollection/collect.js b/test/remoteCollection/collect.js index 1f7c5ae..1ab5e1f 100644 --- a/test/remoteCollection/collect.js +++ b/test/remoteCollection/collect.js @@ -22,7 +22,6 @@ const handleCollectRes = .handleCollectResponse; const collect = require('../../src/remoteCollection/collect'); const httpStatus = require('../../src/constants').httpStatus; -const sampleQueueOps = require('../../src/sampleQueue/sampleQueueOps'); describe('test/remoteCollection/collect.js >', () => { describe('collect >', () => { @@ -213,54 +212,6 @@ describe('test/remoteCollection/collect.js >', () => { }) .catch(done); }); - - it('handleCollectResponse should work with a good response from collect', - (done) => { - const remoteUrl = 'http://bart.gov.api/'; - const generator = { - name: 'Generator0', - interval: 600, - aspects: [{ name: 'Delay', timeout: '1m' }], - ctx: {}, - generatorTemplate: { - connection: { - headers: { - Authorization: 'abddr121345bb', - }, - url: 'http://bart.gov.api/status', - bulk: true, - }, - transform: 'return [{ name: "Fremont|Delay", value: "10" }, ' + - '{ name: "UnionCity|Delay", value: "2" }]', - }, - subjects: [{ absolutePath: 'Fremont' }, { absolutePath: 'UnionCity' }], - aspects: [{ name: 'Delay', timeout: '1m' }], - }; - const remoteData = { - station: [{ name: 'Fremont|Delay', value: '10' }, - { name: 'UnionCity|Delay', value: '2' }, - ], - }; - nock(remoteUrl) - .get('/status') - .reply(httpStatus.OK, remoteData); - - nock(refocusUrl) - .post(bulkEndPoint, sampleArr) - .reply(httpStatus.CREATED, mockRest.bulkUpsertPostOk); - - handleCollectRes(collect.collect(generator)) - .then(() => { - expect(sampleQueueOps.sampleQueue.length).to.be.equal(2); - expect(sampleQueueOps.sampleQueue[0]) - .to.eql({ name: 'Fremont|Delay', value: '10' }); - expect(sampleQueueOps.sampleQueue[1]) - .to.eql({ name: 'UnionCity|Delay', value: '2' }); - sampleQueueOps.flush(100, tu.refocusInstance1); - done(); - }) - .catch(done); - }); }); // collect describe('prepareUrl >', () => { diff --git a/test/remoteCollection/handleCollectResponse.js b/test/remoteCollection/handleCollectResponse.js index 65dfecf..4c4262e 100644 --- a/test/remoteCollection/handleCollectResponse.js +++ b/test/remoteCollection/handleCollectResponse.js @@ -23,9 +23,10 @@ const errors = require('../../src/errors'); const hcr = require('../../src/remoteCollection/handleCollectResponse'); const validateCollectResponse = hcr.validateCollectResponse; const handleCollectResponse = hcr.handleCollectResponse; - +const queueUtils = require('../../src/utils/queueUtils'); const httpStatus = require('../../src/constants').httpStatus; -const sampleQueueOps = require('../../src/sampleQueue/sampleQueueOps'); +const configModule = require('../../src/config/config'); +const httpUtils = require('../../src/utils/httpUtils'); describe('test/remoteCollection/handleCollectResponse.js >', () => { describe('validateCollectResponse >', () => { @@ -178,7 +179,22 @@ describe('test/remoteCollection/handleCollectResponse.js >', () => { describe('handleCollectResponse', () => { let winstonInfoStub; + const refocusObject = { + url: refocusUrl, + token: 'abcdqwerty', + name: 'test', + }; + const config = configModule.getConfig(); before(() => { + queueUtils.createQueue('bulkUpsertSampleQueue', + config.collectorConfig.maxSamplesPerBulkRequest, + config.collectorConfig.sampleUpsertQueueTime, + false, + httpUtils.doBulkUpsert, + refocusObject + ); + + // console.log(queueUtils.getQueue('bulkUpsertSampleQueue')); // use nock to mock the response when flushing const sampleArr = [ { name: 'S1.S2|A1', value: 10 }, { name: 'S1.S2|A2', value: 2 }, @@ -193,7 +209,7 @@ describe('test/remoteCollection/handleCollectResponse.js >', () => { afterEach(() => { winstonInfoStub.reset(); - sampleQueueOps.flush(100, tu.refocusInstance1); + queueUtils.getQueue('bulkUpsertSampleQueue').Items = []; }); after(() => { @@ -368,12 +384,13 @@ describe('test/remoteCollection/handleCollectResponse.js >', () => { }); function checkLogs(expected) { - expect(winston.info.calledTwice).to.be.true; + expect(winston.info.calledOnce).to.be.true; expect(winston.info.args[0][0]).contains('generator: mockGenerator'); expect(winston.info.args[0][0]).contains(`numSamples: ${expected.length}`); - expect(sampleQueueOps.sampleQueue.length).to.be.equal(expected.length); - expect(sampleQueueOps.sampleQueue[0]).to.eql(expected[0]); - expect(sampleQueueOps.sampleQueue[1]).to.eql(expected[1]); + const queue = queueUtils.getQueue('bulkUpsertSampleQueue'); + expect(queue.items.length).to.be.equal(expected.length); + expect(queue.items[0]).to.eql(expected[0]); + expect(queue.items[1]).to.eql(expected[1]); } function defaultErrorSamples(statusCode, statusMessage) { diff --git a/test/sampleQueue/sampleQueueOps.js b/test/sampleQueue/sampleQueueOps.js deleted file mode 100644 index dd75294..0000000 --- a/test/sampleQueue/sampleQueueOps.js +++ /dev/null @@ -1,177 +0,0 @@ -/** - * Copyright (c) 2017, salesforce.com, inc. - * All rights reserved. - * Licensed under the BSD 3-Clause license. - * For full license text, see LICENSE.txt file in the repo root or - * https://opensource.org/licenses/BSD-3-Clause - */ - -/** - * test/sampleQueue/sampleQueueOps.js - */ -const expect = require('chai').expect; - -const configModule = require('../../src/config/config'); -const sampleUpsertUtils = require('../../src/sampleQueue/sampleUpsertUtils'); -const sinon = require('sinon'); -const tu = require('../testUtils'); -const bulkEndPoint = require('../../src/constants').bulkUpsertEndpoint; -const winston = require('winston'); -const nock = require('nock'); -const mockRest = require('../mockedResponse'); -const httpStatus = require('../../src/constants').httpStatus; -const registry = tu.config.registry; -const refocusUrl = registry.refocusInstances[ - Object.keys(tu.config.registry.refocusInstances)[0] -].url; -configModule.clearConfig(); -configModule.setRegistry(registry); -const sampleQueueOps = require('../../src/sampleQueue/sampleQueueOps'); -const ValidationError = require('../../src/errors').ValidationError; - -describe('test/sampleQueue/sampleQueueOps.js >', () => { - let samples; - let winstonInfoStub; - let winstonErrStub; - - beforeEach(() => { - samples = []; - for (let i = 0; i < 10; i++) { // create 10 samples - samples.push({ name: `sample${i.toString()}|aspName`, value: '' + i }); - } - - winstonInfoStub = sinon.stub(winston, 'info'); - winstonErrStub = sinon.stub(winston, 'error'); - }); - - afterEach(() => { - winstonInfoStub.restore(); - winstonErrStub.restore(); - }); - - describe('validateSample >', () => { - it('ok', (done) => { - const sample = { name: 'sample1|aspName', value: '0' }; - expect(() => sampleQueueOps.validateSample(sample)).to.not.throw(); - done(); - }); - - it('throws', (done) => { - const sample = 'abc'; - expect(() => sampleQueueOps.validateSample(sample)) - .to.throw(ValidationError); - done(); - }); - }); - - describe('enqueue >', () => { - it('ok', (done) => { - // check the sample queue length and contents - sampleQueueOps.enqueue(samples); - expect(sampleQueueOps.sampleQueue.length).to.be.equal(10); - expect(sampleQueueOps.sampleQueue[0].name).to.be.equal('sample0|aspName'); - expect(sampleQueueOps.sampleQueue[9].name).to.be.equal('sample9|aspName'); - - // check the logs - expect(winston.info.calledOnce).to.be.true; - expect(winston.info.calledWith('Enqueue successful for 10 samples')) - .to.be.true; - done(); - }); - - it('failed', (done) => { - sampleQueueOps.enqueue([['randomText']]); - expect(winston.error.calledOnce).to.be.true; - expect(winston.error.args[0][0]) - .contains('Enqueue failed: ValidationError'); - done(); - }); - }); - - describe('flush >', () => { - beforeEach(() => sampleQueueOps.flush(100, tu.refocusInstance1)); - - it('number of samples < maxSamplesPerBulkRequest, ok', (done) => { - // check that bulk upsert called expected number of times and with - // right arguments - sampleQueueOps.enqueue(samples); - expect(sampleQueueOps.sampleQueue.length).to.be.equal(10); - const doBulkUpsert = sinon.spy(sampleUpsertUtils, 'doBulkUpsert'); - sampleQueueOps.flush(100, tu.refocusInstance1); - sinon.assert.calledOnce(doBulkUpsert); - expect(doBulkUpsert.args[0][0].url).to.be.equal('http://www.xyz.com'); - expect(doBulkUpsert.args[0][0].token).to.be.string; - expect(doBulkUpsert.args[0][1][1].name).to.be.equal('sample1|aspName'); - expect(doBulkUpsert.args[0][1].length).to.be.equal(10); - doBulkUpsert.restore(); - expect(sampleQueueOps.sampleQueue.length).to.be.equal(0); - - done(); - }); - - it('number of samples > maxSamplesPerBulkRequest, ok', (done) => { - for (let i = 0; i < 250; i++) { // create and enqueue 250 more samples - samples.push({ name: `sample${i.toString()}|aspName`, value: '' + i }); - } - - // check that bulk upsert called expected number of times and with - // right arguments - sampleQueueOps.enqueue(samples); - const doBulkUpsert = sinon.spy(sampleUpsertUtils, 'doBulkUpsert'); - sampleQueueOps.flush(100, tu.refocusInstance1); - - // maxSamplesPerBulkRequest = 100, hence doBulkUpsert called thrice - sinon.assert.calledThrice(doBulkUpsert); - expect(doBulkUpsert.args[0][1].length).to.be.equal(100); - expect(doBulkUpsert.args[1][1].length).to.be.equal(100); - expect(doBulkUpsert.args[2][1].length).to.be.equal(60); - doBulkUpsert.restore(); - expect(sampleQueueOps.sampleQueue.length).to.be.equal(0); - done(); - }); - }); - - describe(' bulkUpsertAndLog >', () => { - afterEach(() => { - nock.cleanAll(); - }); - - // Needs setTimeout delay to pass, hence skipping. - it.skip('bulkUpsertAndLog, ok', (done) => { - // mock the bulk upsert request. - nock(refocusUrl) - .post(bulkEndPoint, samples) - .reply(httpStatus.CREATED, mockRest.bulkUpsertPostOk); - - sampleQueueOps.bulkUpsertAndLog(samples, tu.refocusInstance1); - - // Since logs are created after the bulkUpsert async call returns, hence - // setTimeout to wait for promise to complete. - setTimeout(() => { - expect(winston.info.calledOnce).to.be.true; - expect(winston.info.args[0][0].activity).to.equal('bulkUpsertSamples'); - expect(winston.info.args[0][0].sampleCount).to.equal(10); - done(); - }, 1900); - }); - - it.skip('bulkUpsertAndLog, error', (done) => { - // mock the bulk upsert request. - nock(refocusUrl) - .post(bulkEndPoint, samples) - .reply(httpStatus.BAD_REQUEST, {}); - sampleQueueOps.bulkUpsertAndLog(samples, tu.refocusInstance1); - setTimeout(() => { - // Since logs are created after the bulkUpsert async call returns, hence - // setTimeout to wait for promise to complete. - - expect(winston.error.calledOnce).to.be.true; - expect(winston.error.args[0][0]).contains( - 'doBulkUpsert failed for 10 samples' - ); - - done(); - }, 1980); - }); - }); -}); diff --git a/test/sampleQueue/sampleUpsertUtils.js b/test/utils/httpUtils.js similarity index 84% rename from test/sampleQueue/sampleUpsertUtils.js rename to test/utils/httpUtils.js index 9303e8e..c97c574 100644 --- a/test/sampleQueue/sampleUpsertUtils.js +++ b/test/utils/httpUtils.js @@ -7,16 +7,16 @@ */ /** - * test/sampleQueue/sampleUpsertUtils.js + * test/utils/httpUtils.js */ const expect = require('chai').expect; -const sampleUpsertUtils = require('../../src/sampleQueue/sampleUpsertUtils'); +const httpUtils = require('../../src/utils/httpUtils'); const request = require('superagent'); const bulkUpsertPath = require('../../src/constants').bulkUpsertEndpoint; const mock = require('superagent-mocker')(request); const httpStatus = require('../../src/constants').httpStatus; -describe('test/sampleQueue/sampleUpsertUtils.js >', () => { +describe('test/utils/httpUtils.js >', () => { const dummyStr = 'http://dummy.refocus.url'; const dummyToken = '3245678754323356475654356758675435647qwertyrytu'; const properRegistryObject = { url: dummyStr, token: dummyToken }; @@ -28,7 +28,7 @@ describe('test/sampleQueue/sampleUpsertUtils.js >', () => { after(mock.clearRoutes); it('no url in refocus instance object, gives validation error', (done) => { - sampleUpsertUtils.doBulkUpsert({ token: 'dummy' }, []) + httpUtils.doBulkUpsert({ token: 'dummy' }, []) .then(() => done(new Error('Expected validation error'))) .catch((err) => { expect(err.name).to.equal('ValidationError'); @@ -38,7 +38,7 @@ describe('test/sampleQueue/sampleUpsertUtils.js >', () => { }); it('no array input gives validation error', (done) => { - sampleUpsertUtils.doBulkUpsert(properRegistryObject) + httpUtils.doBulkUpsert(properRegistryObject) .then(() => done(new Error('Expected validation error'))) .catch((err) => { expect(err.name).to.equal('ValidationError'); @@ -48,7 +48,7 @@ describe('test/sampleQueue/sampleUpsertUtils.js >', () => { }); it('array input of non-array type gives validation error', (done) => { - sampleUpsertUtils.doBulkUpsert(properRegistryObject, dummyStr) + httpUtils.doBulkUpsert(properRegistryObject, dummyStr) .then(() => done(new Error('Expected validation error'))) .catch((err) => { expect(err.name).to.equal('ValidationError'); @@ -58,7 +58,7 @@ describe('test/sampleQueue/sampleUpsertUtils.js >', () => { }); it('no token in refocus instance object, gives validation error', (done) => { - sampleUpsertUtils.doBulkUpsert(properRegistryObject) + httpUtils.doBulkUpsert(properRegistryObject) .then(() => done(new Error('Expected validation error'))) .catch((err) => { expect(err.name).to.equal('ValidationError'); @@ -74,7 +74,7 @@ describe('test/sampleQueue/sampleUpsertUtils.js >', () => { // TODO: change to nock, stub response mock.post(properRegistryObject.url + bulkUpsertPath, () => Promise.resolve()); - sampleUpsertUtils.doBulkUpsert(properRegistryObject, []) + httpUtils.doBulkUpsert(properRegistryObject, []) .then((object) => { expect(object.status).to.equal(httpStatus.OK); done(); @@ -87,7 +87,7 @@ describe('test/sampleQueue/sampleUpsertUtils.js >', () => { // TODO: change to nock, stub response mock.post(properRegistryObject.url + bulkUpsertPath, (req) => req); - sampleUpsertUtils.doBulkUpsert(properRegistryObject, sampleArr) + httpUtils.doBulkUpsert(properRegistryObject, sampleArr) .then((object) => { // due to how superagent-mocker works, diff --git a/test/utils/queueUtils.js b/test/utils/queueUtils.js new file mode 100644 index 0000000..8fa067c --- /dev/null +++ b/test/utils/queueUtils.js @@ -0,0 +1,141 @@ +/** + * Copyright (c) 2017, salesforce.com, inc. + * All rights reserved. + * Licensed under the BSD 3-Clause license. + * For full license text, see LICENSE.txt file in the repo root or + * https://opensource.org/licenses/BSD-3-Clause + */ + +/** + * test/utils/queueUtils.js + */ +const expect = require('chai').expect; +const queueUtils = require('../../src/utils/queueUtils'); +const config = require('../../src/config/config'); +const errors = require('../../src/errors'); + +function flushFunction(refocusObject, data) { + return data; +} + +function validateError(data) { + throw new errors.ValidationError('validateError'); +} + +const queueParams = { + name: 'test', + size: 100, + flushTimeout: 4000, + verbose: false, + flushFunction: flushFunction, +}; + +describe('test/utils/queueUtils.js - queue utils unit tests >', () => { + it('Create queue', (done) => { + queueUtils.createQueue(queueParams); + const queue = queueUtils.getQueue('test'); + expect(queue._size).to.be.equal(100); + expect(queue._flushTimeout).to.be.equal(4000); + return done(); + }); + + it('Get queue', (done) => { + queueUtils.createQueue(queueParams); + const queue = queueUtils.getQueue('test'); + expect(queue._size).to.be.equal(100); + expect(queue._flushTimeout).to.be.equal(4000); + return done(); + }); + + it('Change the queue Size', (done) => { + queueUtils.createQueue(queueParams); + const queue = queueUtils.getQueue('test'); + expect(queue._size).to.be.equal(100); + queue._size = 50; + const newQueue = queueUtils.getQueue('test'); + expect(newQueue._size).to.be.equal(50); + return done(); + }); + + it('Change the queue flush time', (done) => { + queueUtils.createQueue(queueParams); + const queue = queueUtils.getQueue('test'); + expect(queue._flushTimeout).to.be.equal(4000); + queue._flushTimeout = 500; + const newQueue = queueUtils.getQueue('test'); + expect(newQueue._flushTimeout).to.be.equal(500); + return done(); + }); + + it('Create two queue', (done) => { + queueUtils.createQueue(queueParams); + queueUtils.createQueue({ + name: 'test1', + size: 100, + flushTimeout: 3000, + verbose: false, + flushFunction: flushFunction, + }); + const queue = queueUtils.getQueue('test'); + const queue1 = queueUtils.getQueue('test1'); + expect(queue._size).to.be.equal(100); + expect(queue1._size).to.be.equal(100); + return done(); + }); + + it('Flush queue after queue size done', (done) => { + queueUtils.createQueue({ + name: 'test', + size: 3, + flushTimeout: 300, + verbose: false, + flushFunction: flushFunction, + }); + const queue = queueUtils.getQueue('test'); + queue.add(1); + queue.add(2); + queue.add(3); + queue.add(4); + expect(queue.items.length).to.be.equal(1); + return done(); + }); + + it('Flush queue after queue flushtimeout done', (done) => { + queueUtils.createQueue({ + name: 'test', + size: 100, + flushTimeout: 300, + verbose: false, + flushFunction: flushFunction, + }); + const queue = queueUtils.getQueue('test'); + queue.add(1); + queue.add(2); + queue.add(3); + queue.add(4); + + setTimeout(() => { + expect(queue.items.length).to.be.equal(0); + return done(); + }, 400); + }); + + it('Enqueue function test', (done) => { + queueUtils.createQueue(queueParams); + const queue = queueUtils.getQueue('test'); + queueUtils.enqueueFromArray('test', [1, 2, 3, 4]); + expect(queue.items.length).to.be.equal(4); + return done(); + }); + + it('Validation individual data error test', (done) => { + queueUtils.createQueue(queueParams); + const queue = queueUtils.getQueue('test'); + try { + queueUtils.enqueueFromArray('test', [1, 2, 3, 4], validateError); + } catch (err) { + expect(err.name).to.be.equal('ValidationError'); + return done(); + } + }); +});