-
Notifications
You must be signed in to change notification settings - Fork 50
/
output.go
66 lines (52 loc) · 1.8 KB
/
output.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package stream
import (
"context"
"github.com/justtrackio/gosoline/pkg/cfg"
"github.com/justtrackio/gosoline/pkg/log"
)
type WritableMessage interface {
MarshalToBytes() ([]byte, error)
MarshalToString() (string, error)
}
//go:generate mockery --name Output
type Output interface {
WriteOne(ctx context.Context, msg WritableMessage) error
Write(ctx context.Context, batch []WritableMessage) error
}
//go:generate mockery --name PartitionedOutput
type PartitionedOutput interface {
Output
// IsPartitionedOutput returns true if the output is writing to more than one shard/partition/bucket, and we need to
// take care about writing messages to the correct partition.
IsPartitionedOutput() bool
}
//go:generate mockery --name SizeRestrictedOutput
type SizeRestrictedOutput interface {
Output
// GetMaxMessageSize returns the maximum size of a message for this output (or nil if there is no limit on message size).
GetMaxMessageSize() *int
// GetMaxBatchSize returns the maximum number of messages we can write at once to the output (or nil if there is no limit).
GetMaxBatchSize() *int
}
type OutputFactory func(ctx context.Context, config cfg.Config, logger log.Logger, name string) (Output, error)
func MessagesToWritableMessages(batch []*Message) []WritableMessage {
writableBatch := make([]WritableMessage, len(batch))
for i, record := range batch {
writableBatch[i] = record
}
return writableBatch
}
type hasAttributes interface {
GetAttributes() map[string]string
}
// ensure all the types we actually write to SQS/SNS implement hasAttributes
var (
_ hasAttributes = &Message{}
_ hasAttributes = rawJsonMessage{}
)
func getAttributes(msg WritableMessage) map[string]string {
if withAttributes, ok := msg.(hasAttributes); ok {
return withAttributes.GetAttributes()
}
return map[string]string{}
}