From 9a2b5c56522433541093b6d2575662db8b4d8006 Mon Sep 17 00:00:00 2001 From: Mark Vayngrib Date: Sat, 16 Dec 2017 17:08:06 -0500 Subject: [PATCH] fix: load iot endpoint async on lambda init (for lambdas that might send messages). --- lib/bot/index.js | 2 +- lib/bot/lambda/onmessage.js | 4 ++++ lib/bot/lambda/onmessagestream.js | 4 ++++ lib/errors.js | 14 +++++++------ lib/lambda/http/inbox.js | 4 ++++ lib/lambda/http/onmessage.js | 4 ++++ lib/lambda/mqtt/onconnect.js | 4 ++++ lib/lambda/mqtt/onsubscribe.js | 4 ++++ lib/push.js | 8 ++++++-- lib/task-manager.js | 6 +++--- lib/test/utils.test.js | 29 +++++++++++++++++++++++++++ lib/user.js | 2 +- lib/utils.js | 22 +++++++++++++++++---- serverless-uncompiled.yml | 6 +++--- src/bot/index.ts | 2 +- src/bot/lambda/onmessage.ts | 5 +++++ src/bot/lambda/onmessagestream.ts | 5 +++++ src/errors.ts | 16 +++++++++------ src/lambda/http/inbox.ts | 5 +++++ src/lambda/http/onmessage.ts | 5 +++++ src/lambda/mqtt/onconnect.ts | 5 +++++ src/lambda/mqtt/onsubscribe.ts | 5 +++++ src/push.ts | 13 ++++++++---- src/task-manager.ts | 7 ++++--- src/test/utils.test.ts | 33 +++++++++++++++++++++++++++++++ src/user.ts | 2 +- src/utils.ts | 23 ++++++++++++++++++--- 27 files changed, 201 insertions(+), 38 deletions(-) diff --git a/lib/bot/index.js b/lib/bot/index.js index ecbb9cfaa..3cc101b4d 100644 --- a/lib/bot/index.js +++ b/lib/bot/index.js @@ -24,7 +24,7 @@ const convenience_1 = require("./convenience"); const { TYPE, SIG } = constants; const promisePassThrough = data => Promise.resolve(data); const PROXY_TO_TRADLE = [ - 'aws', 'objects', 'db', 'dbUtils', 'lambdaUtils', 'seals', + 'aws', 'objects', 'db', 'dbUtils', 'lambdaUtils', 'iot', 'seals', 'identities', 'history', 'messages', 'friends', 'resources', 'env', 'router', 'buckets', 'tables', 'serviceMap', 'version', 'apiBaseUrl', 'tasks' diff --git a/lib/bot/lambda/onmessage.js b/lib/bot/lambda/onmessage.js index 9a5e624ab..c93eb9fe7 100644 --- a/lib/bot/lambda/onmessage.js +++ b/lib/bot/lambda/onmessage.js @@ -6,6 +6,10 @@ exports.createLambda = (opts) => { return exports.outfitLambda(opts.bot.createLambda(Object.assign({ source: lambda_1.EventSource.LAMBDA }, opts)), opts); }; exports.outfitLambda = (lambda, opts) => { + lambda.tasks.add({ + name: 'getiotendpoint', + promiser: lambda.bot.iot.getEndpoint + }); lambda.use(onmessage_1.onmessage(lambda, opts)); return lambda; }; diff --git a/lib/bot/lambda/onmessagestream.js b/lib/bot/lambda/onmessagestream.js index 2bea6ab65..41cac42e1 100644 --- a/lib/bot/lambda/onmessagestream.js +++ b/lib/bot/lambda/onmessagestream.js @@ -28,6 +28,10 @@ exports.outfitLambda = (lambda, opts) => { throw new Error(failed[0]); } }; + lambda.tasks.add({ + name: 'getiotendpoint', + promiser: bot.iot.getEndpoint + }); lambda.use((ctx, next) => __awaiter(this, void 0, void 0, function* () { const { event } = ctx; event.bot = bot; diff --git a/lib/errors.js b/lib/errors.js index add2f7020..365675aee 100644 --- a/lib/errors.js +++ b/lib/errors.js @@ -61,7 +61,13 @@ const rethrow = (err, type) => { throw err; } }; -const HttpError = createError('HttpError'); +const _HttpError = createError('HttpError'); +class HttpError extends Error { + constructor(code, message) { + super(message); + this.status = code || 500; + } +} const errors = { ClientUnreachable: createError('ClientUnreachable'), NotFound: createError('NotFound'), @@ -80,11 +86,7 @@ const errors = { TimeTravel: createError('TimeTravel'), ExecutionTimeout: createError('ExecutionTimeout'), Exists: createError('Exists'), - HttpError: (code, message) => { - const err = new HttpError(message); - err.status = code; - return err; - }, + HttpError, Timeout: createError('Timeout'), export: (err) => { return pick(err, ['message', 'stack', 'name', 'type']); diff --git a/lib/lambda/http/inbox.js b/lib/lambda/http/inbox.js index 66edfba55..ccbfdc669 100644 --- a/lib/lambda/http/inbox.js +++ b/lib/lambda/http/inbox.js @@ -27,6 +27,10 @@ const lambda = new lambda_1.Lambda({ source: lambda_1.EventSource.HTTP, tradle: _1.tradle }); +lambda.tasks.add({ + name: 'getiotendpoint', + promiser: _1.tradle.iot.getEndpoint +}); lambda.use(cors()); lambda.use(bodyParser({ jsonLimit: '10mb' })); const router = new Router(); diff --git a/lib/lambda/http/onmessage.js b/lib/lambda/http/onmessage.js index 0c8f28e59..d4cd9eac5 100644 --- a/lib/lambda/http/onmessage.js +++ b/lib/lambda/http/onmessage.js @@ -17,6 +17,10 @@ const lambda = new lambda_1.Lambda({ source: lambda_1.EventSource.IOT, tradle: _1.tradle }); +lambda.tasks.add({ + name: 'getiotendpoint', + promiser: _1.tradle.iot.getEndpoint +}); const messageHandler = (ctx) => __awaiter(this, void 0, void 0, function* () { const { message } = ctx.event; const result = yield user.onSentMessage({ message }); diff --git a/lib/lambda/mqtt/onconnect.js b/lib/lambda/mqtt/onconnect.js index 28a5889e1..eaf09c6a9 100644 --- a/lib/lambda/mqtt/onconnect.js +++ b/lib/lambda/mqtt/onconnect.js @@ -14,6 +14,10 @@ const lambda = new lambda_1.Lambda({ source: lambda_1.EventSource.IOT, tradle: _1.tradle }); +lambda.tasks.add({ + name: 'getiotendpoint', + promiser: _1.tradle.iot.getEndpoint +}); lambda.use(({ event, context }) => __awaiter(this, void 0, void 0, function* () { lambda.logger.debug('client connected', event); const { clientId } = event; diff --git a/lib/lambda/mqtt/onsubscribe.js b/lib/lambda/mqtt/onsubscribe.js index 5a08d7cf2..e4687bbe7 100644 --- a/lib/lambda/mqtt/onsubscribe.js +++ b/lib/lambda/mqtt/onsubscribe.js @@ -14,6 +14,10 @@ const lambda = new lambda_1.Lambda({ source: lambda_1.EventSource.IOT, tradle: _1.tradle }); +lambda.tasks.add({ + name: 'getiotendpoint', + promiser: lambda.tradle.iot.getEndpoint +}); lambda.use(({ event, context }) => __awaiter(this, void 0, void 0, function* () { const { clientId, topics } = event; yield _1.tradle.user.onSubscribed({ clientId, topics }); diff --git a/lib/push.js b/lib/push.js index 82c40053b..411813fa8 100644 --- a/lib/push.js +++ b/lib/push.js @@ -8,6 +8,7 @@ var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, ge }); }; Object.defineProperty(exports, "__esModule", { value: true }); +const Cache = require("lru-cache"); const buildResource = require("@tradle/build-resource"); const crypto_1 = require("./crypto"); const utils_1 = require("./utils"); @@ -17,11 +18,11 @@ exports.getNotificationData = ({ nonce, seq }) => crypto_1.sha256(seq + nonce); exports.createSubscriberInfo = () => ({ seq: -1 }); class Push { constructor({ serverUrl, conf, logger }) { - this.ensureRegistered = utils_1.cachifyPromiser(({ identity, key }) => __awaiter(this, void 0, void 0, function* () { + this.ensureRegistered = ({ identity, key }) => __awaiter(this, void 0, void 0, function* () { const registered = yield this.isRegistered(); if (!registered) yield this.register({ identity, key }); - })); + }); this.isRegistered = () => this.registration.exists(this.serverUrl); this.setRegistered = () => __awaiter(this, void 0, void 0, function* () { yield this.registration.put(this.serverUrl, { @@ -109,6 +110,9 @@ class Push { this.registration = pushConf.sub(':reg'); this.subscribers = pushConf.sub(':sub'); this.serverUrl = serverUrl; + this.cache = new Cache({ max: 1 }); + this.logger = logger; + this.ensureRegistered = utils_1.cachifyFunction(this, 'ensureRegistered'); } } exports.default = Push; diff --git a/lib/task-manager.js b/lib/task-manager.js index 937e07bde..e12fcdda3 100644 --- a/lib/task-manager.js +++ b/lib/task-manager.js @@ -13,10 +13,10 @@ const settle_promise_1 = require("settle-promise"); const logger_1 = require("./logger"); const RESOLVED = Promise.resolve(); class TaskManager { - constructor(opts = {}) { + constructor({ logger } = {}) { this.add = (task) => { this.logger.debug('add', { name: task.name }); - const promise = task.promise || RESOLVED.then(task.promiser); + const promise = task.promise || RESOLVED.then(() => task.promiser()); task = Object.assign({}, task, { promise }); const start = Date.now(); promise @@ -55,7 +55,7 @@ class TaskManager { }); return results; }); - this.logger = opts.logger || new logger_1.default('task-manager'); + this.logger = logger || new logger_1.default('task-manager'); this.tasks = []; } } diff --git a/lib/test/utils.test.js b/lib/test/utils.test.js index aa151c2d0..1c1cb67b9 100644 --- a/lib/test/utils.test.js +++ b/lib/test/utils.test.js @@ -91,6 +91,35 @@ test('cachifyFunction', utils_1.loudAsync((t) => __awaiter(this, void 0, void 0, t.equal(i, 2); t.end(); }))); +test('cachifyPromiser', utils_1.loudAsync((t) => __awaiter(this, void 0, void 0, function* () { + const actions = [ + () => __awaiter(this, void 0, void 0, function* () { + throw new Error('test err'); + }), + () => __awaiter(this, void 0, void 0, function* () { + return 'a'; + }) + ]; + let i = 0; + const fn = utils_1.cachifyPromiser(() => actions[i++]()); + try { + yield fn(); + t.fail('expected error'); + } + catch (err) { + t.equal(err.message, 'test err'); + } + t.equal(yield fn(), 'a'); + t.equal(yield fn(), 'a'); + try { + fn('something'); + t.fail('expected error'); + } + catch (err) { + t.ok(/arguments/.test(err.message)); + } + t.end(); +}))); test('wrap', utils_1.loudAsync((t) => __awaiter(this, void 0, void 0, function* () { const lambdaUtils = require('../lambda-utils'); const { performServiceDiscovery } = lambdaUtils; diff --git a/lib/user.js b/lib/user.js index 7b76b3249..e4beb6267 100644 --- a/lib/user.js +++ b/lib/user.js @@ -146,7 +146,7 @@ class UserSim { error: err.stack }); if (!clientId) { - throw Errors.HttpError(400, err.message); + throw new Errors.HttpError(400, err.message); } this.tasks.add({ name: 'delivery:reject', diff --git a/lib/utils.js b/lib/utils.js index 2e1aa0f78..abb7ef21c 100644 --- a/lib/utils.js +++ b/lib/utils.js @@ -146,13 +146,27 @@ function groupBy(items, prop) { return groups; } exports.groupBy = groupBy; -function cachifyPromiser(fn) { +function cachifyPromiser(fn, opts = {}) { let promise; - return function (...args) { - if (!promise) - promise = fn.apply(this, args); + const cachified = (...args) => { + if (args.length) { + const msg = 'functions cachified with cachifyPromiser do not accept arguments'; + if (process.env.IS_OFFLINE) { + throw new Error(msg); + } + else { + console.warn(msg); + } + } + if (!promise) { + promise = fn.call(this); + promise.catch(err => { + promise = null; + }); + } return promise; }; + return cachified; } exports.cachifyPromiser = cachifyPromiser; function firstSuccess(promises) { diff --git a/serverless-uncompiled.yml b/serverless-uncompiled.yml index 50386802e..7e5f734a8 100644 --- a/serverless-uncompiled.yml +++ b/serverless-uncompiled.yml @@ -639,7 +639,7 @@ functions: inbox: handler: lib/lambda/http/inbox.handler - memorySize: 1024 + memorySize: 2048 events: - http: path: inbox @@ -714,7 +714,7 @@ functions: # and passes off to bot engine onmessage: handler: lib/lambda/mqtt/onmessage.handler - memorySize: 1024 + memorySize: 2048 # BOT_ONMESSAGE: ${{self:custom.bot_onmessage}} # SERVERLESS_PREFIX: ${{self:custom.prefix}} # SERVERLESS_STAGE: ${{self:custom.stage}} @@ -921,7 +921,7 @@ functions: # make sure graphql route gets loaded handler: lib/samplebot/lambda/http/graphql.handler # handler: lib/lambda/http/default.handler - memorySize: 512 + memorySize: 1024 events: - http: path: graphql diff --git a/src/bot/index.ts b/src/bot/index.ts index 1ff3d553b..c62e20011 100644 --- a/src/bot/index.ts +++ b/src/bot/index.ts @@ -35,7 +35,7 @@ const { TYPE, SIG } = constants const promisePassThrough = data => Promise.resolve(data) const PROXY_TO_TRADLE = [ - 'aws', 'objects', 'db', 'dbUtils', 'lambdaUtils', 'seals', + 'aws', 'objects', 'db', 'dbUtils', 'lambdaUtils', 'iot', 'seals', 'identities', 'history', 'messages', 'friends', 'resources', 'env', 'router', 'buckets', 'tables', 'serviceMap', 'version', 'apiBaseUrl', 'tasks' diff --git a/src/bot/lambda/onmessage.ts b/src/bot/lambda/onmessage.ts index 2e8ded186..c4f7708ca 100644 --- a/src/bot/lambda/onmessage.ts +++ b/src/bot/lambda/onmessage.ts @@ -9,6 +9,11 @@ export const createLambda = (opts) => { } export const outfitLambda = (lambda, opts) => { + lambda.tasks.add({ + name: 'getiotendpoint', + promiser: lambda.bot.iot.getEndpoint + }) + lambda.use(onmessage(lambda, opts)) return lambda } diff --git a/src/bot/lambda/onmessagestream.ts b/src/bot/lambda/onmessagestream.ts index 8aa95398b..213eabb72 100644 --- a/src/bot/lambda/onmessagestream.ts +++ b/src/bot/lambda/onmessagestream.ts @@ -26,6 +26,11 @@ export const outfitLambda = (lambda, opts) => { } } + lambda.tasks.add({ + name: 'getiotendpoint', + promiser: bot.iot.getEndpoint + }) + lambda.use(async (ctx, next) => { const { event } = ctx event.bot = bot diff --git a/src/errors.ts b/src/errors.ts index 9f7ab4bd2..17d3bf5f3 100644 --- a/src/errors.ts +++ b/src/errors.ts @@ -77,7 +77,15 @@ const rethrow = (err, type) => { } } -const HttpError = createError('HttpError') +const _HttpError = createError('HttpError') +class HttpError extends Error { + public status: number + constructor(code, message) { + super(message) + this.status = code || 500 + } +} + const errors = { ClientUnreachable: createError('ClientUnreachable'), NotFound: createError('NotFound'), @@ -96,11 +104,7 @@ const errors = { TimeTravel: createError('TimeTravel'), ExecutionTimeout: createError('ExecutionTimeout'), Exists: createError('Exists'), - HttpError: (code, message) => { - const err = new HttpError(message) - err.status = code - return err - }, + HttpError, Timeout: createError('Timeout'), export: (err:Error): { type:string, diff --git a/src/lambda/http/inbox.ts b/src/lambda/http/inbox.ts index 91d95ec10..40fce2aa7 100644 --- a/src/lambda/http/inbox.ts +++ b/src/lambda/http/inbox.ts @@ -26,6 +26,11 @@ const lambda = new Lambda({ tradle }) +lambda.tasks.add({ + name: 'getiotendpoint', + promiser: tradle.iot.getEndpoint +}) + lambda.use(cors()) lambda.use(bodyParser({ jsonLimit: '10mb' })) diff --git a/src/lambda/http/onmessage.ts b/src/lambda/http/onmessage.ts index 0d238d8c5..2d90071d7 100644 --- a/src/lambda/http/onmessage.ts +++ b/src/lambda/http/onmessage.ts @@ -9,6 +9,11 @@ const lambda = new Lambda({ tradle }) +lambda.tasks.add({ + name: 'getiotendpoint', + promiser: tradle.iot.getEndpoint +}) + const messageHandler = async (ctx) => { const { message } = ctx.event // the user sent us a message diff --git a/src/lambda/mqtt/onconnect.ts b/src/lambda/mqtt/onconnect.ts index ae958ce6a..3941df08c 100644 --- a/src/lambda/mqtt/onconnect.ts +++ b/src/lambda/mqtt/onconnect.ts @@ -6,6 +6,11 @@ const lambda = new Lambda({ tradle }) +lambda.tasks.add({ + name: 'getiotendpoint', + promiser: tradle.iot.getEndpoint +}) + lambda.use(async ({ event, context }) => { lambda.logger.debug('client connected', event) const { clientId } = event diff --git a/src/lambda/mqtt/onsubscribe.ts b/src/lambda/mqtt/onsubscribe.ts index 633457f7a..d2d219775 100644 --- a/src/lambda/mqtt/onsubscribe.ts +++ b/src/lambda/mqtt/onsubscribe.ts @@ -6,6 +6,11 @@ const lambda = new Lambda({ tradle }) +lambda.tasks.add({ + name: 'getiotendpoint', + promiser: lambda.tradle.iot.getEndpoint +}) + lambda.use(async ({ event, context }) => { const { clientId, topics } = event await tradle.user.onSubscribed({ clientId, topics }) diff --git a/src/push.ts b/src/push.ts index 3614b52f5..6127aa48d 100644 --- a/src/push.ts +++ b/src/push.ts @@ -1,9 +1,10 @@ import superagent = require('superagent') +import Cache = require('lru-cache') import { protocol } from '@tradle/engine' import buildResource = require('@tradle/build-resource') import { ECKey, sha256, randomString } from './crypto' -import { cachifyPromiser, post } from './utils' +import { cachifyFunction, post } from './utils' import Logger from './logger' import Provider from './provider' import KeyValueTable from './key-value-table' @@ -27,10 +28,11 @@ export const getNotificationData = ({ nonce, seq }: { export const createSubscriberInfo = () => ({ seq: -1 }) export default class Push { - private logger: Logger private serverUrl: string private registration: KeyValueTable private subscribers: KeyValueTable + public cache: any + public logger: Logger constructor ({ serverUrl, conf, logger }:{ serverUrl:string conf:KeyValueTable @@ -40,12 +42,15 @@ export default class Push { this.registration = pushConf.sub(':reg') this.subscribers = pushConf.sub(':sub') this.serverUrl = serverUrl + this.cache = new Cache({ max: 1 }) + this.logger = logger + this.ensureRegistered = cachifyFunction(this, 'ensureRegistered') } - public ensureRegistered = cachifyPromiser(async ({ identity, key }) => { + public ensureRegistered = async ({ identity, key }) => { const registered = await this.isRegistered() if (!registered) await this.register({ identity, key }) - }) + } public isRegistered = () => this.registration.exists(this.serverUrl) diff --git a/src/task-manager.ts b/src/task-manager.ts index 4313311db..05ec499cf 100644 --- a/src/task-manager.ts +++ b/src/task-manager.ts @@ -12,16 +12,17 @@ const RESOLVED = Promise.resolve() export class TaskManager { private tasks:Task[] + private logger: Logger - constructor(opts={}) { - this.logger = opts.logger || new Logger('task-manager') + constructor({ logger }: { logger?:Logger }={}) { + this.logger = logger || new Logger('task-manager') this.tasks = [] } public add = (task: Task) => { this.logger.debug('add', { name: task.name }) - const promise = task.promise || RESOLVED.then(task.promiser) + const promise = task.promise || RESOLVED.then(() => task.promiser()) task = { ...task, promise } const start = Date.now() promise diff --git a/src/test/utils.test.ts b/src/test/utils.test.ts index 2d36a6deb..ab2fd2847 100644 --- a/src/test/utils.test.ts +++ b/src/test/utils.test.ts @@ -12,6 +12,7 @@ import { firstSuccess, cachify, cachifyFunction, + cachifyPromiser, clone, batchStringsBySize, promisify, @@ -113,6 +114,38 @@ test('cachifyFunction', loudAsync(async (t) => { t.end() })) +test('cachifyPromiser', loudAsync(async (t) => { + const actions = [ + async () => { + throw new Error('test err') + }, + async () => { + return 'a' + } + ] + + let i = 0 + const fn = cachifyPromiser(() => actions[i++]()) + + try { + await fn() + t.fail('expected error') + } catch (err) { + t.equal(err.message, 'test err') + } + + t.equal(await fn(), 'a') + t.equal(await fn(), 'a') + try { + fn('something') + t.fail('expected error') + } catch (err) { + t.ok(/arguments/.test(err.message)) + } + + t.end() +})) + test('wrap', loudAsync(async (t) => { const lambdaUtils = require('../lambda-utils') const { performServiceDiscovery } = lambdaUtils diff --git a/src/user.ts b/src/user.ts index 7abded543..aceee1c0f 100644 --- a/src/user.ts +++ b/src/user.ts @@ -224,7 +224,7 @@ class UserSim { }) if (!clientId) { - throw Errors.HttpError(400, err.message) + throw new Errors.HttpError(400, err.message) } this.tasks.add({ diff --git a/src/utils.ts b/src/utils.ts index 797b5e65f..0db80b64e 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -1,5 +1,6 @@ import fs = require('fs') import Promise = require('bluebird') +import Cache = require('lru-cache') import querystring = require('querystring') import format = require('string-format') import crypto = require('crypto') @@ -147,13 +148,29 @@ export function groupBy (items, prop) { return groups } -export function cachifyPromiser (fn) { +export function cachifyPromiser (fn, opts={}) { let promise - return function (...args) { - if (!promise) promise = fn.apply(this, args) + const cachified = (...args) => { + if (args.length) { + const msg = 'functions cachified with cachifyPromiser do not accept arguments' + if (process.env.IS_OFFLINE) { + throw new Error(msg) + } else { + console.warn(msg) + } + } + + if (!promise) { + promise = fn.call(this) + promise.catch(err => { + promise = null + }) + } return promise } + + return cachified } // trick from: https://stackoverflow.com/questions/37234191/resolve-es6-promise-with-first-success