-
Notifications
You must be signed in to change notification settings - Fork 2
/
main.go
113 lines (102 loc) · 3.2 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package main
import (
"log"
"os"
"os/signal"
"time"
"github.com/wearefair/log-aggregator/pkg/cursor"
"github.com/wearefair/log-aggregator/pkg/destinations"
"github.com/wearefair/log-aggregator/pkg/destinations/firehose"
"github.com/wearefair/log-aggregator/pkg/destinations/stdout"
"github.com/wearefair/log-aggregator/pkg/pipeline"
"github.com/wearefair/log-aggregator/pkg/sources"
sjournal "github.com/wearefair/log-aggregator/pkg/sources/journal"
"github.com/wearefair/log-aggregator/pkg/sources/mock"
"github.com/wearefair/log-aggregator/pkg/transform"
"github.com/wearefair/log-aggregator/pkg/transform/aws"
"github.com/wearefair/log-aggregator/pkg/transform/journal"
"github.com/wearefair/log-aggregator/pkg/transform/json"
"github.com/wearefair/log-aggregator/pkg/transform/k8"
"github.com/wearefair/log-aggregator/pkg/transform/kibana"
)
const (
EnvK8ConfigPath = "FAIR_LOG_K8_CONFIG_PATH"
EnvK8Regex = "FAIR_LOG_K8_CONTAINER_NAME_REGEX"
EnvCursorPath = "FAIR_LOG_CURSOR_PATH"
EnvMockSource = "FAIR_LOG_MOCK_SOURCE"
EnvMockDestination = "FAIR_LOG_MOCK_DESTINATION"
EnvFirehoseStream = "FAIR_LOG_FIREHOSE_STREAM"
EnvFirehoseCredentialsEndpoint = "FAIR_LOG_FIREHOSE_CREDENTIALS_ENDPOINT"
EnvK8NodeName = "EC2_METADATA_LOCAL_HOSTNAME"
)
func main() {
var err error
var source sources.Source
var destination destinations.Destination
var logCursor cursor.DB
var transformers []transform.Transformer
// Setup cursor
if cursorPath := os.Getenv(EnvCursorPath); cursorPath == "" {
log.Fatalf("%s must be set", EnvCursorPath)
} else {
logCursor, err = cursor.New(cursorPath)
if err != nil {
panic(err)
}
}
// Setup source
if os.Getenv(EnvMockSource) == "true" {
source = mock.New(time.Second * 2)
} else {
source, err = sjournal.New(sjournal.ClientConfig{
Cursor: logCursor.Cursor(),
})
if err != nil {
panic(err)
}
}
// Setup destination
if os.Getenv(EnvMockDestination) == "true" {
destination = stdout.New()
} else {
streamName := os.Getenv(EnvFirehoseStream)
if streamName == "" {
log.Fatalf("%s must be set", EnvFirehoseStream)
}
destination = firehose.New(firehose.Config{
EC2MetadataEndpoint: os.Getenv(EnvFirehoseCredentialsEndpoint),
FirehoseStream: streamName,
})
}
// Setup transformer pipeline
transformers = []transform.Transformer{
journal.Transform,
json.Transform,
kibana.Transform,
aws.New(),
}
if configPath := os.Getenv(EnvK8ConfigPath); configPath != "" {
k8Transformer := k8.New(k8.Config{
K8ConfigPath: configPath,
NodeName: os.Getenv(EnvK8NodeName),
MaxPodsCache: 100,
KubernetesContainerNameRegexp: os.Getenv(EnvK8Regex),
})
transformers = append(transformers, k8Transformer.Transform)
}
logPipeline, err := pipeline.New(pipeline.Config{
MaxBuffer: 200,
Cursor: logCursor,
Input: source,
Destination: destination,
Transformers: transformers,
})
if err != nil {
panic(err)
}
logPipeline.Start()
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
<-signals
logPipeline.Stop(time.Second * 30)
}