From 92bfa870d96bfa2e34959b4fc9446b773c430490 Mon Sep 17 00:00:00 2001 From: Toshimitsu Takahashi Date: Mon, 3 Oct 2016 15:41:57 +0900 Subject: [PATCH] Support wrapFunc --- README.md | 6 + lib/processor.js | 398 ++++++++++++++++--------------- test/processor_wrap_func.test.js | 203 ++++++++++++++++ 3 files changed, 416 insertions(+), 191 deletions(-) create mode 100644 test/processor_wrap_func.test.js diff --git a/README.md b/README.md index bd297f1..4e37545 100644 --- a/README.md +++ b/README.md @@ -25,6 +25,12 @@ $ npm install -save dynamo-processor const dp = require('dynamo-processor')({ region: 'ap-northeast-1' }); ``` +`require('dynamo-processor')(options)` + +* **options** `` AWS Config options as well + * **wrapFunc** `` If this is true, proc method returns a _Function_ that wraps the _Promise_ in case that promise evaluation need lazy. (default is false) + + ### dp#proc `proc` method is to analyze an item and to process the item by the action diff --git a/lib/processor.js b/lib/processor.js index 7975aed..2f7f48a 100644 --- a/lib/processor.js +++ b/lib/processor.js @@ -7,10 +7,9 @@ const AWS = require('aws-sdk'); const Expression = require('./expression'); const DocClient = require('./doc_client'); -let client; /** Default logger to console */ -let logger = { +const defaultLogger = { debug: function(){ console.log(util.inspect(arguments[0], false, null)); }, @@ -19,216 +18,233 @@ let logger = { error: function(){ console.error.apply(console, arguments) }, } +const MSG_INVALID_EXPRESSION = 'The document path provided in the update expression is invalid for update'; -/** - * getItem - * @param {string} table - TableName - * @param {object} key - Key - */ -function get(table, key) { - const params = { - TableName: table, - Key: key - }; - - return client.get(params) - .then((data) => { - if (data.Item) { - logger.info('Got %s from %s table', getKey(key), table); - return data.Item; - } else { - logger.info('Not found %s on %s table', getKey(key), table); - return null; - } - }); +function getKey(key) { + return _.values(key).slice(0, 2).join(' - ') } /** - * batchGetItem - * @param {string} table - TableName - * @param {Array} keys - Keys + * DynamoProcessor */ -function batchGet(table, keys) { - const params = { - RequestItems: {} - }; - - params.RequestItems[table] = { - Keys: keys - }; - - return client.batchGet(params) - .then((data) => { - logger.info('Batch got from %s table', table); - return data.Responses[table]; - }); -} +class Processor { + constructor(opts, client) { + this.logger = opts.logger || defaultLogger; + delete opts.logger; -/** - * putItem - * @param {String} table - table name - * @param {Object} item - Item - */ -function put(table, item) { - const params = { - TableName: table, - Item: item - }; - - return client.put(params) - .then((data) => { - logger.info('Put %s on %s table', getKey(item), table); - return data; - }); -} + this._wrapFunc = opts.wrapFunc || false; + delete opts.wrapFunc; -/** - * batchWriteItems - * @param {String} table - table name - * @param {Array} items - Items - */ -function batchWrite(table, items) { - const params = { - RequestItems: {} - }; - - params.RequestItems[table] = items.map((item) => { - return { - PutRequest: { - Item: item + const awsOpts = opts || {}; + this.dynamodb = new AWS.DynamoDB(awsOpts); + this.docClient = DocClient(new AWS.DynamoDB.DocumentClient(awsOpts)); + } + + /** + * getItem + * @param {string} table - TableName + * @param {object} key - Key + */ + get(table, key) { + const params = { + TableName: table, + Key: key + }; + + const f = () => { + return this.docClient.get(params) + .then(data => { + if (data.Item) { + this.logger.info('Got %s from %s table', getKey(key), table); + return data.Item; + } else { + this.logger.info('Not found %s on %s table', getKey(key), table); + return null; } - } - }); + }); + }; - return client.batchWrite(params) - .then((data) => { - logger.info('Batch wrote on %s table', table); - return data; - }); -} + return this._wrapFunc ? f : f(); + } -const MSG_INVALID_EXPRESSION = 'The document path provided in the update expression is invalid for update'; + /** + * batchGetItem + * @param {string} table - TableName + * @param {Array} keys - Keys + */ + batchGet(table, keys) { + const params = { + RequestItems: {} + }; + + params.RequestItems[table] = { + Keys: keys + }; + + const f = () => { + return this.docClient.batchGet(params) + .then(data => { + this.logger.info('Batch got from %s table', table); + return data.Responses[table]; + }); + }; + + return this._wrapFunc ? f : f(); + } -/** - * updateItem - * @param {string} table - TableName - * @param {object} key - Key - * @param {object} ops - Operations - * @param {object} init - Initial fields - */ -function update(table, key, ops, init) { - const exp = new Expression(init); - const params = exp.generate(table, key, ops); - const itemKey = getKey(key); - - return client.update(params) - .then((data) => { - logger.info('Updated %s on %s table', itemKey, table); - return data; - }) - .catch((err) => { - if (init && err.code === 'ValidationException' - && err.message === MSG_INVALID_EXPRESSION) { - - logger.warn('Failed to update %s on %s table because some fields not initialized', itemKey, table); - - const paramsWithInit = exp.generate(table, key, ops, true); - return client.update(paramsWithInit) - .then((data) => { - logger.info('Updated %s with initial fields on %s table', itemKey, table); - return data; - }) - .catch((err) => { - if (err.code === 'ConditionalCheckFailedException') { - // An another client has already set them same attributes. - // Try to update it at first again because attributes were initialized. - logger.warn('Failed to update %s with initial fields on %s table because of conflict', itemKey, table); - - return client.update(params) - .then((data) => { - logger.info('Updated %s on %s table', itemKey, table); - return data; - }) + /** + * putItem + * @param {String} table - table name + * @param {Object} item - Item + */ + put(table, item) { + const params = { + TableName: table, + Item: item + }; + + const f = () => { + return this.docClient.put(params) + .then(data => { + this.logger.info('Put %s on %s table', getKey(item), table); + return data; + }); + }; + + return this._wrapFunc ? f : f(); + } + + /** + * batchWriteItems + * @param {String} table - table name + * @param {Array} items - Items + */ + batchWrite(table, items) { + const params = { + RequestItems: {} + }; + + params.RequestItems[table] = items.map((item) => { + return { + PutRequest: { + Item: item } + } + }); - logger.error(err); - throw err - }); - } + const f = () => { + return this.docClient.batchWrite(params) + .then(data => { + this.logger.info('Batch wrote on %s table', table); + return data; + }); + }; - logger.error(err); - throw err - }); -} + return this._wrapFunc ? f : f(); + } -function getKey(key) { - return _.values(key).slice(0, 2).join(' - ') -} + /** + * updateItem + * @param {string} table - TableName + * @param {object} key - Key + * @param {object} ops - Operations + * @param {object} init - Initial fields + */ + update(table, key, ops, init) { + const exp = new Expression(init); + const params = exp.generate(table, key, ops); + const itemKey = getKey(key); + + const f = () => { + return this.docClient.update(params) + .then(data => { + this.logger.info('Updated %s on %s table', itemKey, table); + return data; + }) + .catch((err) => { + if (init && err.code === 'ValidationException' + && err.message === MSG_INVALID_EXPRESSION) { + + this.logger.warn('Failed to update %s on %s table because some fields not initialized', itemKey, table); + + const paramsWithInit = exp.generate(table, key, ops, true); + return this.docClient.update(paramsWithInit) + .then(data => { + this.logger.info('Updated %s with initial fields on %s table', itemKey, table); + return data; + }) + .catch(err => { + if (err.code === 'ConditionalCheckFailedException') { + // An another client has already set them same attributes. + // Try to update it at first again because attributes were initialized. + this.logger.warn('Failed to update %s with initial fields on %s table because of conflict', itemKey, table); + + return this.docClient.update(params) + .then(data => { + this.logger.info('Updated %s on %s table', itemKey, table); + return data; + }) + } + + this.logger.error(err); + throw err + }); + } -function proc(data, opts) { - const opts_ = opts || {}; - const useBatch = 'useBatch' in opts_ ? opts_.useBatch : true; - const table = data.table || opts_.table; - - if (data.action === 'put' || data.item || data.items) { - if (data.item) { - return put(table, data.item); - } else if (data.items) { - if (useBatch) { - return batchWrite(table, data.items); - } else { - return data.items.map((item) => { - return put(table, item); - }); + this.logger.error(err); + throw err + }); + }; + + return this._wrapFunc ? f : f(); + } + + proc(data, opts) { + const opts_ = opts || {}; + const useBatch = 'useBatch' in opts_ ? opts_.useBatch : true; + const table = data.table || opts_.table; + + if (data.action === 'put' || data.item || data.items) { + if (data.item) { + return this.put(table, data.item); + } else if (data.items) { + if (useBatch) { + return this.batchWrite(table, data.items); + } else { + return data.items.map((item) => { + return this.put(table, item); + }); + } } - } - } else if (data.action === 'update' || data.set - || data.add || data.remove || data.pushset) { - - return update(table, data.key, { - set: data.set, - add: data.add, - remove: data.remove, - pushset: data.pushset - }, opts_.initFields); - - } else if (data.action === 'get' || data.key || data.keys) { - if (data.key) { - return get(table, data.key); - } else if (data.keys) { - if (useBatch) { - return batchGet(table, data.keys); - } else { - return data.keys.map((key) => { - return get(table, key); - }); + } else if (data.action === 'update' || data.set + || data.add || data.remove || data.pushset) { + + return this.update(table, data.key, { + set: data.set, + add: data.add, + remove: data.remove, + pushset: data.pushset + }, opts_.initFields); + + } else if (data.action === 'get' || data.key || data.keys) { + if (data.key) { + return this.get(table, data.key); + } else if (data.keys) { + if (useBatch) { + return this.batchGet(table, data.keys); + } else { + return data.keys.map((key) => { + return this.get(table, key); + }); + } } } - } - return Promise.resolve(); + return Promise.resolve(); + } } -module.exports = function(opts) { - const opts_ = opts || {}; - if (opts_.logger) { - logger = opts_.logger; - delete opts_.logger; - } - - const awsOpts = opts_ || {}; - const documentClient = new AWS.DynamoDB.DocumentClient(awsOpts); - client = DocClient(documentClient); - - return { - dynamodb: new AWS.DynamoDB(awsOpts), - docClient: client, - get: get, - put: put, - update: update, - batchGet: batchGet, - batchWrite: batchWrite, - proc: proc, - }; +module.exports = function(options) { + return new Processor(options || {}); } diff --git a/test/processor_wrap_func.test.js b/test/processor_wrap_func.test.js new file mode 100644 index 0000000..f0561b4 --- /dev/null +++ b/test/processor_wrap_func.test.js @@ -0,0 +1,203 @@ +const AWS = require('aws-sdk'); +const _ = require('lodash'); +const chai = require('chai'); +const expect = chai.expect; +const assert = chai.assert; +const helper = require('./helper'); + +const opts = _.cloneDeep(helper.awsOpts); +opts.wrapFunc = true; + +const dp = require('../main')(opts); + +describe('DynamoProcessor with wrapFunc = true', () => { + before(helper.createTable); + after(helper.deleteTable); + + describe('#proc', () => { + const data = { + id: 1, + name: 'Taro', + age: 16, + weight: 55.3 + }; + + before(() => { + return helper.putDoc(data); + }); + + it('gets an item', () => { + return dp.proc({ + table: 'tests', + key: { id: 1 } + })() + .then((item) => { + expect(item).to.deep.equal(data); + }); + + }); + + it('gets null', () => { + return dp.proc({ + table: 'tests', + key: { id: -1 } + })() + .then((item) => { + expect(item).to.be.null; + }); + }); + + it('puts an item', () => { + data.id = 2; + return dp.proc({ + table: 'tests', + item: data + })() + .then(() => { + return helper.getDoc(2); + }) + .then((dbItem) => { + expect(dbItem).to.deep.equal(data); + }); + }); + + it('updates an item', () => { + return dp.proc({ + table: 'tests', + key: { id: 3 }, + set: { name: 'Ken' }, + add: { age: 10 }, + pushset: { cards:[1, 2] } + })() + .then(() => { + return helper.getDoc(3); + }) + .then((dbItem) => { + expect(dbItem).to.deep.equal({ + id: 3, name: 'Ken', age: 10, + cards: helper.docClient.createSet([1, 2]) + }); + }); + }); + + it('updates an item with remove', () => { + return dp.proc({ + table: 'tests', + key: { id: 2 }, + remove: ['weight'] + })() + .then((item) => { + delete data.weight; + expect(item).to.deep.equal(data); + }); + }); + + context('with initFields', () => { + it('updates an item with initial fields', () => { + return dp.proc({ + table: 'tests', + key: { id: 4 }, + set: { + 'map1 foo': 1 + }, + pushset: { + 'map2 bar': 'a' + }, + add: { + 'map2 size': 3 + } + }, { + initFields: { + map1: {}, map2: {}, list: [] + } + })() + .then((item) => { + expect(item).to.deep.equal({ + id: 4, + list: [], + map1: { foo: 1 }, + map2: { + bar: helper.docClient.createSet(['a']), + size: 3 + } + }); + }); + }) + }); + + context('multiple items', () => { + const data1 = { id: 10, name: 'Karen' }; + const data2 = { id: 11, name: 'Hana' }; + const data3 = { id: 12, name: 'Nancy' }; + const data4 = { id: 13, name: 'Jiro' }; + + before(() => { + return Promise.all([ + helper.putDoc(data1), + helper.putDoc(data2), + ]) + }); + + it('gets items', () => { + return dp.proc({ + table: 'tests', + keys: [{ id: 10 }, { id: 11 }] + })() + .then(items => { + expect(_.sortBy(items, 'id')) + .to.deep.equal(_.sortBy([data1, data2], 'id')); + }); + }); + + it('gets items as function array', () => { + const promises = dp.proc({ + table: 'tests', + keys: [{ id: 10 }, { id: 11 }] + }, { useBatch: false }) + .map(f => f()); + + return Promise.all(promises) + .then((items) => { + expect(items).to.deep.equal([data1, data2]); + }); + }); + + it('puts items', () => { + return dp.proc({ + table: 'tests', + items: [data3, data4] + })() + .then(() => { + return helper.getDoc(12); + }) + .then(dbItem => { + expect(dbItem).to.deep.equal(data3); + return helper.getDoc(13); + }) + .then(dbItem => { + expect(dbItem).to.deep.equal(data4); + }); + }); + + it('puts items as function array', () => { + const promises = dp.proc({ + table: 'tests', + items: [data3, data4] + }, { useBatch: false }) + .map(f => f()); + + return Promise.all(promises) + .then(item => { + return helper.getDoc(12); + }) + .then(dbItem => { + expect(dbItem).to.deep.equal(data3); + return helper.getDoc(13); + }) + .then(dbItem => { + expect(dbItem).to.deep.equal(data4); + }); + }); + }); + }); +});