diff --git a/cmds/contest/server/server.go b/cmds/contest/server/server.go index 88ceaa6d..72848001 100644 --- a/cmds/contest/server/server.go +++ b/cmds/contest/server/server.go @@ -19,6 +19,7 @@ import ( "github.com/linuxboot/contest/pkg/config" "github.com/linuxboot/contest/pkg/job" "github.com/linuxboot/contest/pkg/jobmanager" + "github.com/linuxboot/contest/pkg/loggerhook" "github.com/linuxboot/contest/pkg/logging" "github.com/linuxboot/contest/pkg/pluginregistry" "github.com/linuxboot/contest/pkg/storage" @@ -48,10 +49,16 @@ var ( flagTargetLocker *string flagInstanceTag *string flagLogLevel *string - flagAdminServerAddr *string flagPauseTimeout *time.Duration flagResumeJobs *bool flagTargetLockDuration *time.Duration + // http logger parameters + flagAdminServerAddr *string + flagHttpLoggerBufferSize *int + flagHttpLoggerMaxBatchSize *int + flagHttpLoggerMaxBatchCount *int + flagHttpLoggerBatchSendFreq *time.Duration + flagHttpLoggerTimeout *time.Duration ) func initFlags(cmd string) { @@ -59,6 +66,11 @@ func initFlags(cmd string) { flagDBURI = flagSet.String("dbURI", config.DefaultDBURI, "Database URI") flagListenAddr = flagSet.String("listenAddr", ":8080", "Listen address and port") flagAdminServerAddr = flagSet.String("adminServerAddr", "", "Addr of the admin server to connect to") + flagHttpLoggerBufferSize = flagSet.Int("loggerBufferSize", loggerhook.DefaultBufferSize, "buffer size for the http logger hook") + flagHttpLoggerMaxBatchSize = flagSet.Int("loggerMaxBatchSize", loggerhook.DefaultMaxBatchSize, "max size (in bytes) of a logs batch to be sent if it reaches/exceeds it") + flagHttpLoggerMaxBatchCount = flagSet.Int("loggerMaxBatchCount", loggerhook.DefaultMaxBatchCount, "max count of logs in a batch") + flagHttpLoggerBatchSendFreq = flagSet.Duration("loggerBatchSendFreq", loggerhook.DefaultBatchSendFreq, "duration that defines the batch sending freq") + flagHttpLoggerTimeout = flagSet.Duration("loggerTimeout", loggerhook.DefaultLogTimeout, "logs send timeout") flagServerID = flagSet.String("serverID", "", "Set a static server ID, e.g. the host name or another unique identifier. If unset, will use the listener's default") flagProcessTimeout = flagSet.Duration("processTimeout", api.DefaultEventTimeout, "API request processing timeout") flagTargetLocker = flagSet.String("targetLocker", "auto", "Target locker implementation to use, \"auto\" follows DBURI setting") @@ -154,7 +166,17 @@ func Main(pluginConfig *PluginConfig, cmd string, args []string, sigs <-chan os. logrusOpts := logging.DefaultOptions() if *flagAdminServerAddr != "" { - logrusOpts = append(logrusOpts, bundles.OptionHttpLoggerAddr(*flagAdminServerAddr)) + logrusOpts = append( + logrusOpts, + bundles.OptionHttpLoggerConfig(loggerhook.Config{ + Addr: *flagAdminServerAddr, + BufferSize: *flagHttpLoggerBufferSize, + MaxBatchSize: *flagHttpLoggerMaxBatchSize, + MaxBatchCount: *flagHttpLoggerMaxBatchCount, + BatchSendFreq: *flagHttpLoggerBatchSendFreq, + LogTimeout: *flagHttpLoggerTimeout, + }), + ) } ctx, cancel := logrusctx.NewContext(logLevel, logrusOpts...) diff --git a/pkg/loggerhook/httphook.go b/pkg/loggerhook/httphook.go index 96511a18..701a38c3 100644 --- a/pkg/loggerhook/httphook.go +++ b/pkg/loggerhook/httphook.go @@ -16,14 +16,23 @@ import ( "github.com/sirupsen/logrus" ) -var ( - DefaultBufferSize = 10 - MaxBatchSize = 500000 // size in bytes - MaxBatchCount = 100 - BatchSendFreq = 1 * time.Second - DefaultLogTimeout = 1 * time.Second +const ( + DefaultBufferSize = 10 + DefaultMaxBatchSize = 500000 // size in bytes + DefaultMaxBatchCount = 100 + DefaultBatchSendFreq = 1 * time.Second + DefaultLogTimeout = 1 * time.Second ) +type Config struct { + Addr string + BufferSize int + MaxBatchSize int + MaxBatchCount int + BatchSendFreq time.Duration + LogTimeout time.Duration +} + // Batch defines a log batch that handles the size in bytes of the logs type Batch struct { addr string @@ -68,6 +77,7 @@ func (b *Batch) PostAndReset() error { } type HttpHook struct { + cfg Config batch Batch batchTicker *time.Ticker @@ -75,8 +85,8 @@ type HttpHook struct { closeChan chan struct{} } -func NewHttpHook(addr string) (*HttpHook, error) { - url, err := url.ParseRequestURI(addr) +func NewHttpHook(cfg Config) (*HttpHook, error) { + url, err := url.ParseRequestURI(cfg.Addr) if err != nil { return nil, err } @@ -84,9 +94,10 @@ func NewHttpHook(addr string) (*HttpHook, error) { url.Path = path.Join(url.Path, "log") hh := HttpHook{ + cfg: cfg, batch: NewBatch(url.String()), - batchTicker: time.NewTicker(BatchSendFreq), - logChan: make(chan server.Log, DefaultBufferSize), + batchTicker: time.NewTicker(cfg.BatchSendFreq), + logChan: make(chan server.Log, cfg.BufferSize), closeChan: make(chan struct{}), } @@ -123,7 +134,7 @@ func (hh *HttpHook) Fire(entry *logrus.Entry) error { } // timeout is used to not block the service on the logging - timeout := time.After(DefaultLogTimeout) + timeout := time.After(hh.cfg.LogTimeout) select { case hh.logChan <- log: // do nothing @@ -140,7 +151,7 @@ func (hh *HttpHook) logHandler() { select { case log := <-hh.logChan: hh.batch.Add(log) - if hh.batch.Count() > MaxBatchCount || hh.batch.Size() > uint64(MaxBatchSize) { + if hh.batch.Count() > hh.cfg.MaxBatchCount || hh.batch.Size() > uint64(hh.cfg.MaxBatchSize) { err := hh.batch.PostAndReset() if err != nil { fmt.Fprintf(os.Stderr, "Send Batch failed: %v", err) @@ -148,7 +159,7 @@ func (hh *HttpHook) logHandler() { } // if the batch is sent // to avoid ticking on an empty batch - hh.batchTicker.Reset(BatchSendFreq) + hh.batchTicker.Reset(hh.cfg.BatchSendFreq) } case <-hh.batchTicker.C: if hh.batch.Size() > 0 { diff --git a/pkg/xcontext/bundles/logrusctx/new_context.go b/pkg/xcontext/bundles/logrusctx/new_context.go index 89fe50c4..368fbff9 100644 --- a/pkg/xcontext/bundles/logrusctx/new_context.go +++ b/pkg/xcontext/bundles/logrusctx/new_context.go @@ -90,8 +90,8 @@ func NewContext(logLevel logger.Level, opts ...bundles.Option) (xcontext.Context ), ) - if cfg.HttpLoggerAddr != "" { - httpHook, err := loggerhook.NewHttpHook(cfg.HttpLoggerAddr) + if cfg.HttpLoggerConfig.Addr != "" { + httpHook, err := loggerhook.NewHttpHook(cfg.HttpLoggerConfig) if err == nil { // clean the http logger on ctx canceling go func() { diff --git a/pkg/xcontext/bundles/options.go b/pkg/xcontext/bundles/options.go index 111d04f7..04c8336b 100644 --- a/pkg/xcontext/bundles/options.go +++ b/pkg/xcontext/bundles/options.go @@ -6,6 +6,7 @@ package bundles import ( + "github.com/linuxboot/contest/pkg/loggerhook" "github.com/linuxboot/contest/pkg/xcontext" ) @@ -70,12 +71,11 @@ func (opt OptionTimestampFormat) apply(cfg *Config) { cfg.TimestampFormat = string(opt) } -// OptionHttpLoggerAddr is used to create a logger hook to send logs via http -// to admin server -type OptionHttpLoggerAddr string +// OptionHttpLoggerConfig is used to define the different config for the http logger +type OptionHttpLoggerConfig loggerhook.Config -func (opt OptionHttpLoggerAddr) apply(cfg *Config) { - cfg.HttpLoggerAddr = string(opt) +func (opt OptionHttpLoggerConfig) apply(cfg *Config) { + cfg.HttpLoggerConfig = loggerhook.Config(opt) } // Config is a configuration state resulted from Option-s. @@ -84,9 +84,10 @@ type Config struct { TracerReportCaller bool TimestampFormat string VerboseCaller bool - HttpLoggerAddr string Tracer xcontext.Tracer Format LogFormat + // http logger specific options + HttpLoggerConfig loggerhook.Config } // GetConfig processes passed Option-s and returns the resulting state as Config.