Skip to content
This repository has been archived by the owner on Mar 3, 2023. It is now read-only.
/ dynamo-streams Public archive

A stream-flavored wrapper for the AWS DynamoDB JavaScript API

Notifications You must be signed in to change notification settings

jed/dynamo-streams

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

45 Commits
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

dynamo-streams

Build Status

A DynamoDB stream interface for the JavaScript aws-sdk library.

API

dbStreams = dynamoStreams(new aws.DynamoDB)

Extends the existing DynamoDB instance with the following stream methods. All methods encode/decode DynamoDB types (such as S, N, and B) automatically.

var aws = require("aws-sdk")
var dynamoStreams = require("dynamo-streams")

var db = new aws.DynamoDB

// create each stream as a function...
var read = dynamoStreams.createScanStream(db, {TableName: "stooges"})

// ... or as a method.
var dbStreams = dynamoStreams(db)
var read = dbStreams.createScanStream({TableName: "stooges"})

Readable streams

dbStreams#createScanStream(params)

Returns a readable stream of scanned rows. params is passed through to the underlying db.scan operation.

var read = dbStreams.createScanStream({TableName: "stooges"})

read.on("data", console.log)

// {id: 1, name: "Moe"}
// {id: 2, name: "Shemp"}
// {id: 3, name: "Larry"}

dbStreams#createQueryStream(params)

Same as createScanStream, but for queries.

Writable streams

dbStreams#createPutStream(params)

Returns a writeable stream of rows to put. params must include a TableName property specifying the DynamoDB table. Internally, operations are chunked using db.BatchWriteItem.

var put = dbStreams.createPutStream({TableName: "stooges"})

put.write({id: 4, name: "Curly"})
put.end()

put.on("finish", function() {
  var read = dbStreams.createScanStream({TableName: "stooges"})

  read.on("data", console.log)

  // {id: 1, name: "Moe"}
  // {id: 2, name: "Shemp"}
  // {id: 3, name: "Larry"}
  // {id: 4, name: "Curly"}
}

dbStreams#createDeleteStream(params)

Returns a writeable stream of rows to delete. params must include a TableName property specifying the DynamoDB table. Internally, operations are chunked using db.BatchWriteItem. All incoming objects are trimmed to keys of hash/range values.

var put = dbStreams.createDeleteStream({TableName: "stooges"})

put.write({id: 2})
put.end()

put.on("finish", function() {
  var read = dbStreams.createScanStream({TableName: "stooges"})

  read.on("data", console.log)

  // {id: 1, name: "Moe"}
  // {id: 3, name: "Larry"}
}

dbStreams#createScanSyncStream(params)

Returns a writeable stream representing the state of the database for a given scan. Internally, params is passed to createScanStream, to return a readable stream. This (remote) readable stream is diffed against the items piped to the (local) stream, and the db.BatchWriteItem method is then used to delete items missing from the local stream and put items missing from the remote stream. In other words, the inbound items are compared with the existing items, and the minimum number of operations are then performed to update the database.

var put = dbStreams.createDeleteStream({TableName: "stooges"})

sync.write({id: 1, name: "Moe"})
sync.write({id: 3, name: "Larry"})
sync.write({id: 4, name: "Curly"})

sync.end()

put.on("finish", function() {
  var read = dbStreams.createScanStream({TableName: "stooges"})

  read.on("data", console.log)

  // {id: 1, name: "Moe"}
  // {id: 3, name: "Larry"}
  // {id: 4, name: "Curly"}
}

dbStreams#createQuerySyncStream(params)

Returns the same as the createScanSyncStream, but for a query instead of a scan.

Credits

Many thanks to mhart for his awesome dynalite library.

About

A stream-flavored wrapper for the AWS DynamoDB JavaScript API

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Contributors 4

  •  
  •  
  •  
  •