-
Notifications
You must be signed in to change notification settings - Fork 0
/
factory.go
97 lines (83 loc) · 2.27 KB
/
factory.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
package cmd
import (
"context"
"fmt"
stdlog "log"
"os"
"github.com/mshindle/datagen"
"github.com/mshindle/datagen/elastic"
"github.com/mshindle/datagen/kafka"
"github.com/mshindle/datagen/logger"
"github.com/mshindle/zlg"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)
type publisherFactory func(ctx context.Context, config appConfig) (datagen.Publisher, error)
var factoryLookup = map[string]publisherFactory{
"kafka": initKafka,
"elastic": initElastic,
"cloudlogs": initCloudlogs,
"logs": initLogs,
}
func initPublisher(ctx context.Context, cfg appConfig) (datagen.Publisher, error) {
factory, ok := factoryLookup[cfg.Sink]
if !ok {
return nil, fmt.Errorf("invalid sink option: %s", cfg.Sink)
}
return factory(ctx, cfg)
}
func initKafka(ctx context.Context, cfg appConfig) (datagen.Publisher, error) {
p, err := kafka.New(cfg.Kafka)
if err != nil {
log.Error().Msg("unable to create kafka client")
return nil, err
}
// just count the number of successful messages published...
go func() {
var n int
for _ = range p.Successes() {
n++
if n%10000 == 0 {
log.Debug().Int("messages", n).Msg("messages sent")
}
}
}()
// log any errors talking to kafka
go func() {
for err := range p.Errors() {
log.Error().Err(err).Msg("failed to deliver message")
}
}()
return p, nil
}
func initElastic(ctx context.Context, cfg appConfig) (datagen.Publisher, error) {
p, err := elastic.New(ctx, cfg.Elastic)
if err != nil {
log.Error().Msg("unable to create elastic client")
return nil, err
}
err = p.ListIndices()
if err != nil {
log.Error().Msg("could not pull index alias")
return nil, err
}
return p, nil
}
func initCloudlogs(ctx context.Context, cfg appConfig) (datagen.Publisher, error) {
zlog, err := zlg.NewWriter(ctx, cfg.CloudLog.Parent, cfg.CloudLog.LogID)
if err != nil {
return nil, err
}
// TODO: need to properly close zlg
std := stdlog.Default()
std.SetFlags(0)
std.SetOutput(zlog)
return logger.LoggerPublisher{Logger: std}, nil
}
func initLogs(ctx context.Context, cfg appConfig) (datagen.Publisher, error) {
zlog := zerolog.New(os.Stdout).With().Timestamp().Str("event", "publish").Logger()
std := stdlog.Default()
std.SetFlags(0)
std.SetOutput(zlog)
return logger.LoggerPublisher{Logger: std}, nil
}