Switch branches/tags
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
102 lines (81 sloc) 2.76 KB


Gem Version Build Status Test Coverage Code Climate Codacy Badge

Fluentd input plugin for AWS DynamoDB Streams.


Create IAM user with a policy like the following.

  "Version": "2012-10-17",
  "Statement": [
      "Effect": "Allow",
      "Action": [
      "Resource": "*"

Or define aws_key_id and aws_sec_key in your config file.


Add this line to your application's Gemfile:

gem 'fluent-plugin-dynamodb-streams'

And then execute:

$ bundle

Or install it yourself as:

$ gem install fluent-plugin-dynamodb-streams


  type dynamodb_streams
  #aws_key_id  AWS_ACCESS_KEY_ID
  #aws_sec_key AWS_SECRET_ACCESS_KEY
  #aws_region  AWS_DEFAULT_REGION
  stream_arn arn:aws:dynamodb:ap-northeast-1:000000000000:table/table_name/stream/2015-01-01T00:00:00.000
  pos_file /var/lib/fluent/dynamodb_streams_table_name
  fetch_interval 1
  fetch_size 1
  • tag: Fluentd tag.
  • stream_arn: DynamoDB Streams ARN.
  • pos_file: File to store last sequence number.
  • fetch_interval: The interval to fetch records in seconds. Default is 1 sec.
  • fetch_size: The maximum number of records fetches in each iteration. Default is 1.


  "aws_region": "ap-northeast-1",
  "event_source": "aws:dynamodb",
  "event_version": "1.0",
  "event_id": "dfbdf4fe-6f2b-4b34-9b17-4b8caae561fa",
  "event_name": "INSERT",
  "dynamodb": {
    "stream_view_type": "NEW_AND_OLD_IMAGES",
    "sequence_number": "000000000000000000001",
    "size_bytes": 14,
    "keys": {
      "key": "value2"
    "old_image": {
      "key": "value1"
    "new_image": {
      "key": "value2"


  • store sequence number to DynamoDB
  • fetch records from each shards concurrently