Skip to content

Commit

Permalink
feat: send multi channel message per recipient
Browse files Browse the repository at this point in the history
  • Loading branch information
lykmapipo committed Oct 14, 2019
1 parent 3be2522 commit 1d43780
Show file tree
Hide file tree
Showing 13 changed files with 248 additions and 86 deletions.
7 changes: 7 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ DEFAULT_SMS_TRANSPORT_NAME=tz-ega-sms
DEFAULT_PUSH_TRANSPORT_NAME=fcm-push


# SENDER DEFAULTS
DEFAULT_SENDER_NAME=
DEFAULT_SENDER_EMAIL=
DEFAULT_SENDER_MOBILE=
DEFAULT_SENDER_PUSH=


# TZ EGA SMS TRANSPORT
SMS_EGA_TZ_API_KEY=
SMS_EGA_TZ_API_USER=
Expand Down
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,13 @@ MONGODB_URI=
DEFAULT_TRANSPORT_NAME=echo
DEFAULT_SMTP_TRANSPORT_NAME=smtp
DEFAULT_SMS_TRANSPORT_NAME=infobip-sms
DEFAULT_PUSH_TRANSPORT_NAME=fcm-push

# SENDER DEFAULTS
DEFAULT_SENDER_NAME=Notification
DEFAULT_SENDER_EMAIL=no-reply@example..com
DEFAULT_SENDER_SMS=15552
DEFAULT_SENDER_PUSH=588573631552

# TZ EGA SMS TRANSPORT
SMS_EGA_TZ_API_KEY=
Expand Down
3 changes: 3 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ postman.httpServer = kue.app;
/* export http server listen */
postman.listen = listen;

/* export postman worker start */
postman.start = worker.start;

/* export common constants */
_.forEach(common, (value, key) => {
postman[key] = value;
Expand Down
4 changes: 2 additions & 2 deletions lib/campaign.http.router.js
Original file line number Diff line number Diff line change
Expand Up @@ -309,10 +309,10 @@ router.put(
router.delete(
PATH_SINGLE,
deleteFor({
del: (options, done) => Campaign.del(options, done),
del: (options, done) => Campaign.del(options, done), // TODO stop sending
soft: true,
})
);

/* expose campaign router */
module.exports = exports = router;
module.exports = exports = router;
162 changes: 135 additions & 27 deletions lib/campaign.model.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,18 @@
const _ = require('lodash');
const { waterfall, parallel } = require('async');
const { getString } = require('@lykmapipo/env');
const { stringify, parse } = require('@lykmapipo/common');
const { mergeObjects, stringify, parse, uniq } = require('@lykmapipo/common');
const {
model,
copyInstance,
Schema,
Mixed,
SCHEMA_OPTIONS,
} = require('@lykmapipo/mongoose-common');
const actions = require('mongoose-rest-actions');
const exportable = require('@lykmapipo/mongoose-exportable');
const { plugin: runInBackground, worker } = require('mongoose-kue');
const { Message, Email } = require('./message.model');
const { Message, Email, SMS, Push } = require('./message.model');

/* constants */
const MODEL_NAME = getString('CAMPAIGN_MODEL_NAME', 'Campaign');
Expand All @@ -51,6 +52,7 @@ const {
AUDIENCE_CUSTOMERS,
AUDIENCE_SUBSCRIBERS,
AUDIENCE_EMPLOYEES,
AUDIENCE_ALL,
AUDIENCES,
} = require('./common');

Expand All @@ -62,8 +64,7 @@ const {
* @version 0.1.0
* @private
*/
const CampaignSchema = new Schema(
{
const CampaignSchema = new Schema({
/**
* @name form
* @description campaign form i.e Alert, Announcement etc
Expand Down Expand Up @@ -164,7 +165,7 @@ const CampaignSchema = new Schema(
/**
* @name audiences
* @description Target audiences for a campaign
* e.g SMS, EMAIL etc.
* e.g Customers, Employees etc.
* @type {Object}
* @since 0.1.0
* @version 1.0.0
Expand All @@ -173,6 +174,7 @@ const CampaignSchema = new Schema(
audiences: {
type: [String],
enum: AUDIENCES,
default: [AUDIENCE_ALL],
index: true,
searchable: true,
taggable: true,
Expand Down Expand Up @@ -225,8 +227,13 @@ const CampaignSchema = new Schema(
*/
statistics: {
type: Mixed,
default: { sent: 0, queued: 0, delivered: 0, failed: 0 },
fake: true,
fake: () => {
return {
sms: { sent: 0, delivered: 0, failed: 0 },
email: { sent: 0, delivered: 0, failed: 0 },
push: { sent: 0, delivered: 0, failed: 0 }
};
},
},

/**
Expand Down Expand Up @@ -283,6 +290,24 @@ CampaignSchema.methods.preValidate = function preValidate(next) {
//ensure `title`
this.title = _.trim(this.title) || this.form;

// ensure default sender
this.sender = mergeObjects({
name: getString('DEFAULT_SENDER_NAME'),
email: getString('DEFAULT_SENDER_EMAIL'),
mobile: getString('DEFAULT_SENDER_SMS'),
pushToken: getString('DEFAULT_SENDER_PUSH'),
}, copyInstance(this.sender));

// ensure statistics
// TODO: update statistics after each message send or read
if (_.isEmpty(this.statistics) && !_.isEmpty(this.channels)) {
const statistics = {};
_.forEach(this.channels, channel => {
statistics[_.toLower(channel)] = { sent: 0, delivered: 0, failed: 0 };
});
this.statistics = statistics;
}

next(null, this);
};

Expand All @@ -307,7 +332,6 @@ CampaignSchema.methods.send = function send(done) {
const Campaign = model(MODEL_NAME);

/* @todo refactor */
/* @todo send messages per channel per recipient */

// save campaign
const saveCampaign = next => this.save(next);
Expand All @@ -333,46 +357,132 @@ CampaignSchema.methods.send = function send(done) {
return next(null, campaign);
};

// TODO: refactor to common
// send emails
const sendEmails = (campaign, next) => {
if (!_.isEmpty(campaign.to)) {
// prepare emails to send
// TODO improve sync send strategy
let emails = _.map(campaign.to, 'email');
let emails = uniq(_.map(campaign.to, 'email'));
emails = _.map(emails, to => {
const sendEmail = then => {
const SMTP_FROM = getString('SMTP_FROM');
const email = new Email({
sender: SMTP_FROM,
sender: campaign.sender.email,
to: to,
subject: campaign.subject,
body: campaign.message,
campaign: campaign,
bulk: campaign._id,
options: campaign.metadata
});
return email.send((error, message) => {
// TODO: update email statistics
// TODO: perist message results from error
return then(null, message);
});
return email.send((error, message) => then(null, message));
};
return sendEmail;
});
// send emails
emails = _.compact(emails);
return parallel(emails, (error, messages) => {
return next(error, campaign, messages);
return next(error, _.compact(messages));
});
}
// continue without send emails
return next(null, campaign);
return next(null, []);
};

// TODO: refactor to common
// send smss
const sendSMSs = (campaign, next) => {
if (!_.isEmpty(campaign.to)) {
// prepare smss to send
// TODO improve sync send strategy
let smss = uniq(_.map(campaign.to, 'mobile'));
smss = _.map(smss, to => {
const sendSMS = then => {
const sms = new SMS({
sender: campaign.sender.mobile,
to: to,
subject: campaign.subject,
body: campaign.message,
campaign: campaign,
bulk: campaign._id,
options: campaign.metadata
});
return sms.send((error, message) => {
// TODO: update sms statistics
// TODO: perist message results from error
return then(null, message);
});
};
return sendSMS;
});
// send smss
smss = _.compact(smss);
return parallel(smss, (error, messages) => {
return next(error, _.compact(messages));
});
}
// continue without send smss
return next(null, []);
};

// TODO: refactor to common
// send pushs
const sendPushs = (campaign, next) => {
if (!_.isEmpty(campaign.to)) {
// prepare pushs to send
// TODO improve sync send strategy
let pushs = uniq(_.map(campaign.to, 'pushToken'));
pushs = _.map(pushs, to => {
const sendPush = then => {
const push = new Push({
sender: campaign.sender.pushToken,
to: to,
subject: campaign.subject,
body: campaign.message,
campaign: campaign,
bulk: campaign._id,
options: campaign.metadata
});
return push.send((error, message) => {
// TODO: update push statistics
// TODO: perist message results from error
return then(null, message);
});
};
return sendPush;
});
// send pushs
pushs = _.compact(pushs);
return parallel(pushs, (error, messages) => {
return next(error, _.compact(messages));
});
}
// continue without send pushs
return next(null, []);
};

// TODO send sms
// TODO send push
// TODO paralles(sendEmails, sendSMS, sendPush)
// TODO update statistics after send
// send in paralles(sendEmails, sendSMS, sendPush)
// TODO check allowed channels
const doSend = (campaign, next) => parallel({
email: then => sendEmails(campaign, then),
sms: then => sendSMSs(campaign, then),
push: then => sendPushs(campaign, then),
}, (error, results) => {
// TODO update statistics after send
return next(error, campaign, results);
});

// do sending
return waterfall(
[saveCampaign, fetchContacts, ensureUniqContacts, sendEmails],
done
);
return waterfall([
saveCampaign,
fetchContacts, ensureUniqContacts,
doSend
], done);
};

/**
Expand All @@ -395,7 +505,7 @@ CampaignSchema.methods.queue = function queue(done) {
const cb = _.isFunction(done) ? done : _.noop;

//persist campaign
this.save(function(error, campaign) {
this.save(function (error, campaign) {
//notify campaign queue error
if (error) {
worker.queue.emit('job error', error);
Expand All @@ -414,11 +524,8 @@ CampaignSchema.methods.queue = function queue(done) {
};
const jobDetails = _.merge({}, jobDefaults, campaign.toObject());

const job = campaign.runInBackground(jobDetails);

//ensure campaign has been queued
// TODO queue message send
job.save(function(error) {
return campaign.runInBackground(jobDetails, function (error) {
if (error) {
worker.queue.emit('job error', error);
return cb(error);
Expand Down Expand Up @@ -455,6 +562,7 @@ CampaignSchema.statics.AUDIENCE_REPORTERS = AUDIENCE_REPORTERS;
CampaignSchema.statics.AUDIENCE_CUSTOMERS = AUDIENCE_CUSTOMERS;
CampaignSchema.statics.AUDIENCE_SUBSCRIBERS = AUDIENCE_SUBSCRIBERS;
CampaignSchema.statics.AUDIENCE_EMPLOYEES = AUDIENCE_EMPLOYEES;
CampaignSchema.statics.AUDIENCE_ALL = AUDIENCE_ALL;
CampaignSchema.statics.AUDIENCES = AUDIENCES;

/*
Expand All @@ -469,4 +577,4 @@ CampaignSchema.plugin(runInBackground, {
});

/* export campaign model */
module.exports = exports = model(MODEL_NAME, CampaignSchema);
module.exports = exports = model(MODEL_NAME, CampaignSchema);
4 changes: 3 additions & 1 deletion lib/common.js
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,12 @@ exports.AUDIENCE_REPORTERS = 'Reporters';
exports.AUDIENCE_CUSTOMERS = 'Customers';
exports.AUDIENCE_SUBSCRIBERS = 'Subscribers';
exports.AUDIENCE_EMPLOYEES = 'Employees';
exports.AUDIENCE_ALL = 'All';
exports.AUDIENCES = sortedUniq([
exports.AUDIENCE_REPORTERS,
exports.AUDIENCE_CUSTOMERS,
exports.AUDIENCE_SUBSCRIBERS,
exports.AUDIENCE_EMPLOYEES,
exports.AUDIENCE_ALL,
...getStrings('MESSAGE_AUDIENCES', []),
]);
]);
Loading

0 comments on commit 1d43780

Please sign in to comment.