Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add costLogger #158

Merged
merged 14 commits into from
Mar 14, 2023
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## v1.6.0

- Support passing a `costLogger` which will be called with casted `ConsumedCapacity`
- Support reusing DynamoDB Client by passing `dynoInstance`

## v1.5.2

- update reduceCapacity to support new data shape [#157](https://github.com/mapbox/dyno/pull/157)
Expand Down
106 changes: 72 additions & 34 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
/* eslint-env es6 */
var AWS = require('aws-sdk');
var DynamoDBSet = require('aws-sdk/lib/dynamodb/set');
var _ = require('underscore');
const util = require('./lib/util');

module.exports = Dyno;

Expand All @@ -24,6 +26,8 @@ module.exports = Dyno;
* @param {string} [options.sessionToken] - credentials for the client to utilize
* @param {object} [options.logger] - a writable stream for detailed logging from the aws-sdk. See [constructor docs for details](http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/DynamoDB.html#constructor-property).
* @param {number} [options.maxRetries] - number of times to retry on retryable errors. See [constructor docs for details](http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/DynamoDB.html#constructor-property).
* @param {function} [options.costLogger] - a function that will be called with consumedCapacity
miafan23 marked this conversation as resolved.
Show resolved Hide resolved
* @param {Dyno} [options.dynoInstance] - a Dyno instance that will share the underlying AWS client with this one
* @returns {client} a dyno client
* @example
* var Dyno = require('dyno');
Expand All @@ -33,7 +37,6 @@ module.exports = Dyno;
* });
*/
function Dyno(options) {

/**
* A dyno client which extends the [aws-sdk's DocumentClient](http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/DynamoDB/DocumentClient.html).
*
Expand All @@ -58,26 +61,52 @@ function Dyno(options) {
* be present if the `ReturnConsumedCapacity` parameter was set on the initial
* request.
*/
if (!options) {
throw new Error('options is required');
}

if (!options.table) throw new Error('table is required'); // Demand table be specified
if (!options.region) throw new Error('region is required');
let client;
let docClient;
let tableFreeClient;
let tableFreeDocClient;
let config = {};

var config = {
region: options.region,
endpoint: options.endpoint,
params: { TableName: options.table }, // Sets `TableName` in every request
httpOptions: options.httpOptions || { timeout: 5000 }, // Default appears to be 2 min
accessKeyId: options.accessKeyId,
secretAccessKey: options.secretAccessKey,
sessionToken: options.sessionToken,
logger: options.logger,
maxRetries: options.maxRetries
};
// Reuse DynamoDB Client
if (options.dynoInstance && options.dynoInstance._client) {
const dynoInstance = options.dynoInstance;
if (Object.keys(options).some(o => ['region', 'endpoint', 'table'].includes(o))) {
Copy link
Contributor Author

@miafan23 miafan23 Mar 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AlexanderBelokon I made some changes there to allow for options other than costLogger, as there might be other options like read and write. I only rejected options containing basic table config to remind the user that there's no need to provide DynamoDB config.

throw new Error('No need to provide DynamoDB config when reusing Dynamodb client');
}
client = dynoInstance._client;
docClient = dynoInstance._docClient;
tableFreeClient = dynoInstance._tableFreeClient;
tableFreeDocClient = dynoInstance._tableFreeDocClient;
config = dynoInstance.config;
} else {
if (!options.table) throw new Error('table is required'); // Demand table be specified
if (!options.region) throw new Error('region is required');

var client = new AWS.DynamoDB(config);
var docClient = new AWS.DynamoDB.DocumentClient({ service: client });
var tableFreeClient = new AWS.DynamoDB(_(config).omit('params')); // no TableName in batch requests
var tableFreeDocClient = new AWS.DynamoDB.DocumentClient({ service: tableFreeClient });
config = {
region: options.region,
endpoint: options.endpoint,
params: { TableName: options.table }, // Sets `TableName` in every request
httpOptions: options.httpOptions || { timeout: 5000 }, // Default appears to be 2 min
accessKeyId: options.accessKeyId,
secretAccessKey: options.secretAccessKey,
sessionToken: options.sessionToken,
logger: options.logger,
maxRetries: options.maxRetries
};

client = new AWS.DynamoDB(config);
docClient = new AWS.DynamoDB.DocumentClient({ service: client });
tableFreeClient = new AWS.DynamoDB(_(config).omit('params')); // no TableName in batch requests
tableFreeDocClient = new AWS.DynamoDB.DocumentClient({ service: tableFreeClient });

}

const wrappedDocClient = util.wrapDocClient(docClient, options.costLogger);
const wrappedTableFreeDocClient = util.wrapDocClient(tableFreeDocClient, options.costLogger);

// Straight-up inherit several functions from aws-sdk so we can also inherit docs and tests
var nativeFunctions = {
Expand Down Expand Up @@ -110,7 +139,7 @@ function Dyno(options) {
* @param {function} [callback] - a function to handle the response. See [DocumentClient.batchGet](http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/DynamoDB/DocumentClient.html#batchGet-property) for details.
* @returns {Request}
*/
batchGetItem: tableFreeDocClient.batchGet.bind(tableFreeDocClient),
batchGetItem: wrappedTableFreeDocClient.batchGet,
/**
* Perform a batch of write operations. Passthrough to [DocumentClient.batchWrite](http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/DynamoDB/DocumentClient.html#batchWrite-property).
*
Expand All @@ -120,7 +149,7 @@ function Dyno(options) {
* @param {function} [callback] - a function to handle the response. See [DocumentClient.batchWrite](http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/DynamoDB/DocumentClient.html#batchWrite-property) for details.
* @returns {Request}
*/
batchWriteItem: tableFreeDocClient.batchWrite.bind(tableFreeDocClient),
batchWriteItem: wrappedTableFreeDocClient.batchWrite,
/**
* Delete a single record. Passthrough to [DocumentClient.delete](http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/DynamoDB/DocumentClient.html#delete-property).
*
Expand All @@ -130,7 +159,7 @@ function Dyno(options) {
* @param {function} [callback] - a function to handle the response. See [DocumentClient.delete](http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/DynamoDB/DocumentClient.html#delete-property) for details.
* @returns {Request}
*/
deleteItem: docClient.delete.bind(docClient),
deleteItem: wrappedDocClient.delete,
/**
* Get a single record. Passthrough to [DocumentClient.get](http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/DynamoDB/DocumentClient.html#get-property).
*
Expand All @@ -140,7 +169,7 @@ function Dyno(options) {
* @param {function} [callback] - a function to handle the response. See [DocumentClient.get](http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/DynamoDB/DocumentClient.html#get-property) for details.
* @returns {Request}
*/
getItem: docClient.get.bind(docClient),
getItem: wrappedDocClient.get,
/**
* Put a single record. Passthrough to [DocumentClient.put](http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/DynamoDB/DocumentClient.html#put-property).
*
Expand All @@ -150,7 +179,7 @@ function Dyno(options) {
* @param {function} [callback] - a function to handle the response. See [DocumentClient.put](http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/DynamoDB/DocumentClient.html#put-property) for details.
* @returns {Request}
*/
putItem: docClient.put.bind(docClient),
putItem: wrappedDocClient.put,
/**
* Update a single record. Passthrough to [DocumentClient.update](http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/DynamoDB/DocumentClient.html#update-property).
*
Expand All @@ -160,7 +189,7 @@ function Dyno(options) {
* @param {function} [callback] - a function to handle the response. See [DocumentClient.update](http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/DynamoDB/DocumentClient.html#update-property) for details.
* @returns {Request}
*/
updateItem: docClient.update.bind(docClient)
updateItem: wrappedDocClient.update
};

/**
Expand Down Expand Up @@ -226,7 +255,7 @@ function Dyno(options) {
* @param {object} params - unbounded batchGetItem request parameters. See [DocumentClient.batchGet](http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/DynamoDB/DocumentClient.html#batchGet-property) for details.
* @returns {RequestSet}
*/
batchGetItemRequests: require('./lib/requests')(docClient).batchGetItemRequests,
batchGetItemRequests: require('./lib/requests')(wrappedDocClient).batchGetItemRequests,
/**
* Break a large batch of write operations into a set of requests that can be
* sent individually or concurrently.
Expand All @@ -236,7 +265,7 @@ function Dyno(options) {
* @param {object} params - unbounded batchWriteItem request parameters. See [DocumentClient.batchWrite](http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/DynamoDB/DocumentClient.html#batchWrite-property) for details.
* @returns {RequestSet}
*/
batchWriteItemRequests: require('./lib/requests')(docClient).batchWriteItemRequests,
batchWriteItemRequests: require('./lib/requests')(wrappedDocClient).batchWriteItemRequests,
/**
* Break a large batch of get operations into a set of requests that are intended
* to be sent concurrently.
Expand All @@ -246,7 +275,7 @@ function Dyno(options) {
* @param {object} params - unbounded batchGetItem request parameters. See [DocumentClient.batchGet](http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/DynamoDB/DocumentClient.html#batchGet-property) for details.
* @returns {CompleteRequestSet}
*/
batchGetAll: require('./lib/requests')(docClient).batchGetAll,
batchGetAll: require('./lib/requests')(wrappedDocClient).batchGetAll,
/**
* Break a large batch of write operations into a set of requests that are intended
* to be sent concurrently.
Expand All @@ -256,7 +285,7 @@ function Dyno(options) {
* @param {object} params - unbounded batchWriteItem request parameters. See [DocumentClient.batchWrite](http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/DynamoDB/DocumentClient.html#batchWrite-property) for details.
* @returns {CompleteRequestSet}
*/
batchWriteAll: require('./lib/requests')(docClient).batchWriteAll,
batchWriteAll: require('./lib/requests')(wrappedDocClient).batchWriteAll,
/**
* Create a table. Passthrough to [DynamoDB.createTable](http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/DynamoDB.html#createTable-property),
* except the function polls DynamoDB until the table is ready to accept
Expand Down Expand Up @@ -291,7 +320,7 @@ function Dyno(options) {
* @param {object} params - query request parameters. See [DocumentClient.query](http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/DynamoDB/DocumentClient.html#query-property) for details.
* @returns {ReadableStream}
*/
queryStream: require('./lib/stream')(docClient).query,
queryStream: require('./lib/stream')(wrappedDocClient).query,
/**
* Provide the results of a scan as a [Readable Stream](https://nodejs.org/api/stream.html#stream_class_stream_readable).
* This function will paginate through query responses, making HTTP requests
Expand All @@ -302,7 +331,7 @@ function Dyno(options) {
* @param {object} params - scan request parameters. See [DocumentClient.scan](http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/DynamoDB/DocumentClient.html#scan-property) for details.
* @returns {ReadableStream}
*/
scanStream: require('./lib/stream')(docClient).scan,
scanStream: require('./lib/stream')(wrappedDocClient).scan,
/**
* Creates a [Writable stream](https://nodejs.org/api/stream.html#stream_class_stream_writable).
* Writing individual records to the stream will aggregate them into sets of
Expand All @@ -316,7 +345,7 @@ function Dyno(options) {
* on outgoing BatchWriteItem requests.
* @returns a [Writable stream](https://nodejs.org/api/stream.html#stream_class_stream_writable)
*/
putStream: require('./lib/stream')(docClient, options.table).put,
putStream: require('./lib/stream')(wrappedDocClient, options.table).put,
/**
* Query a table or secondary index. Passthrough to [DocumentClient.query](http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/DynamoDB/DocumentClient.html#query-property).
*
Expand All @@ -327,7 +356,7 @@ function Dyno(options) {
* @param {function} [callback] - a function to handle the response. See [DocumentClient.query](http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/DynamoDB/DocumentClient.html#query-property) for details.
* @returns a Request if not paginating, or a ReadableStream if multiple pages were requested
*/
query: require('./lib/paginated')(docClient).query,
query: require('./lib/paginated')(wrappedDocClient).query,
/**
* Scan a table or secondary index. Passthrough to [DocumentClient.scan](http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/DynamoDB/DocumentClient.html#scan-property).
*
Expand All @@ -338,7 +367,7 @@ function Dyno(options) {
* @param {function} [callback] - a function to handle the response. See [DocumentClient.scan](http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/DynamoDB/DocumentClient.html#scan-property) for details.
* @returns a Request if not paginating, or a ReadableStream if multiple pages were requested
*/
scan: require('./lib/paginated')(docClient).scan
scan: require('./lib/paginated')(wrappedDocClient).scan
};

// Drop specific functions from read/write only clients
Expand All @@ -364,7 +393,14 @@ function Dyno(options) {
}

// Glue everything together
return _({ config: config, defaultTable: options.tableName }).extend(nativeFunctions, dynoExtensions);
return _({
config: config,
defaultTable: options.tableName,
_client: client,
_docClient: docClient,
_tableFreeClient: tableFreeClient,
_tableFreeDocClient: tableFreeDocClient,
}).extend(nativeFunctions, dynoExtensions);
}

/**
Expand All @@ -381,6 +417,8 @@ Dyno.multi = function(readOptions, writeOptions) {

return _({}).extend(write, read, {
config: { read: read.config, write: write.config },
read: read,
write: write,
createTable: require('./lib/table')(read, write).multiCreate,
deleteTable: require('./lib/table')(read, write).multiDelete
});
Expand Down
14 changes: 10 additions & 4 deletions lib/stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,11 @@ var reduceCapacity = require('./util').reduceCapacity;
module.exports = function(client, tableName) {
var stream = {};

function readableStream(request, params, options) {
function readableStream(requestType, params, options) {
options = options || {};
var pages = options.pages || Infinity;

var readable = new Readable(_({ objectMode: true }).defaults(options));
var nextRequest = client[request](params);
var nextRequest = client[requestType](params);
var pending = false;
var items = [];

Expand All @@ -37,7 +36,14 @@ module.exports = function(client, tableName) {
}

pages--;
nextRequest = pages && response.hasNextPage() ? response.nextPage() : false;

nextRequest = pages && response.hasNextPage()
? client[requestType](
Object.assign({}, params, {
ExclusiveStartKey: readable.LastEvaluatedKey,
})
)
: false;

response.data.Items.forEach(function(item) { items.push(item); });
readable._read();
Expand Down
Loading