Skip to content
Permalink
Browse files

fix(dataflow): ignore old message from dataflow queue

  • Loading branch information...
philbooth committed Sep 10, 2019
1 parent fdc3ef2 commit 74572555fba1bc133859cf95597af361ed71d5cf
@@ -374,6 +374,13 @@ module.exports = function(fs, path, url, convict) {
env: 'DATAFLOW_REPORT_ONLY',
format: Boolean,
},
ignoreOlderThan: {
doc:
'Ignore messages older than this value. Or set to `0` to never ignore old messages',
default: '1 day',
env: 'DATAFLOW_IGNORE_OLDER_THAN',
format: 'duration',
},
gcpPubSub: {
projectId: {
doc: 'GCP PubSub project ID',
@@ -26,7 +26,7 @@ module.exports = (config, log, fetchRecords, setRecords) => {
}

const { projectId, subscriptionName } = config.dataflow.gcpPubSub;
const { reportOnly } = config.dataflow;
const { ignoreOlderThan, reportOnly } = config.dataflow;

if (!projectId) {
throw new Error(
@@ -61,33 +61,39 @@ module.exports = (config, log, fetchRecords, setRecords) => {
const action = parseMessage(message);
const type = TYPES.get(action.indicator_type);

let level, op;

if (reportOnly || action.suggested_action === 'report') {
log.warn({
op: 'dataflow.report',
[type]: action.indicator,
severity: action.severity,
confidence: action.confidence,
heuristic: action.heuristic,
heuristic_description: action.heuristic_description,
reason: action.reason,
suggested_action: action.suggested_action,
});
} else {
level = 'info';
op = 'dataflow.message.report';
} else if (isFresh(action.timestamp)) {
const records = await fetchRecords({
[type]: action.indicator,
});

records[type][action.suggested_action]();

await setRecords(records);

level = 'info';
op = 'dataflow.message.success';
} else {
level = 'warn';
op = 'dataflow.message.ignore';
}

message.ack();

log.info({
op: 'dataflow.message.success',
log[level]({
op,
id: message.id,
timestamp: action.timestamp,
[type]: action.indicator,
severity: action.severity,
confidence: action.confidence,
heuristic: action.heuristic,
heuristic_description: action.heuristic_description,
reason: action.reason,
suggested_action: action.suggested_action,
reportOnly,
});
@@ -154,4 +160,19 @@ module.exports = (config, log, fetchRecords, setRecords) => {
throw new TypeError(`invalid indicator: ${type}`);
}
}

/**
* Predicate indicating whether a timestamp is younger than `config.ignoreOlderThan`.
*
* @param {String} timestamp
* @returns {Boolean}
*/
function isFresh(timestamp) {
if (ignoreOlderThan > 0) {
const date = new Date(timestamp);
return date.getTime() > Date.now() - ignoreOlderThan;
}

return true;
}
};
@@ -12,6 +12,7 @@ tapTest('dataflow', async () => {
dataflow: {
enabled: true,
reportOnly: false,
ignoreOlderThan: 1000,
gcpPubSub: {
projectId: 'foo',
subscriptionName: 'bar',
@@ -59,7 +60,7 @@ tapTest('dataflow', async () => {
heuristic: 'heurizzle',
heuristic_description: 'this is a heuristic',
reason: 'because',
suggested_action: 'report',
suggested_action: 'suspect',
details: {
key: 'value',
},
@@ -263,44 +264,83 @@ tapTest('dataflow', async () => {
assert.equal(setRecords.callCount, 0);
});

test('report email', async () => {
test('old message', async () => {
const timestamp = new Date(Date.now() - 1001).toISOString();
const message = {
ack: sandbox.spy(),
nack: sandbox.spy(),
id: 'blee',
data: Buffer.from(JSON.stringify(data)),
data: Buffer.from(
JSON.stringify({
...data,
timestamp,
})
),
};
await messageHandler(message);

assert.equal(log.warn.callCount, 1);
args = log.warn.args[0];
assert.lengthOf(args, 1);
assert.deepEqual(args[0], {
op: 'dataflow.report',
op: 'dataflow.message.ignore',
id: 'blee',
timestamp,
email: 'pb@example.com',
severity: 'warn',
confidence: 42,
heuristic: 'heurizzle',
heuristic_description: 'this is a heuristic',
reason: 'because',
suggested_action: 'report',
suggested_action: 'suspect',
reportOnly: false,
});

assert.equal(message.ack.callCount, 1);
assert.lengthOf(message.ack.args[0], 0);

assert.equal(message.nack.callCount, 0);
assert.equal(log.info.callCount, 0);
assert.equal(log.error.callCount, 0);
assert.equal(fetchRecords.callCount, 0);
assert.equal(setRecords.callCount, 0);
});

test('report email', async () => {
const message = {
ack: sandbox.spy(),
nack: sandbox.spy(),
id: 'blee',
data: Buffer.from(
JSON.stringify({
...data,
suggested_action: 'report',
})
),
};
await messageHandler(message);

assert.equal(log.info.callCount, 1);
args = log.info.args[0];
assert.lengthOf(args, 1);
assert.deepEqual(args[0], {
op: 'dataflow.message.success',
op: 'dataflow.message.report',
id: 'blee',
timestamp: data.timestamp,
email: 'pb@example.com',
severity: 'warn',
confidence: 42,
heuristic: 'heurizzle',
heuristic_description: 'this is a heuristic',
reason: 'because',
suggested_action: 'report',
reportOnly: false,
});

assert.equal(message.ack.callCount, 1);
assert.lengthOf(message.ack.args[0], 0);

assert.equal(message.nack.callCount, 0);
assert.equal(log.warn.callCount, 0);
assert.equal(log.error.callCount, 0);
assert.equal(fetchRecords.callCount, 0);
assert.equal(setRecords.callCount, 0);
@@ -314,26 +354,23 @@ tapTest('dataflow', async () => {
data: Buffer.from(
JSON.stringify({
...data,
suggested_action: 'report',
indicator_type: 'sourceaddress',
indicator: '1.1.1.1',
})
),
};
await messageHandler(message);

assert.equal(log.warn.callCount, 1);
args = log.warn.args[0];
assert.equal(args[0].ip, '1.1.1.1');
assert.isUndefined(args[0].email);

assert.equal(message.ack.callCount, 1);

assert.equal(log.info.callCount, 1);
args = log.info.args[0];
assert.equal(args[0].ip, '1.1.1.1');
assert.isUndefined(args[0].email);

assert.equal(message.ack.callCount, 1);

assert.equal(message.nack.callCount, 0);
assert.equal(log.warn.callCount, 0);
assert.equal(log.error.callCount, 0);
assert.equal(fetchRecords.callCount, 0);
assert.equal(setRecords.callCount, 0);
@@ -344,12 +381,7 @@ tapTest('dataflow', async () => {
ack: sandbox.spy(),
nack: sandbox.spy(),
id: 'blee',
data: Buffer.from(
JSON.stringify({
...data,
suggested_action: 'suspect',
})
),
data: Buffer.from(JSON.stringify(data)),
};
await messageHandler(message);

@@ -372,10 +404,18 @@ tapTest('dataflow', async () => {
assert.equal(message.ack.callCount, 1);

assert.equal(log.info.callCount, 1);
assert.deepEqual(log.info.args[0][0], {
args = log.info.args[0];
assert.lengthOf(args, 1);
assert.deepEqual(args[0], {
op: 'dataflow.message.success',
id: 'blee',
timestamp: data.timestamp,
email: 'pb@example.com',
severity: 'warn',
confidence: 42,
heuristic: 'heurizzle',
heuristic_description: 'this is a heuristic',
reason: 'because',
suggested_action: 'suspect',
reportOnly: false,
});
@@ -398,7 +438,6 @@ tapTest('dataflow', async () => {
data: Buffer.from(
JSON.stringify({
...data,
suggested_action: 'suspect',
indicator_type: 'sourceaddress',
indicator: '1.1.1.1',
})
@@ -459,6 +498,7 @@ tapTest('dataflow', async () => {
assert.equal(message.ack.callCount, 1);

assert.equal(log.info.callCount, 1);
assert.equal(log.info.args[0][0].op, 'dataflow.message.success');

assert.equal(message.nack.callCount, 0);
assert.equal(log.warn.callCount, 0);
@@ -496,6 +536,7 @@ tapTest('dataflow', async () => {
assert.equal(message.ack.callCount, 1);

assert.equal(log.info.callCount, 1);
assert.equal(log.info.args[0][0].op, 'dataflow.message.success');

assert.equal(message.nack.callCount, 0);
assert.equal(log.warn.callCount, 0);
@@ -545,7 +586,7 @@ tapTest('dataflow', async () => {
assert.equal(fetchRecords.callCount, 0);
assert.equal(setRecords.callCount, 0);

test('report', async () => {
test('suspect', async () => {
const message = {
ack: sandbox.spy(),
nack: sandbox.spy(),
@@ -554,50 +595,25 @@ tapTest('dataflow', async () => {
};
await messageHandler(message);

assert.equal(log.warn.callCount, 1);
assert.deepEqual(log.warn.args[0][0], {
op: 'dataflow.report',
assert.equal(log.info.callCount, 1);
assert.deepEqual(log.info.args[0][0], {
op: 'dataflow.message.report',
id: 'blee',
timestamp: data.timestamp,
email: 'pb@example.com',
severity: 'warn',
confidence: 42,
heuristic: 'heurizzle',
heuristic_description: 'this is a heuristic',
reason: 'because',
suggested_action: 'report',
suggested_action: 'suspect',
reportOnly: true,
});

assert.equal(message.ack.callCount, 1);

assert.equal(log.info.callCount, 1);

assert.equal(message.nack.callCount, 0);
assert.equal(log.error.callCount, 0);
assert.equal(fetchRecords.callCount, 0);
assert.equal(setRecords.callCount, 0);
});

test('suspect', async () => {
const message = {
ack: sandbox.spy(),
nack: sandbox.spy(),
id: 'blee',
data: Buffer.from(
JSON.stringify({
...data,
suggested_action: 'suspect',
})
),
};
await messageHandler(message);

assert.equal(log.warn.callCount, 1);
assert.deepEqual(log.warn.args[0][0].suggested_action, 'suspect');

assert.equal(message.ack.callCount, 1);

assert.equal(log.info.callCount, 1);

assert.equal(message.nack.callCount, 0);
assert.equal(log.warn.callCount, 0);
assert.equal(log.error.callCount, 0);
assert.equal(fetchRecords.callCount, 0);
assert.equal(setRecords.callCount, 0);
@@ -617,14 +633,13 @@ tapTest('dataflow', async () => {
};
await messageHandler(message);

assert.equal(log.warn.callCount, 1);
assert.deepEqual(log.warn.args[0][0].suggested_action, 'block');
assert.equal(log.info.callCount, 1);
assert.deepEqual(log.info.args[0][0].suggested_action, 'block');

assert.equal(message.ack.callCount, 1);

assert.equal(log.info.callCount, 1);

assert.equal(message.nack.callCount, 0);
assert.equal(log.warn.callCount, 0);
assert.equal(log.error.callCount, 0);
assert.equal(fetchRecords.callCount, 0);
assert.equal(setRecords.callCount, 0);
@@ -644,14 +659,13 @@ tapTest('dataflow', async () => {
};
await messageHandler(message);

assert.equal(log.warn.callCount, 1);
assert.deepEqual(log.warn.args[0][0].suggested_action, 'disable');
assert.equal(log.info.callCount, 1);
assert.deepEqual(log.info.args[0][0].suggested_action, 'disable');

assert.equal(message.ack.callCount, 1);

assert.equal(log.info.callCount, 1);

assert.equal(message.nack.callCount, 0);
assert.equal(log.warn.callCount, 0);
assert.equal(log.error.callCount, 0);
assert.equal(fetchRecords.callCount, 0);
assert.equal(setRecords.callCount, 0);

0 comments on commit 7457255

Please sign in to comment.
You can’t perform that action at this time.