Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store shard metadata in S3, add a tailing facility #5

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
build/
src/github.com/
src/gopkg.in/
/.go/
cscope.*
7 changes: 7 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,10 @@ build:

clean:
rm -rf build

cscope:
find $$GOPATH/src -type f -iname "*.go"> cscope.files
cscope -b -k

tags:

22 changes: 20 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ Triton is an opinionated set of tooling for building a data pipeline around an
AWS stack including [Kinesis](http://aws.amazon.com/kinesis/) and S3.

It provides the necessary glue for building real applications on top of the
type of architecture.
type of architecture.

## Overview ##

As your application collects data, write it to Kinesis streams as a series of
events. Other applications in your infrastructure read from this stream
providing a solid pattern for services to share data.
providing a solid pattern for services to share data.

Triton aims to provide a level of tooling, glue and utility to make this
ecosystem easy to use. Namely:
Expand Down Expand Up @@ -177,7 +177,25 @@ for {
...
}
```
### Tailing from a specific time

You can tail from a specific point in time. For example:

```go

kinesisClient := ...
s3Client := ...

tail := triton.NewTailAt(&triton.NewTailAtParams{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to rethink these 'public' interfaces.

Like perhaps the interfaces should all be "NewStreamReader"-ish with combinations for:

  • starting/maintaining a Checkpointer
  • starting at a specific Time
  • ending at a specific Time (tbd)

S3Service: ... s3 client for region where triton records live...,
KinesisService: ... kinesis client...,
StreamName: "courier_activity_prod",
Bucket: "postmates-triton-prod",
Client: "archive"
At: time.Now().Add(time.Minute * 30), // 30 minutes ago
})

```

### Other Languages ###

Expand Down
45 changes: 16 additions & 29 deletions triton/archive.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package triton

import (
"fmt"
"regexp"
"strings"
"time"

"github.com/aws/aws-sdk-go/aws"
Expand All @@ -17,14 +14,13 @@ type StoreArchive struct {
Key string
ClientName string

T time.Time
SortValue int
T time.Time

s3Svc S3Service
rdr Reader
}

func (sa *StoreArchive) ReadRecord() (rec map[string]interface{}, err error) {
func (sa *StoreArchive) ReadRecord() (rec Record, err error) {
if sa.rdr == nil {
out, err := sa.s3Svc.GetObject(&s3.GetObjectInput{
Bucket: aws.String(sa.Bucket),
Expand All @@ -37,45 +33,36 @@ func (sa *StoreArchive) ReadRecord() (rec map[string]interface{}, err error) {

sa.rdr = NewArchiveReader(out.Body)
}

rec, err = sa.rdr.ReadRecord()
return
}

func (sa *StoreArchive) parseKeyName(keyName string) (err error) {
re := regexp.MustCompile(`(?P<day>\d{8})\/(?P<stream>.+)\-(?P<ts>\d+)\.tri$`)
res := re.FindAllStringSubmatch(keyName, -1)

if len(res) != 1 {
return fmt.Errorf("Invalid key name")
}

sa.T, err = time.Parse("20060102", res[0][1])

n, err := fmt.Sscanf(res[0][3], "%d", &sa.SortValue)
if n != 1 {
return fmt.Errorf("Failed to parse sort value")
key, err := DecodeArchiveKey(keyName)
if err != nil {
return
}
sa.T = key.Time
sa.StreamName = key.Stream
sa.ClientName = key.Client
return
}

nameParts := strings.Split(res[0][2], "-")
if len(nameParts) != 2 {
return fmt.Errorf("Failure parsing stream name: %v", res[0][2])
}
sa.StreamName = nameParts[0]
sa.ClientName = nameParts[1]
// Read the stream metadata associated with this store archive instance
func (sa *StoreArchive) GetStreamMetadata() (result *StreamMetadata, err error) {
result, err = ReadStreamMetadata(sa.s3Svc, sa.Bucket, sa.Key)

return
}

// NewStoreArchive returns a StoreArchive instance
func NewStoreArchive(bucketName, keyName string, svc S3Service) (sa StoreArchive, err error) {
sa.Bucket = bucketName
sa.Key = keyName
sa.s3Svc = svc

err = sa.parseKeyName(keyName)
if err != nil {
return sa, err
return
}

return sa, nil
return
}
82 changes: 82 additions & 0 deletions triton/archive_key.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package triton

import (
"fmt"
"regexp"
"strconv"
"strings"
"time"
)

// ArchiveKey is a struct representing the path value for the Triton S3 keys
type ArchiveKey struct {
Client string
Stream string
Time time.Time
}

// Path encodes the ArchiveKey to a string path
func (a ArchiveKey) Path() string {
return fmt.Sprintf("%04d%02d%02d/%s-%d.tri", a.Time.Year(), a.Time.Month(), a.Time.Day(), a.fullStreamName(), a.Time.Unix())
}

const (
metadataSuffix = ".metadata"
)

// MetadataPath encodes the ArchiveKey to a string path with the metadata suffix applied
func (a ArchiveKey) MetadataPath() string {
return a.Path() + metadataSuffix
}

// fullStreamName returns the full stream name (stream + "-" + client) if there is a client name or just stream
func (a ArchiveKey) fullStreamName() (stream string) {
stream = a.Stream
if a.Client != "" {
stream += "-" + a.Client
}
return
}

// PathPrefix returns the string key prefix without the timestamp
func (a ArchiveKey) PathPrefix() string {
return fmt.Sprintf("%04d%02d%02d/%s-", a.Time.Year(), a.Time.Month(), a.Time.Day(), a.fullStreamName())
}

func (a ArchiveKey) Equal(other ArchiveKey) (result bool) {
if a.Stream != other.Stream {
return false
}
if a.Time.Truncate(time.Second) != other.Time.Truncate(time.Second) {
return false
}
if a.Client != other.Client {
return false
}
return true
}

var archiveKeyPattern = regexp.MustCompile(`^/?(?P<day>\d{8})\/(?P<stream>.+)\-(?P<ts>\d+)\.tri$`)

// Decode an archive S3 key into an ArchiveKey
func DecodeArchiveKey(keyName string) (a ArchiveKey, err error) {
res := archiveKeyPattern.FindStringSubmatch(keyName)
if res == nil {
err = fmt.Errorf("Invalid key name")
return
}
ts, err := strconv.ParseInt(res[3], 10, 64)
if err != nil {
err = fmt.Errorf("Failed to parse timestamp value: %s", err.Error())
return
}
a.Time = time.Unix(ts, 0)
nameParts := strings.Split(res[2], "-")
if len(nameParts) != 2 {
err = fmt.Errorf("Failure parsing stream name: %v", res[2])
return
}
a.Stream = nameParts[0]
a.Client = nameParts[1]
return
}
20 changes: 20 additions & 0 deletions triton/archive_key_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package triton

import (
"testing"
"time"
)

func TestArchiveKeyPathCodec(t *testing.T) {
aTime := time.Now()
archiveKey := ArchiveKey{Time: aTime, Stream: "a", Client: "b"}
archiveKey2, err := DecodeArchiveKey(archiveKey.Path())

if err != nil {
t.Fatalf("unexpected error: %s", err.Error())
}
if !archiveKey.Equal(archiveKey2) {
t.Fatalf("expecting %+v == %+v", archiveKey, archiveKey2)
}

}
10 changes: 6 additions & 4 deletions triton/archive_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,18 @@ type ArchiveReader struct {
mr *msgp.Reader
}

func (r *ArchiveReader) ReadRecord() (rec map[string]interface{}, err error) {
rec = make(map[string]interface{})

func (r *ArchiveReader) ReadRecord() (sr Record, err error) {
rec := make(map[string]interface{})
err = r.mr.ReadMapStrIntf(rec)
if err != nil {
return
}
sr = rec
return
}

func NewArchiveReader(ir io.Reader) (or Reader) {
sr := snappy.NewReader(ir)
mr := msgp.NewReader(sr)

return &ArchiveReader{mr}
}
100 changes: 100 additions & 0 deletions triton/archive_repository.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package triton

import (
"bytes"
"encoding/json"
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"io"
"io/ioutil"
"sort"
"strings"
"time"
)

// ArchiveRepository manages reading and writing Archives
type ArchiveRepository struct {
s3Service S3Service
s3Uploader S3UploaderService
stream string
bucket string
client string
}

func NewArchiveRepository(s3Service S3Service, s3Uploader S3UploaderService, bucket string, stream string, client string) *ArchiveRepository {
return &ArchiveRepository{
s3Service: s3Service,
s3Uploader: s3Uploader,
bucket: bucket,
stream: stream,
client: client,
}
}

// Upload the archive for a stream at Time t
func (ar *ArchiveRepository) Upload(t time.Time, contents io.ReadCloser, metadata *StreamMetadata) (err error) {
archiveKey := ArchiveKey{Stream: ar.stream, Time: t, Client: ar.client}
_, err = ar.s3Uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(ar.bucket),
Key: aws.String(archiveKey.Path()),
Body: contents,
})
if err != nil {
if awsErr, ok := err.(awserr.Error); ok {
return fmt.Errorf("Failed to upload: %v (%v)", awsErr.Code(), awsErr.Message())
}
return
}
var buf bytes.Buffer
err = json.NewEncoder(&buf).Encode(metadata)
if err != nil {
return
}
_, err = ar.s3Uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(ar.bucket),
Key: aws.String(archiveKey.MetadataPath()),
Body: ioutil.NopCloser(&buf),
})
if err != nil {
if awsErr, ok := err.(awserr.Error); ok {
return fmt.Errorf("Failed to upload metadata: %v (%v)", awsErr.Code(), awsErr.Message())
}
return
}
return
}

// ArchivesAtDate lists all the archives for a stream stored at a UTC date represented by aDate
func (ar *ArchiveRepository) ArchivesAtDate(aDate time.Time) (result []StoreArchive, err error) {
keyPrefix := ArchiveKey{Time: aDate, Stream: ar.stream, Client: ar.client}.PathPrefix()
keys := []string{}
err = ar.s3Service.ListObjectsPages(&s3.ListObjectsInput{
Bucket: aws.String(ar.bucket),
Prefix: aws.String(keyPrefix),
}, func(output *s3.ListObjectsOutput, lastPage bool) (shouldContinue bool) {
for _, object := range output.Contents {
keys = append(keys, *object.Key)
}
return true
})
if err != nil {
return
}
sort.Sort(sort.StringSlice(keys))
for _, key := range keys {
if strings.HasSuffix(key, metadataSuffix) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a thought, but maybe you should just attempt to create archives for every key it finds, and let the DecodeArchiveKey figure out if it's a valid key to use or not.

Seems like it would be safer to allow unrecognizable keys to exist for future backwards compatibility reasons too?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems reasonable

On Fri, Dec 18, 2015 at 3:05 PM, Rhett Garber notifications@github.com
wrote:

In triton/archive_repository.go
#5 (comment):

  • keys := []string{}
  • err = ar.s3Service.ListObjectsPages(&s3.ListObjectsInput{
  •   Bucket: aws.String(ar.bucket),
    
  •   Prefix: aws.String(keyPrefix),
    
  • }, func(output *s3.ListObjectsOutput, lastPage bool) (shouldContinue bool) {
  •   for _, object := range output.Contents {
    
  •       keys = append(keys, *object.Key)
    
  •   }
    
  •   return true
    
  • })
  • if err != nil {
  •   return
    
  • }
  • sort.Sort(sort.StringSlice(keys))
  • for _, key := range keys {
  •   if strings.HasSuffix(key, metadataSuffix) {
    

Just a thought, but maybe you should just attempt to create archives for
every key it finds, and let the DecodeArchiveKey figure out if it's a valid
key to use or not.

Seems like it would be safer to allow unrecognizable keys to exist for
future backwards compatibility reasons too?


Reply to this email directly or view it on GitHub
https://github.com/postmates/go-triton/pull/5/files#r48064339.

continue
}
var sa StoreArchive
sa, err = NewStoreArchive(ar.bucket, key, ar.s3Service)
if err != nil {
err = fmt.Errorf("failed to create store archive for %q: %s", key, err)
return
}
result = append(result, sa)
}
return
}
Loading