/
root.go
207 lines (173 loc) · 5.72 KB
/
root.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
package main
import (
"context"
"fmt"
"log"
"net/http"
// This package registers its HTTP endpoints for profiling using an init hook
_ "net/http/pprof" // #nosec
"os"
"runtime"
"runtime/pprof"
"sync"
"time"
"github.com/observiq/stanza/v2/service"
"github.com/spf13/cobra"
"go.uber.org/zap"
)
// RootFlags are the root level flags that be provided when invoking stanza from the command line
type RootFlags struct {
DatabaseFile string
ConfigFile string
PluginDir string
PprofPort int
CPUProfile string
CPUProfileDuration time.Duration
MemProfile string
MemProfileDelay time.Duration
LogLevel string
LogFile string
MaxLogSize int
MaxLogBackups int
MaxLogAge int
Debug bool
}
// NewRootCmd will return a root level command
func NewRootCmd() *cobra.Command {
rootFlags := &RootFlags{}
root := &cobra.Command{
Use: "stanza [-c ./config.yaml]",
Short: "A log parser and router",
Long: "A log parser and router",
Args: cobra.NoArgs,
Run: func(command *cobra.Command, args []string) { runRoot(command, args, rootFlags) },
}
rootFlagSet := root.PersistentFlags()
rootFlagSet.StringVar(&rootFlags.LogLevel, "log_level", "INFO", "sets the agent's log level")
rootFlagSet.StringVar(&rootFlags.LogFile, "log_file", "", "writes agent logs to a specified file")
rootFlagSet.IntVar(&rootFlags.MaxLogSize, "max_log_size", 10, "sets the maximum size of agent log files in MB before rotating")
rootFlagSet.IntVar(&rootFlags.MaxLogBackups, "max_log_backups", 5, "sets the maximum number of rotated log files to retain")
rootFlagSet.IntVar(&rootFlags.MaxLogAge, "max_log_age", 7, "sets the maximum number of days to retain a rotated log file")
rootFlagSet.BoolVar(&rootFlags.Debug, "debug", false, "debug logging flag - deprecated")
rootFlagSet.StringVarP(&rootFlags.ConfigFile, "config", "c", defaultConfig(), "path to a config file")
rootFlagSet.StringVar(&rootFlags.PluginDir, "plugin_dir", defaultPluginDir(), "path to the plugin directory")
rootFlagSet.StringVar(&rootFlags.DatabaseFile, "database", "", "path to the stanza offset database")
// Profiling flags
rootFlagSet.IntVar(&rootFlags.PprofPort, "pprof_port", 0, "listen port for pprof profiling")
rootFlagSet.StringVar(&rootFlags.CPUProfile, "cpu_profile", "", "path to cpu profile output")
rootFlagSet.DurationVar(&rootFlags.CPUProfileDuration, "cpu_profile_duration", 60*time.Second, "duration to run the cpu profile")
rootFlagSet.StringVar(&rootFlags.MemProfile, "mem_profile", "", "path to memory profile output")
rootFlagSet.DurationVar(&rootFlags.MemProfileDelay, "mem_profile_delay", 10*time.Second, "time to wait before writing a memory profile")
// Set profiling flags to hidden
hiddenFlags := []string{"pprof_port", "cpu_profile", "cpu_profile_duration", "mem_profile", "mem_profile_delay"}
for _, flag := range hiddenFlags {
err := rootFlagSet.MarkHidden(flag)
if err != nil {
// MarkHidden only fails if the flag does not exist
panic(err)
}
}
root.AddCommand(NewGraphCommand(rootFlags))
root.AddCommand(NewVersionCommand())
root.AddCommand(NewOffsetsCmd(rootFlags))
return root
}
func runRoot(command *cobra.Command, _ []string, flags *RootFlags) {
logger := newLogger(*flags).Sugar()
defer func() {
_ = logger.Sync()
}()
// Build agent service
service, ctx, err := service.NewBuilder().
WithConfigFile(flags.ConfigFile).
WithDatabaseFile(flags.DatabaseFile).
WithPluginDir(flags.PluginDir).
WithLogger(logger).
Build(command.Context())
if err != nil {
logger.Errorf("Failed to create agent service", zap.Any("error", err))
os.Exit(1)
}
profilingWg := startProfiling(ctx, flags, logger)
err = service.Run()
if err != nil {
logger.Errorw("Failed to run agent service", zap.Any("error", err))
os.Exit(1)
}
profilingWg.Wait()
}
func startProfiling(ctx context.Context, flags *RootFlags, logger *zap.SugaredLogger) *sync.WaitGroup {
wg := &sync.WaitGroup{}
// Start pprof listening on port
if flags.PprofPort != 0 {
// pprof endpoints registered by importing net/pprof
var srv http.Server
srv.Addr = fmt.Sprintf(":%d", flags.PprofPort)
wg.Add(1)
go func() {
defer wg.Done()
logger.Info(srv.ListenAndServe())
}()
wg.Add(1)
go func() {
defer wg.Done()
<-ctx.Done()
shutdownCtx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
err := srv.Shutdown(shutdownCtx)
if err != nil {
logger.Warnw("Errored shutting down pprof server", zap.Error(err))
}
}()
}
// Start CPU profile for configured duration
if flags.CPUProfile != "" {
wg.Add(1)
go func() {
defer wg.Done()
f, err := os.Create(flags.CPUProfile)
if err != nil {
logger.Errorw("Failed to create CPU profile", zap.Error(err))
return
}
if err := pprof.StartCPUProfile(f); err != nil {
log.Fatal("could not start CPU profile: ", err)
}
select {
case <-ctx.Done():
case <-time.After(flags.CPUProfileDuration):
}
pprof.StopCPUProfile()
if f != nil {
if err := f.Close(); err != nil { // #nosec G307
logger.Errorf(err.Error())
}
}
}()
}
// Start memory profile after configured delay
if flags.MemProfile != "" {
wg.Add(1)
go func() {
defer wg.Done()
select {
case <-ctx.Done():
case <-time.After(flags.MemProfileDelay):
}
f, err := os.Create(flags.MemProfile)
if err != nil {
logger.Errorw("Failed to create memory profile", zap.Error(err))
}
runtime.GC() // get up-to-date statistics
if err := pprof.WriteHeapProfile(f); err != nil {
log.Fatal("could not write memory profile: ", err)
}
if f != nil {
if err := f.Close(); err != nil {
logger.Errorw("Failed to close file", zap.Error(err))
}
}
}()
}
return wg
}