-
Notifications
You must be signed in to change notification settings - Fork 31
/
writer.go
229 lines (196 loc) · 7.16 KB
/
writer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
package writer
import (
"context"
"fmt"
"hash/maphash"
"sort"
"time"
"github.com/kelindar/talaria/internal/column/computed"
"github.com/kelindar/talaria/internal/config"
"github.com/kelindar/talaria/internal/encoding/typeof"
"github.com/kelindar/talaria/internal/monitor"
"github.com/kelindar/talaria/internal/monitor/errors"
"github.com/kelindar/talaria/internal/storage"
"github.com/kelindar/talaria/internal/storage/compact"
"github.com/kelindar/talaria/internal/storage/flush"
"github.com/kelindar/talaria/internal/storage/writer/azure"
"github.com/kelindar/talaria/internal/storage/writer/bigquery"
"github.com/kelindar/talaria/internal/storage/writer/console"
"github.com/kelindar/talaria/internal/storage/writer/file"
"github.com/kelindar/talaria/internal/storage/writer/gcs"
"github.com/kelindar/talaria/internal/storage/writer/multi"
"github.com/kelindar/talaria/internal/storage/writer/noop"
"github.com/kelindar/talaria/internal/storage/writer/pubsub"
"github.com/kelindar/talaria/internal/storage/writer/s3"
"github.com/kelindar/talaria/internal/storage/writer/talaria"
)
var seed = maphash.MakeSeed()
// ForStreaming creates a streaming writer
func ForStreaming(config config.Streams, monitor monitor.Monitor) (storage.Streamer, error) {
writer, err := newStreamer(config, monitor)
if err != nil {
monitor.Error(err)
}
return writer.(storage.Streamer), nil
}
// ForCompaction creates a compaction writer
func ForCompaction(config *config.Compaction, monitor monitor.Monitor, store storage.Storage) (*compact.Storage, error) {
writer, err := newWriter(config.Sinks, monitor)
if err != nil {
return nil, err
}
// Configure the flush interval, default to 30s
interval := 30 * time.Second
if config.Interval > 0 {
interval = time.Duration(config.Interval) * time.Second
}
// If name function was specified, use it
nameFunc := defaultNameFunc
if config.NameFunc != "" {
if fn, err := computed.NewComputed("nameFunc", "main", typeof.String, config.NameFunc, monitor); err == nil {
nameFunc = func(row map[string]interface{}) (s string, e error) {
val, err := fn.Value(row)
if err != nil {
monitor.Error(err)
return "", err
}
return val.(string), err
}
}
}
// Crate the flusher
monitor.Info("server: setting up compaction %T to run every %.0fs...", writer, interval.Seconds())
// TODO: once we have everything working, consider making the flusher per writer (requires changing all writers)
flusher, err := flush.ForCompaction(monitor, writer, nameFunc)
if err != nil {
return nil, err
}
return compact.New(store, flusher, monitor, interval), nil
}
// NewWriter creates a new writer from the configuration.
func newWriter(sinks []config.Sink, monitor monitor.Monitor) (flush.Writer, error) {
var writers []multi.SubWriter
for _, config := range sinks {
// Configure console writer if present
if config.Console != nil {
w, err := console.New(config.Console.Filter, config.Console.Encoder, monitor)
if err != nil {
return nil, err
}
writers = append(writers, w)
}
// Configure S3 writer if present
if config.S3 != nil {
w, err := s3.New(monitor, config.S3.Bucket, config.S3.Prefix, config.S3.Region, config.S3.Endpoint, config.S3.SSE, config.S3.AccessKey, config.S3.SecretKey, config.S3.Filter, config.S3.Encoder, config.S3.Concurrency)
if err != nil {
return nil, err
}
writers = append(writers, w)
}
// Configure Azure MultiAccount writer if present
if config.Azure != nil && len(config.Azure.StorageAccounts) > 0 {
w, err := azure.NewMultiAccountWriter(monitor, config.Azure.Filter, config.Azure.Encoder, config.Azure.BlobServiceURL, config.Azure.Container, config.Azure.Prefix, config.Azure.StorageAccounts, config.Azure.StorageAccountWeights, config.Azure.Parallelism, config.Azure.BlockSize)
if err != nil {
return nil, err
}
writers = append(writers, w)
}
// Configure Azure SingleAccount writer if present
if config.Azure != nil && len(config.Azure.StorageAccounts) == 0 {
w, err := azure.New(config.Azure.Container, config.Azure.Prefix, config.Azure.Filter, config.Azure.Encoder, monitor)
if err != nil {
return nil, err
}
writers = append(writers, w)
}
// Configure GCS writer if present
if config.GCS != nil {
w, err := gcs.New(config.GCS.Bucket, config.GCS.Prefix, config.GCS.Filter, config.GCS.Encoder, monitor)
if err != nil {
return nil, err
}
writers = append(writers, w)
}
// Configure BigQuery writer if present
if config.BigQuery != nil {
w, err := bigquery.New(config.BigQuery.Project, config.BigQuery.Dataset, config.BigQuery.Table, config.BigQuery.Encoder, config.BigQuery.Filter, monitor)
if err != nil {
return nil, err
}
writers = append(writers, w)
}
// Configure File writer if present
if config.File != nil {
w, err := file.New(config.File.Directory, config.File.Filter, config.File.Encoder, monitor)
if err != nil {
return nil, err
}
writers = append(writers, w)
}
// Configure Talaria writer if present
if config.Talaria != nil {
w, err := talaria.New(config.Talaria.Endpoint, config.Talaria.Filter, config.Talaria.Encoder, monitor, config.Talaria.CircuitTimeout, config.Talaria.MaxConcurrent, config.Talaria.ErrorPercentThreshold, config.Talaria.MaxCallSendMsgSize, config.Talaria.MaxCallRecvMsgSize)
if err != nil {
return nil, err
}
writers = append(writers, w)
}
// Configure Google Pub/Sub writer if present
if config.PubSub != nil {
w, err := pubsub.New(config.PubSub.Project, config.PubSub.Topic, config.PubSub.Encoder, config.PubSub.Filter, monitor)
if err != nil {
return nil, err
}
writers = append(writers, w)
}
// If no writers were configured, error out
if len(writers) == 0 {
return noop.New(), errors.New("compact: writer was not configured")
}
}
// Setup a multi-writer for all configured writers
return multi.New(writers...), nil
}
// newStreamer creates a new streamer from the configuration.
func newStreamer(sinks config.Streams, monitor monitor.Monitor) (flush.Writer, error) {
var writers []multi.SubWriter
// If no streams were configured, error out
if len(sinks) == 0 {
return noop.New(), errors.New("stream: writer was not configured")
}
for _, v := range sinks {
conf := []config.Sink{v}
w, err := newWriter(conf, monitor)
if err != nil {
return noop.New(), err
}
writers = append(writers, w)
}
// Setup a multi-writer for all configured writers
multiWriters := multi.New(writers...)
_, err := multiWriters.Run(context.Background())
return multiWriters, err
}
// defaultNameFunc represents a default name function
func defaultNameFunc(row map[string]interface{}) (s string, e error) {
return fmt.Sprintf("%s-%x.orc",
time.Now().UTC().Format("year=2006/month=1/day=2/15-04-05"),
hashOfRow(row),
), nil
}
// hashOfRow computes a hash of the row, for the default filename
func hashOfRow(row map[string]interface{}) uint64 {
// Sort the map keys
str := make([]string, 0, len(row))
for k, v := range row {
str = append(str, fmt.Sprintf("%s=%v", k, v))
}
sort.Strings(str)
// Compute the hash
var hash maphash.Hash
hash.SetSeed(seed)
for _, v := range str {
hash.WriteString(v)
}
return hash.Sum64()
}