Skip to content
This repository has been archived by the owner on Mar 20, 2022. It is now read-only.

Commit

Permalink
feat(pubsub): log a warning when really old messages arrive
Browse files Browse the repository at this point in the history
  • Loading branch information
philbooth authored and jbuck committed Jun 7, 2019
1 parent 64f951b commit d1e67a5
Showing 1 changed file with 15 additions and 2 deletions.
17 changes: 15 additions & 2 deletions pubsub/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,16 @@ if (! AMPLITUDE_API_KEY || ! HMAC_KEY || ! PUBSUB_PROJECT || ! PUBSUB_TOPIC || !
process.exit(1)
}

const TIMEOUT_THRESHOLD = parseInt(process.env.TIMEOUT_THRESHOLD || '60000');
const SECOND = 1000;
const MINUTE = SECOND * 60;
const HOUR = MINUTE * 60;
const DAY = HOUR * 24;

// If TIMEOUT_THRESHOLD milliseconds pass with no messages arriving, the script will abort
const TIMEOUT_THRESHOLD = parseInt(process.env.TIMEOUT_THRESHOLD || MINUTE);

// If a message older than WARNING_THRESHOLD milliseconds arrives, the script will log a warning
const WARNING_THRESHOLD = parseInt(process.env.WARNING_THRESHOLD || DAY * 3);

const IGNORED_EVENTS = new Map()
if (process.env.IGNORED_EVENTS) {
Expand Down Expand Up @@ -138,6 +147,10 @@ function setupCargo (endpoint, key) {
function processMessage (cargo, message) {
const { httpapi, identify } = parseMessage(message)

if (message.publishTime < Date.now() - WARNING_THRESHOLD) {
console.log(timestamp(), 'Warning: Old message', { httpapi, identify })
}

if (httpapi) {
MESSAGES.set(httpapi.insert_id, { message, payloadCount: identify ? 2 : 1 })
cargo.httpapi.push(httpapi)
Expand Down Expand Up @@ -172,7 +185,7 @@ function parseMessage (message) {
}

if (! isEventOk(event)) {
console.warn(timestamp(), 'Warning: Skipping malformed event', event)
console.log(timestamp(), 'Warning: Skipping malformed event', event)
return {}
}

Expand Down

0 comments on commit d1e67a5

Please sign in to comment.