Skip to content
This repository has been archived by the owner on Jul 31, 2024. It is now read-only.

Commit

Permalink
Merge pull request #4 from suzuken/time_based_iterator
Browse files Browse the repository at this point in the history
support time based shard iterator
  • Loading branch information
suzuken committed Apr 20, 2016
2 parents 992e47b + 8e62a52 commit 706ab14
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 4 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,15 @@ Usage of kineis-tail:
-interval duration
seconds for waiting next GetRecords request. (default 3s)
-iterator-type string
iterator type. Choose from TRIM_HORIZON(default), AT_SEQUENCE_NUMBER, or LATEST. (default "TRIM_HORIZON")
iterator type. Choose from TRIM_HORIZON(default), AT_SEQUENCE_NUMBER, AT_TIMESTAMP or LATEST. (default "TRIM_HORIZON")
-limit int
limit records length of each GetRecords request. (default 100)
-max-item-size int
max byte size per item for printing. (default 4096)
-region string
the AWS region where your Kinesis Stream is. (default "ap-northeast-1")
-start-time string
timestamp to start reading. only enable when iterator type is AT_TIMESTAMP. acceptable format is YYYY-MM-DDThh:mm:ss.sssTZD (RFC3339 format)
-stream string
your stream name (default "your-stream")
```
Expand Down
19 changes: 16 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,23 @@ package main
import (
"flag"
"fmt"
"os"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/kinesis"
"os"
"time"
)

var (
stream = flag.String("stream", "your-stream", "your stream name")
region = flag.String("region", "ap-northeast-1", "the AWS region where your Kinesis Stream is.")
iteratorType = flag.String("iterator-type", "TRIM_HORIZON", "iterator type. Choose from TRIM_HORIZON(default), AT_SEQUENCE_NUMBER, or LATEST.")
iteratorType = flag.String("iterator-type", "TRIM_HORIZON", "iterator type. Choose from TRIM_HORIZON(default), AT_SEQUENCE_NUMBER, AT_TIMESTAMP or LATEST.")
maxItemSize = flag.Int("max-item-size", 4096, "max byte size per item for printing.")
forever = flag.Bool("f", true, "tailing kinesis stream forever or not (like: tail -f)")
limit = flag.Int64("limit", 100, "limit records length of each GetRecords request.")
interval = flag.Duration("interval", time.Second*3, "seconds for waiting next GetRecords request.")
startTime = flag.String("start-time", "", "timestamp to start reading. only enable when iterator type is AT_TIMESTAMP. acceptable format is YYYY-MM-DDThh:mm:ss.sssTZD (RFC3339 format). For example, 2016-04-20T12:00:00+09:00 is acceptable.")
)

type Client struct {
Expand All @@ -32,6 +34,16 @@ func main() {
s := session.New(&aws.Config{Region: aws.String(*region)})
c := &Client{Kinesis: kinesis.New(s), maxItemSize: *maxItemSize, Limit: limit}

var start time.Time
if *iteratorType == "AT_TIMESTAMP" && *startTime != "" {
t, err := time.Parse(time.RFC3339, *startTime)
if err != nil {
fmt.Fprintf(os.Stderr, "parse time failed. -start-time format should be RFC3339 format.: %s", err)
os.Exit(1)
}
start = t
}

streamName := aws.String(*stream)

streams, err := c.DescribeStream(&kinesis.DescribeStreamInput{StreamName: streamName})
Expand All @@ -46,6 +58,7 @@ func main() {
ShardId: streams.StreamDescription.Shards[0].ShardId,
ShardIteratorType: aws.String(*iteratorType),
StreamName: streamName,
Timestamp: aws.Time(start),
})
if err != nil {
fmt.Fprintf(os.Stderr, "cannot get iterator: %s", err)
Expand Down

0 comments on commit 706ab14

Please sign in to comment.