Skip to content

Commit

Permalink
feat: support sending message batches
Browse files Browse the repository at this point in the history
  • Loading branch information
mvayngrib committed Apr 20, 2018
1 parent c83bc96 commit 6e7d1ea
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 42 deletions.
4 changes: 2 additions & 2 deletions npm-shrinkwrap.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@
"localstack:restart": "./src/scripts/localstack.sh down && ./src/scripts/localstack.sh up -d",
"localstack:update": "./src/scripts/localstack.sh pull tradle_web",
"models:update": "node ./lib/scripts/update-models.js",
"redis:stop": "redis-cli shutdown",
"tail": "./src/scripts/tail.sh",
"info": "serverless info",
"tswatch": "tsc -w",
Expand Down
98 changes: 61 additions & 37 deletions src/bot/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ import {
batchProcess,
getResourceIdentifier,
pickBacklinks,
omitBacklinks
omitBacklinks,
pluck
} from '../utils'

import { addLinks } from '../crypto'
Expand Down Expand Up @@ -308,55 +309,78 @@ export class Bot extends EventEmitter implements IReady, IHasModels {
}
}

public send = async (opts) => {
const batch = await Promise.all([].concat(opts)
.map(oneOpts => normalizeSendOpts(this, oneOpts)))
public toMessageBatch = (batch) => {
const recipients = pluck(batch, 'to').map(normalizeRecipient)
if (_.uniq(recipients).length > 1) {
throw new Errors.InvalidInput(`expected a single recipient`)
}

const n = batch.length
return batch.map((opts, i) => ({
...opts,
other: {
..._.clone(opts.other || {}),
iOfN: {
i: i + 1,
n
}
}
}))
}

public sendBatch = async (batch) => {
return this.send(this.toMessageBatch(batch))
}

public send = async (opts) => {
const batch = await Promise.map([].concat(opts), oneOpts => normalizeSendOpts(this, oneOpts))
const byRecipient = _.groupBy(batch, 'recipient')
const recipients = Object.keys(byRecipient)
this.logger.debug(`queueing messages to ${recipients.length} recipients`, {
recipients
})

const results = await Promise.all(recipients.map(async (recipient) => {
const subBatch = byRecipient[recipient]
const types = subBatch.map(m => m[TYPE]).join(', ')
this.logger.debug(`sending to ${recipient}: ${types}`)

await this.outboundMessageLocker.lock(recipient)
let messages
try {
messages = await this.provider.sendMessageBatch(subBatch)
this.tasks.add({
name: 'delivery:live',
promiser: () => this.provider.attemptLiveDelivery({
recipient,
messages
})
})

const user = await this.users.get(recipient)
await this._fireMessageBatchEvent({
spread: true,
batch: messages.map(message => toBotMessageEvent({
bot: this,
message,
user
}))
})
} finally {
this.outboundMessageLocker.unlock(recipient)
}

return messages
}))
const results = await Promise.map(recipients, async (recipient) => {
return await this._sendBatch({ recipient, batch: byRecipient[recipient] })
})

const messages = _.flatten(results)
if (messages) {
return Array.isArray(opts) ? messages : messages[0]
}
}

public _sendBatch = async ({ recipient, batch }) => {
const types = batch.map(m => m[TYPE]).join(', ')
this.logger.debug(`sending to ${recipient}: ${types}`)
await this.outboundMessageLocker.lock(recipient)
let messages
try {
messages = await this.provider.sendMessageBatch(batch)
this.tasks.add({
name: 'delivery:live',
promiser: () => this.provider.attemptLiveDelivery({
recipient,
messages
})
})

const user = await this.users.get(recipient)
await this._fireMessageBatchEvent({
spread: true,
batch: messages.map(message => toBotMessageEvent({
bot: this,
message,
user
}))
})
} finally {
this.outboundMessageLocker.unlock(recipient)
}

return messages
}

public sendPushNotification = (recipient: string) => this.provider.sendPushNotification(recipient)
public registerWithPushNotificationsServer = () => this.provider.registerWithPushNotificationsServer()
public sendSimpleMessage = async ({ to, message }) => {
Expand Down Expand Up @@ -664,7 +688,7 @@ export class Bot extends EventEmitter implements IReady, IHasModels {
const { changes, async, spread } = opts
const base = EventTopics.resource.save
const topic = async ? base.async : base.sync
const payloads = await Promise.all(changes.map(change => maybeAddOld(this, change, async)))
const payloads = await Promise.map(changes, change => maybeAddOld(this, change, async))
return spread
? await this.fireBatch(topic, payloads)
: await this.fire(topic.batch, payloads)
Expand Down
1 change: 1 addition & 0 deletions src/bot/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ const normalizeSendOpts = async (bot, opts) => {

bot.objects.presignEmbeddedMediaLinks(object)
opts = _.omit(opts, 'to')
opts.object = object
opts.recipient = normalizeRecipient(to)
// if (typeof opts.object === 'string') {
// opts.object = {
Expand Down
5 changes: 3 additions & 2 deletions src/in-house-bot/plugins/centrix.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,9 @@ class CentrixAPI {
let status
try {
this.logger.debug(`running ${centrixOpName} with Centrix`)
// rawData = FIXTURES[idType === DOCUMENT_TYPES.passport ? 'passport' : 'license']
rawData = await this.centrix[method](props)
debugger
rawData = FIXTURES[idType === DOCUMENT_TYPES.passport ? 'passport' : 'license']
// rawData = await this.centrix[method](props)
} catch (err) {
this.logger.debug(`Centrix ${centrixOpName} verification failed`, err.stack)
rawData = {}
Expand Down

0 comments on commit 6e7d1ea

Please sign in to comment.