diff --git a/README.md b/README.md index eb49012..a08ace5 100644 --- a/README.md +++ b/README.md @@ -22,6 +22,7 @@ Can be run as a command line script or as an npm module. -i, --included-tables only backup these tables -p, --backup-path backup path to store table dumps in. default is DynamoDB-backup-YYYY-MM-DD-HH-mm-ss -e, --base64-encode-binary if passed, encode binary fields in base64 before exporting + -d, --save-datapipeline-format save in format compatible with the AWS datapipeline import. Default to false (save as exported by DynamoDb) --aws-key AWS access key. Will use AWS_ACCESS_KEY_ID env var if --aws-key not set --aws-secret AWS secret key. Will use AWS_SECRET_ACCESS_KEY env var if --aws-secret not set --aws-region AWS region. Will use AWS_DEFAULT_REGION env var if --aws-region not set @@ -76,6 +77,7 @@ var options = { readPercentage: /* only consume this much capacity. expressed as a decimal (i.e. .5 means use 50% of table read capacity). default: .25 */, bucket: /* bucket to upload the backup to */, stopOnFailure: /* whether or not to continue backing up if a single table fails to back up */, + saveDataPipelineFormat /* save in format compatible with the AWS datapipeline import. Default to false (save as exported by DynamoDb) */, awsAccessKey: /* AWS access key */, awsSecretKey: /* AWS secret key */, awsRegion: /* AWS region */, diff --git a/bin/dynamo-backup-to-s3 b/bin/dynamo-backup-to-s3 index 8f34f0d..d4a21a3 100755 --- a/bin/dynamo-backup-to-s3 +++ b/bin/dynamo-backup-to-s3 @@ -25,6 +25,7 @@ program .option('-i, --included-tables ', 'only backup these tables', list) .option('-p, --backup-path ', 'backup path to store table dumps in. default is DynamoDB-backup-YYYY-MM-DD-HH-mm-ss') .option('-e, --base64-encode-binary', 'encode binary fields in base64 before exporting') + .option('-d, --save-data-pipeline-format', 'save in format compatible with the AWS Data Pipeline import. Default to false (save as exported by DynamoDb)') .option('--aws-key ', 'AWS access key. Will use AWS_ACCESS_KEY_ID env var if --aws-key not set') .option('--aws-secret ', 'AWS secret key. Will use AWS_SECRET_ACCESS_KEY env var if --aws-secret not set') .option('--aws-region ', 'AWS region. Will use AWS_DEFAULT_REGION env var if --aws-region not set') @@ -44,7 +45,8 @@ var dynamoBackup = new DynamoBackup({ includedTables: program.includedTables, readPercentage: program.readPercentage, stopOnFailure: program.stopOnFailure, - base64Binary: program.base64EncodeBinary + base64Binary: program.base64EncodeBinary, + saveDataPipelineFormat: program.saveDataPipelineFormat }); dynamoBackup.on('error', function(data) { diff --git a/lib/dynamo-backup.js b/lib/dynamo-backup.js index 63976d8..cbfc48c 100644 --- a/lib/dynamo-backup.js +++ b/lib/dynamo-backup.js @@ -20,6 +20,7 @@ function DynamoBackup(options) { this.bucket = options.bucket; this.stopOnFailure = options.stopOnFailure || false; this.base64Binary = options.base64Binary || false; + this.saveDataPipelineFormat = options.saveDataPipelineFormat || false; this.awsAccessKey = options.awsAccessKey || process.env.AWS_ACCESS_KEY_ID; this.awsSecretKey = options.awsSecretKey || process.env.AWS_SECRET_ACCESS_KEY; this.awsRegion = options.awsRegion || process.env.AWS_DEFAULT_REGION || 'us-east-1'; @@ -49,22 +50,22 @@ DynamoBackup.prototype.backupTable = function (tableName, backupPath, callback) } var upload = new Uploader({ - accessKey: self.awsAccessKey, - secretKey: self.awsSecretKey, - region: self.awsRegion, - bucket: self.bucket, + accessKey: self.awsAccessKey, + secretKey: self.awsSecretKey, + region: self.awsRegion, + bucket: self.bucket, objectName: path.join(backupPath, tableName + '.json'), - stream: stream, - debug: self.debug + stream: stream, + debug: self.debug }); var startTime = moment.utc(); self.emit('start-backup', tableName, startTime); - upload.send(function(err) { + upload.send(function (err) { if (err) { self.emit('error', { table: tableName, - err: err + err: err }); } var endTime = moment.utc(); @@ -75,8 +76,8 @@ DynamoBackup.prototype.backupTable = function (tableName, backupPath, callback) self._copyTable( tableName, - function(items) { - items.forEach(function(item) { + function (items) { + items.forEach(function (item) { if (self.base64Binary) { _.each(item, function (value, key) { if (value && value.B) { @@ -84,27 +85,32 @@ DynamoBackup.prototype.backupTable = function (tableName, backupPath, callback) } }); } - stream.append(JSON.stringify(item)); + + if (self.saveDataPipelineFormat) { + stream.append(self._formatForDataPipeline(item)); + } else { + stream.append(JSON.stringify(item)); + } stream.append('\n'); }); }, - function(err) { + function (err) { stream.end(); - if(err) { + if (err) { self.emit('error', { table: tableName, - err: err + err: err }); } } - ); + ); } DynamoBackup.prototype.backupAllTables = function (callback) { var self = this; var backupPath = self._getBackupPath(); - self.listTables(function(err, tables) { + self.listTables(function (err, tables) { if (err) { return callback(err); } @@ -113,8 +119,8 @@ DynamoBackup.prototype.backupAllTables = function (callback) { tables = _.intersection(tables, includedTables); async.each(tables, - function(tableName, done) { - self.backupTable(tableName, backupPath, function(err) { + function (tableName, done) { + self.backupTable(tableName, backupPath, function (err) { if (err) { if (self.stopOnFailure) { return done(err); @@ -124,11 +130,11 @@ DynamoBackup.prototype.backupAllTables = function (callback) { }) }, callback - ); + ); }); } -DynamoBackup.prototype._getBackupPath = function() { +DynamoBackup.prototype._getBackupPath = function () { var self = this; var now = moment.utc(); return self.backupPath || ('DynamoDB-backup-' + now.format('YYYY-MM-DD-HH-mm-ss')); @@ -137,13 +143,13 @@ DynamoBackup.prototype._getBackupPath = function() { DynamoBackup.prototype._copyTable = function (tableName, itemsReceived, callback) { var self = this; var ddb = new AWS.DynamoDB(); - ddb.describeTable({ TableName: tableName }, function(err, data) { + ddb.describeTable({ TableName: tableName }, function (err, data) { if (err) { return callback(err); } var readPercentage = self.readPercentage; - var limit = Math.max((data.Table.ProvisionedThroughput.ReadCapacityUnits * readPercentage)|0, 1); + var limit = Math.max((data.Table.ProvisionedThroughput.ReadCapacityUnits * readPercentage) | 0, 1); self._streamItems(tableName, null, limit, itemsReceived, callback); }); @@ -160,12 +166,12 @@ DynamoBackup.prototype._streamItems = function fetchItems(tableName, startKey, l if (startKey) { params.ExclusiveStartKey = startKey; } - ddb.scan(params, function(err, data) { + ddb.scan(params, function (err, data) { if (err) { return callback(err); } - if(data.Items.length > 0) { + if (data.Items.length > 0) { itemsReceived(data.Items); } @@ -176,14 +182,14 @@ DynamoBackup.prototype._streamItems = function fetchItems(tableName, startKey, l }); } -DynamoBackup.prototype._fetchTables = function(lastTable, tables, callback) { +DynamoBackup.prototype._fetchTables = function (lastTable, tables, callback) { var self = this; var ddb = new AWS.DynamoDB(); var params = {}; if (lastTable) { params.ExclusiveStartTableName = lastTable; } - ddb.listTables(params, function(err, data) { + ddb.listTables(params, function (err, data) { if (err) { return callback(err, null); } @@ -196,4 +202,45 @@ DynamoBackup.prototype._fetchTables = function(lastTable, tables, callback) { }); }; -module.exports = DynamoBackup; +/** + * AWS Data Pipeline import requires that each key in the Attribute list + * be lower-cased and for sets start with a lower-case character followed + * by an 'S'. + * + * Go through each attribute and create a new entry with the correct case + */ +DynamoBackup.prototype._formatForDataPipeline = function (item) { + var self = this; + _.each(item, function (value, key) { + //value will be of the form: {S: 'xxx'}. Convert the key + _.each(value, function (v, k) { + var dataPipelineValueKey = self._getDataPipelineAttributeValueKey(k); + value[dataPipelineValueKey] = v; + value[k] = undefined; + }); + }); + return JSON.stringify(item); +}; + +DynamoBackup.prototype._getDataPipelineAttributeValueKey = function (type) { + switch (type) { + case 'S': + case 'N': + case 'B': + case 'M': + case 'L': + case 'NULL': + case 'BOOL': + return type.toLowerCase(); + case 'SS': + return 'sS'; + case 'NS': + return 'nS'; + case 'BS': + return 'bS'; + default: + throw new Error('Unknown AttributeValue key: ' + type); + } +} + +module.exports = DynamoBackup; \ No newline at end of file