Skip to content
This repository has been archived by the owner on Apr 3, 2019. It is now read-only.

Commit

Permalink
feat(metrics): add code and config for email service notification queue
Browse files Browse the repository at this point in the history
Fixes #2633.

Once the email service starts handling bounce and complaint events we
will need a way for it to tell the auth server to emit metrics. This
change adds a handler + config for a new SQS queue to that end. It
duplicates some of the code from other handlers but that's intentional,
we plan to remove those queues eventually.
  • Loading branch information
philbooth committed Oct 25, 2018
1 parent 93580da commit ccd5556
Show file tree
Hide file tree
Showing 4 changed files with 586 additions and 21 deletions.
47 changes: 26 additions & 21 deletions bin/email_notifications.js
Expand Up @@ -9,33 +9,38 @@
// If required, modules will be instrumented.
require('../lib/newrelic')()

var config = require('../config').getProperties()
var log = require('../lib/log')(config.log.level, 'fxa-email-bouncer')
var error = require('../lib/error')
var Token = require('../lib/tokens')(log, config)
var SQSReceiver = require('../lib/sqs')(log)
var bounces = require('../lib/email/bounces')(log, error)
var delivery = require('../lib/email/delivery')(log)
const config = require('../config').getProperties()
const log = require('../lib/log')(config.log.level, 'fxa-email-bouncer')
const error = require('../lib/error')
const Token = require('../lib/tokens')(log, config)
const SQSReceiver = require('../lib/sqs')(log)
const bounces = require('../lib/email/bounces')(log, error)
const delivery = require('../lib/email/delivery')(log)
const notifications = require('../lib/email/notifications')(log, error)

var DB = require('../lib/db')(
const DB = require('../lib/db')(
config,
log,
Token
)

var bounceQueue = new SQSReceiver(config.emailNotifications.region, [
config.emailNotifications.bounceQueueUrl,
config.emailNotifications.complaintQueueUrl
])
const {
bounceQueueUrl,
complaintQueueUrl,
deliveryQueueUrl,
notificationQueueUrl,
region
} = config.emailNotifications

var deliveryQueue = new SQSReceiver(config.emailNotifications.region, [
config.emailNotifications.deliveryQueueUrl
])
const bounceQueue = new SQSReceiver(region, [ bounceQueueUrl, complaintQueueUrl ])
const deliveryQueue = new SQSReceiver(region, [ deliveryQueueUrl ])
const notificationQueue = new SQSReceiver(region, [ notificationQueueUrl ])

DB.connect(config[config.db.backend])
.then(
function (db) {
bounces(bounceQueue, db)
delivery(deliveryQueue)
}
)
.then(db => {
// bounces and delivery are now deprecated, we'll delete them
// as soon as we're 100% confident in fxa-email-service
bounces(bounceQueue, db)
delivery(deliveryQueue)
notifications(notificationQueue, db)
})
6 changes: 6 additions & 0 deletions config/index.js
Expand Up @@ -536,6 +536,12 @@ var conf = convict({
format: String,
env: 'DELIVERY_QUEUE_URL',
default: ''
},
notificationQueueUrl: {
doc: 'Queue URL for notifications from fxa-email-service (eventually this will be the only email-related queue)',
format: String,
env: 'NOTIFICATION_QUEUE_URL',
default: ''
}
},
profileServerMessaging: {
Expand Down
63 changes: 63 additions & 0 deletions lib/email/notifications.js
@@ -0,0 +1,63 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */

'use strict'

const P = require('../promise')
const utils = require('./utils/helpers')

// Account deletion threshold for new unverified accounts that receive
// a bounce or complaint notification. Unverified accounts younger than
// 6 hours old will be deleted if a bounce or complaint occurs.
const SIX_HOURS = 1000 * 60 * 60 * 6

module.exports = (log, error) => {
return (queue, db) => {
queue.start()

queue.on('data', async message => {
try {
utils.logErrorIfHeadersAreWeirdOrMissing(log, message, 'notification')

let addresses = [], eventType = 'bounced', isDeletionCandidate = false
if (message.bounce) {
addresses = mapBounceComplaintRecipients(message.bounce.bouncedRecipients)
isDeletionCandidate = true
} else if (message.complaint) {
addresses = mapBounceComplaintRecipients(message.complaint.complainedRecipients)
isDeletionCandidate = true
} else if (message.delivery) {
addresses = message.delivery.recipients
eventType = 'delivered'
}

await P.all(addresses.map(async address => {
const domain = utils.getAnonymizedEmailDomain(address)

utils.logFlowEventFromMessage(log, message, eventType)
utils.logEmailEventFromMessage(log, message, eventType, domain)

if (isDeletionCandidate) {
const emailRecord = await db.accountRecord(address)

if (! emailRecord.emailVerified && emailRecord.createdAt >= Date.now() - SIX_HOURS) {
// A bounce or complaint on a new unverified account is grounds for deletion
await db.deleteAccount(emailRecord)

log.info({ op: 'accountDeleted', ...emailRecord })
}
}
}))
} catch (err) {
log.error({ op: 'email.notification.error', err })
}

message.del()
})
}
}

function mapBounceComplaintRecipients (recipients) {
return recipients.map(recipient => recipient.emailAddress)
}

0 comments on commit ccd5556

Please sign in to comment.