Skip to content

Commit

Permalink
Merge branch 'master' into delete-rlinks-tags--sample-endpoints
Browse files Browse the repository at this point in the history
  • Loading branch information
pallavi2209 committed Mar 17, 2017
2 parents daac9e7 + 5705608 commit dbd375e
Show file tree
Hide file tree
Showing 16 changed files with 412 additions and 65 deletions.
27 changes: 24 additions & 3 deletions api/v1/controllers/samples.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
const featureToggles = require('feature-toggles');

const helper = require('../helpers/nouns/samples');
const subHelper = require('../helpers/nouns/subjects');
const doDelete = require('../helpers/verbs/doDelete');
const doFind = require('../helpers/verbs/doFind');
const doGet = require('../helpers/verbs/doGet');
Expand All @@ -25,7 +26,8 @@ const httpStatus = require('../constants').httpStatus;
const sampleStore = require('../../../cache/sampleStore');
const constants = sampleStore.constants;
const redisModelSample = require('../../../cache/models/samples');

const publisher = u.publisher;
const event = u.realtimeEvents;
module.exports = {

/**
Expand Down Expand Up @@ -192,6 +194,12 @@ module.exports = {
u.removeFieldsFromResponse(helper.fieldsToExclude, dataValues);
}

/*
*send the upserted sample to the client by publishing it to the redis
*channel
*/
publisher.publishSample(samp, subHelper.model);

u.logAPI(req, resultObj, dataValues);
return res.status(httpStatus.OK)
.json(u.responsify(samp, helper, req.method));
Expand Down Expand Up @@ -230,10 +238,23 @@ module.exports = {

const j = jobWrapper.createJob(jobType.BULKUPSERTSAMPLES,
wrappedBulkUpsertData, req);
} else if (featureToggles.isFeatureEnabled(constants.featureName)) {
redisModelSample.bulkUpsertSample(value);
} else {
const bulkUpsertPromise =
featureToggles.isFeatureEnabled(constants.featureName) ?
redisModelSample.bulkUpsertSample(value) :
helper.model.bulkUpsertByName(value, userName);

/*
*send the upserted sample to the client by publishing it to the redis
*channel
*/
bulkUpsertPromise.then((samples) => {
samples.forEach((sample) => {
if (!sample.isFailed) {
publisher.publishSample(sample, subHelper.model);
}
});
});
}
});

Expand Down
8 changes: 8 additions & 0 deletions api/v1/helpers/verbs/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ const constants = require('../../constants');
const commonDbUtil = require('../../../../db/helpers/common');
const jwtUtil = require('../../../../utils/jwtUtil');
const logAPI = require('../../../../utils/apiLog').logAPI;
const publisher = require('../../../../realtime/redisPublisher');
const realtimeEvents = require('../../../../realtime/constants').events;

/**
* In-place removal of certain keys from the input object
Expand All @@ -30,6 +32,7 @@ function removeFieldsFromResponse(fieldsToExclude, responseObj) {
delete responseObj[fieldsToExclude[i]];
}
}

/**
* This function adds the association scope name to the as the to all
* the elements of the associaton array
Expand Down Expand Up @@ -711,7 +714,12 @@ function checkDuplicateRLinks(rLinkArr) {
// ----------------------------------------------------------------------------

module.exports = {
realtimeEvents,

publisher,

logAPI,

buildFieldList,

/**
Expand Down
5 changes: 3 additions & 2 deletions cache/models/samples.js
Original file line number Diff line number Diff line change
Expand Up @@ -337,12 +337,13 @@ function createSampHsetCommand(qbObj, sampObj, aspectObj, isBulk) {
}
}

const dateNow = new Date().toString();
if (!sampObj) { // new sample
qbObj[sampFields.CREATED_AT] = new Date().toString();
qbObj[sampFields.CREATED_AT] = dateNow;
qbObj[sampFields.IS_DELETED] = '0';
}

qbObj[sampFields.UPD_AT] = new Date().toString();
qbObj[sampFields.UPD_AT] = dateNow;
}

/**
Expand Down
33 changes: 23 additions & 10 deletions cache/sampleStoreTimeout.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ const sampleStore = require('./sampleStore');
const redisClient = require('./redisCache').client.sampleStore;
const isTimedOut = require('../db/helpers/sampleUtils').isTimedOut;
const constants = require('../api/v1/constants');
const fieldsToStringify = require('./sampleStore').constants.fieldsToStringify;
const redisErrors = require('./redisErrors');
const ONE = 1;

Expand All @@ -27,10 +28,13 @@ const ONE = 1;
* @param {Array} samples - Samples to be timedout
* @param {Hash} aspects - Aspects hash, name-> object
* @param {Date} curr - Current datetime
* @returns {Array} Commands needed to timeout samples
* @returns {Object} - with sampCmds and timedOutSamples properties.
* sampCmds contains the commands needed to timeout samples and timedOutSamples
* contains the samples that needs to be timedout
*/
function getSampleTimeoutCommands(samples, aspects, curr) {
function getSampleTimeoutComponents(samples, aspects, curr) {
const sampCmds = [];
const timedOutSamples = [];

for (let num = 0; num < samples.length; num++) {
const samp = samples[num];
Expand All @@ -45,8 +49,10 @@ function getSampleTimeoutCommands(samples, aspects, curr) {
const asp = aspects[aspName.toLowerCase()];
const sampUpdDateTime = new Date(samp.updatedAt);

/* Update sample if aspect exists, sample status is other than TimeOut and
sample is timed out.*/
/*
* Update sample if aspect exists, sample status is other than TimeOut and
* sample is timed out.
*/
if (asp && isTimedOut(asp.timeout, curr, sampUpdDateTime)) {
const objToUpdate = {
value: constants.statuses.Timeout,
Expand All @@ -55,6 +61,11 @@ function getSampleTimeoutCommands(samples, aspects, curr) {
statusChangedAt: new Date().toString(),
updatedAt: new Date().toString(),
};
const fullSampObj = Object.assign({}, objToUpdate);
fullSampObj.name = samp.name;
fullSampObj.aspect =
sampleStore.arrayStringsToJson(asp, fieldsToStringify.aspect);
timedOutSamples.push(fullSampObj);
sampCmds.push([
'hmset',
sampleStore.toKey(sampleStore.constants.objectType.sample, samp.name),
Expand All @@ -63,8 +74,8 @@ function getSampleTimeoutCommands(samples, aspects, curr) {
}
}

return sampCmds;
}
return { sampCmds, timedOutSamples };
} // getSampleTimeoutComponents

module.exports = {

Expand All @@ -82,7 +93,7 @@ module.exports = {
let numberTimedOut = 0;
let numberEvaluated = 0;
let samplesCount = 0;

let timedOutSamples;
return redisClient.smembersAsync(sampleStore.constants.indexKey.sample)
.then((allSamples) => {
const commands = [];
Expand Down Expand Up @@ -125,17 +136,19 @@ module.exports = {
}
}

const sampCmds = getSampleTimeoutCommands(samples, aspects, curr);
const retObj = getSampleTimeoutComponents(samples, aspects, curr);
timedOutSamples = retObj.timedOutSamples;
const sampCmds = retObj.sampCmds;
numberEvaluated = samples.length;
numberTimedOut = sampCmds.length;
return redisClient.batch(sampCmds).execAsync();
})
.then(() => {
const res = { numberEvaluated, numberTimedOut };
const res = { numberEvaluated, numberTimedOut, timedOutSamples };
return res;
})
.catch((err) => {
throw err;
});
},
}, //doTimeout
};
21 changes: 17 additions & 4 deletions clock/scheduledJobs/sampleTimeoutJob.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
*/
const featureToggles = require('feature-toggles');
const dbSample = require('../../db/index').Sample;
const dbSubject = require('../../db/index').Subject;
const publisher = require('../../realtime/redisPublisher');
const sampleEvent = require('../../realtime/constants').events.sample;
const sampleStoreTimeout = require('../../cache/sampleStoreTimeout');

/**
Expand All @@ -22,11 +25,21 @@ const sampleStoreTimeout = require('../../cache/sampleStoreTimeout');
* @returns {Promise}
*/
function execute() {
if (featureToggles.isFeatureEnabled('enableRedisSampleStore')) {
return sampleStoreTimeout.doTimeout();
}
const sampleHandle = featureToggles
.isFeatureEnabled('enableRedisSampleStore') ?
sampleStoreTimeout : dbSample;

return dbSample.doTimeout();
return sampleHandle.doTimeout()
.then((dbRes) => {
// send the timeoutsample to the client by publishing it to redis channel
if (dbRes.timedOutSamples) {
dbRes.timedOutSamples.forEach((sample) => {
publisher.publishSample(sample, dbSubject, sampleEvent.upd);
});
}

return Promise.resolve(dbRes);
});
} // execute

module.exports = {
Expand Down
47 changes: 21 additions & 26 deletions db/model/sample.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ const u = require('../helpers/sampleUtils');
const common = require('../helpers/common');
const ResourceNotFoundError = require('../dbErrors').ResourceNotFoundError;
const UpdateDeleteForbidden = require('../dbErrors').UpdateDeleteForbidden;
const config = require('../../config');
const eventName = {
add: 'refocus.internal.realtime.sample.add',
upd: 'refocus.internal.realtime.sample.update',
Expand Down Expand Up @@ -159,7 +158,7 @@ module.exports = function sample(seq, dataTypes) {
include: [
{
association: assoc.aspect,
attributes: ['timeout', 'isPublished'],
attributes: ['timeout', 'isPublished', 'tags', 'name'],
},
],
});
Expand Down Expand Up @@ -244,7 +243,9 @@ module.exports = function sample(seq, dataTypes) {

/**
* Invalidates samples which were last updated before the "timeout"
* specified by the aspect.
* specified by the aspect and resolves to an object containing the
* number of samples evaluated, the number of samples timedout and an
* array containing a "copy" of the sample object.
*
* @param {Date} now - For testing, pass in a Date object to represent
* the current time
Expand All @@ -254,16 +255,32 @@ module.exports = function sample(seq, dataTypes) {
const curr = now || new Date();
let numberTimedOut = 0;
let numberEvaluated = 0;
const timedOutSamples = [];
return new seq.Promise((resolve, reject) => {
Sample.scope('checkTimeout').findAll()
.each((s) => {
numberEvaluated++;
if (s.aspect && u.isTimedOut(s.aspect.timeout, curr, s.updatedAt)) {
numberTimedOut++;

/*
* NOTE: a separate copy of the sample needed to be made (by
* deep cloning the required fields) to let the garbage collector
* cleanup the timedOutSamples array.
*/
const timedOutSample = {
value: constants.statuses.Timeout,
status: constants.statuses.Timeout,
name: s.name.split()[0],
statusChangedAt: new Date(),
aspect: JSON.parse(JSON.stringify(s.aspect)),
};
timedOutSamples.push(timedOutSample);
return s.update({ value: constants.statuses.Timeout });
}
})
.then(() => resolve({ numberEvaluated, numberTimedOut }))
.then(() => resolve({ numberEvaluated, numberTimedOut,
timedOutSamples, }))
.catch(reject);
});
}, // doTimeout
Expand Down Expand Up @@ -348,28 +365,6 @@ module.exports = function sample(seq, dataTypes) {
});
}, // hooks.afterDelete

/**
* If the sample changed significantly,
* publish the updated and former sample to redis channel.
*
* @param {Sample} inst - The updated instance
*/
afterUpdate(inst /* , opts */) {
const changedKeys = Object.keys(inst._changed);
const ignoreAttributes = ['isDeleted'];

return common.sampleAspectAndSubjectArePublished(seq, inst)
.then((published) => {
if (published) {
// augument the sample instance with the subject instance to enable
// filtering by subjecttags in the realtime socketio module
common.augmentSampleWithSubjectAspectInfo(seq, inst)
.then(() => common.publishChange(inst, eventName.upd, changedKeys,
ignoreAttributes));
}
});
},

/**
* Update isDeleted.
* Publishes the deleted sample to redis channel.
Expand Down
19 changes: 19 additions & 0 deletions realtime/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,23 @@ module.exports = {
fieldTypeFieldSeparator: '=',

filterTypeInclude: 'INCLUDE',

events: {
subject: {
add: 'refocus.internal.realtime.subject.add',
upd: 'refocus.internal.realtime.subject.update',
del: 'refocus.internal.realtime.subject.remove',
},

sample: {
add: 'refocus.internal.realtime.sample.add',
upd: 'refocus.internal.realtime.sample.update',
del: 'refocus.internal.realtime.sample.remove',
},

perspective: {
initialize: 'refocus.internal.realtime.perspective.namespace.initialize',
},
},

};

0 comments on commit dbd375e

Please sign in to comment.