diff --git a/bin/email_notifications.js b/bin/email_notifications.js index e0493e3c2..ff4d81004 100644 --- a/bin/email_notifications.js +++ b/bin/email_notifications.js @@ -9,33 +9,38 @@ // If required, modules will be instrumented. require('../lib/newrelic')() -var config = require('../config').getProperties() -var log = require('../lib/log')(config.log.level, 'fxa-email-bouncer') -var error = require('../lib/error') -var Token = require('../lib/tokens')(log, config) -var SQSReceiver = require('../lib/sqs')(log) -var bounces = require('../lib/email/bounces')(log, error) -var delivery = require('../lib/email/delivery')(log) +const config = require('../config').getProperties() +const log = require('../lib/log')(config.log.level, 'fxa-email-bouncer') +const error = require('../lib/error') +const Token = require('../lib/tokens')(log, config) +const SQSReceiver = require('../lib/sqs')(log) +const bounces = require('../lib/email/bounces')(log, error) +const delivery = require('../lib/email/delivery')(log) +const notifications = require('../lib/email/notifications')(log, error) -var DB = require('../lib/db')( +const DB = require('../lib/db')( config, log, Token ) -var bounceQueue = new SQSReceiver(config.emailNotifications.region, [ - config.emailNotifications.bounceQueueUrl, - config.emailNotifications.complaintQueueUrl -]) +const { + bounceQueueUrl, + complaintQueueUrl, + deliveryQueueUrl, + notificationQueueUrl, + region +} = config.emailNotifications -var deliveryQueue = new SQSReceiver(config.emailNotifications.region, [ - config.emailNotifications.deliveryQueueUrl -]) +const bounceQueue = new SQSReceiver(region, [ bounceQueueUrl, complaintQueueUrl ]) +const deliveryQueue = new SQSReceiver(region, [ deliveryQueueUrl ]) +const notificationQueue = new SQSReceiver(region, [ notificationQueueUrl ]) DB.connect(config[config.db.backend]) - .then( - function (db) { - bounces(bounceQueue, db) - delivery(deliveryQueue) - } - ) + .then(db => { + // bounces and delivery are now deprecated, we'll delete them + // as soon as we're 100% confident in fxa-email-service + bounces(bounceQueue, db) + delivery(deliveryQueue) + notifications(notificationQueue, db) + }) diff --git a/config/index.js b/config/index.js index c625ad657..4fabe8922 100644 --- a/config/index.js +++ b/config/index.js @@ -536,6 +536,12 @@ var conf = convict({ format: String, env: 'DELIVERY_QUEUE_URL', default: '' + }, + notificationQueueUrl: { + doc: 'Queue URL for notifications from fxa-email-service (eventually this will be the only email-related queue)', + format: String, + env: 'NOTIFICATION_QUEUE_URL', + default: '' } }, profileServerMessaging: { diff --git a/lib/email/notifications.js b/lib/email/notifications.js new file mode 100644 index 000000000..3176fe14e --- /dev/null +++ b/lib/email/notifications.js @@ -0,0 +1,63 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +'use strict' + +const P = require('../promise') +const utils = require('./utils/helpers') + +// Account deletion threshold for new unverified accounts that receive +// a bounce or complaint notification. Unverified accounts younger than +// 6 hours old will be deleted if a bounce or complaint occurs. +const SIX_HOURS = 1000 * 60 * 60 * 6 + +module.exports = (log, error) => { + return (queue, db) => { + queue.start() + + queue.on('data', async message => { + try { + utils.logErrorIfHeadersAreWeirdOrMissing(log, message, 'notification') + + let addresses = [], eventType = 'bounced', isDeletionCandidate = false + if (message.bounce) { + addresses = mapBounceComplaintRecipients(message.bounce.bouncedRecipients) + isDeletionCandidate = true + } else if (message.complaint) { + addresses = mapBounceComplaintRecipients(message.complaint.complainedRecipients) + isDeletionCandidate = true + } else if (message.delivery) { + addresses = message.delivery.recipients + eventType = 'delivered' + } + + await P.all(addresses.map(async address => { + const domain = utils.getAnonymizedEmailDomain(address) + + utils.logFlowEventFromMessage(log, message, eventType) + utils.logEmailEventFromMessage(log, message, eventType, domain) + + if (isDeletionCandidate) { + const emailRecord = await db.accountRecord(address) + + if (! emailRecord.emailVerified && emailRecord.createdAt >= Date.now() - SIX_HOURS) { + // A bounce or complaint on a new unverified account is grounds for deletion + await db.deleteAccount(emailRecord) + + log.info({ op: 'accountDeleted', ...emailRecord }) + } + } + })) + } catch (err) { + log.error({ op: 'email.notification.error', err }) + } + + message.del() + }) + } +} + +function mapBounceComplaintRecipients (recipients) { + return recipients.map(recipient => recipient.emailAddress) +} diff --git a/test/local/email/notifications.js b/test/local/email/notifications.js new file mode 100644 index 000000000..520c969d4 --- /dev/null +++ b/test/local/email/notifications.js @@ -0,0 +1,491 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +'use strict' + +const ROOT_DIR = '../../..' + +const { assert } = require('chai') +const error = require(`${ROOT_DIR}/lib/error`) +const { mockLog } = require('../../mocks') +const notifications = require(`${ROOT_DIR}/lib/email/notifications`) +const P = require(`${ROOT_DIR}/lib/promise`) +const sinon = require('sinon') + +const SIX_HOURS = 1000 * 60 * 60 * 6 + +describe('lib/email/notifications:', () => { + let now, del, log, queue, emailRecord, db + + beforeEach(() => { + now = Date.now() + sinon.stub(Date, 'now', () => now) + del = sinon.spy() + log = mockLog() + queue = { + start: sinon.spy(), + on: sinon.spy() + } + emailRecord = { + emailVerified: false, + createdAt: now - SIX_HOURS - 1 + } + db = { + accountRecord: sinon.spy(() => P.resolve(emailRecord)), + deleteAccount: sinon.spy(() => P.resolve()) + } + notifications(log, error)(queue, db) + }) + + afterEach(() => { + Date.now.restore() + }) + + it('called queue.start', () => { + assert.equal(queue.start.callCount, 1) + assert.lengthOf(queue.start.args[0], 0) + }) + + it('called queue.on', () => { + assert.equal(queue.on.callCount, 1) + + const args = queue.on.args[0] + assert.lengthOf(args, 2) + assert.equal(args[0], 'data') + assert.isFunction(args[1]) + assert.lengthOf(args[1], 1) + }) + + describe('bounce message:', () => { + beforeEach(() => { + return queue.on.args[0][1]({ + del, + mail: { + headers: { + 'Content-Language': 'en-gb', + 'X-Flow-Begin-Time': now - 1, + 'X-Flow-Id': 'foo', + 'X-Template-Name': 'bar', + 'X-Template-Version': 'baz', + } + }, + bounce: { + bouncedRecipients: [ { emailAddress: 'wibble@example.com' } ] + } + }) + }) + + it('logged a flow event', () => { + assert.equal(log.flowEvent.callCount, 1) + const args = log.flowEvent.args[0] + assert.lengthOf(args, 1) + assert.deepEqual(args[0], { + event: 'email.bar.bounced', + flow_id: 'foo', + flow_time: 1, + time: now + }) + }) + + it('logged an email event', () => { + assert.equal(log.info.callCount, 1) + const args = log.info.args[0] + assert.lengthOf(args, 1) + assert.deepEqual(args[0], { + bounced: true, + domain: 'other', + flow_id: 'foo', + locale: 'en-gb', + op: 'emailEvent', + template: 'bar', + templateVersion: 'baz', + type: 'bounced' + }) + }) + + it('did not delete the account', () => { + assert.equal(db.accountRecord.callCount, 1) + const args = db.accountRecord.args[0] + assert.lengthOf(args, 1) + assert.equal(args[0], 'wibble@example.com') + + assert.equal(db.deleteAccount.callCount, 0) + }) + + it('called message.del', () => { + assert.equal(del.callCount, 1) + assert.lengthOf(del.args[0], 0) + }) + + it('did not log an error', () => { + assert.equal(log.error.callCount, 0) + }) + }) + + describe('complaint message, 2 recipients:', () => { + beforeEach(() => { + return queue.on.args[0][1]({ + del, + mail: { + headers: { + 'Content-Language': 'fr', + 'X-Flow-Begin-Time': now - 2, + 'X-Flow-Id': 'wibble', + 'X-Template-Name': 'blee' + } + }, + complaint: { + complainedRecipients: [ + { emailAddress: 'foo@example.com' }, + { emailAddress: 'pmbooth@gmail.com' } + ] + } + }) + }) + + it('logged 2 flow events', () => { + assert.equal(log.flowEvent.callCount, 2) + + let args = log.flowEvent.args[0] + assert.lengthOf(args, 1) + assert.deepEqual(args[0], { + event: 'email.blee.bounced', + flow_id: 'wibble', + flow_time: 2, + time: now + }) + + args = log.flowEvent.args[1] + assert.lengthOf(args, 1) + assert.deepEqual(args[0], { + event: 'email.blee.bounced', + flow_id: 'wibble', + flow_time: 2, + time: now + }) + }) + + it('logged 2 email events', () => { + assert.equal(log.info.callCount, 2) + + let args = log.info.args[0] + assert.lengthOf(args, 1) + assert.deepEqual(args[0], { + complaint: true, + domain: 'other', + flow_id: 'wibble', + locale: 'fr', + op: 'emailEvent', + template: 'blee', + templateVersion: '', + type: 'bounced' + }) + + args = log.info.args[1] + assert.lengthOf(args, 1) + assert.deepEqual(args[0], { + complaint: true, + domain: 'gmail.com', + flow_id: 'wibble', + locale: 'fr', + op: 'emailEvent', + template: 'blee', + templateVersion: '', + type: 'bounced' + }) + }) + + it('did not delete the accounts', () => { + assert.equal(db.accountRecord.callCount, 2) + + let args = db.accountRecord.args[0] + assert.lengthOf(args, 1) + assert.equal(args[0], 'foo@example.com') + + args = db.accountRecord.args[1] + assert.lengthOf(args, 1) + assert.equal(args[0], 'pmbooth@gmail.com') + + assert.equal(db.deleteAccount.callCount, 0) + }) + + it('called message.del', () => { + assert.equal(del.callCount, 1) + }) + + it('did not log an error', () => { + assert.equal(log.error.callCount, 0) + }) + }) + + describe('bounce message, 2 recipients, new unverified account:', () => { + beforeEach(() => { + emailRecord.createdAt += 1 + return queue.on.args[0][1]({ + del, + mail: { + headers: { + 'Content-Language': 'en-gb', + 'X-Flow-Begin-Time': now - 1, + 'X-Flow-Id': 'foo', + 'X-Template-Name': 'bar', + 'X-Template-Version': 'baz', + } + }, + bounce: { + bouncedRecipients: [ + { emailAddress: 'wibble@example.com' }, + { emailAddress: 'blee@example.com' } + ] + } + }) + }) + + it('logged events', () => { + assert.equal(log.flowEvent.callCount, 2) + + assert.equal(log.info.callCount, 4) + + let args = log.info.args[2] + assert.lengthOf(args, 1) + assert.deepEqual(args[0], { + op: 'accountDeleted', + emailVerified: false, + createdAt: emailRecord.createdAt + }) + + args = log.info.args[3] + assert.lengthOf(args, 1) + assert.deepEqual(args[0], { + op: 'accountDeleted', + emailVerified: false, + createdAt: emailRecord.createdAt + }) + }) + + it('deleted the accounts', () => { + assert.equal(db.accountRecord.callCount, 2) + + let args = db.accountRecord.args[0] + assert.lengthOf(args, 1) + assert.equal(args[0], 'wibble@example.com') + + args = db.accountRecord.args[1] + assert.lengthOf(args, 1) + assert.equal(args[0], 'blee@example.com') + + assert.equal(db.deleteAccount.callCount, 2) + + args = db.deleteAccount.args[0] + assert.lengthOf(args, 1) + assert.equal(args[0], emailRecord) + + args = db.deleteAccount.args[1] + assert.lengthOf(args, 1) + assert.equal(args[0], emailRecord) + }) + + it('called message.del', () => { + assert.equal(del.callCount, 1) + }) + + it('did not log an error', () => { + assert.equal(log.error.callCount, 0) + }) + }) + + describe('complaint message, new unverified account:', () => { + beforeEach(() => { + emailRecord.createdAt += 1 + return queue.on.args[0][1]({ + del, + mail: { + headers: { + 'Content-Language': 'fr', + 'X-Flow-Begin-Time': now - 2, + 'X-Flow-Id': 'wibble', + 'X-Template-Name': 'blee' + } + }, + complaint: { + complainedRecipients: [ + { emailAddress: 'foo@example.com' } + ] + } + }) + }) + + it('logged events', () => { + assert.equal(log.flowEvent.callCount, 1) + assert.equal(log.info.callCount, 2) + }) + + it('deleted the account', () => { + assert.equal(db.accountRecord.callCount, 1) + assert.equal(db.deleteAccount.callCount, 1) + }) + + it('called message.del', () => { + assert.equal(del.callCount, 1) + }) + + it('did not log an error', () => { + assert.equal(log.error.callCount, 0) + }) + }) + + describe('bounce message, new verified account:', () => { + beforeEach(() => { + emailRecord.createdAt += 1 + emailRecord.emailVerified = true + return queue.on.args[0][1]({ + del, + mail: { + headers: { + 'Content-Language': 'en-gb', + 'X-Flow-Begin-Time': now - 1, + 'X-Flow-Id': 'foo', + 'X-Template-Name': 'bar', + 'X-Template-Version': 'baz', + } + }, + bounce: { + bouncedRecipients: [ + { emailAddress: 'wibble@example.com' } + ] + } + }) + }) + + it('logged events', () => { + assert.equal(log.flowEvent.callCount, 1) + assert.equal(log.info.callCount, 1) + }) + + it('did not delete the account', () => { + assert.equal(db.accountRecord.callCount, 1) + assert.equal(db.deleteAccount.callCount, 0) + }) + + it('called message.del', () => { + assert.equal(del.callCount, 1) + }) + + it('did not log an error', () => { + assert.equal(log.error.callCount, 0) + }) + }) + + describe('delivery message, new unverified account:', () => { + beforeEach(() => { + emailRecord.createdAt += 1 + return queue.on.args[0][1]({ + del, + mail: { + headers: { + 'Content-Language': 'en-gb', + 'X-Flow-Begin-Time': now - 1, + 'X-Flow-Id': 'foo', + 'X-Template-Name': 'bar', + 'X-Template-Version': 'baz', + } + }, + delivery: { + recipients: [ 'wibble@example.com' ] + } + }) + }) + + it('logged a flow event', () => { + assert.equal(log.flowEvent.callCount, 1) + const args = log.flowEvent.args[0] + assert.lengthOf(args, 1) + assert.deepEqual(args[0], { + event: 'email.bar.delivered', + flow_id: 'foo', + flow_time: 1, + time: now + }) + }) + + it('logged an email event', () => { + assert.equal(log.info.callCount, 1) + const args = log.info.args[0] + assert.lengthOf(args, 1) + assert.deepEqual(args[0], { + domain: 'other', + flow_id: 'foo', + locale: 'en-gb', + op: 'emailEvent', + template: 'bar', + templateVersion: 'baz', + type: 'delivered' + }) + }) + + it('did not delete the account', () => { + assert.equal(db.accountRecord.callCount, 0) + assert.equal(db.deleteAccount.callCount, 0) + }) + + it('called message.del', () => { + assert.equal(del.callCount, 1) + }) + + it('did not log an error', () => { + assert.equal(log.error.callCount, 0) + }) + }) + + describe('missing headers:', () => { + beforeEach(() => { + return queue.on.args[0][1]({ + del, + mail: {}, + bounce: { + bouncedRecipients: [ { emailAddress: 'wibble@example.com' } ] + } + }) + }) + + it('logged an error', () => { + assert.isAtLeast(log.error.callCount, 1) + + const args = log.error.args[0] + assert.lengthOf(args, 1) + assert.deepEqual(args[0], { + op: 'emailHeaders.missing', + origin: 'notification' + }) + }) + + it('did not log a flow event', () => { + assert.equal(log.flowEvent.callCount, 0) + }) + + it('logged an email event', () => { + assert.equal(log.info.callCount, 1) + const args = log.info.args[0] + assert.lengthOf(args, 1) + assert.deepEqual(args[0], { + bounced: true, + domain: 'other', + locale: '', + op: 'emailEvent', + template: '', + templateVersion: '', + type: 'bounced' + }) + }) + + it('did not delete the account', () => { + assert.equal(db.accountRecord.callCount, 1) + assert.equal(db.deleteAccount.callCount, 0) + }) + + it('called message.del', () => { + assert.equal(del.callCount, 1) + }) + }) +})