-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Store shard metadata in S3, add a tailing facility #5
base: master
Are you sure you want to change the base?
Conversation
triton/uploader.go
Outdated
@@ -45,6 +47,26 @@ func (s *S3Uploader) Upload(fileName, keyName string) (err error) { | |||
return | |||
} | |||
|
|||
func (s *S3Uploader) UploadBuf(r io.Reader, keyName string) (err error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This name doesn't seem exactly right... the first argument is a Reader not a buffer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I renamed this to UploadData()
Who is supposed to call |
triton/store.go
Outdated
|
||
type streamMetadata struct { | ||
// shard ID => shardInfo | ||
Shards map[string]*shardInfo `json:"shards"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do these actually need to be pointers to shardInfo?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think so because they are mutable
|
Tests are failing |
I think this is fine. Still seems like two totally independent designs with the Checkpointers and then this new interface, and they should be reconciled in some way. But it's not really obvious to me right now how to do that. 👍 |
What's the story on this? Ready to go in? |
I'm ended up doing my tailing code in the Triton repo and based it off of On Wed, Dec 16, 2015 at 2:23 PM, Rhett Garber notifications@github.com
|
Also, I think adding the (time->sequence number) metadata about each archive is different than checkpointing. It's just a way to skip through the stream faster |
} | ||
sort.Sort(sort.StringSlice(keys)) | ||
for _, key := range keys { | ||
if strings.HasSuffix(key, metadataSuffix) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a thought, but maybe you should just attempt to create archives for every key it finds, and let the DecodeArchiveKey figure out if it's a valid key to use or not.
Seems like it would be safer to allow unrecognizable keys to exist for future backwards compatibility reasons too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems reasonable
On Fri, Dec 18, 2015 at 3:05 PM, Rhett Garber notifications@github.com
wrote:
In triton/archive_repository.go
#5 (comment):
- keys := []string{}
- err = ar.s3Service.ListObjectsPages(&s3.ListObjectsInput{
Bucket: aws.String(ar.bucket),
Prefix: aws.String(keyPrefix),
- }, func(output *s3.ListObjectsOutput, lastPage bool) (shouldContinue bool) {
for _, object := range output.Contents {
keys = append(keys, *object.Key)
}
return true
- })
- if err != nil {
return
- }
- sort.Sort(sort.StringSlice(keys))
- for _, key := range keys {
if strings.HasSuffix(key, metadataSuffix) {
Just a thought, but maybe you should just attempt to create archives for
every key it finds, and let the DecodeArchiveKey figure out if it's a valid
key to use or not.Seems like it would be safer to allow unrecognizable keys to exist for
future backwards compatibility reasons too?—
Reply to this email directly or view it on GitHub
https://github.com/postmates/go-triton/pull/5/files#r48064339.
Can you add examples on using Tail to the Readme. That might also help inform whether the interfaces are similar enough to the existing api. |
* Add ShardRecordReader interface * Add shardReader that uses channels * Update tail interface
Updated |
@@ -0,0 +1,198 @@ | |||
package triton | |||
|
|||
// Module for a record reader which consumes from a streamf ro a each shard |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo?
) | ||
|
||
// NewTail returns a new tailing stream starting at "at" | ||
func NewTailAt(params *NewTailAtParams) (tail *TailAt) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
still not loving the name here, but don't have an obvious answer.
So the current most common interface is to do 'NewStreamReader()'
Could this be 'NewTailAtReader'?
{"shards": {"shard id": {"min_sequence_number": "X", "max_sequence_number": "Y"}}}
NewTailAt()
function to tail records starting at a pointArchiveKey
)